Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 53 additions & 19 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@
-export([update_header/4]).
-endif.

-define(SETTLE_V2, '$s').
-define(RETURN_V2, '$r').
-define(DISCARD_V2, '$d').
-define(CREDIT_V2, '$c').

%% command records representing all the protocol actions that are supported
-record(enqueue, {pid :: option(pid()),
seq :: option(msg_seqno()),
Expand Down Expand Up @@ -362,9 +357,9 @@ apply(#{index := Index,
%% a dequeue using the same consumer_id isn't possible at this point
{State0, {dequeue, empty}};
_ ->
State1 = update_consumer(Meta, ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch}, 0,
State0),
{_, State1} = update_consumer(Meta, ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch}, 0,
State0),
case checkout_one(Meta, false, State1, []) of
{success, _, MsgId, ?MSG(RaftIdx, Header), ExpiredMsg, State2, Effects0} ->
{State4, Effects1} = case Settlement of
Expand Down Expand Up @@ -405,9 +400,19 @@ apply(#{index := Idx} = Meta,
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId}, State0) ->
Priority = get_priority_from_args(ConsumerMeta),
State1 = update_consumer(Meta, ConsumerId, ConsumerMeta, Spec, Priority, State0),
{Consumer, State1} = update_consumer(Meta, ConsumerId, ConsumerMeta,
Spec, Priority, State0),
{State2, Effs} = activate_next_consumer(State1, []),
checkout(Meta, State0, State2, [{monitor, process, Pid} | Effs]);
#consumer{checked_out = Checked,
credit = Credit,
delivery_count = DeliveryCount,
next_msg_id = NextMsgId} = Consumer,
%% reply with a consumer summary
Reply = {ok, #{next_msg_id => NextMsgId,
credit => Credit,
delivery_count => DeliveryCount,
num_checked_out => map_size(Checked)}},
checkout(Meta, State0, State2, [{monitor, process, Pid} | Effs], Reply);
apply(#{index := Index}, #purge{},
#?MODULE{messages_total = Total,
returns = Returns,
Expand Down Expand Up @@ -1024,6 +1029,28 @@ handle_aux(leader, cast, {#return{msg_ids = MsgIds,
_ ->
{no_reply, Aux0, Log0}
end;
handle_aux(_, _, {get_checked_out, ConsumerId, MsgIds},
Aux0, Log0, #?MODULE{cfg = #cfg{},
consumers = Consumers}) ->
case Consumers of
#{ConsumerId := #consumer{checked_out = Checked}} ->
{Log, IdMsgs} =
maps:fold(
fun (MsgId, ?MSG(Idx, Header), {L0, Acc}) ->
%% it is possible this is not found if the consumer
%% crashed and the message got removed
case ra_log:fetch(Idx, L0) of
{{_, _, {_, _, Cmd, _}}, L} ->
Msg = get_msg(Cmd),
{L, [{MsgId, {Header, Msg}} | Acc]};
{undefined, L} ->
{L, Acc}
end
end, {Log0, []}, maps:with(MsgIds, Checked)),
{reply, {ok, IdMsgs}, Aux0, Log};
_ ->
{reply, {error, consumer_not_found}, Aux0, Log0}
end;
handle_aux(leader, cast, {#return{} = Ret, Corr, Pid},
Aux0, Log, #?MODULE{}) ->
%% for returns with a delivery limit set we can just return as before
Expand Down Expand Up @@ -1827,9 +1854,12 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
return_one(Meta, MsgId, Msg, S, E, ConsumerId)
end, {State, Effects0}, lists:sort(maps:to_list(Checked))).

checkout(Meta, OldState, State0, Effects0) ->
checkout(Meta, OldState, State0, Effects0, ok).

checkout(#{index := Index} = Meta,
#?MODULE{cfg = #cfg{resource = _QName}} = OldState,
State0, Effects0) ->
State0, Effects0, Reply) ->
{#?MODULE{cfg = #cfg{dead_letter_handler = DLH},
dlx = DlxState0} = State1, ExpiredMsg, Effects1} =
checkout0(Meta, checkout_one(Meta, false, State0, Effects0), #{}),
Expand All @@ -1840,9 +1870,9 @@ checkout(#{index := Index} = Meta,
Effects2 = DlxDeliveryEffects ++ Effects1,
case evaluate_limit(Index, false, OldState, State2, Effects2) of
{State, false, Effects} when ExpiredMsg == false ->
{State, ok, Effects};
{State, Reply, Effects};
{State, _, Effects} ->
update_smallest_raft_index(Index, State, Effects)
update_smallest_raft_index(Index, Reply, State, Effects)
end.

checkout0(Meta, {success, ConsumerId, MsgId,
Expand Down Expand Up @@ -2137,7 +2167,7 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
credit_mode = Mode},
credit = Credit}
end,
update_or_remove_sub(Meta, ConsumerId, Consumer, State0);
{Consumer, update_or_remove_sub(Meta, ConsumerId, Consumer, State0)};
update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
{Life, Credit, Mode0} = Spec, Priority,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
Expand All @@ -2149,15 +2179,17 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
%% one, then merge
case active_consumer(Cons0) of
{ConsumerId, #consumer{status = up} = Consumer0} ->
Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, Spec, Priority),
update_or_remove_sub(Meta, ConsumerId, Consumer, State0);
Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta,
Spec, Priority),
{Consumer, update_or_remove_sub(Meta, ConsumerId, Consumer, State0)};
undefined when is_map_key(ConsumerId, Cons0) ->
%% there is no active consumer and the current consumer is in the
%% consumers map and thus must be cancelled, in this case we can just
%% merge and effectively make this the current active one
Consumer0 = maps:get(ConsumerId, Cons0),
Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, Spec, Priority),
update_or_remove_sub(Meta, ConsumerId, Consumer, State0);
Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta,
Spec, Priority),
{Consumer, update_or_remove_sub(Meta, ConsumerId, Consumer, State0)};
_ ->
%% add as a new waiting consumer
Mode = credit_mode(Meta, Credit, Mode0),
Expand All @@ -2169,7 +2201,9 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta,
credit_mode = Mode},
credit = Credit},

