Skip to content

Commit

Permalink
WIP Supervise rabbit_fifo_dlx_worker
Browse files Browse the repository at this point in the history
Start worker when Ra node becomes leader.
Stop worker when Ra node is not leader anymore.

Next step:
Currently the state_enter callbacks are called for rabbit_fifo_v0.
Only after the node becomes leader, "applying new machine version 2 current
0".
Check why that's the case.
  • Loading branch information
ansd committed Nov 5, 2021
1 parent 7431b46 commit 15c633a
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 26 deletions.
35 changes: 25 additions & 10 deletions deps/rabbit/src/rabbit_fifo.erl
Expand Up @@ -20,11 +20,13 @@
-include_lib("rabbit_common/include/rabbit.hrl").

-export([
%% ra_machine callbacks
init/1,
apply/3,
state_enter/2,
tick/2,
overview/1,

get_checked_out/4,
%% versioning
version/0,
Expand Down Expand Up @@ -210,15 +212,14 @@ apply(Meta,

end;
apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0,
cfg = #cfg{dead_letter_handling = DLH}} = State0) ->
#?MODULE{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := #consumer{checked_out = Checked} = Con0} ->
case DLH of
{at_least_once, _} ->
case at_least_once_dlx(State0) of
true ->
{State, ok, Effects} = discard(MsgIds, Meta, ConsumerId, Checked, [], State0),
checkout(Meta, State0, State, Effects, false);
_ ->
false ->
% Discarded maintains same order as MsgIds (so that publishing to
% dead-letter exchange will be in same order as messages got rejected)
Discarded = lists:filtermap(fun(Id) ->
Expand Down Expand Up @@ -783,7 +784,17 @@ update_waiting_consumer_status(Node,
Consumer#consumer.status =/= cancelled].

-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #?MODULE{consumers = Cons,
state_enter(RaState, #?MODULE{cfg = #cfg{name = QName},
dlx = DlxState} = State) ->
case at_least_once_dlx(State) of
true ->
rabbit_fifo_dlx:state_enter(RaState, QName, DlxState);
false ->
ok
end,
state_enter0(RaState, State).

state_enter0(leader, #?MODULE{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
cfg = #cfg{name = Name,
Expand All @@ -806,7 +817,7 @@ state_enter(leader, #?MODULE{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
state_enter(eol, #?MODULE{enqueuers = Enqs,
state_enter0(eol, #?MODULE{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
Expand All @@ -817,14 +828,13 @@ state_enter(eol, #?MODULE{enqueuers = Enqs,
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
[{aux, eol},
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
state_enter0(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
[FHReservation];
state_enter(_, _) ->
state_enter0(_, _) ->
%% catch all as not handling all states
[].


-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(Ts, #?MODULE{cfg = #cfg{name = Name,
resource = QName},
Expand Down Expand Up @@ -2401,3 +2411,8 @@ smallest_raft_index(#?MODULE{cfg = _Cfg,
{undefined, State}
end
end.

at_least_once_dlx(#?MODULE{cfg = #cfg{dead_letter_handling = {at_least_once, _}}}) ->
true;
at_least_once_dlx(_) ->
false.
25 changes: 23 additions & 2 deletions deps/rabbit/src/rabbit_fifo_dlx.erl
Expand Up @@ -7,7 +7,8 @@
make_settle/1]).

% called by rabbit_fifo delegating DLX handling to this module
-export([init/0, apply/3, discard/2, overview/1, checkout/1]).
-export([init/0, apply/3, discard/2, overview/1,
checkout/1, state_enter/3]).

%% This module handles the dead letter (DLX) part of the rabbit_fifo state machine.
%% This is a separate module to better unit test and provide separation of concerns.
Expand All @@ -25,7 +26,7 @@
%% We don't require a consumer tag because a consumer tag is a means to distinguish
%% multiple consumers in the same channel. The rabbit_fifo_dlx_worker channel like process however
%% creates only a single consumer to this quorum queue's discards queue.
pid :: pid(),
pid :: pid(), %%TODO replace with locally registered name
prefetch :: non_neg_integer(),
checked_out = #{} :: #{msg_id() => indexed_msg()},
next_msg_id = 0 :: msg_id() % part of snapshot data
Expand Down Expand Up @@ -186,3 +187,23 @@ delivery_effects(CPid, {InMemMsgs, IdxMsgs0}) ->
[{send_msg, CPid, {delivery, Msgs}, [local, ra_event]}]
end,
{local, node(CPid)}}].

state_enter(leader, QName, _State) ->
%% We must ensure that starting the rabbit_fifo_dlx_worker succeeds.
%% Therefore, we don't use an effect.
%% Also therefore, if starting the rabbit_fifo_dlx_worker fails, let the whole Ra server process crash
%% in which case another Ra node will become leader.
%% supervisor:start_child/2 blocks until rabbit_fifo_dlx_worker:init/1 returns (TODO check if this is correct).
%% That's okay since rabbit_fifo_dlx_worker:init/1 returns immediately by delegating
%% initial setup to handle_continue/2.
{ok, _Pid} = supervisor:start_child(rabbit_fifo_dlx_sup, [QName]);
state_enter(_, _QName, #state{consumer = #dlx_consumer{pid = Pid}}) ->
%% TODO Why don't we pass OldRaftState to the ra_machine:state_enter callback?
%% It's super easy do so, in which case we can call terminate_child only if
%% old state was 'leader' and new state is not 'leader'.
%% (Right now, it will error most of the time with not_found. We ignore the error).
%%
%% Note that we can't return a mod_call effect here because mod_call is executed on the leader only.
supervisor:terminate_child(rabbit_fifo_dlx_sup, Pid);
state_enter(_, _, _) ->
ok.
35 changes: 35 additions & 0 deletions deps/rabbit/src/rabbit_fifo_dlx_sup.erl
@@ -0,0 +1,35 @@
-module(rabbit_fifo_dlx_sup).

-behaviour(supervisor).

-rabbit_boot_step({?MODULE,
[{description, "supervisor of quorum queue dead-letter workers"},
{mfa, {rabbit_sup, start_supervisor_child, [?MODULE]}},
{requires, routing_ready}]}).

%% supervisor callback
-export([init/1]).
%% client API
-export([start_link/0]).

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
FeatureFlag = quorum_queue,
case rabbit_ff_registry:is_enabled(FeatureFlag) of
true ->
SupFlags = #{strategy => simple_one_for_one,
intensity => 0,
period => 1},
Worker = rabbit_fifo_dlx_worker,
ChildSpec = #{id => Worker,
start => {Worker, start_link, []},
type => worker,
modules => [Worker]},
{ok, {SupFlags, [ChildSpec]}};
false ->
rabbit_log:info("not starting supervisor ~s because feature flag ~s is disabled",
[?MODULE, FeatureFlag]),
ignore
end.
18 changes: 11 additions & 7 deletions deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Expand Up @@ -19,7 +19,9 @@

-export([start_link/1]).
%% gen_server2 callbacks
-export([init/1, terminate/2, handle_cast/2, handle_call/3, handle_info/2, code_change/3]).
-export([init/1, terminate/2, handle_continue/2,
handle_cast/2, handle_call/3, handle_info/2,
code_change/3]).

%%TODO make configurable or leave at 0 which means 2000 as in
%% https://github.com/rabbitmq/rabbitmq-server/blob/1e7df8c436174735b1d167673afd3f1642da5cdc/deps/rabbit/src/rabbit_quorum_queue.erl#L726-L729
Expand Down Expand Up @@ -75,13 +77,15 @@ start_link(QRef) ->

-spec init(rabbit_amqqueue:name()) -> {ok, state()}.
init(QRef) ->
{ok, #state{}, {continue, QRef}}.

handle_continue(QRef, State) ->
{ok, Q} = rabbit_amqqueue:lookup(QRef),
Node = node(),
{_ClusterName, Node} = Leader = amqqueue:get_pid(Q),
{ok, ConsumerState} = rabbit_fifo_dlx_client:checkout(QRef, Leader, ?CONSUMER_PREFETCH_COUNT),
{ok, #state{consumer_queue_ref = QRef,
dlx_client_state = ConsumerState,
queue_type_state = rabbit_queue_type:init()}}.
{ClusterName, _MaybeOldLeaderNode} = amqqueue:get_pid(Q),
{ok, ConsumerState} = rabbit_fifo_dlx_client:checkout(QRef, {ClusterName, node()}, ?CONSUMER_PREFETCH_COUNT),
{noreply, State#state{consumer_queue_ref = QRef,
dlx_client_state = ConsumerState,
queue_type_state = rabbit_queue_type:init()}}.

terminate(_Reason, _State) ->
%% cancel subscription?
Expand Down
8 changes: 1 addition & 7 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Expand Up @@ -310,13 +310,7 @@ become_leader(QName, Name) ->
|| Node <- Nodes, Node =/= node()];
_ ->
ok
end,
%%TODO
%% 1. Spawn leader companion process directly in Ra.
%% 2. Supervise rabbit_fifo_dlx_worker (see rabbit_channel_sup for inspiration).
%% 3. Spawn only if dead-letter-strategy at-least-once.
{ok, Pid} = rabbit_fifo_dlx_worker:start_link(QName),
rabbit_log:debug("Started rabbit_fifo_dlx_worker process ~p", [Pid])
end
end).

-spec all_replica_states() -> {node(), #{atom() => atom()}}.
Expand Down

0 comments on commit 15c633a

Please sign in to comment.