diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 34ab7c70fb98..21d16c81188c 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -774,11 +774,9 @@ purge_node(Meta, Node, State, Effects) -> end, {State, Effects}, all_pids_for(Node, State)). %% any downs that re not noconnection -handle_down(Meta, Pid, #?MODULE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> - % Remove any enqueuer for the down pid - State1 = State0#?MODULE{enqueuers = maps:remove(Pid, Enqs0)}, - {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), +handle_down(Meta, Pid, #?MODULE{consumers = Cons0} = State0) -> + {Effects0, State1} = handle_enqueuer_down(Meta, Pid, State0), + {Effects1, State2} = handle_waiting_consumer_down(Pid, State1, Effects0), % return checked out messages to main queue % Find the consumers for the down pid DownConsumers = maps:keys( @@ -787,6 +785,24 @@ handle_down(Meta, Pid, #?MODULE{consumers = Cons0, cancel_consumer(Meta, ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers). +handle_enqueuer_down(Meta, Pid, #?MODULE{enqueuers = Enqs0} = State0) -> + case maps:take(Pid, Enqs0) of + {_Enqueuer, Enqs} -> + State = State0#?MODULE{enqueuers = Enqs}, + %% When there are no more enqueuers connected, suggest a checkpoint + %% so that recovery is fast. + case Enqs =:= #{} of + true -> + #{index := Idx, term := Term} = Meta, + Dehydrated = dehydrate_state(State), + {[{checkpoint, {Idx, Term}, Dehydrated}], State}; + false -> + {[], State} + end; + false -> + {[], State0} + end. + consumer_active_flag_update_function( #?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> @@ -800,22 +816,25 @@ consumer_active_flag_update_function( end. handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> - {[], State}; + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + Effects0) -> + {Effects0, State}; handle_waiting_consumer_down(_Pid, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = []} = State) -> - {[], State}; + waiting_consumers = []} = State, + Effects0) -> + {Effects0, State}; handle_waiting_consumer_down(Pid, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers0} = State0) -> + waiting_consumers = WaitingConsumers0} = State0, + Effects0) -> % get cancel effects for down waiting consumers Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, WaitingConsumers0), Effects = lists:foldl(fun ({ConsumerId, _}, Effects) -> cancel_consumer_effects(ConsumerId, State0, Effects) - end, [], Down), + end, Effects0, Down), % update state to have only up waiting consumers StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0),