Skip to content

Commit

Permalink
rabbit_fifo: Suggest a fifo when all enqueuers disconnect
Browse files Browse the repository at this point in the history
Checkpoints are normally only taken after enqueuing many messages. We
can suggest a checkpoint when all enqueuers disconnect so that the
queue is likely to recover in minimal time from the current index.
  • Loading branch information
the-mikedavis committed Feb 8, 2024
1 parent 300ff68 commit 4e2b1a7
Showing 1 changed file with 30 additions and 11 deletions.
41 changes: 30 additions & 11 deletions deps/rabbit/src/rabbit_fifo.erl
Expand Up @@ -774,11 +774,9 @@ purge_node(Meta, Node, State, Effects) ->
end, {State, Effects}, all_pids_for(Node, State)).

%% any downs that re not noconnection
handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the down pid
State1 = State0#?MODULE{enqueuers = maps:remove(Pid, Enqs0)},
{Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
handle_down(Meta, Pid, #?MODULE{consumers = Cons0} = State0) ->
{Effects0, State1} = handle_enqueuer_down(Meta, Pid, State0),
{Effects1, State2} = handle_waiting_consumer_down(Pid, State1, Effects0),
% return checked out messages to main queue
% Find the consumers for the down pid
DownConsumers = maps:keys(
Expand All @@ -787,6 +785,24 @@ handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
cancel_consumer(Meta, ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers).

handle_enqueuer_down(Meta, Pid, #?MODULE{enqueuers = Enqs0} = State0) ->
case maps:take(Pid, Enqs0) of
{_Enqueuer, Enqs} ->
State = State0#?MODULE{enqueuers = Enqs},
%% When there are no more enqueuers connected, suggest a checkpoint
%% so that recovery is fast.
case Enqs =:= #{} of
true ->
#{index := Idx, term := Term} = Meta,
Dehydrated = dehydrate_state(State),
{[{checkpoint, {Idx, Term}, Dehydrated}], State};
false ->
{[], State}
end;
false ->
{[], State0}
end.

consumer_active_flag_update_function(
#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
Expand All @@ -800,22 +816,25 @@ consumer_active_flag_update_function(
end.

handle_waiting_consumer_down(_Pid,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) ->
{[], State};
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
Effects0) ->
{Effects0, State};
handle_waiting_consumer_down(_Pid,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State) ->
{[], State};
waiting_consumers = []} = State,
Effects0) ->
{Effects0, State};
handle_waiting_consumer_down(Pid,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
waiting_consumers = WaitingConsumers0} = State0,
Effects0) ->
% get cancel effects for down waiting consumers
Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
WaitingConsumers0),
Effects = lists:foldl(fun ({ConsumerId, _}, Effects) ->
cancel_consumer_effects(ConsumerId, State0,
Effects)
end, [], Down),
end, Effects0, Down),
% update state to have only up waiting consumers
StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
WaitingConsumers0),
Expand Down

0 comments on commit 4e2b1a7

Please sign in to comment.