Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add federation support for quorum queues #2804

Merged
merged 4 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1634,12 +1634,8 @@ basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) ->

-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.

notify_decorators(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]});
notify_decorators(_Q) ->
%% Not supported by any other queue type
ok.
notify_decorators(Q) ->
rabbit_queue_type:notify_decorators(Q).

notify_sent(QPid, ChPid) ->
rabbit_amqqueue_common:notify_sent(QPid, ChPid).
Expand Down
18 changes: 9 additions & 9 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
dequeue/4,
info/2,
state_info/1,
capabilities/0
capabilities/0,
notify_decorators/1
]).

-export([delete_crashed/1,
Expand Down Expand Up @@ -441,14 +442,9 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Q || {_, {new, Q}} <- Results].

capabilities() ->
#{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
<<"dead-letter-routing-key">>, <<"max-length">>,
<<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
<<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
<<"single-active-consumer">>, <<"delivery-limit">>,
<<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
<<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
<<"queue-master-locator">>],
#{policies => [ %% Stream policies
<<"max-age">>, <<"max-segment-size">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
Expand All @@ -460,6 +456,10 @@ capabilities() ->
],
server_named => true}.

notify_decorators(Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).

reject_seq_no(SeqNo, U0) ->
reject_seq_no(SeqNo, U0, []).

Expand Down
74 changes: 62 additions & 12 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
query_single_active_consumer/1,
query_in_memory_usage/1,
query_peek/2,
query_notify_decorators_info/1,
usage/1,

zero/1,
Expand Down Expand Up @@ -241,7 +242,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, ok, Effects} =
checkout(Meta, State0,
State0#?MODULE{service_queue = ServiceQueue,
consumers = Cons}, []),
consumers = Cons}, [], false),
Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
Expand Down Expand Up @@ -299,7 +300,8 @@ apply(#{index := Index,
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
0 ->
update_smallest_raft_index(Index, {dequeue, empty}, State0, []);
update_smallest_raft_index(Index, {dequeue, empty}, State0,
[notify_decorators_effect(State0)]);
dcorbacho marked this conversation as resolved.
Show resolved Hide resolved
_ when Exists ->
%% a dequeue using the same consumer_id isn't possible at this point
{State0, {dequeue, empty}};
Expand Down Expand Up @@ -330,8 +332,8 @@ apply(#{index := Index,
{{dequeue, {MsgId, Msg}, Ready-1}, Effects1}

end,

case evaluate_limit(Index, false, State0, State4, Effects2) of
NotifyEffect = notify_decorators_effect(State4),
case evaluate_limit(Index, false, State0, State4, [NotifyEffect | Effects2]) of
{State, true, Effects} ->
update_smallest_raft_index(Index, Reply, State, Effects);
{State, false, Effects} ->
Expand Down Expand Up @@ -456,6 +458,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
% Monitor the node so that we can "unsuspect" these processes when the node
% comes back, then re-issue all monitors and discover the final fate of
% these processes

Effects = case maps:size(State#?MODULE.consumers) of
0 ->
[{aux, inactive}, {monitor, node, Node}];
Expand Down Expand Up @@ -959,6 +962,21 @@ query_peek(Pos, State0) when Pos > 0 ->
query_peek(Pos-1, State)
end.

query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) ->
MaxActivePriority = maps:fold(fun(_, #consumer{credit = C,
status = up,
priority = P0}, MaxP) when C > 0 ->
P = -P0,
case MaxP of
empty -> P;
MaxP when MaxP > P -> MaxP;
_ -> P
end;
(_, _, MaxP) ->
MaxP
end, empty, Consumers),
IsEmpty = (messages_ready(State) == 0),
{MaxActivePriority, IsEmpty}.

-spec usage(atom()) -> float().
usage(Name) when is_atom(Name) ->
Expand Down Expand Up @@ -1062,11 +1080,13 @@ cancel_consumer0(Meta, ConsumerId,
#{ConsumerId := Consumer} ->
{S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
S0, Effects0, Reason),

%% The effects are emitted before the consumer is actually removed
%% if the consumer has unacked messages. This is a bit weird but
%% in line with what classic queues do (from an external point of
%% view)
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),

case maps:size(S#?MODULE.consumers) of
0 ->
{S, [{aux, inactive} | Effects]};
Expand Down Expand Up @@ -1129,7 +1149,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
State2 = append_to_master_index(RaftIdx, State1),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
{State, ok, Effects}
Expand Down Expand Up @@ -1287,7 +1307,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
_ ->
State1
end,
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).

% used to processes messages that are finished
Expand Down Expand Up @@ -1331,7 +1351,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
Discarded = maps:with(MsgIds, Checked0),
{State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
Effects0, State0),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).

