diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 702dd8321218..0c79b478b7c6 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -75,11 +75,6 @@ -export([update_header/4]). -endif. --define(SETTLE_V2, '$s'). --define(RETURN_V2, '$r'). --define(DISCARD_V2, '$d'). --define(CREDIT_V2, '$c'). - %% command records representing all the protocol actions that are supported -record(enqueue, {pid :: option(pid()), seq :: option(msg_seqno()), @@ -362,9 +357,9 @@ apply(#{index := Index, %% a dequeue using the same consumer_id isn't possible at this point {State0, {dequeue, empty}}; _ -> - State1 = update_consumer(Meta, ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, 0, - State0), + {_, State1} = update_consumer(Meta, ConsumerId, ConsumerMeta, + {once, 1, simple_prefetch}, 0, + State0), case checkout_one(Meta, false, State1, []) of {success, _, MsgId, ?MSG(RaftIdx, Header), ExpiredMsg, State2, Effects0} -> {State4, Effects1} = case Settlement of @@ -405,9 +400,19 @@ apply(#{index := Idx} = Meta, apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, State0) -> Priority = get_priority_from_args(ConsumerMeta), - State1 = update_consumer(Meta, ConsumerId, ConsumerMeta, Spec, Priority, State0), + {Consumer, State1} = update_consumer(Meta, ConsumerId, ConsumerMeta, + Spec, Priority, State0), {State2, Effs} = activate_next_consumer(State1, []), - checkout(Meta, State0, State2, [{monitor, process, Pid} | Effs]); + #consumer{checked_out = Checked, + credit = Credit, + delivery_count = DeliveryCount, + next_msg_id = NextMsgId} = Consumer, + %% reply with a consumer summary + Reply = {ok, #{next_msg_id => NextMsgId, + credit => Credit, + delivery_count => DeliveryCount, + num_checked_out => map_size(Checked)}}, + checkout(Meta, State0, State2, [{monitor, process, Pid} | Effs], Reply); apply(#{index := Index}, #purge{}, #?MODULE{messages_total = Total, returns = Returns, @@ -1024,6 +1029,28 @@ handle_aux(leader, cast, {#return{msg_ids = MsgIds, _ -> {no_reply, Aux0, Log0} end; +handle_aux(_, _, {get_checked_out, ConsumerId, MsgIds}, + Aux0, Log0, #?MODULE{cfg = #cfg{}, + consumers = Consumers}) -> + case Consumers of + #{ConsumerId := #consumer{checked_out = Checked}} -> + {Log, IdMsgs} = + maps:fold( + fun (MsgId, ?MSG(Idx, Header), {L0, Acc}) -> + %% it is possible this is not found if the consumer + %% crashed and the message got removed + case ra_log:fetch(Idx, L0) of + {{_, _, {_, _, Cmd, _}}, L} -> + Msg = get_msg(Cmd), + {L, [{MsgId, {Header, Msg}} | Acc]}; + {undefined, L} -> + {L, Acc} + end + end, {Log0, []}, maps:with(MsgIds, Checked)), + {reply, {ok, IdMsgs}, Aux0, Log}; + _ -> + {reply, {error, consumer_not_found}, Aux0, Log0} + end; handle_aux(leader, cast, {#return{} = Ret, Corr, Pid}, Aux0, Log, #?MODULE{}) -> %% for returns with a delivery limit set we can just return as before @@ -1827,9 +1854,12 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, return_one(Meta, MsgId, Msg, S, E, ConsumerId) end, {State, Effects0}, lists:sort(maps:to_list(Checked))). +checkout(Meta, OldState, State0, Effects0) -> + checkout(Meta, OldState, State0, Effects0, ok). + checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = _QName}} = OldState, - State0, Effects0) -> + State0, Effects0, Reply) -> {#?MODULE{cfg = #cfg{dead_letter_handler = DLH}, dlx = DlxState0} = State1, ExpiredMsg, Effects1} = checkout0(Meta, checkout_one(Meta, false, State0, Effects0), #{}), @@ -1840,9 +1870,9 @@ checkout(#{index := Index} = Meta, Effects2 = DlxDeliveryEffects ++ Effects1, case evaluate_limit(Index, false, OldState, State2, Effects2) of {State, false, Effects} when ExpiredMsg == false -> - {State, ok, Effects}; + {State, Reply, Effects}; {State, _, Effects} -> - update_smallest_raft_index(Index, State, Effects) + update_smallest_raft_index(Index, Reply, State, Effects) end. checkout0(Meta, {success, ConsumerId, MsgId, @@ -2137,7 +2167,7 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta, credit_mode = Mode}, credit = Credit} end, - update_or_remove_sub(Meta, ConsumerId, Consumer, State0); + {Consumer, update_or_remove_sub(Meta, ConsumerId, Consumer, State0)}; update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta, {Life, Credit, Mode0} = Spec, Priority, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -2149,15 +2179,17 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta, %% one, then merge case active_consumer(Cons0) of {ConsumerId, #consumer{status = up} = Consumer0} -> - Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, Spec, Priority), - update_or_remove_sub(Meta, ConsumerId, Consumer, State0); + Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, + Spec, Priority), + {Consumer, update_or_remove_sub(Meta, ConsumerId, Consumer, State0)}; undefined when is_map_key(ConsumerId, Cons0) -> %% there is no active consumer and the current consumer is in the %% consumers map and thus must be cancelled, in this case we can just %% merge and effectively make this the current active one Consumer0 = maps:get(ConsumerId, Cons0), - Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, Spec, Priority), - update_or_remove_sub(Meta, ConsumerId, Consumer, State0); + Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, + Spec, Priority), + {Consumer, update_or_remove_sub(Meta, ConsumerId, Consumer, State0)}; _ -> %% add as a new waiting consumer Mode = credit_mode(Meta, Credit, Mode0), @@ -2169,7 +2201,9 @@ update_consumer(Meta, {Tag, Pid} = ConsumerId, ConsumerMeta, credit_mode = Mode}, credit = Credit}, - State0#?MODULE{waiting_consumers = Waiting ++ [{ConsumerId, Consumer}]} + {Consumer, + State0#?MODULE{waiting_consumers = + Waiting ++ [{ConsumerId, Consumer}]}} end. merge_consumer(Meta, #consumer{cfg = CCfg, checked_out = Checked} = Consumer, diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index f227c4f9668a..8fd8577aad4e 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -386,14 +386,25 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, %% ??? Ack = maps:get(ack, Meta, true), - SDels = maps:update_with(ConsumerTag, - fun (V) -> - V#consumer{ack = Ack} - end, - #consumer{last_msg_id = -1, - ack = Ack}, CDels0), - try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}). - + case try_process_command(Servers, Cmd, State0) of + {ok, Reply, Leader} -> + LastMsgId = case Reply of + ok -> + %% this is the pre 3.11.1 / 3.10.9 + %% reply format + -1; + {ok, #{next_msg_id := NextMsgId}} -> + NextMsgId - 1 + end, + SDels = maps:update_with( + ConsumerTag, fun (C) -> C#consumer{ack = Ack} end, + #consumer{last_msg_id = LastMsgId, + ack = Ack}, CDels0), + {ok, State0#state{leader = Leader, + consumer_deliveries = SDels}}; + Err -> + Err + end. -spec query_single_active_consumer(state()) -> {ok, term()} | {error, term()} | {timeout, term()}. @@ -448,7 +459,12 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> ConsumerId = {ConsumerTag, self()}, Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}), State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)}, - try_process_command(Servers, Cmd, State). + case try_process_command(Servers, Cmd, State) of + {ok, _, Leader} -> + {ok, State#state{leader = Leader}}; + Err -> + Err + end. %% @doc Purges all the messages from a rabbit_fifo queue and returns the number %% of messages purged. @@ -653,8 +669,8 @@ untracked_enqueue([Node | _], Msg) -> try_process_command([Server | Rem], Cmd, #state{cfg = #cfg{timeout = Timeout}} = State) -> case ra:process_command(Server, Cmd, Timeout) of - {ok, _, Leader} -> - {ok, State#state{leader = Leader}}; + {ok, _, _} = Res -> + Res; Err when length(Rem) =:= 0 -> Err; _ -> @@ -748,7 +764,7 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs}, %% When the node is disconnected the leader will return all checked %% out messages to the main queue to ensure they don't get stuck in %% case the node never comes back. - case get_missing_deliveries(Leader, Prev+1, FstId-1, Tag) of + case get_missing_deliveries(State0, Prev+1, FstId-1, Tag) of {protocol_error, _, _, _} = Err -> Err; Missing -> @@ -808,22 +824,22 @@ update_consumer(Tag, LastId, DelCntIncr, Consumers). -get_missing_deliveries(Leader, From, To, ConsumerTag) -> +get_missing_deliveries(State, From, To, ConsumerTag) -> + %% find local server ConsumerId = consumer_id(ConsumerTag), - % ?INFO("get_missing_deliveries for ~w from ~b to ~b", - % [ConsumerId, From, To]), - Query = fun (State) -> - rabbit_fifo:get_checked_out(ConsumerId, From, To, State) - end, - case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of - {ok, {_, Missing}, _} -> + rabbit_log:debug("get_missing_deliveries for ~w from ~b to ~b", + [ConsumerId, From, To]), + Cmd = {get_checked_out, ConsumerId, lists:seq(From, To)}, + ServerId = find_local_or_leader(State), + case ra:aux_command(ServerId, Cmd) of + {ok, Missing} -> Missing; {error, Error} -> {protocol_error, internal_error, "Cannot query missing deliveries from ~p: ~p", - [Leader, Error]}; + [ServerId, Error]}; {timeout, _} -> {protocol_error, internal_error, "Cannot query missing deliveries from ~p: timeout", - [Leader]} + [ServerId]} end. pick_server(#state{leader = undefined, @@ -906,6 +922,23 @@ cancel_timer(#state{timer_state = Ref} = State) -> erlang:cancel_timer(Ref, [{async, true}, {info, false}]), State#state{timer_state = undefined}. +find_local_or_leader(#state{leader = Leader, + cfg = #cfg{servers = Servers}}) -> + case find_local(Servers) of + undefined -> + Leader; + ServerId -> + ServerId + end. + +find_local([{_, N} = ServerId | _]) when N == node() -> + ServerId; +find_local([_ | Rem]) -> + find_local(Rem); +find_local([]) -> + undefined. + + find_leader([]) -> undefined; find_leader([Server | Servers]) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 818476f3b725..035398bd5691 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -149,7 +149,8 @@ all_tests() -> per_message_ttl_mixed_expiry, per_message_ttl_expiration_too_high, consumer_priorities, - cancel_consumer_gh_3729 + cancel_consumer_gh_3729, + cancel_and_consume_with_same_tag ]. memory_tests() -> @@ -2820,6 +2821,53 @@ cancel_consumer_gh_3729(Config) -> ok = rabbit_ct_client_helpers:close_channel(Ch). +cancel_and_consume_with_same_tag(Config) -> + %% https://github.com/rabbitmq/rabbitmq-server/issues/5927 + QQ = ?config(queue_name, Config), + + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0}, + DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + ?assertMatch(ExpectedDeclareRslt0, DeclareRslt0), + + ok = publish(Ch, QQ), + + ok = subscribe(Ch, QQ, false), + + DeliveryTag = receive + {#'basic.deliver'{delivery_tag = D}, _} -> + D + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + ok = cancel(Ch), + + ok = subscribe(Ch, QQ, false), + + ok = publish(Ch, QQ), + + receive + {#'basic.deliver'{delivery_tag = _}, _} -> + ok + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout 2") + end, + + + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + + ok = cancel(Ch), + + + + ok. + leader_locator_client_local(Config) -> [Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Q = ?config(queue_name, Config), diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index ed1409cd1d8a..1fdc83b40e2f 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -30,6 +30,7 @@ all_tests() -> dequeue, discard, cancel_checkout, + lost_delivery, credit, untracked_enqueue, flow, @@ -411,6 +412,38 @@ cancel_checkout(Config) -> {ok, _, {_, _, _, _, m1}, F5} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), ok. +lost_delivery(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {_, _, F2} = process_ra_events( + receive_ra_events(1, 0), F1, [], [], fun (_, S) -> S end), + {ok, F3} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F2), + %% drop a delivery, simulating e.g. a full distribution buffer + receive + {ra_event, _, Evt} -> + ct:pal("dropping event ~p", [Evt]), + ok + after 500 -> + exit(await_ra_event_timeout) + end, + % send another message + {ok, F4} = rabbit_fifo_client:enqueue(m2, F3), + %% this hsould trigger the fifo client to fetch any missing messages + %% from the server + {_, _, _F5} = process_ra_events( + receive_ra_events(1, 1), F4, [], [], + fun ({deliver, _, _, Dels}, S) -> + [{_, _, _, _, M1}, + {_, _, _, _, M2}] = Dels, + ?assertEqual(m1, M1), + ?assertEqual(m2, M2), + S + end), + ok. + credit(Config) -> ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), @@ -593,7 +626,8 @@ process_ra_events(Events, State) -> process_ra_events([], State0, Acc, Actions0, _DeliveryFun) -> {Acc, Actions0, State0}; -process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, Actions0, DeliveryFun) -> +process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, + Actions0, DeliveryFun) -> case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of {ok, State1, Actions1} -> {Msgs, Actions, State} =