From 79663d3de6a6fdc85ceee89e4beb88da7af52f4a Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Tue, 10 Dec 2024 23:37:59 +0200 Subject: [PATCH 1/2] backing_queue: simplify `is_duplicate` callback signature `is_duplicate` callback signature was changed in order to support both the mirroring queues as well as the de-duplication ones. As the mirroring queues are now deprecated and removed, we can fall back to a simpler boolean as return value. Signed-off-by: Matteo Cafasso (cherry picked from commit c927446e17c825208db7c734be160b91b6090c67) --- deps/rabbit/src/rabbit_backing_queue.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_backing_queue.erl b/deps/rabbit/src/rabbit_backing_queue.erl index ffa0a791f1b5..3169cbf87bfd 100644 --- a/deps/rabbit/src/rabbit_backing_queue.erl +++ b/deps/rabbit/src/rabbit_backing_queue.erl @@ -220,9 +220,8 @@ %% Called prior to a publish or publish_delivered call. Allows the BQ %% to signal that it's already seen this message, (e.g. it was published -%% or discarded previously) specifying whether to drop the message or reject it. --callback is_duplicate(mc:state(), state()) - -> {{true, drop} | {true, reject} | boolean(), state()}. +%% or discarded previously). +-callback is_duplicate(mc:state(), state()) -> {boolean(), state()}. -callback set_queue_mode(queue_mode(), state()) -> state(). From 0a9e0b77e265202df9f533ace2774614be18c089 Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Tue, 10 Dec 2024 23:57:44 +0200 Subject: [PATCH 2/2] amqqueue_process: adopt new `is_duplicate` backing queue callback As the de-duplication plugin is the only adopter of the `is_duplicate` callback, we now use a simpler signature. When a message is deemed duplicated, we discard it and re-route it to dead letter exchange. Signed-off-by: Matteo Cafasso (cherry picked from commit 6a979b6c7dcd74ed37e7bc55378ad52f4db19fc4) --- deps/rabbit/src/rabbit_amqqueue_process.erl | 27 +++++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 63f886bd3763..899a77656199 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -722,16 +722,27 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, %% Drop publish and nack to publisher send_reject_publish(Delivery, State); _ -> - {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), - State1 = State#q{backing_queue_state = BQS1}, - case IsDuplicate of - true -> State1; - {true, drop} -> State1; - %% Drop publish and nack to publisher - {true, reject} -> + case BQ:is_duplicate(Message, BQS) of + {true, State1} -> + %% Publish to DLX + _ = with_dlx( + DLX, + fun (X) -> + rabbit_global_counters:messages_dead_lettered(maxlen, + rabbit_classic_queue, + at_most_once, 1), + QName = qname(State1), + rabbit_dead_letter:publish(Message, maxlen, X, RK, QName) + end, + fun () -> + rabbit_global_counters:messages_dead_lettered(maxlen, + rabbit_classic_queue, + disabled, 1) + end), + %% Drop publish and nack to publisher send_reject_publish(Delivery, State1); %% Enqueue and maybe drop head later - false -> + {false, State1} -> deliver_or_enqueue(Delivery, Delivered, State1) end end.