dead_letter_effects(_Reason, _Discarded,
Expand Down Expand Up @@ -1363,9 +1383,10 @@ dead_letter_effects(Reason, Discarded,
end} | Effects].

cancel_consumer_effects(ConsumerId,
#?MODULE{cfg = #cfg{resource = QName}}, Effects) ->
#?MODULE{cfg = #cfg{resource = QName}} = State, Effects) ->
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
cancel_consumer_handler, [QName, ConsumerId]},
notify_decorators_effect(State) | Effects].

update_smallest_raft_index(Idx, State, Effects) ->
update_smallest_raft_index(Idx, ok, State, Effects).
Expand Down Expand Up @@ -1500,14 +1521,30 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
end, {State, Effects0}, Checked).

%% checkout new messages to consumers
checkout(#{index := Index} = Meta, OldState, State0, Effects0) ->
checkout(Meta, OldState, State, Effects) ->
checkout(Meta, OldState, State, Effects, true).
michaelklishin marked this conversation as resolved.
Show resolved Hide resolved

checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0,
Effects0, HandleConsumerChanges) ->
{State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
Effects0, {#{}, #{}}),
case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
case have_active_consumers_changed(State, HandleConsumerChanges) of
michaelklishin marked this conversation as resolved.
Show resolved Hide resolved
{true, {MaxActivePriority, IsEmpty}} ->
NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty),
update_smallest_raft_index(Index, State, [NotifyEffect | Effects]);
false ->
update_smallest_raft_index(Index, State, Effects)
end;
{State, false, Effects} ->
{State, ok, Effects}
case have_active_consumers_changed(State, HandleConsumerChanges) of
{true, {MaxActivePriority, IsEmpty}} ->
NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty),
{State, ok, [NotifyEffect | Effects]};
false ->
{State, ok, Effects}
end
end.

checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
Expand Down Expand Up @@ -2126,3 +2163,16 @@ get_priority_from_args(#{args := Args}) ->
end;
get_priority_from_args(_) ->
0.

have_active_consumers_changed(_, false) ->
dcorbacho marked this conversation as resolved.
Show resolved Hide resolved
false;
have_active_consumers_changed(State, _) ->
{true, query_notify_decorators_info(State)}.

notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) ->
{MaxActivePriority, IsEmpty} = query_notify_decorators_info(State),
notify_decorators_effect(QName, MaxActivePriority, IsEmpty).

notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
[QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}.
14 changes: 11 additions & 3 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
dequeue/5,
fold_state/3,
is_policy_applicable/2,
is_server_named_allowed/1
is_server_named_allowed/1,
notify_decorators/1
]).

-type queue_name() :: rabbit_types:r(queue).
Expand Down Expand Up @@ -198,6 +199,9 @@
-callback capabilities() ->
#{atom() := term()}.

-callback notify_decorators(amqqueue:amqqueue()) ->
ok.

%% TODO: this should be controlled by a registry that is populated on boot
discover(<<"quorum">>) ->
rabbit_quorum_queue;
Expand Down Expand Up @@ -298,15 +302,19 @@ i_down(_K, _Q, _DownReason) -> ''.
is_policy_applicable(Q, Policy) ->
Mod = amqqueue:get_type(Q),
Capabilities = Mod:capabilities(),
Applicable = maps:get(policies, Capabilities, []),
NotApplicable = maps:get(policies, Capabilities, []),
lists:all(fun({P, _}) ->
lists:member(P, Applicable)
not lists:member(P, NotApplicable)
end, Policy).

