Skip to content

Commit

Permalink
Add :target_pools option to Xandra.Cluster (#289)
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Mar 28, 2023
1 parent 0bcb146 commit 36a7735
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 212 deletions.
106 changes: 79 additions & 27 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ defmodule Xandra.Cluster do
# The registry where connections are registered.
:registry,

# The number of target pools.
:target_pools,

# A map of peername to pool PID pairs.
pools: %{},

Expand All @@ -128,12 +131,13 @@ defmodule Xandra.Cluster do

@start_link_opts_schema [
load_balancing: [
type: {:in, [:priority, :random]},
type: {:or, [{:in, [:priority, :random]}, :mod_arg]},
default: :random,
doc: """
Load balancing "strategy". Either `:random` or `:priority`. See the "Load balancing
strategies" section in the module documentation. If `:autodiscovery` is `true`,
the only supported strategy is `:random`.
TODO: fix docs here
"""
],
nodes: [
Expand All @@ -152,14 +156,19 @@ defmodule Xandra.Cluster do
],
autodiscovery: [
type: :boolean,
doc: """
Whether to enable autodiscovery. Since v0.15.0, this option is deprecated and
autodiscovery is always enabled.
""",
deprecated: """
:autodiscovery is deprecated since 0.15.0 and now always enabled due to internal changes
:autodiscovery is deprecated since v0.15.0 and now always enabled due to internal changes
to Xandra.Cluster.
"""
],
autodiscovered_nodes_port: [
type: {:in, 0..65535},
default: @default_port,
# TODO: use type_doc: "`t::inet.port_number/0`" when we depend on nimble_options 1.0.
doc: """
The port to use when connecting to autodiscovered nodes. Cassandra does not advertise
the port of nodes when discovering them, so you'll need to specify one explicitly.
Expand All @@ -176,6 +185,17 @@ defmodule Xandra.Cluster do
the cluster. *Available since v0.15.0*.
"""
],
target_pools: [
type: :pos_integer,
default: 2,
doc: """
The number of nodes to start pools to. Each pool will use the `:pool_size` option
(see `Xandra.start_link/1`) to determine how many single connections to open to that
node. This number is a *target* number, which means that sometimes there might not
be enough nodes to start this many pools. Xandra won't ever start more than
`:target_pools` pools. *Available since v0.15.0*.
"""
],
name: [
type: :any,
doc: """
Expand All @@ -201,6 +221,7 @@ defmodule Xandra.Cluster do
#{NimbleOptions.docs(@start_link_opts_schema)}
# TODO: Fix below
> #### Control connections {: .neutral}
>
> A `Xandra.Cluster` starts **one additional "control connection"** for each node.
Expand Down Expand Up @@ -441,20 +462,21 @@ defmodule Xandra.Cluster do

{:ok, _} = Registry.start_link(keys: :unique, name: registry_name)

load_balancing_mod =
{lb_mod, lb_opts} =
case Keyword.fetch!(cluster_opts, :load_balancing) do
:random -> LoadBalancingPolicy.Random
:random -> {LoadBalancingPolicy.Random, []}
:priority -> raise "not implemented yet"
module when is_atom(module) -> module
{mod, opts} -> {mod, opts}
end

state = %__MODULE__{
pool_options: pool_opts,
load_balancing_module: load_balancing_mod,
load_balancing_state: load_balancing_mod.init([]),
load_balancing_module: lb_mod,
load_balancing_state: lb_mod.init(lb_opts),
autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port),
xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module),
control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module),
target_pools: Keyword.fetch!(cluster_opts, :target_pools),
registry: registry_name
}

Expand All @@ -467,7 +489,7 @@ defmodule Xandra.Cluster do
contact_points: nodes,
connection_options: state.pool_options,
autodiscovered_nodes_port: state.autodiscovered_nodes_port,
load_balancing_module: load_balancing_mod,
load_balancing: {lb_mod, lb_opts},
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval),
registry: registry_name
)
Expand All @@ -478,31 +500,27 @@ defmodule Xandra.Cluster do

@impl true
def handle_call(:checkout, _from, %__MODULE__{} = state) do
case state do
%__MODULE__{pools: pools} when map_size(pools) == 0 ->
{:reply, {:error, :empty}, state}

%__MODULE__{} ->
{hosts_plan, state} =
get_and_update_in(state.load_balancing_state, fn lb_state ->
state.load_balancing_module.hosts_plan(lb_state)
end)

%Host{address: ip, port: port} = Enum.at(hosts_plan, 0)
pool = Map.fetch!(state.pools, {ip, port})
{:reply, {:ok, pool}, state}
end
{hosts_plan, state} =
get_and_update_in(state.load_balancing_state, fn lb_state ->
state.load_balancing_module.hosts_plan(lb_state)
end)

# Find the first host in the plan for which we have a pool.
reply =
hosts_plan
|> Stream.map(fn %Host{} = host -> Map.fetch(state.pools, Host.to_peername(host)) end)
|> Enum.find(_default = {:error, :empty}, &match?({:ok, _}, &1))

{:reply, reply, state}
end

@impl true
def handle_info(msg, state)

def handle_info({:host_up, %Host{} = host}, %__MODULE__{} = state) do
Logger.debug("Host marked as UP: #{Host.format_address(host)}")

state = update_in(state.load_balancing_state, &state.load_balancing_module.host_up(&1, host))

state = start_pool(state, host)
state = maybe_start_pools(state)
{:noreply, state}
end

Expand All @@ -513,6 +531,7 @@ defmodule Xandra.Cluster do
update_in(state.load_balancing_state, &state.load_balancing_module.host_down(&1, host))

state = stop_pool(state, host)
state = maybe_start_pools(state)
{:noreply, state}
end

Expand All @@ -522,19 +541,23 @@ defmodule Xandra.Cluster do
state =
update_in(state.load_balancing_state, &state.load_balancing_module.host_added(&1, host))

state = start_pool(state, host)
state = maybe_start_pools(state)
{:noreply, state}
end

def handle_info({:host_removed, %Host{} = host}, %__MODULE__{} = state) do
Logger.debug("Host removed from the cluster: #{Host.format_address(host)}")
_ = Supervisor.terminate_child(state.pool_supervisor, {host.address, host.port})
state = stop_pool(state, host)

# Also delete the child from the supervisor altogether.
_ = Supervisor.delete_child(state.pool_supervisor, {host.address, host.port})

state =
update_in(state.load_balancing_state, &state.load_balancing_module.host_removed(&1, host))

state = update_in(state.pools, &Map.delete(&1, {host.address, host.port}))

state = maybe_start_pools(state)
{:noreply, state}
end

Expand Down Expand Up @@ -585,4 +608,33 @@ defmodule Xandra.Cluster do
_ = Supervisor.terminate_child(state.pool_supervisor, {host.address, host.port})
update_in(state.pools, &Map.delete(&1, {host.address, host.port}))
end

defp maybe_start_pools(%__MODULE__{target_pools: target, pools: pools} = state)
when map_size(pools) == target do
state
end

defp maybe_start_pools(%__MODULE__{target_pools: target, pools: pools} = state)
when map_size(pools) < target do
{hosts_plan, state} =
get_and_update_in(state.load_balancing_state, fn lb_state ->
state.load_balancing_module.hosts_plan(lb_state)
end)

Enum.reduce_while(hosts_plan, state, fn %Host{} = host, state ->
case Map.fetch(pools, Host.to_peername(host)) do
{:ok, _pool} ->
{:cont, state}

:error ->
state = start_pool(state, host)

if map_size(state.pools) == target do
{:halt, state}
else
{:cont, state}
end
end
end)
end
end
25 changes: 16 additions & 9 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Xandra.Cluster.ControlConnection do
contact_points: [type: :any, required: true],
connection_options: [type: :keyword_list, required: true],
autodiscovered_nodes_port: [type: :non_neg_integer, required: true],
load_balancing_module: [type: :atom, required: true],
load_balancing: [type: :mod_arg, required: true],
refresh_topology_interval: [type: :timeout, required: true],
registry: [type: :atom, required: true]
]
Expand All @@ -46,8 +46,9 @@ defmodule Xandra.Cluster.ControlConnection do
# The interval at which to refresh the cluster topology.
:refresh_topology_interval,

# The load balancing policy, as a {mod, state} tuple.
# The load balancing policy, as a {mod, state} tuple, and the options.
:lbp,
:lb_opts,

# The registry to use to register connections.
:registry,
Expand Down Expand Up @@ -87,14 +88,15 @@ defmodule Xandra.Cluster.ControlConnection do
|> Keyword.fetch!(:contact_points)
|> contact_points_to_hosts()

lb_module = Keyword.fetch!(options, :load_balancing_module)
{lb_module, lb_opts} = Keyword.fetch!(options, :load_balancing)

data = %__MODULE__{
cluster: Keyword.fetch!(options, :cluster),
contact_points: contact_points,
autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port),
refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval),
lbp: {lb_module, lb_module.init([])},
lbp: {lb_module, lb_module.init(lb_opts)},
lb_opts: lb_opts,
options: connection_options,
transport: transport,
transport_options: Keyword.merge(transport_options, @forced_transport_options),
Expand Down Expand Up @@ -406,11 +408,7 @@ defmodule Xandra.Cluster.ControlConnection do

data =
if final_peers != old_peers do
# "Reset" the load-balancing policy.
update_in(data.lbp, fn {lb_module, _} ->
hosts = Enum.map(final_peers, fn {_peername, %{host: host}} -> host end)
{lb_module, lb_module.init(hosts)}
end)
reset_lbp(data, Enum.map(final_peers, fn {_peername, %{host: host}} -> host end))
else
data
end
Expand Down Expand Up @@ -662,6 +660,15 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

defp reset_lbp(%__MODULE__{lbp: {mod, _state}, lb_opts: lb_opts} = data, new_hosts) do
state =
Enum.reduce(new_hosts, mod.init(lb_opts), fn %Host{} = host, acc ->
mod.host_added(acc, host)
end)

%__MODULE__{data | lbp: {mod, state}}
end

# Returns {:error, reason} if the socket was closes or if there was any data
# coming from the socket. Otherwise, returns :ok.
defp assert_no_transport_message(socket) do
Expand Down
12 changes: 3 additions & 9 deletions lib/xandra/cluster/load_balancing_policy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,39 @@ defmodule Xandra.Cluster.LoadBalancingPolicy do
Can be any term and is passed around to all callbacks.
"""
@typedoc since: "0.15.0"
@type state() :: term()

@doc """
Called to initialize the load-balancing policy.
Hosts is the initial list of hosts. You can assume that all of them are *up*.
`options` is given by the user when configuring the cluster, and is specific to
the load-balancing policy.
"""
@doc since: "0.15.0"
@callback init(hosts :: [Host.t()]) :: state()
@callback init(options :: term()) :: state()

@doc """
Called when the Cassandra cluster marks `host` as "up".
"""
@doc since: "0.15.0"
@callback host_up(state(), host :: Host.t()) :: state()

@doc """
Called when the Cassandra cluster marks `host` as "down".
"""
@doc since: "0.15.0"
@callback host_down(state(), host :: Host.t()) :: state()

@doc """
Called when the Cassandra cluster reports a new host that joined.
"""
@doc since: "0.15.0"
@callback host_added(state(), host :: Host.t()) :: state()

@doc """
Called when the Cassandra cluster reports a host that left the cluster.
"""
@doc since: "0.15.0"
@callback host_removed(state(), host :: Host.t()) :: state()

@doc """
Called to return a "plan", which is an enumerable of hosts to query in order.
"""
@doc since: "0.15.0"
# TODO: remove the check once we depend on Elixir 1.14+. Enumerable.t/1 was
# introduced in 1.14.
if Version.match?(System.version(), ">= 1.14.0") do
Expand Down
26 changes: 10 additions & 16 deletions lib/xandra/cluster/load_balancing_policy/random.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,31 @@ defmodule Xandra.Cluster.LoadBalancingPolicy.Random do
@behaviour Xandra.Cluster.LoadBalancingPolicy

@impl true
def init(hosts) do
Enum.map(hosts, &{&1, :up})
def init([] = _options) do
[]
end

@impl true
def host_added(hosts, new_host) do
Enum.uniq_by([{new_host, :up}] ++ hosts, fn {host, _status} -> {host.address, host.port} end)
Enum.uniq_by([{new_host, :up}] ++ hosts, fn {host, _status} -> Host.to_peername(host) end)
end

@impl true
def host_removed(hosts, host) do
Enum.reject(hosts, fn {%Host{address: address, port: port}, _status} ->
address == host.address and port == host.port
end)
Enum.reject(hosts, fn {existing_host, _status} -> host_match?(existing_host, host) end)
end

@impl true
def host_up(hosts, new_host) do
Enum.map(hosts, fn {host, status} ->
if host.address == new_host.address and host.port == new_host.port do
{host, :up}
else
{host, status}
end
if host_match?(host, new_host), do: {host, :up}, else: {host, status}
end)
end

@impl true
def host_down(hosts, host_down) do
Enum.map(hosts, fn {host, status} ->
if host.address == host_down.address and host.port == host_down.port do
{host, :down}
else
{host, status}
end
if host_match?(host, host_down), do: {host, :down}, else: {host, status}
end)
end

Expand All @@ -55,4 +45,8 @@ defmodule Xandra.Cluster.LoadBalancingPolicy.Random do
up_hosts = for {host, :up} <- hosts, do: host
{Enum.shuffle(up_hosts), hosts}
end

defp host_match?(%Host{} = host1, %Host{} = host2) do
host1.address == host2.address and host1.port == host2.port
end
end
Loading

0 comments on commit 36a7735

Please sign in to comment.