Skip to content

Commit

Permalink
Avoid RPC roundtrips while listing items
Browse files Browse the repository at this point in the history
- Emit info about particular items in parallel on every node, with
  results delivered directly to a `rabbitmqctl` instance.
- `rabbit_control_misc:wait_for_info_messages/5` can wait for results of
  more than one emitting map.
- Stop passing arround InfoItemKeys in
  `rabbit_control_misc:wait_for_info_messages/5`, the same information
  could be directly encoded in DisplayFun closure.
- Add `emit` to function names, to avoid confusion with regular ones
  which return result directly.

Part of rabbitmq/rabbitmq-server#683
  • Loading branch information
binarin authored and lemenkov committed Jul 15, 2016
1 parent 73e16c8 commit babe073
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 51 deletions.
48 changes: 28 additions & 20 deletions src/rabbit_amqqueue.erl
Expand Up @@ -25,10 +25,10 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
info_all/6]).
emit_info_all/5, list_local/1]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]).
Expand All @@ -41,7 +41,8 @@

%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
set_ram_duration_target/2, set_maximum_since_use/2,
emit_info_local/4, emit_info_down/4, emit_consumers_local/3]).

-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
Expand Down Expand Up @@ -117,10 +118,6 @@
-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
[rabbit_types:infos()].
-spec info_all
(rabbit_types:vhost(), rabbit_types:info_keys(), boolean(), boolean(),
reference(), pid()) ->
'ok'.
-spec force_event_refresh(reference()) -> 'ok'.
-spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'.
-spec consumers(rabbit_types:amqqueue()) ->
Expand All @@ -130,7 +127,6 @@
-spec consumers_all(rabbit_types:vhost()) ->
[{name(), pid(), rabbit_types:ctag(), boolean(),
non_neg_integer(), rabbit_framing:amqp_table()}].
-spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'.
-spec stat(rabbit_types:amqqueue()) ->
{'ok', non_neg_integer(), non_neg_integer()}.
-spec delete_immediately(qpids()) -> 'ok'.
Expand Down Expand Up @@ -627,16 +623,23 @@ info_all(VHostPath, Items) ->
map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).

info_all(VHostPath, Items, NeedOnline, NeedOffline, Ref, AggregatorPid) ->
NeedOnline andalso rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list(VHostPath),
continue),
NeedOffline andalso rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
list_down(VHostPath),
continue),
%% Previous maps are incomplete, finalize emission
rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []).
emit_info_local(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)).

emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids).

emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
list_down(VHostPath)).

list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid=QPid} = Q <- list(VHostPath),
State =/= crashed,
node() =:= node(QPid) ].

force_event_refresh(Ref) ->
[gen_server2:cast(Q#amqqueue.pid,
Expand All @@ -656,12 +659,17 @@ consumers_all(VHostPath) ->
map(list(VHostPath),
fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)).

consumers_all(VHostPath, Ref, AggregatorPid) ->
emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) ->
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids),
ok.

emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
ConsumerInfoKeys = consumer_info_keys(),
rabbit_control_misc:emitting_map(
AggregatorPid, Ref,
fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end,
list(VHostPath)).
list_local(VHostPath)).

get_queue_consumer_info(Q, ConsumerInfoKeys) ->
[lists:zip(ConsumerInfoKeys,
Expand Down
15 changes: 11 additions & 4 deletions src/rabbit_channel.erl
Expand Up @@ -56,15 +56,15 @@
-export([send_command/2, deliver/4, deliver_reply/2,
send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
info_all/3]).
emit_info_all/4]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/1]).

-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Internal
-export([list_local/0, deliver_reply_local/3]).
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
-export([get_vhost/1, get_user/1]).

-record(ch, {
Expand Down Expand Up @@ -326,9 +326,16 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).

info_all(Items, Ref, AggregatorPid) ->
emit_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids).

emit_info_local(Items, Ref, AggregatorPid) ->
emit_info(list_local(), Items, Ref, AggregatorPid).

