Skip to content

Commit

Permalink
Introduce internal connection pool for clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Oct 9, 2023
1 parent 6dd035b commit c47ff39
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 48 deletions.
22 changes: 14 additions & 8 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ defmodule Xandra.Cluster do
type: :pos_integer,
default: 2,
doc: """
The number of nodes to start pools to. Each pool will use the `:pool_size` option
(see `Xandra.start_link/1`) to determine how many single connections to open to that
node. This number is a *target* number, which means that sometimes there might not
be enough nodes to start this many pools. Xandra won't ever start more than
`:target_pools` pools. *Available since v0.15.0*.
The number of nodes to start pools to. Each pool will use the `:pool_size` option to
determine how many single connections to open to that node. This number is a *target*
number, which means that sometimes there might not be enough nodes to start this many
pools. Xandra won't ever start more than `:target_pools` pools. *Available since v0.15.0*.
"""
],
name: [
Expand Down Expand Up @@ -238,6 +237,13 @@ defmodule Xandra.Cluster do
],
default: []
],
pool_size: [
type: :pos_integer,
default: 1,
doc: """
The number of connections to open to each node in the cluster. *Available since v0.18.0*.
"""
],
debug: [
type: :any,
doc: """
Expand All @@ -259,7 +265,7 @@ defmodule Xandra.Cluster do
],

# Internal for testing, not exposed.
xandra_module: [type: :atom, default: Xandra, doc: false],
xandra_module: [type: :atom, default: Xandra.Cluster.ConnectionPool, doc: false],
control_connection_module: [type: :atom, default: ControlConnection, doc: false],
test_discovered_hosts: [type: :any, default: [], doc: false]
]
Expand Down Expand Up @@ -318,9 +324,9 @@ defmodule Xandra.Cluster do
@spec start_link([option]) :: GenServer.on_start()
when option: Xandra.start_option() | start_option()
def start_link(options) when is_list(options) do
{cluster_opts, pool_opts} = Keyword.split(options, @start_link_opts_schema_keys)
{cluster_opts, connection_opts} = Keyword.split(options, @start_link_opts_schema_keys)
cluster_opts = NimbleOptions.validate!(cluster_opts, @start_link_opts_schema)
Pool.start_link(cluster_opts, pool_opts)
Pool.start_link(cluster_opts, connection_opts)
end

@doc false
Expand Down
34 changes: 34 additions & 0 deletions lib/xandra/cluster/connection_pool.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Xandra.Cluster.ConnectionPool do
@moduledoc false

use Supervisor

@spec start_link(keyword()) :: Supervisor.on_start()
def start_link(opts) when is_list(opts) do
connection_opts = Keyword.fetch!(opts, :connection_options)
pool_size = Keyword.fetch!(opts, :pool_size)
Supervisor.start_link(__MODULE__, {connection_opts, pool_size})
end

@spec checkout(pid()) :: pid()
def checkout(sup_pid) when is_pid(sup_pid) do
pids =
for {_id, pid, _type, _modules} when is_pid(pid) <- Supervisor.which_children(sup_pid) do
pid
end

Enum.random(pids)
end

## Callbacks

@impl true
def init({connection_opts, pool_size}) do
children =
for index <- 1..pool_size do
Supervisor.child_spec({Xandra, connection_opts}, id: {Xandra, index})
end

Supervisor.init(children, strategy: :one_for_one)
end
end
64 changes: 40 additions & 24 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ defmodule Xandra.Cluster.Pool do

@behaviour :gen_statem

alias Xandra.Cluster.{Host, LoadBalancingPolicy}
alias Xandra.Cluster.{ConnectionPool, Host, LoadBalancingPolicy}
alias Xandra.GenStatemHelpers

## Public API

@spec start_link(keyword(), keyword()) :: :gen_statem.start_ret()
def start_link(cluster_opts, pool_opts) do
def start_link(cluster_opts, connection_opts) do
{sync_connect_timeout, cluster_opts} = Keyword.pop!(cluster_opts, :sync_connect)

# Split out gen_statem-specific options from the cluster options.
{gen_statem_opts, cluster_opts} = Keyword.split(cluster_opts, GenStatemHelpers.start_opts())
{gen_statem_opts, cluster_opts} = GenStatemHelpers.split_opts(cluster_opts)

sync_connect_alias_or_nil = if sync_connect_timeout, do: Process.alias([:reply]), else: nil

case GenStatemHelpers.start_link_with_name_registration(
__MODULE__,
{cluster_opts, pool_opts, sync_connect_alias_or_nil},
{cluster_opts, connection_opts, sync_connect_alias_or_nil},
gen_statem_opts
) do
{:ok, pid} when is_integer(sync_connect_timeout) ->
Expand Down Expand Up @@ -71,7 +71,7 @@ defmodule Xandra.Cluster.Pool do

defstruct [
# Options for the underlying connection pools.
:pool_options,
:connection_options,

# Contact nodes.
:contact_nodes,
Expand All @@ -98,6 +98,9 @@ defmodule Xandra.Cluster.Pool do
# The number of target pools.
:target_pools,

# The number of connections in each pool to a node.
:pool_size,

# Erlang alias to send back the ":connected" message to make :sync_connect work.
# This is nil if :sync_connect was not used.
:sync_connect_alias,
Expand Down Expand Up @@ -137,7 +140,7 @@ defmodule Xandra.Cluster.Pool do
def init({cluster_opts, pool_opts, sync_connect_alias_or_nil}) do
Process.flag(:trap_exit, true)

# Start supervisor for the pools.
# Start supervisor for the connections.
{:ok, pool_sup} = Supervisor.start_link([], strategy: :one_for_one)

{lb_mod, lb_opts} =
Expand All @@ -151,7 +154,7 @@ defmodule Xandra.Cluster.Pool do
queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout)

data = %__MODULE__{
pool_options: pool_opts,
connection_options: pool_opts,
contact_nodes: Keyword.fetch!(cluster_opts, :nodes),
load_balancing_module: lb_mod,
load_balancing_state: lb_mod.init(lb_opts),
Expand All @@ -160,6 +163,7 @@ defmodule Xandra.Cluster.Pool do
control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module),
target_pools: Keyword.fetch!(cluster_opts, :target_pools),
name: Keyword.get(cluster_opts, :name),
pool_size: Keyword.fetch!(cluster_opts, :pool_size),
pool_supervisor: pool_sup,
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval),
reqs_before_connecting: %{
Expand Down Expand Up @@ -214,7 +218,7 @@ defmodule Xandra.Cluster.Pool do
{:keep_state, data, reply_actions}
end

# If we connected once, we already flush this queue, so we ignore this timeout.
# If we connected once, we already flushed this queue, so we ignore this timeout.
def handle_event(
{:timeout, :flush_queue_before_connecting},
nil,
Expand Down Expand Up @@ -331,7 +335,12 @@ defmodule Xandra.Cluster.Pool do
end

# For testing purposes
def handle_event(:info, {:add_test_hosts, hosts_with_status}, _state, %__MODULE__{} = data) do
def handle_event(
{:call, from},
{:add_test_hosts, hosts_with_status},
_state,
%__MODULE__{} = data
) do
data =
Enum.reduce(hosts_with_status, data, fn {%Host{} = host, status}, data_acc ->
data_acc =
Expand All @@ -340,18 +349,17 @@ defmodule Xandra.Cluster.Pool do
apply(data_acc.load_balancing_module, :"host_#{status}", [current_state, host])
end)

update_in(
data_acc.peers,
&Map.put(&1, Host.to_peername(host), %{
host: host,
status: status,
pool_pid: Process.spawn(fn -> nil end, []),
pool_ref: make_ref()
})
)
put_in(data_acc.peers[Host.to_peername(host)], %{
host: host,
status: status,
pool_pid: nil,
pool_ref: nil
})
end)

{:keep_state, data}
data = maybe_start_pools(data)

{:keep_state, data, {:reply, from, :ok}}
end

# Sent by the connection itself.
Expand Down Expand Up @@ -466,9 +474,10 @@ defmodule Xandra.Cluster.Pool do
# Find all connected hosts
connected_hosts =
for host <- query_plan,
%{pool_pid: pid, host: host} = Map.get(data.peers, Host.to_peername(host)),
%{pool_pid: pool_pid, host: host} = Map.get(data.peers, Host.to_peername(host)),
not is_nil(host),
is_pid(pid),
is_pid(pool_pid) and Process.alive?(pool_pid),
pid = ConnectionPool.checkout(pool_pid),
do: {pid, host}

reply =
Expand Down Expand Up @@ -515,12 +524,19 @@ defmodule Xandra.Cluster.Pool do
# peer, and it'll only start it once.
defp start_pool(%__MODULE__{} = data, %Host{} = host) do
conn_options =
Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self())
Keyword.merge(data.connection_options,
nodes: [Host.format_address(host)],
cluster_pid: self()
)

peername = Host.to_peername(host)

pool_spec =
Supervisor.child_spec({data.xandra_mod, conn_options}, id: peername, restart: :transient)
Supervisor.child_spec(
{data.xandra_mod, connection_options: conn_options, pool_size: data.pool_size},
id: peername,
restart: :transient
)

case Supervisor.start_child(data.pool_supervisor, pool_spec) do
{:ok, pool} ->
Expand Down Expand Up @@ -615,7 +631,7 @@ defmodule Xandra.Cluster.Pool do
cluster_pid: self(),
cluster_name: data.name,
contact_node: {host.address, host.port},
connection_options: data.pool_options,
connection_options: data.connection_options,
autodiscovered_nodes_port: data.autodiscovered_nodes_port,
refresh_topology_interval: data.refresh_topology_interval
]
Expand Down
2 changes: 1 addition & 1 deletion lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Xandra.Connection do

@spec start_link(keyword()) :: :gen_statem.start_ret()
def start_link(opts) when is_list(opts) do
{gen_statem_opts, opts} = Keyword.split(opts, GenStatemHelpers.start_opts())
{gen_statem_opts, opts} = GenStatemHelpers.split_opts(opts)
GenStatemHelpers.start_link_with_name_registration(__MODULE__, opts, gen_statem_opts)
end

Expand Down
10 changes: 6 additions & 4 deletions lib/xandra/gen_statem_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ defmodule Xandra.GenStatemHelpers do
# We use :gen_statem in some places, but name registration for it is tricky for example.
# This module provides a few helpers to mitigate that.

@spec start_opts() :: [atom(), ...]
def start_opts do
[:debug, :hibernate_after, :spawn_opt, :name]
@spec split_opts(keyword()) :: {keyword(), keyword()}
def split_opts(opts) when is_list(opts) do
{gen_statem_opts, other_opts} = Keyword.split(opts, [:debug, :hibernate_after, :spawn_opt])
gen_statem_opts = Keyword.merge(gen_statem_opts, Keyword.take(other_opts, [:name]))
{gen_statem_opts, other_opts}
end

@spec start_link_with_name_registration(module(), term(), keyword()) :: :gen_statem.start_ret()
Expand Down Expand Up @@ -37,7 +39,7 @@ defmodule Xandra.GenStatemHelpers do
* {:global, term}
* {:via, module, term}
Instead, got: #{inspect(other)}
Instead, got: #{inspect(other)}\
"""
end
end
Expand Down
4 changes: 1 addition & 3 deletions test/integration/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ defmodule TelemetryTest do
test "sends event on connection/disconnection", %{start_options: start_options} do
ref = :telemetry_test.attach_event_handlers(self(), [[:xandra, :connected]])

start_supervised!(
{Xandra, [name: :telemetry_test_connection, pool_size: 1] ++ start_options}
)
start_supervised!({Xandra, [name: :telemetry_test_connection] ++ start_options})

assert_receive {[:xandra, :connected], ^ref, measurements, metadata}

Expand Down
3 changes: 3 additions & 0 deletions test/xandra/cluster/connection_pool_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule Xandra.Cluster.ConnectionPoolTest do
use ExUnit.Case, async: true
end
39 changes: 36 additions & 3 deletions test/xandra/cluster/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ defmodule Xandra.Cluster.PoolTest do
alias Xandra.Cluster.Host
alias Xandra.Cluster.Pool

defmodule PoolMock do
use Supervisor

def start_link(opts), do: Supervisor.start_link(__MODULE__, Map.new(opts))

@impl true
def init(opts) do
{test_pid, test_ref} = :persistent_term.get(:clustering_test_info)
send(test_pid, {test_ref, __MODULE__, :init_called, opts})
Supervisor.init([], strategy: :one_for_one)
end
end

defmacrop assert_telemetry(postfix, meta) do
quote do
event = [:xandra, :cluster] ++ unquote(postfix)
Expand All @@ -28,7 +41,8 @@ defmodule Xandra.Cluster.PoolTest do
nodes: [{~c"127.0.0.1", @port}],
load_balancing: :random,
autodiscovered_nodes_port: @port,
xandra_module: Xandra,
xandra_module: Xandra.Cluster.ConnectionPool,
pool_size: 1,
target_pools: 2,
sync_connect: false,
refresh_topology_interval: 60_000,
Expand Down Expand Up @@ -313,6 +327,7 @@ defmodule Xandra.Cluster.PoolTest do

test "returns all connected pools",

Check failure on line 328 in test/xandra/cluster/pool_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15.4, OTP 25.3, C* 4.1, Scylla 5.1.6, Native protocol v3)

