Skip to content

Commit

Permalink
Merge pull request #6282 from rabbitmq/mergify/bp/v3.10.x/pr-6276
Browse files Browse the repository at this point in the history
QQ: don't try to contact non-connected nodes for stats (backport #6183) (backport #6276)
  • Loading branch information
michaelklishin committed Oct 28, 2022
2 parents bc7dc47 + 9753f34 commit 2824098
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 32 deletions.
95 changes: 64 additions & 31 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Expand Up @@ -203,8 +203,8 @@ start_cluster(Q) ->
?SNAPSHOT_INTERVAL),
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout, SnapshotInterval)
|| ServerId <- members(NewQ)],
Timeout = erpc_timeout(Leader, ?START_CLUSTER_RPC_TIMEOUT),
try erpc:call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs], Timeout) of
try erpc_call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs],
?START_CLUSTER_RPC_TIMEOUT) of
{ok, _, _} ->
%% ensure the latest config is evaluated properly
%% even when running the machine version from 0
Expand Down Expand Up @@ -285,17 +285,23 @@ single_active_consumer_on(Q) ->
_ -> false
end.

update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
[QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]).
update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired,
Prefetch, Active, ActivityStatus, Args) ->
catch local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
[QName, ChPid, ConsumerTag, Exclusive,
AckRequired, Prefetch, Active,
ActivityStatus, Args]).

update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
QName, Prefetch, Active, ActivityStatus, Args).
update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch,
Active, ActivityStatus, Args) ->
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive,
AckRequired,
QName, Prefetch, Active,
ActivityStatus, Args).

cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
[QName, ChPid, ConsumerTag]).
catch local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
[QName, ChPid, ConsumerTag]).

cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
Expand All @@ -309,7 +315,7 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
false ->
%% this could potentially block for a while if the node is
%% in disconnected state or tcp buffers are full
rpc:cast(Node, Module, Function, Args)
erpc:cast(Node, Module, Function, Args)
end.

become_leader(QName, Name) ->
Expand All @@ -329,8 +335,8 @@ become_leader(QName, Name) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = get_nodes(Q0),
[rpc:call(Node, ?MODULE, rpc_delete_metrics,
[QName], ?RPC_TIMEOUT)
[_ = erpc_call(Node, ?MODULE, rpc_delete_metrics,
[QName], ?RPC_TIMEOUT)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
Expand Down Expand Up @@ -676,8 +682,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
Expand Down Expand Up @@ -971,9 +977,8 @@ cleanup_data_dir() ->
end
|| Q <- rabbit_amqqueue:list_by_type(?MODULE),
lists:member(node(), get_nodes(Q))],
NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(),
Registered = ra_directory:list_registered(?RA_SYSTEM),
Running = Names ++ NoQQClusters,
Running = Names,
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
not lists:member(Name, Running)],
ok.
Expand Down Expand Up @@ -1436,9 +1441,11 @@ i(memory, Q) when ?is_amqqueue(Q) ->
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
%% Check against the leader or last known leader
case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{badrpc, _} -> down;
State -> State
case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{error, _} ->
down;
State ->
State
end;
i(local_state, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
Expand All @@ -1457,7 +1464,7 @@ i(online, Q) -> online(Q);
i(leader, Q) -> leader(Q);
i(open_files, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
Nodes = get_nodes(Q),
Nodes = get_connected_nodes(Q),
{Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]),
lists:flatten(Data);
i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
Expand Down Expand Up @@ -1559,7 +1566,7 @@ peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported}.

online(Q) when ?is_amqqueue(Q) ->
Nodes = get_nodes(Q),
Nodes = get_connected_nodes(Q),
{Name, _} = amqqueue:get_pid(Q),
[Node || Node <- Nodes, is_process_alive(Name, Node)].

Expand All @@ -1568,7 +1575,10 @@ format(Q) when ?is_amqqueue(Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].

is_process_alive(Name, Node) ->
erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
%% don't attempt rpc if node is not already connected
%% as this function is used for metrics and stats and the additional
%% latency isn't warranted
erlang:is_pid(erpc_call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).

-spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().

Expand Down Expand Up @@ -1626,6 +1636,10 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes.

get_connected_nodes(Q) when ?is_amqqueue(Q) ->
ErlangNodes = [node() | nodes()],
[N || N <- get_nodes(Q), lists:member(N, ErlangNodes)].

update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
Expand Down Expand Up @@ -1691,18 +1705,37 @@ prepare_content(Content) ->
%% rabbit_fifo can directly parse it without having to decode again.
Content.

erpc_timeout(Node, _)
when Node =:= node() ->
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
%% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
infinity;
erpc_timeout(_, Timeout) ->
Timeout.

ets_lookup_element(Tbl, Key, Pos, Default) ->
try ets:lookup_element(Tbl, Key, Pos) of
V -> V
catch
_:badarg ->
Default
end.

erpc_call(Node, M, F, A, Timeout)
when is_integer(Timeout) andalso Node == node() ->
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
%% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
try erpc:call(Node, M, F, A, Timeout) of
Result ->
Result
catch
error:Err ->
{error, Err}
end;
erpc_call(Node, M, F, A, Timeout) ->
case lists:member(Node, nodes()) of
true ->
try erpc:call(Node, M, F, A, Timeout) of
Result ->
Result
catch
error:Err ->
{error, Err}
end;
false ->
{error, noconnection}
end.


2 changes: 1 addition & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Expand Up @@ -1751,7 +1751,7 @@ add_member(Config) ->
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
rabbit_control_helper:command(start_app, Server1),
?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server1, 5000])),
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
Expand Down

0 comments on commit 2824098

Please sign in to comment.