State0#?MODULE{waiting_consumers = Waiting ++ [{ConsumerId, Consumer}]}
{Consumer,
State0#?MODULE{waiting_consumers =
Waiting ++ [{ConsumerId, Consumer}]}}
end.

merge_consumer(Meta, #consumer{cfg = CCfg, checked_out = Checked} = Consumer,
Expand Down
77 changes: 55 additions & 22 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,25 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
%% ???
Ack = maps:get(ack, Meta, true),

SDels = maps:update_with(ConsumerTag,
fun (V) ->
V#consumer{ack = Ack}
end,
#consumer{last_msg_id = -1,
ack = Ack}, CDels0),
try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}).

case try_process_command(Servers, Cmd, State0) of
{ok, Reply, Leader} ->
LastMsgId = case Reply of
ok ->
%% this is the pre 3.11.1 / 3.10.9
%% reply format
-1;
{ok, #{next_msg_id := NextMsgId}} ->
NextMsgId - 1
end,
SDels = maps:update_with(
ConsumerTag, fun (C) -> C#consumer{ack = Ack} end,
#consumer{last_msg_id = LastMsgId,
ack = Ack}, CDels0),
{ok, State0#state{leader = Leader,
consumer_deliveries = SDels}};
Err ->
Err
end.

-spec query_single_active_consumer(state()) ->
{ok, term()} | {error, term()} | {timeout, term()}.
Expand Down Expand Up @@ -448,7 +459,12 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
ConsumerId = {ConsumerTag, self()},
Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}),
State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)},
try_process_command(Servers, Cmd, State).
case try_process_command(Servers, Cmd, State) of
{ok, _, Leader} ->
{ok, State#state{leader = Leader}};
Err ->
Err
end.

%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
%% of messages purged.
Expand Down Expand Up @@ -653,8 +669,8 @@ untracked_enqueue([Node | _], Msg) ->
try_process_command([Server | Rem], Cmd,
#state{cfg = #cfg{timeout = Timeout}} = State) ->
case ra:process_command(Server, Cmd, Timeout) of
{ok, _, Leader} ->
{ok, State#state{leader = Leader}};
{ok, _, _} = Res ->
Res;
Err when length(Rem) =:= 0 ->
Err;
_ ->
Expand Down Expand Up @@ -748,7 +764,7 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
%% When the node is disconnected the leader will return all checked
%% out messages to the main queue to ensure they don't get stuck in
%% case the node never comes back.
case get_missing_deliveries(Leader, Prev+1, FstId-1, Tag) of
case get_missing_deliveries(State0, Prev+1, FstId-1, Tag) of
{protocol_error, _, _, _} = Err ->
Err;
Missing ->
Expand Down Expand Up @@ -808,22 +824,22 @@ update_consumer(Tag, LastId, DelCntIncr,
Consumers).


get_missing_deliveries(Leader, From, To, ConsumerTag) ->
get_missing_deliveries(State, From, To, ConsumerTag) ->
%% find local server
ConsumerId = consumer_id(ConsumerTag),
% ?INFO("get_missing_deliveries for ~w from ~b to ~b",
% [ConsumerId, From, To]),
Query = fun (State) ->
rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
end,
case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of
{ok, {_, Missing}, _} ->
rabbit_log:debug("get_missing_deliveries for ~w from ~b to ~b",
[ConsumerId, From, To]),
Cmd = {get_checked_out, ConsumerId, lists:seq(From, To)},
ServerId = find_local_or_leader(State),
case ra:aux_command(ServerId, Cmd) of
{ok, Missing} ->
Missing;
{error, Error} ->
{protocol_error, internal_error, "Cannot query missing deliveries from ~p: ~p",
[Leader, Error]};
[ServerId, Error]};
{timeout, _} ->
{protocol_error, internal_error, "Cannot query missing deliveries from ~p: timeout",
[Leader]}
[ServerId]}
end.

pick_server(#state{leader = undefined,
Expand Down Expand Up @@ -906,6 +922,23 @@ cancel_timer(#state{timer_state = Ref} = State) ->
erlang:cancel_timer(Ref, [{async, true}, {info, false}]),
State#state{timer_state = undefined}.

find_local_or_leader(#state{leader = Leader,
cfg = #cfg{servers = Servers}}) ->
case find_local(Servers) of
undefined ->
Leader;
ServerId ->
ServerId
end.

find_local([{_, N} = ServerId | _]) when N == node() ->
ServerId;
find_local([_ | Rem]) ->
find_local(Rem);
find_local([]) ->
undefined.


find_leader([]) ->
undefined;
find_leader([Server | Servers]) ->
Expand Down
50 changes: 49 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ all_tests() ->
per_message_ttl_mixed_expiry,
per_message_ttl_expiration_too_high,
consumer_priorities,
cancel_consumer_gh_3729
cancel_consumer_gh_3729,
cancel_and_consume_with_same_tag
].

memory_tests() ->
Expand Down Expand Up @@ -2820,6 +2821,53 @@ cancel_consumer_gh_3729(Config) ->

ok = rabbit_ct_client_helpers:close_channel(Ch).

cancel_and_consume_with_same_tag(Config) ->
%% https://github.com/rabbitmq/rabbitmq-server/issues/5927
QQ = ?config(queue_name, Config),

Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),

ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0},
DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),

