Skip to content

Commit

Permalink
Fix dead lettering
Browse files Browse the repository at this point in the history
This commit fixes #11159, #11160, #11173.
  • Loading branch information
ansd committed May 7, 2024
1 parent e96d100 commit e334bd6
Show file tree
Hide file tree
Showing 12 changed files with 626 additions and 227 deletions.
5 changes: 4 additions & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ rabbitmq_integration_suite(
additional_beam = [
":test_queue_utils_beam",
],
shard_count = 7,
shard_count = 8,
)

rabbitmq_integration_suite(
Expand Down Expand Up @@ -789,6 +789,9 @@ rabbitmq_suite(
rabbitmq_suite(
name = "mc_unit_SUITE",
size = "small",
runtime_deps = [
"@meck//:erlang_app",
],
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/rabbit_common:erlang_app",
Expand Down
41 changes: 27 additions & 14 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{first_time := non_neg_integer(), %% the timestamp of the first
last_time := non_neg_integer(), %% the timestamp of the last
ttl => OriginalExpiration :: non_neg_integer()}.
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
count = 0 :: non_neg_integer(),
anns :: death_anns()}).

-record(deaths, {first :: death_key(),
last :: death_key(),
records = #{} :: #{death_key() := #death{}}}).


%% good enough for most use cases
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).

Expand All @@ -26,3 +12,30 @@
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
-define(ANN_DURABLE, d).
-define(ANN_PRIORITY, p).

%% RabbitMQ >= 3.13.3
-record(death_v2, {source_queue :: rabbit_misc:resource_name(),
reason :: rabbit_dead_letter:reason(),
%% how many times this message was dead lettered
%% from this source_queue for this reason
count :: pos_integer(),
%% timestamp when this message was dead lettered the first time
%% from this source_queue for this reason
first_death_timestamp :: pos_integer(),
original_exchange :: rabbit_misc:resource_name(),
original_routing_keys :: [rabbit_types:routing_key(),...],
%% set iff reason == expired
original_ttl :: undefined | non_neg_integer()}).

%% These records were used in RabbitMQ 3.13.0 - 3.13.2.
-type death_v1_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_v1_anns() :: #{first_time := non_neg_integer(),
last_time := non_neg_integer(),
ttl => OriginalExpiration :: non_neg_integer()}.
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
count = 0 :: non_neg_integer(),
anns :: death_v1_anns()}).
-record(deaths, {first :: death_v1_key(),
last :: death_v1_key(),
records = #{} :: #{death_v1_key() := #death{}}}).
161 changes: 112 additions & 49 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
prepare/2,
record_death/3,
is_death_cycle/2,
last_death/1,
death_queue_names/1
]).

Expand Down Expand Up @@ -359,10 +358,9 @@ protocol_state(BasicMsg) ->
SourceQueue :: rabbit_misc:resource_name(),
state()) -> state().
record_death(Reason, SourceQueue,
#?MODULE{protocol = _Mod,
data = _Data,
annotations = Anns0} = State)
when is_atom(Reason) andalso is_binary(SourceQueue) ->
#?MODULE{annotations = Anns0} = State)
when is_atom(Reason) andalso
is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
#{?ANN_EXCHANGE := Exchange,
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
Expand All @@ -374,21 +372,27 @@ record_death(Reason, SourceQueue,
last_time => Timestamp}),
case maps:get(deaths, Anns0, undefined) of
undefined ->
Ds = #deaths{last = Key,
first = Key,
records = #{Key => #death{count = 1,
exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}}},
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange
},

State#?MODULE{annotations = Anns#{deaths => Ds}};
case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of
true ->
record_death_v2(SourceQueue, Reason, ReasonBin, Exchange,
RoutingKeys, Timestamp, Ttl, State);
false ->
Ds = #deaths{last = Key,
first = Key,
records = #{Key => #death{count = 1,
exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}}},
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange,
deaths => Ds
},
State#?MODULE{annotations = Anns}
end;
#deaths{records = Rs} = Ds0 ->
Death = #death{count = C,
anns = DA} = maps:get(Key, Rs,
Expand All @@ -408,37 +412,68 @@ record_death(Reason, SourceQueue,
record_death(Reason, SourceQueue, BasicMsg) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg).

record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp, Ttl,
#?MODULE{annotations = Anns0} = State) ->
Anns = case Anns0 of
#{deaths_v2 := Deaths0} ->
%% deaths_v2 is ordered by recency
Deaths = case deaths_take(SourceQueue, Reason, Deaths0) of
{value, Death0 = #death_v2{count = Count}, Deaths1} ->
Death = Death0#death_v2{count = Count + 1},
[Death | Deaths1];
false ->
Death = #death_v2{source_queue = SourceQueue,
reason = Reason,
count = 1,
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
original_ttl = Ttl},
[Death | Deaths0]
end,
Anns0#{deaths_v2 := Deaths,
<<"x-last-death-reason">> := ReasonBin,
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange};
_ ->
Death = #death_v2{source_queue = SourceQueue,
reason = Reason,
count = 1,
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
original_ttl = Ttl},
Anns0#{deaths_v2 => [Death],
<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange}
end,
State#?MODULE{annotations = Anns}.