test checkout returns all connected pools (Xandra.Cluster.PoolTest)

Check failure on line 328 in test/xandra/cluster/pool_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15.4, OTP 25.3, C* 4.1, Scylla 5.1.6, Native protocol v4)

test checkout returns all connected pools (Xandra.Cluster.PoolTest)

Check failure on line 328 in test/xandra/cluster/pool_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15.4, OTP 25.3, C* 3, Scylla 4.6.3, Native protocol v3)

test checkout returns all connected pools (Xandra.Cluster.PoolTest)
%{cluster_options: cluster_options, pool_options: pool_options} do
cluster_options = Keyword.put(cluster_options, :xandra_mod, PoolMock)
assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options))

hosts_with_statuses = [
Expand All @@ -322,7 +337,7 @@ defmodule Xandra.Cluster.PoolTest do
]

wait_until_connected(pid)
send(pid, {:add_test_hosts, hosts_with_statuses})
:ok = :gen_statem.call(pid, {:add_test_hosts, hosts_with_statuses})

assert {:ok, pids_with_hosts} = Pool.checkout(pid)
assert is_list(pids_with_hosts)
Expand Down Expand Up @@ -353,12 +368,30 @@ defmodule Xandra.Cluster.PoolTest do
end

describe "resiliency" do
@tag :capture_log
test "if a single connection crashes, the pool process stays up and cleans up",
%{cluster_options: cluster_options, pool_options: pool_options} do
cluster_options = Keyword.merge(cluster_options, sync_connect: 1000)
cluster = start_supervised!(spec(cluster_options, pool_options))

assert %{pool_pid: pool_pid} = get_state(cluster).peers[{{127, 0, 0, 1}, @port}]

assert {:ok, [{conn_pid, %Host{}}]} = Pool.checkout(cluster)
ref = Process.monitor(conn_pid)

Process.exit(conn_pid, :kill)
assert_receive {:DOWN, ^ref, _, _, _}

assert %{pool_pid: ^pool_pid} = get_state(cluster).peers[{{127, 0, 0, 1}, @port}]
end

@tag :capture_log
test "if a connection pool crashes, the pool process stays up and cleans up",
%{cluster_options: cluster_options, pool_options: pool_options} do
cluster_options = Keyword.merge(cluster_options, sync_connect: 1000)
cluster = start_supervised!(spec(cluster_options, pool_options))

assert {:ok, [{pool_pid, %Host{}}]} = Pool.checkout(cluster)
assert %{pool_pid: pool_pid} = get_state(cluster).peers[{{127, 0, 0, 1}, @port}]
ref = Process.monitor(pool_pid)

Process.exit(pool_pid, :kill)
Expand Down
Loading

0 comments on commit c47ff39

Please sign in to comment.