Skip to content

Commit

Permalink
Send discovered_nodes in bulk from control connection (#299)
Browse files Browse the repository at this point in the history
Co-authored-by: Harun Zengin <h.zengin@sonnen.de>
  • Loading branch information
harunzengin and harunzengin committed Apr 15, 2023
1 parent 8fe68c1 commit 4dbbd84
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
12 changes: 12 additions & 0 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,18 @@ defmodule Xandra.Cluster do
{:noreply, state}
end

def handle_info({:discovered_hosts, hosts}, %__MODULE__{} = state) when is_list(hosts) do
Logger.debug("Discovered hosts: #{Enum.map_join(hosts, ", ", &Host.format_address/1)}")

state =
Enum.reduce(hosts, state, fn %Host{} = host, acc ->
update_in(acc.load_balancing_state, &state.load_balancing_module.host_added(&1, host))
end)

state = maybe_start_pools(state)
{:noreply, state}
end

## Helpers

# This function is idempotent: you can call it as many times as you want with the same
Expand Down
20 changes: 14 additions & 6 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,13 @@ defmodule Xandra.Cluster.ControlConnection do
})
end)

final_peers =
Enum.reduce(new_peers, %{}, fn %Host{} = host, acc ->
{existing_hosts, discovered_hosts} =
Enum.reduce(new_peers, {[], []}, fn %Host{} = host, {existing_acc, discovered_acc} ->
peername = Host.to_peername(host)

case Map.fetch(old_peers, peername) do
{:ok, %{status: :up}} ->
Map.put(acc, peername, %{host: host, status: :up})
{existing_acc ++ [host], discovered_acc}

{:ok, %{status: :down}} ->
execute_telemetry(data, [:change_event], %{}, %{
Expand All @@ -468,7 +468,7 @@ defmodule Xandra.Cluster.ControlConnection do
})

send(data.cluster, {:host_up, host})
Map.put(acc, peername, %{host: host, status: :up})
{existing_acc ++ [host], discovered_acc}

:error ->
execute_telemetry(data, [:change_event], %{}, %{
Expand All @@ -478,11 +478,19 @@ defmodule Xandra.Cluster.ControlConnection do
source: :xandra
})

send(data.cluster, {:host_added, host})
Map.put(acc, peername, %{host: host, status: :up})
{existing_acc, discovered_acc ++ [host]}
end
end)

if discovered_hosts != [] do
send(data.cluster, {:discovered_hosts, discovered_hosts})
end

final_peers =
Enum.reduce(existing_hosts ++ discovered_hosts, %{}, fn host, acc ->
Map.put(acc, Host.to_peername(host), %{host: host, status: :up})
end)

data =
if final_peers != old_peers do
reset_lbp(data, Enum.map(final_peers, fn {_peername, %{host: host}} -> host end))
Expand Down
28 changes: 15 additions & 13 deletions test/xandra/cluster/control_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
test "reporting data upon successful connection",
%{mirror_ref: mirror_ref, start_options: start_options} do
start_control_connection!(start_options)
assert_receive {^mirror_ref, {:host_added, local_peer}}
assert_receive {^mirror_ref, {:discovered_hosts, [local_peer]}}
assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer
end

Expand All @@ -49,7 +49,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
log =
capture_log(fn ->
start_control_connection!(start_options, contact_points: ["bad-domain", "127.0.0.1"])
assert_receive {^mirror_ref, {:host_added, local_peer}}
assert_receive {^mirror_ref, {:discovered_hosts, [local_peer]}}
assert %Host{address: {127, 0, 0, 1}} = local_peer
end)

Expand Down Expand Up @@ -94,7 +94,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do

ctrl_conn = start_control_connection!(start_options)

assert_receive {^mirror_ref, {:host_added, _peer}}
assert_receive {^mirror_ref, {:discovered_hosts, _peers}}

assert %{cluster_name: nil, cluster_pid: ^mirror, host: %Host{address: {127, 0, 0, 1}}} =
assert_telemetry.(:connected)
Expand Down Expand Up @@ -140,7 +140,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do

ctrl_conn = start_control_connection!(start_options)

assert_receive {^mirror_ref, {:host_added, _peer}}
assert_receive {^mirror_ref, {:discovered_hosts, _peers}}
assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, _, _}

# No-op: sending a UP event for a node that is already up.
Expand Down Expand Up @@ -238,7 +238,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do

ctrl_conn = start_control_connection!(start_options)

assert_receive {^mirror_ref, {:host_added, _peer}}
assert_receive {^mirror_ref, {:discovered_hosts, _peers}}
assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, _, _}

:gen_statem.cast(
Expand All @@ -262,7 +262,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
%{mirror_ref: mirror_ref, start_options: start_options} do
ctrl_conn = start_control_connection!(start_options)

assert_receive {^mirror_ref, {:host_added, _peer}}
assert_receive {^mirror_ref, {:discovered_hosts, _peers}}
assert {{:connected, _connected_node}, _data} = :sys.get_state(ctrl_conn)

log =
Expand All @@ -284,7 +284,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
:telemetry_test.attach_event_handlers(self(), [[:xandra, :cluster, :change_event]])

ctrl_conn = start_control_connection!(start_options)
assert_receive {^mirror_ref, {:host_added, %Host{address: {127, 0, 0, 1}}}}
assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {127, 0, 0, 1}}]}}
assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, _, _}

new_peers = [
Expand All @@ -295,8 +295,10 @@ defmodule Xandra.Cluster.ControlConnectionTest do
:gen_statem.cast(ctrl_conn, {:refresh_topology, new_peers})

assert_receive {^mirror_ref, {:host_removed, %Host{address: {127, 0, 0, 1}}}}
assert_receive {^mirror_ref, {:host_added, %Host{address: {192, 168, 1, 1}}}}
assert_receive {^mirror_ref, {:host_added, %Host{address: {192, 168, 1, 2}}}}

assert_receive {^mirror_ref,
{:discovered_hosts,
[%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}}

assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, %{},
%{
Expand Down Expand Up @@ -330,7 +332,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
:gen_statem.cast(ctrl_conn, {:refresh_topology, new_peers})

assert_receive {^mirror_ref, {:host_removed, %Host{address: {192, 168, 1, 1}}}}
assert_receive {^mirror_ref, {:host_added, %Host{address: {192, 168, 1, 3}}}}
assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {192, 168, 1, 3}}]}}

assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, %{},
%{
Expand Down Expand Up @@ -392,7 +394,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
assert_receive {:ready, ^task2_pid}

ctrl_conn = start_control_connection!(start_options)
assert_receive {^mirror_ref, {:host_added, %Host{address: {127, 0, 0, 1}}}}
assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {127, 0, 0, 1}}]}}

send(task1_pid, {:disconnect, ctrl_conn})
refute_receive {^mirror_ref, {:host_down, %Host{address: {127, 0, 0, 1}}}}, 100
Expand All @@ -416,7 +418,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do

ctrl_conn = start_control_connection!(start_options)

assert_receive {^mirror_ref, {:host_added, %Host{address: {127, 0, 0, 1}}}}
assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {127, 0, 0, 1}}]}}
assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, _, _}

parent = self()
Expand All @@ -438,7 +440,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do

:gen_statem.cast(ctrl_conn, {:refresh_topology, new_peers})

assert_receive {^mirror_ref, {:host_added, %Host{address: {192, 168, 1, 1}}}}
assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {192, 168, 1, 1}}]}}

assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, %{},
%{
Expand Down

0 comments on commit 4dbbd84

Please sign in to comment.