ok = publish(Ch, QQ),

ok = subscribe(Ch, QQ, false),

DeliveryTag = receive
{#'basic.deliver'{delivery_tag = D}, _} ->
D
after 5000 ->
flush(100),
ct:fail("basic.deliver timeout")
end,

ok = cancel(Ch),

ok = subscribe(Ch, QQ, false),

ok = publish(Ch, QQ),

receive
{#'basic.deliver'{delivery_tag = _}, _} ->
ok
after 5000 ->
flush(100),
ct:fail("basic.deliver timeout 2")
end,


amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}),

ok = cancel(Ch),



ok.

leader_locator_client_local(Config) ->
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Q = ?config(queue_name, Config),
Expand Down
36 changes: 35 additions & 1 deletion deps/rabbit/test/rabbit_fifo_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ all_tests() ->
dequeue,
discard,
cancel_checkout,
lost_delivery,
credit,
untracked_enqueue,
flow,
Expand Down Expand Up @@ -411,6 +412,38 @@ cancel_checkout(Config) ->
{ok, _, {_, _, _, _, m1}, F5} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
ok.

lost_delivery(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
{_, _, F2} = process_ra_events(
receive_ra_events(1, 0), F1, [], [], fun (_, S) -> S end),
{ok, F3} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F2),
%% drop a delivery, simulating e.g. a full distribution buffer
receive
{ra_event, _, Evt} ->
ct:pal("dropping event ~p", [Evt]),
ok
after 500 ->
exit(await_ra_event_timeout)
end,
% send another message
{ok, F4} = rabbit_fifo_client:enqueue(m2, F3),
%% this hsould trigger the fifo client to fetch any missing messages
%% from the server
{_, _, _F5} = process_ra_events(
receive_ra_events(1, 1), F4, [], [],
fun ({deliver, _, _, Dels}, S) ->
[{_, _, _, _, M1},
{_, _, _, _, M2}] = Dels,
?assertEqual(m1, M1),
?assertEqual(m2, M2),
S
end),
ok.

credit(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
Expand Down Expand Up @@ -593,7 +626,8 @@ process_ra_events(Events, State) ->

process_ra_events([], State0, Acc, Actions0, _DeliveryFun) ->
{Acc, Actions0, State0};
process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, Actions0, DeliveryFun) ->
process_ra_events([{ra_event, From, Evt} | Events], State0, Acc,
Actions0, DeliveryFun) ->
case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
{ok, State1, Actions1} ->
{Msgs, Actions, State} =
Expand Down