Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -722,16 +722,27 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
%% Drop publish and nack to publisher
send_reject_publish(Delivery, State);
_ ->
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State1 = State#q{backing_queue_state = BQS1},
case IsDuplicate of
true -> State1;
{true, drop} -> State1;
%% Drop publish and nack to publisher
{true, reject} ->
case BQ:is_duplicate(Message, BQS) of
{true, State1} ->
%% Publish to DLX
_ = with_dlx(
DLX,
fun (X) ->
rabbit_global_counters:messages_dead_lettered(maxlen,
rabbit_classic_queue,
at_most_once, 1),
QName = qname(State1),
rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
end,
fun () ->
rabbit_global_counters:messages_dead_lettered(maxlen,
rabbit_classic_queue,
disabled, 1)
end),
%% Drop publish and nack to publisher
send_reject_publish(Delivery, State1);
%% Enqueue and maybe drop head later
false ->
{false, State1} ->
deliver_or_enqueue(Delivery, Delivered, State1)
end
end.
Expand Down
5 changes: 2 additions & 3 deletions deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,8 @@

%% Called prior to a publish or publish_delivered call. Allows the BQ
%% to signal that it's already seen this message, (e.g. it was published
%% or discarded previously) specifying whether to drop the message or reject it.
-callback is_duplicate(mc:state(), state())
-> {{true, drop} | {true, reject} | boolean(), state()}.
%% or discarded previously).
-callback is_duplicate(mc:state(), state()) -> {boolean(), state()}.

-callback set_queue_mode(queue_mode(), state()) -> state().

Expand Down
Loading