From f14efc60bcd2646af248386ee9e99d12f3eb91ae Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Sun, 1 Oct 2023 21:52:23 +0300 Subject: [PATCH 1/2] rabbit_backing_queue: pass the whole message to discard callback The previous behaviour was passing solely the message ID making queue implementations such as, for example, the priority one hard to fulfil. Signed-off-by: Matteo Cafasso --- deps/rabbit/src/rabbit_backing_queue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_backing_queue.erl b/deps/rabbit/src/rabbit_backing_queue.erl index f4ae7d3c67ad..55d2dbb51115 100644 --- a/deps/rabbit/src/rabbit_backing_queue.erl +++ b/deps/rabbit/src/rabbit_backing_queue.erl @@ -117,7 +117,7 @@ %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. --callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). +-callback discard(rabbit_types:basic_message(), pid(), flow(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). From 4945d392aa881156d7922a395525515e252aeb65 Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Sun, 1 Oct 2023 21:52:40 +0300 Subject: [PATCH 2/2] Adopt new rabbit_backing_queue:discard implementation Signed-off-by: Matteo Cafasso --- deps/rabbit/src/rabbit_amqqueue_process.erl | 4 ++-- .../rabbit/src/rabbit_mirror_queue_master.erl | 17 ++++++++------ deps/rabbit/src/rabbit_mirror_queue_slave.erl | 3 +-- deps/rabbit/src/rabbit_priority_queue.erl | 22 +++++-------------- deps/rabbit/src/rabbit_variable_queue.erl | 2 +- 5 files changed, 20 insertions(+), 28 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 42ee87b40edc..cfb42c154556 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -647,7 +647,7 @@ discard(#delivery{confirm = Confirm, true -> confirm_messages([MsgId], MTC, QName); false -> MTC end, - BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), + BQS1 = BQ:discard(Msg, SenderPid, Flow, BQS), {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -821,7 +821,7 @@ send_reject_publish(#delivery{confirm = true, amqqueue:get_name(Q), MsgSeqNo), MTC1 = maps:remove(MsgId, MTC), - BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), + BQS1 = BQ:discard(Msg, SenderPid, Flow, BQS), State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; send_reject_publish(#delivery{confirm = false}, _Delivered, State) -> diff --git a/deps/rabbit/src/rabbit_mirror_queue_master.erl b/deps/rabbit/src/rabbit_mirror_queue_master.erl index 2b46a5ba9be9..2f4c1f5e71eb 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_master.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_master.erl @@ -327,15 +327,18 @@ batch_publish_delivered(Publishes, ChPid, Flow, State1 = State #state { backing_queue_state = BQS1 }, {AckTags, ensure_monitoring(ChPid, State1)}. -discard(MsgId, ChPid, Flow, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> - false = maps:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), +discard(Msg, ChPid, Flow, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + MsgId = mc:get_annotation(id, Msg), ensure_monitoring(ChPid, State #state { backing_queue_state = - BQ:discard(MsgId, ChPid, Flow, BQS) }). + BQ:discard(Msg, ChPid, Flow, BQS) }), + broadcast_discard(MsgId, ChPid, Flow, State). + +broadcast_discard(MsgId, ChPid, Flow, #state { gm = GM, + seen_status = SS }) -> + false = maps:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/deps/rabbit/src/rabbit_mirror_queue_slave.erl b/deps/rabbit/src/rabbit_mirror_queue_slave.erl index 3acc3d30e81d..f3b37f187de3 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_slave.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_slave.erl @@ -1016,8 +1016,7 @@ process_instruction({discard, ChPid, Flow, MsgId}, State) -> maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(discarded, ChPid, MsgId, State), - BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS), - {ok, State1 #state { backing_queue_state = BQS1 }}; + {ok, State1 #state { backing_queue_state = BQS }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/deps/rabbit/src/rabbit_priority_queue.erl b/deps/rabbit/src/rabbit_priority_queue.erl index d649773190d5..3edc15051219 100644 --- a/deps/rabbit/src/rabbit_priority_queue.erl +++ b/deps/rabbit/src/rabbit_priority_queue.erl @@ -248,22 +248,12 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)). -%% TODO this is a hack. The BQ api does not give us enough information -%% here - if we had the Msg we could look at its priority and forward -%% to the appropriate sub-BQ. But we don't so we are stuck. -%% -%% But fortunately VQ ignores discard/4, so we can too, *assuming we -%% are talking to VQ*. discard/4 is used by HA, but that's "above" us -%% (if in use) so we don't break that either, just some hypothetical -%% alternate BQ implementation. -discard(_MsgId, _ChPid, _Flow, State = #state{}) -> - State; - %% We should have something a bit like this here: - %% pick1(fun (_P, BQSN) -> - %% BQ:discard(MsgId, ChPid, Flow, BQSN) - %% end, Msg, State); -discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> - ?passthrough1(discard(MsgId, ChPid, Flow, BQS)). +discard(Msg, ChPid, Flow, State = #state{bq = BQ}) -> + pick1(fun (_P, BQSN) -> + BQ:discard(Msg, ChPid, Flow, BQSN) + end, Msg, State); +discard(Msg, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(discard(Msg, ChPid, Flow, BQS)). drain_confirmed(State = #state{bq = BQ}) -> fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State); diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 2c35f38df08c..c71014d01492 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -588,7 +588,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State) -> State2 = ui(State1), {lists:reverse(SeqIds), a(maybe_update_rates(State2))}. -discard(_MsgId, _ChPid, _Flow, State) -> State. +discard(_Msg, _ChPid, _Flow, State) -> State. drain_confirmed(State = #vqstate { confirmed = C }) -> case sets:is_empty(C) of