emit_info(PidList, InfoItems, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()).
AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList).

refresh_config_local() ->
rabbit_misc:upmap(
Expand Down
125 changes: 107 additions & 18 deletions src/rabbit_control_misc.erl
Expand Up @@ -17,15 +17,23 @@
-module(rabbit_control_misc).

-export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4,
emitting_map_with_exit_handler/5, wait_for_info_messages/5,
emitting_map_with_exit_handler/5, wait_for_info_messages/6,
spawn_emitter_caller/7, await_emitters_termination/1,
print_cmd_result/2]).

-spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'.
-spec emitting_map(pid(), reference(), fun(), list(), atom()) -> 'ok'.
-spec emitting_map_with_exit_handler
(pid(), reference(), fun(), list()) -> 'ok'.
-spec emitting_map_with_exit_handler
(pid(), reference(), fun(), list(), atom()) -> 'ok'.
(pid(), reference(), fun(), list(), 'continue') -> 'ok'.

-type fold_fun() :: fun(term(), term()) -> term().

-spec wait_for_info_messages (pid(), reference(), fold_fun(), term(), timeout(), non_neg_integer()) -> {'ok', term()} | {'error', term()}.
-spec spawn_emitter_caller (node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'.
-spec await_emitters_termination ([pid()]) -> 'ok'.

-spec print_cmd_result(atom(), term()) -> 'ok'.

emitting_map(AggregatorPid, Ref, Fun, List) ->
Expand Down Expand Up @@ -65,27 +73,108 @@ step_with_exit_handler(AggregatorPid, Ref, Fun, Item) ->
ok
end.

wait_for_info_messages(Pid, Ref, ArgAtoms, DisplayFun, Timeout) ->
_ = notify_if_timeout(Pid, Ref, Timeout),
wait_for_info_messages(Ref, ArgAtoms, DisplayFun).
%% Invokes RPC for async info collection in separate (but linked to
%% the caller) process. Separate process waits for RPC to finish and
%% in case of errors sends them in wait_for_info_messages/5-compatible
%% form to aggregator process. Calling process is then expected to
%% do blocking call of wait_for_info_messages/5.
%%
%% Remote function MUST use calls to emitting_map/4 (and other
%% emitting_map's) to properly deliver requested information to an
%% aggregator process.
%%
%% If for performance reasons several parallel emitting_map's need to
%% be run, remote function MUST NOT return until all this
%% emitting_map's are done. And during all this time remote RPC
%% process MUST be linked to emitting
%% processes. await_emitters_termination/1 helper can be used as a
%% last statement of remote function to ensure this behaviour.
spawn_emitter_caller(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
spawn_monitor(
fun () ->
case rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) of
{error, _} = Error ->
Pid ! {Ref, error, Error};
{bad_argument, _} = Error ->
Pid ! {Ref, error, Error};
{badrpc, _} = Error ->
Pid ! {Ref, error, Error};
_ ->
ok
end
end),
ok.

rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
rabbit_misc:rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).

%% Agregator process expects correct numbers of explicits ACKs about
%% finished emission process. While everything is linked, we still
%% need somehow to wait for termination of all emitters before
%% returning from RPC call - otherwise links will be just broken with
%% reason 'normal' and we can miss some errors, and subsequentially
%% hang.
await_emitters_termination(Pids) ->
Monitors = [erlang:monitor(process, Pid) || Pid <- Pids],
collect_monitors(Monitors).

wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) when is_reference(Ref) ->
collect_monitors([]) ->
ok;
collect_monitors([Monitor|Rest]) ->
receive
{Ref, finished} ->
ok;
{Ref, {timeout, T}} ->
{'DOWN', Monitor, _Pid, normal} ->
collect_monitors(Rest);
{'DOWN', Monitor, _Pid, noproc} ->
%% There is a link and a monitor to a process. Matching
%% this clause means that process has gracefully
%% terminated even before we've started monitoring.
collect_monitors(Rest);
{'DOWN', _, Pid, Reason} ->
exit({emitter_exit, Pid, Reason})
end.