is_server_named_allowed(Type) ->
Capabilities = Type:capabilities(),
maps:get(server_named, Capabilities, false).

notify_decorators(Q) ->
Mod = amqqueue:get_type(Q),
Mod:notify_decorators(Q).

-spec init() -> state().
init() ->
#?STATE{}.
Expand Down
45 changes: 42 additions & 3 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
repair_amqqueue_nodes/2
]).
-export([reclaim_memory/2]).
-export([notify_decorators/1,
notify_decorators/3,
spawn_notify_decorators/3]).

-export([is_enabled/0,
declare/2]).
Expand Down Expand Up @@ -172,6 +175,7 @@ start_cluster(Q) ->
ra_machine_config(NewQ)),
%% force a policy change to ensure the latest config is
%% updated even when running the machine version from 0
notify_decorators(QName, startup),
rabbit_event:notify(queue_created,
[{name, QName},
{durable, Durable},
Expand Down Expand Up @@ -346,9 +350,14 @@ filter_quorum_critical(Queues, ReplicaStates) ->
end, Queues).

capabilities() ->
#{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>,
<<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
<<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>],
#{policies => [ %% Classic policies
<<"message-ttl">>, <<"max-priority">>, <<"queue-mode">>,
<<"single-active-consumer">>, <<"ha-mode">>, <<"ha-params">>,
<<"ha-sync-mode">>, <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
<<"queue-master-locator">>,
%% Stream policies
<<"max-age">>, <<"max-segment-size">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>,
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
Expand All @@ -369,6 +378,11 @@ spawn_deleter(QName) ->
delete(Q, false, false, <<"expired">>)
end).

spawn_notify_decorators(QName, Fun, Args) ->
spawn(fun () ->
notify_decorators(QName, Fun, Args)
end).

handle_tick(QName,
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
Nodes) ->
Expand Down Expand Up @@ -568,6 +582,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
after Timeout ->
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
Expand All @@ -589,6 +604,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
" Attempting force delete.",
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown),
delete_queue_data(QName, ActingUser),
{ok, ReadyMsgs}
end
Expand Down Expand Up @@ -1525,3 +1541,26 @@ parse_credit_args(Default, Args) ->
undefined ->
{simple_prefetch, Default, false}
end.

-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
notify_decorators(Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
case ra:local_query(QPid, fun rabbit_fifo:query_notify_decorators_info/1) of
{ok, {_, {MaxActivePriority, IsEmpty}}, _} ->
notify_decorators(QName, consumer_state_changed, [MaxActivePriority, IsEmpty]);
_ -> ok
end.

notify_decorators(QName, Event) -> notify_decorators(QName, Event, []).
dcorbacho marked this conversation as resolved.
Show resolved Hide resolved

notify_decorators(QName, F, A) ->
michaelklishin marked this conversation as resolved.
Show resolved Hide resolved
%% Look up again in case policy and hence decorators have changed
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
Ds = amqqueue:get_decorators(Q),
[ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)],
ok;
{error, not_found} ->
ok
end.
18 changes: 15 additions & 3 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
update/2,
state_info/1,
stat/1,
capabilities/0]).
capabilities/0,
notify_decorators/1]).

-export([set_retention_policy/3]).
-export([add_replica/3,
Expand Down Expand Up @@ -744,8 +745,15 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
rabbit_msg_record:to_iodata(R).

capabilities() ->
#{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>,
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
#{policies => [ %% Classic policies
<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
<<"dead-letter-routing-key">>, <<"max-length">>,
<<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
<<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
<<"single-active-consumer">>, <<"delivery-limit">>,
<<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
<<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
<<"queue-master-locator">>],
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
<<"x-max-length">>, <<"x-max-length-bytes">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
Expand All @@ -754,6 +762,10 @@ capabilities() ->
consumer_arguments => [<<"x-stream-offset">>],
server_named => false}.

notify_decorators(Q) when ?is_amqqueue(Q) ->
%% Not supported
ok.

resend_all(#stream_client{leader = LeaderPid,
writer_id = WriterId,
correlation = Corrs} = State) ->
Expand Down
Loading