Skip to content

Commit

Permalink
Fix dead lettering
Browse files Browse the repository at this point in the history
This commit fixes #11159, #11160, #11173.
  • Loading branch information
ansd committed May 7, 2024
1 parent 6ff7b65 commit 8241f5b
Show file tree
Hide file tree
Showing 12 changed files with 798 additions and 228 deletions.
5 changes: 4 additions & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ rabbitmq_integration_suite(
additional_beam = [
":test_queue_utils_beam",
],
shard_count = 7,
shard_count = 8,
)

rabbitmq_integration_suite(
Expand Down Expand Up @@ -789,6 +789,9 @@ rabbitmq_suite(
rabbitmq_suite(
name = "mc_unit_SUITE",
size = "small",
runtime_deps = [
"@meck//:erlang_app",
],
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/rabbit_common:erlang_app",
Expand Down
41 changes: 27 additions & 14 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{first_time := non_neg_integer(), %% the timestamp of the first
last_time := non_neg_integer(), %% the timestamp of the last
ttl => OriginalExpiration :: non_neg_integer()}.
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
count = 0 :: non_neg_integer(),
anns :: death_anns()}).

-record(deaths, {first :: death_key(),
last :: death_key(),
records = #{} :: #{death_key() := #death{}}}).


%% good enough for most use cases
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).

Expand All @@ -26,3 +12,30 @@
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
-define(ANN_DURABLE, d).
-define(ANN_PRIORITY, p).

%% RabbitMQ >= 3.13.3
-record(death_v2, {source_queue :: rabbit_misc:resource_name(),
reason :: rabbit_dead_letter:reason(),
%% how many times this message was dead lettered
%% from this source_queue for this reason
count :: pos_integer(),
%% timestamp when this message was dead lettered the first time
%% from this source_queue for this reason
first_death_timestamp :: pos_integer(),
original_exchange :: rabbit_misc:resource_name(),
original_routing_keys :: [rabbit_types:routing_key(),...],
%% set iff reason == expired
original_ttl :: undefined | non_neg_integer()}).

%% These records were used in RabbitMQ 3.13.0 - 3.13.2.
-type death_v1_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_v1_anns() :: #{first_time := non_neg_integer(),
last_time := non_neg_integer(),
ttl => OriginalExpiration :: non_neg_integer()}.
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
count = 0 :: non_neg_integer(),
anns :: death_v1_anns()}).
-record(deaths, {first :: death_v1_key(),
last :: death_v1_key(),
records = #{} :: #{death_v1_key() := #death{}}}).
161 changes: 112 additions & 49 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
prepare/2,
record_death/3,
is_death_cycle/2,
last_death/1,
death_queue_names/1
]).

Expand Down Expand Up @@ -359,10 +358,9 @@ protocol_state(BasicMsg) ->
SourceQueue :: rabbit_misc:resource_name(),
state()) -> state().
record_death(Reason, SourceQueue,
#?MODULE{protocol = _Mod,
data = _Data,
annotations = Anns0} = State)
when is_atom(Reason) andalso is_binary(SourceQueue) ->
#?MODULE{annotations = Anns0} = State)
when is_atom(Reason) andalso
is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
#{?ANN_EXCHANGE := Exchange,
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
Expand All @@ -374,21 +372,27 @@ record_death(Reason, SourceQueue,
last_time => Timestamp}),
case maps:get(deaths, Anns0, undefined) of
undefined ->
Ds = #deaths{last = Key,
first = Key,
records = #{Key => #death{count = 1,
exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}}},
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange
},

State#?MODULE{annotations = Anns#{deaths => Ds}};
case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of
true ->
record_death_v2(SourceQueue, Reason, ReasonBin, Exchange,
RoutingKeys, Timestamp, Ttl, State);
false ->
Ds = #deaths{last = Key,
first = Key,
records = #{Key => #death{count = 1,
exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}}},
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange,
deaths => Ds
},
State#?MODULE{annotations = Anns}
end;
#deaths{records = Rs} = Ds0 ->
Death = #death{count = C,
anns = DA} = maps:get(Key, Rs,
Expand All @@ -408,37 +412,68 @@ record_death(Reason, SourceQueue,
record_death(Reason, SourceQueue, BasicMsg) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg).

record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp, Ttl,
#?MODULE{annotations = Anns0} = State) ->
Anns = case Anns0 of
#{deaths_v2 := Deaths0} ->
%% deaths_v2 is ordered by recency
Deaths = case deaths_take(SourceQueue, Reason, Deaths0) of
{value, Death0 = #death_v2{count = Count}, Deaths1} ->
Death = Death0#death_v2{count = Count + 1},
[Death | Deaths1];
false ->
Death = #death_v2{source_queue = SourceQueue,
reason = Reason,
count = 1,
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
original_ttl = Ttl},
[Death | Deaths0]
end,
Anns0#{deaths_v2 := Deaths,
<<"x-last-death-reason">> := ReasonBin,
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange};
_ ->
Death = #death_v2{source_queue = SourceQueue,
reason = Reason,
count = 1,
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
original_ttl = Ttl},
Anns0#{deaths_v2 => [Death],
<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange}
end,
State#?MODULE{annotations = Anns}.

