Skip to content

Commit

Permalink
QQ: don't try to contact non-connected nodes for starts
Browse files Browse the repository at this point in the history
Some systems may incur a substantial latency penalty when attempting
reconnections to down nodes so to avoid this some stat related functions
that gather info from all QQ member nodes no only try those nodes
that are connected. This should help keeping things like the mgmt API
functions and ctl commands a bit more responsive.
  • Loading branch information
kjnilsson committed Oct 19, 2022
1 parent 1f6ead8 commit 4486843
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
19 changes: 13 additions & 6 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Expand Up @@ -971,9 +971,8 @@ cleanup_data_dir() ->
end
|| Q <- rabbit_amqqueue:list_by_type(?MODULE),
lists:member(node(), get_nodes(Q))],
NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(),
Registered = ra_directory:list_registered(?RA_SYSTEM),
Running = Names ++ NoQQClusters,
Running = Names,
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
not lists:member(Name, Running)],
ok.
Expand Down Expand Up @@ -1436,7 +1435,7 @@ i(memory, Q) when ?is_amqqueue(Q) ->
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
%% Check against the leader or last known leader
case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
case erpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{badrpc, _} -> down;
State -> State
end;
Expand All @@ -1457,7 +1456,7 @@ i(online, Q) -> online(Q);
i(leader, Q) -> leader(Q);
i(open_files, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
Nodes = get_nodes(Q),
Nodes = get_connected_nodes(Q),
{Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]),
lists:flatten(Data);
i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
Expand Down Expand Up @@ -1559,7 +1558,7 @@ peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported}.

online(Q) when ?is_amqqueue(Q) ->
Nodes = get_nodes(Q),
Nodes = get_connected_nodes(Q),
{Name, _} = amqqueue:get_pid(Q),
[Node || Node <- Nodes, is_process_alive(Name, Node)].

Expand All @@ -1568,7 +1567,11 @@ format(Q) when ?is_amqqueue(Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].

is_process_alive(Name, Node) ->
erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
%% don't attempt rpc if node is not already connected
%% as this function is used for metrics and stats and the additional
%% latency isn't warranted
lists:member(Node, [node() | nodes()]) andalso
erlang:is_pid(erpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).

-spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().

Expand Down Expand Up @@ -1626,6 +1629,10 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes.

get_connected_nodes(Q) when ?is_amqqueue(Q) ->
ErlangNodes = [node() | nodes()],
[N || N <- get_nodes(Q), lists:member(N, ErlangNodes)].

update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Expand Up @@ -1705,7 +1705,7 @@ add_member(Config) ->
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
rabbit_control_helper:command(start_app, Server1),
?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server1, 5000])),
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
Expand Down

0 comments on commit 4486843

Please sign in to comment.