%% Wait for result of one or more calls to emitting_map-family
%% functions.
%%
%% Number of expected acknowledgments is specified by ChunkCount
%% argument. Most common usage will be with ChunkCount equals to
%% number of live nodes, but it's not mandatory - thus more generic
%% name of 'ChunkCount' was chosen.
wait_for_info_messages(Pid, Ref, Fun, Acc0, Timeout, ChunkCount) ->
notify_if_timeout(Pid, Ref, Timeout),
wait_for_info_messages(Ref, Fun, Acc0, ChunkCount).

wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) ->
receive
{Ref, finished} when ChunksLeft =:= 1 ->
{ok, Acc0};
{Ref, finished} ->
wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft - 1);
{Ref, {timeout, T}} ->
exit({error, {timeout, (T / 1000)}});
{Ref, []} ->
wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
{Ref, Result, continue} ->
DisplayFun(Result, InfoItemKeys),
wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
{error, Error} ->
Error;
_ ->
wait_for_info_messages(Ref, InfoItemKeys, DisplayFun)
{Ref, []} ->
wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
{Ref, Result, continue} ->
wait_for_info_messages(Ref, Fun, Fun(Result, Acc0), ChunksLeft);
{Ref, error, Error} ->
{error, simplify_emission_error(Error)};
{'DOWN', _MRef, process, _Pid, normal} ->
wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
{'DOWN', _MRef, process, _Pid, Reason} ->
{error, simplify_emission_error(Reason)};
_Msg ->
wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft)
end.

simplify_emission_error({badrpc, {'EXIT', {{nocatch, EmissionError}, _Stacktrace}}}) ->
EmissionError;
simplify_emission_error({{nocatch, EmissionError}, _Stacktrace}) ->
EmissionError;
simplify_emission_error(Anything) ->
{error, Anything}.

notify_if_timeout(_, _, infinity) ->
ok;
notify_if_timeout(Pid, Ref, Timeout) ->
timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}).

Expand Down
7 changes: 1 addition & 6 deletions src/rabbit_misc.erl
Expand Up @@ -75,7 +75,7 @@
-export([get_env/3]).
-export([get_channel_operation_timeout/0]).
-export([random/1]).
-export([rpc_call/4, rpc_call/5, rpc_call/7]).
-export([rpc_call/4, rpc_call/5]).
-export([report_default_thread_pool_size/0]).

%% Horrible macro to use in guards
Expand Down Expand Up @@ -262,8 +262,6 @@
-spec random(non_neg_integer()) -> non_neg_integer().
-spec rpc_call(node(), atom(), atom(), [any()]) -> any().
-spec rpc_call(node(), atom(), atom(), [any()], number()) -> any().
-spec rpc_call
(node(), atom(), atom(), [any()], reference(), pid(), number()) -> any().
-spec report_default_thread_pool_size() -> 'ok'.

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -1173,9 +1171,6 @@ rpc_call(Node, Mod, Fun, Args, Timeout) ->
rpc:call(Node, Mod, Fun, Args, Timeout)
end.

rpc_call(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).

guess_number_of_cpu_cores() ->
case erlang:system_info(logical_processors_available) of
unknown -> % Happens on Mac OS X.
Expand Down
12 changes: 9 additions & 3 deletions src/rabbit_networking.erl
Expand Up @@ -33,7 +33,8 @@
node_listeners/1, register_connection/1, unregister_connection/1,
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1, connection_info_all/3,
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
close_connection/2, force_connection_event_refresh/1, tcp_host/1]).

%% Used by TCP-based transports, e.g. STOMP adapter
Expand Down Expand Up @@ -365,10 +366,15 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).

connection_info_all(Items, Ref, AggregatorPid) ->
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [ spawn_link(Node, rabbit_networking, emit_connection_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids),
ok.

emit_connection_info_local(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
connections()).
connections_local()).

close_connection(Pid, Explanation) ->
rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),
Expand Down

0 comments on commit babe073

Please sign in to comment.