-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
is_cycle_v2(TargetQueue, Deaths);
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
is_cycle(TargetQueue, maps:keys(Deaths#deaths.records));
is_cycle_v1(TargetQueue, maps:keys(Deaths#deaths.records));
is_death_cycle(_TargetQueue, #?MODULE{}) ->
false;
is_death_cycle(TargetQueue, BasicMsg) ->
mc_compat:is_death_cycle(TargetQueue, BasicMsg).

%% Returns death queue names ordered by recency.
-spec death_queue_names(state()) -> [rabbit_misc:resource_name()].
death_queue_names(#?MODULE{annotations = Anns}) ->
case maps:get(deaths, Anns, undefined) of
undefined ->
[];
#deaths{records = Records} ->
proplists:get_keys(maps:keys(Records))
end;
death_queue_names(#?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
lists:map(fun(#death_v2{source_queue = Q}) -> Q end, Deaths);
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}}) ->
proplists:get_keys(maps:keys(Records));
death_queue_names(#?MODULE{}) ->
[];
death_queue_names(BasicMsg) ->
mc_compat:death_queue_names(BasicMsg).

-spec last_death(state()) ->
undefined | {death_key(), #death{}}.
last_death(#?MODULE{annotations = Anns})
when not is_map_key(deaths, Anns) ->
undefined;
last_death(#?MODULE{annotations = #{deaths := #deaths{last = Last,
records = Rs}}}) ->
{Last, maps:get(Last, Rs)};
last_death(BasicMsg) ->
mc_compat:last_death(BasicMsg).

-spec prepare(read | store, state()) -> state().
prepare(store, #?MODULE{protocol = mc_amqp} = State) ->
case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
Expand All @@ -456,24 +491,52 @@ prepare(For, State) ->

%% INTERNAL

%% if there is a death with a source queue that is the same as the target
is_cycle_v2(TargetQueue, Deaths) ->
case lists:splitwith(fun(#death_v2{source_queue = SourceQueue}) ->
SourceQueue =/= TargetQueue
end, Deaths) of
{_, []} ->
false;
{L, [H | _]} ->
%% There is a cycle, but we only want to drop the message
%% if the cycle is "fully automatic", i.e. without a client
%% expliclity rejecting the message somewhere in the cycle.
lists:all(fun(#death_v2{reason = Reason}) ->
Reason =/= rejected
end, [H | L])
end.

%% The desired v1 behaviour is the following:
%% "If there is a death with a source queue that is the same as the target
%% queue name and there are no newer deaths with the 'rejected' reason then
%% consider this a cycle
is_cycle(_Queue, []) ->
%% consider this a cycle."
%% However, the correct death order cannot be reliably determined in v1.
%% deaths_v2 fixes this bug.
is_cycle_v1(_Queue, []) ->
false;
is_cycle(_Queue, [{_Q, rejected} | _]) ->
is_cycle_v1(_Queue, [{_Q, rejected} | _]) ->
%% any rejection breaks the cycle
false;
is_cycle(Queue, [{Queue, Reason} | _])
is_cycle_v1(Queue, [{Queue, Reason} | _])
when Reason =/= rejected ->
true;
is_cycle(Queue, [_ | Rem]) ->
is_cycle(Queue, Rem).
is_cycle_v1(Queue, [_ | Rem]) ->
is_cycle_v1(Queue, Rem).

set_received_at_timestamp(Anns) ->
Millis = os:system_time(millisecond),
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
deaths_take(Queue, Reason, Deaths) ->
deaths_take(Queue, Reason, Deaths, []).

deaths_take(Queue,
Reason,
[#death_v2{source_queue = Queue,
reason = Reason} = H | T],
Acc) ->
{value, H, lists:reverse(Acc, T)};
deaths_take(Queue, Reason, [H|T], Acc) ->
deaths_take(Queue, Reason, T, [H|Acc]);
deaths_take(_Queue, _Reason, [], _Acc) ->
false.
75 changes: 29 additions & 46 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,9 @@ protocol_state_message_annotations(MA, Anns) ->
maps_upsert(K, mc_util:infer_type(V), L);
(<<"timestamp_in_ms">>, V, L) ->
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
(deaths_v2, Deaths, L) ->
Maps = encode_deaths(Deaths),
maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L);
(_, _, Acc) ->
Acc
end, MA, Anns).
Expand Down Expand Up @@ -569,13 +572,6 @@ msg_body_encoded([{{pos, Pos}, {body, Code}}], BarePos, Msg)
binary_part_bare_and_footer(Payload, Start) ->
binary_part(Payload, Start, byte_size(Payload) - Start).

key_find(K, [{{_, K}, {_, V}} | _]) ->
V;
key_find(K, [_ | Rem]) ->
key_find(K, Rem);
key_find(_K, []) ->
undefined.

-spec first_acquirer(mc:annotations()) -> boolean().
first_acquirer(Anns) ->
Redelivered = case Anns of
Expand All @@ -584,58 +580,45 @@ first_acquirer(Anns) ->
end,
not Redelivered.

recover_deaths([], Acc) ->
Acc;
recover_deaths([{map, Kvs} | Rem], Acc) ->
Queue = key_find(<<"queue">>, Kvs),
Reason = binary_to_existing_atom(key_find(<<"reason">>, Kvs)),
DA0 = case key_find(<<"original-expiration">>, Kvs) of
undefined ->
#{};
Exp ->
#{ttl => binary_to_integer(Exp)}
end,
RKeys = [RK || {_, RK} <- key_find(<<"routing-keys">>, Kvs)],
Ts = key_find(<<"time">>, Kvs),
DA = DA0#{first_time => Ts,
last_time => Ts},
recover_deaths(Rem,
Acc#{{Queue, Reason} =>
#death{anns = DA,
exchange = key_find(<<"exchange">>, Kvs),
count = key_find(<<"count">>, Kvs),
routing_keys = RKeys}}).
encode_deaths(Deaths) ->
lists:map(
fun(#death_v2{source_queue = Queue,
reason = Reason,
count = Count,
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
original_ttl = Ttl}) ->
RKeys = [{utf8, Rk} || Rk <- RoutingKeys],
Map0 = [
{{symbol, <<"queue">>}, {utf8, Queue}},
{{symbol, <<"reason">>}, {symbol, atom_to_binary(Reason)}},
{{symbol, <<"count">>}, {ulong, Count}},
{{symbol, <<"time">>}, {timestamp, Timestamp}},
{{symbol, <<"exchange">>}, {utf8, Exchange}},
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys}}
],
Map = if Ttl =:= undefined ->
Map0;
is_integer(Ttl) ->
[{{symbol, <<"ttl">>}, {uint, Ttl}} | Map0]
end,
{map, Map}
end, Deaths).

essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
Durable = get_property(durable, Msg),
Priority = get_property(priority, Msg),
Timestamp = get_property(timestamp, Msg),
Ttl = get_property(ttl, Msg),

Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
{list, DeathMaps} ->
%% TODO: make more correct?
Def = {utf8, <<>>},
{utf8, FstQ} = message_annotation(<<"x-first-death-queue">>, Msg, Def),
{utf8, FstR} = message_annotation(<<"x-first-death-reason">>, Msg, Def),
{utf8, LastQ} = message_annotation(<<"x-last-death-queue">>, Msg, Def),
{utf8, LastR} = message_annotation(<<"x-last-death-reason">>, Msg, Def),
#deaths{first = {FstQ, binary_to_existing_atom(FstR)},
last = {LastQ, binary_to_existing_atom(LastR)},
records = recover_deaths(DeathMaps, #{})};
_ ->
undefined
end,
Anns0 = #{?ANN_DURABLE => Durable},
Anns = maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
?ANN_TIMESTAMP, Timestamp,
maps_put_truthy(
ttl, Ttl,
maps_put_truthy(
deaths, Deaths,
Anns0)))),
Anns0))),
case MA of
[] ->
Anns;
Expand Down
Loading

0 comments on commit 8241f5b

Please sign in to comment.