-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
is_cycle_v2(TargetQueue, Deaths);
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
is_cycle(TargetQueue, maps:keys(Deaths#deaths.records));
is_cycle_v1(TargetQueue, maps:keys(Deaths#deaths.records));
is_death_cycle(_TargetQueue, #?MODULE{}) ->
false;
is_death_cycle(TargetQueue, BasicMsg) ->
mc_compat:is_death_cycle(TargetQueue, BasicMsg).

%% Returns death queue names ordered by recency.
-spec death_queue_names(state()) -> [rabbit_misc:resource_name()].
death_queue_names(#?MODULE{annotations = Anns}) ->
case maps:get(deaths, Anns, undefined) of
undefined ->
[];
#deaths{records = Records} ->
proplists:get_keys(maps:keys(Records))
end;
death_queue_names(#?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
lists:map(fun(#death_v2{source_queue = Q}) -> Q end, Deaths);
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}}) ->
proplists:get_keys(maps:keys(Records));
death_queue_names(#?MODULE{}) ->
[];
death_queue_names(BasicMsg) ->
mc_compat:death_queue_names(BasicMsg).

-spec last_death(state()) ->
undefined | {death_key(), #death{}}.
last_death(#?MODULE{annotations = Anns})
when not is_map_key(deaths, Anns) ->
undefined;
last_death(#?MODULE{annotations = #{deaths := #deaths{last = Last,
records = Rs}}}) ->
{Last, maps:get(Last, Rs)};
last_death(BasicMsg) ->
mc_compat:last_death(BasicMsg).

-spec prepare(read | store, state()) -> state().
prepare(store, #?MODULE{protocol = mc_amqp} = State) ->
case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
Expand All @@ -456,24 +491,52 @@ prepare(For, State) ->

%% INTERNAL

%% if there is a death with a source queue that is the same as the target
is_cycle_v2(TargetQueue, Deaths) ->
case lists:splitwith(fun(#death_v2{source_queue = SourceQueue}) ->
SourceQueue =/= TargetQueue
end, Deaths) of
{_, []} ->
false;
{L, [H | _]} ->
%% There is a cycle, but we only want to drop the message
%% if the cycle is "fully automatic", i.e. without a client
%% expliclity rejecting the message somewhere in the cycle.
lists:all(fun(#death_v2{reason = Reason}) ->
Reason =/= rejected
end, [H | L])
end.

%% The desired v1 behaviour is the following:
%% "If there is a death with a source queue that is the same as the target
%% queue name and there are no newer deaths with the 'rejected' reason then
%% consider this a cycle
is_cycle(_Queue, []) ->
%% consider this a cycle."
%% However, the correct death order cannot be reliably determined in v1.
%% deaths_v2 fixes this bug.
is_cycle_v1(_Queue, []) ->
false;
is_cycle(_Queue, [{_Q, rejected} | _]) ->
is_cycle_v1(_Queue, [{_Q, rejected} | _]) ->
%% any rejection breaks the cycle
false;
is_cycle(Queue, [{Queue, Reason} | _])
is_cycle_v1(Queue, [{Queue, Reason} | _])
when Reason =/= rejected ->
true;
is_cycle(Queue, [_ | Rem]) ->
is_cycle(Queue, Rem).
is_cycle_v1(Queue, [_ | Rem]) ->
is_cycle_v1(Queue, Rem).

set_received_at_timestamp(Anns) ->
Millis = os:system_time(millisecond),
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
deaths_take(Queue, Reason, Deaths) ->
deaths_take(Queue, Reason, Deaths, []).

deaths_take(Queue,
Reason,
[#death_v2{source_queue = Queue,
reason = Reason} = H | T],
Acc) ->
{value, H, lists:reverse(Acc, T)};
deaths_take(Queue, Reason, [H|T], Acc) ->
deaths_take(Queue, Reason, T, [H|Acc]);
deaths_take(_Queue, _Reason, [], _Acc) ->
false.
Loading

0 comments on commit e334bd6

Please sign in to comment.