Skip to content

Commit

Permalink
Reuse death record
Browse files Browse the repository at this point in the history
Address PR feedback
#11174 (comment)
  • Loading branch information
ansd committed May 13, 2024
1 parent b056b45 commit 9a84c34
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 179 deletions.
42 changes: 18 additions & 24 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,23 @@
-define(ANN_DURABLE, d).
-define(ANN_PRIORITY, p).

%% RabbitMQ >= 3.13.3
-type death_v2_anns() :: #{ttl => OriginalTtlHeader :: non_neg_integer()}.
-record(death_v2, {source_queue :: rabbit_misc:resource_name(),
reason :: rabbit_dead_letter:reason(),
%% how many times this message was dead lettered
%% from this source_queue for this reason
count :: pos_integer(),
%% timestamp when this message was dead lettered the first time
%% from this source_queue for this reason
first_death_timestamp :: pos_integer(),
original_exchange :: rabbit_misc:resource_name(),
original_routing_keys :: [rabbit_types:routing_key(),...],
annotations :: death_v2_anns()}).
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{%% timestamp of the first time this message
%% was dead lettered from this queue for this reason
first_time := pos_integer(),
%% timestamp of the last time this message
%% was dead lettered from this queue for this reason
last_time := pos_integer(),
ttl => OriginalTtlHeader :: non_neg_integer()}.

%% RabbitMQ 3.13.0 - 3.13.2.
-type death_v1_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_v1_anns() :: #{first_time := non_neg_integer(),
last_time := non_neg_integer(),
ttl => OriginalExpiration :: non_neg_integer()}.
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
count = 0 :: non_neg_integer(),
anns :: death_v1_anns()}).
-record(deaths, {first :: death_v1_key(),
last :: death_v1_key(),
records = #{} :: #{death_v1_key() := #death{}}}).
routing_keys :: OriginalRoutingKeys :: [rabbit_types:routing_key(),...],
%% how many times this message was dead lettered from this queue for this reason
count :: pos_integer(),
anns :: death_anns()}).

-record(deaths, {first :: death_key(), % redundant to mc annotations x-first-death-*
last :: death_key(), % redundant to mc annotations x-last-death-*
records :: #{death_key() := #death{}} | % feature flag message_containers_deaths_v2 disabled
[{death_key(), #death{}}] % feature flag message_containers_deaths_v2 enabled
}).
168 changes: 67 additions & 101 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -366,109 +366,89 @@ record_death(Reason, SourceQueue,
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
Timestamp = os:system_time(millisecond),
Ttl = maps:get(ttl, Anns0, undefined),

ReasonBin = atom_to_binary(Reason),
DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{first_time => Timestamp,
last_time => Timestamp}),
case maps:get(deaths, Anns0, undefined) of
undefined ->
case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of
true ->
record_death_v2(SourceQueue, Reason, ReasonBin, Exchange,
RoutingKeys, Timestamp, Ttl, State);
false ->
Ds = #deaths{last = Key,
first = Key,
records = #{Key => #death{count = 1,
exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}}},
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange,
deaths => Ds
},
State#?MODULE{annotations = Anns}
end;
#deaths{records = Rs} = Ds0 ->
Death = #death{count = C,
anns = DA} = maps:get(Key, Rs,
#death{exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}),
Ds = Ds0#deaths{last = Key,
records = Rs#{Key =>
Death#death{count = C + 1,
anns = DA#{last_time => Timestamp}}}},
Anns = Anns0#{deaths => Ds,
case Anns0 of
#{deaths := Deaths = #deaths{records = Rs0}} ->
Rs = if is_list(Rs0) ->
%% records are ordered by recency
case lists:keytake(Key, 1, Rs0) of
{value, {Key, D0}, Rs1} ->
D = update_death(D0, Timestamp),
[{Key, D} | Rs1];
false ->
[{Key, #death{exchange = Exchange,
routing_keys = RoutingKeys,
count = 1,
anns = DeathAnns}} | Rs0]
end;
is_map(Rs0) ->
case Rs0 of
#{Key := Death} ->
Rs0#{Key := update_death(Death, Timestamp)};
_ ->
Rs0#{Key => #death{exchange = Exchange,
routing_keys = RoutingKeys,
count = 1,
anns = DeathAnns}}
end
end,
Anns = Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange,
deaths := Deaths#deaths{last = Key,
records = Rs}},
State#?MODULE{annotations = Anns};
_ ->
Death = #death{exchange = Exchange,
routing_keys = RoutingKeys,
count = 1,
anns = DeathAnns},
Rs = case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of
true -> [{Key, Death}];
false -> #{Key => Death}
end,
ReasonBin = atom_to_binary(Reason),
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange},
<<"x-last-death-exchange">> => Exchange,
deaths => #deaths{last = Key,
first = Key,
records = Rs}},
State#?MODULE{annotations = Anns}
end;
record_death(Reason, SourceQueue, BasicMsg) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg).

record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp, Ttl,
#?MODULE{annotations = Anns0} = State) ->
DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{}),
Anns = case Anns0 of
#{deaths_v2 := Deaths0} ->
%% deaths_v2 is ordered by recency
Deaths = case deaths_take(SourceQueue, Reason, Deaths0) of
{value, Death0 = #death_v2{count = Count}, Deaths1} ->
Death = Death0#death_v2{count = Count + 1},
[Death | Deaths1];
false ->
Death = #death_v2{source_queue = SourceQueue,
reason = Reason,
count = 1,
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
annotations = DeathAnns},
[Death | Deaths0]
end,
Anns0#{deaths_v2 := Deaths,
<<"x-last-death-reason">> := ReasonBin,
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange};
_ ->
Death = #death_v2{source_queue = SourceQueue,
reason = Reason,
count = 1,
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
annotations = DeathAnns},
Anns0#{deaths_v2 => [Death],
<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange}
end,
State#?MODULE{annotations = Anns}.
update_death(#death{count = Count,
anns = DeathAnns} = Death, Timestamp) ->
Death#death{count = Count + 1,
anns = DeathAnns#{last_time := Timestamp}}.

-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
is_cycle_v2(TargetQueue, Deaths);
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
is_cycle_v1(TargetQueue, maps:keys(Deaths#deaths.records));
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Recs}}})
when is_list(Recs) ->
is_cycle_v2(TargetQueue, Recs);
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Recs}}})
when is_map(Recs) ->
is_cycle_v1(TargetQueue, maps:keys(Recs));
is_death_cycle(_TargetQueue, #?MODULE{}) ->
false;
is_death_cycle(TargetQueue, BasicMsg) ->
mc_compat:is_death_cycle(TargetQueue, BasicMsg).

%% Returns death queue names ordered by recency.
-spec death_queue_names(state()) -> [rabbit_misc:resource_name()].
death_queue_names(#?MODULE{annotations = #{deaths_v2 := Deaths}}) ->
lists:map(fun(#death_v2{source_queue = Q}) -> Q end, Deaths);
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}}) ->
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}})
when is_list(Records) ->
lists:map(fun({{Queue, _Reason}, _Death}) ->
Queue
end, Records);
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}})
when is_map(Records) ->
proplists:get_keys(maps:keys(Records));
death_queue_names(#?MODULE{}) ->
[];
Expand All @@ -485,7 +465,7 @@ prepare(For, State) ->
%% INTERNAL

is_cycle_v2(TargetQueue, Deaths) ->
case lists:splitwith(fun(#death_v2{source_queue = SourceQueue}) ->
case lists:splitwith(fun({{SourceQueue, _Reason}, #death{}}) ->
SourceQueue =/= TargetQueue
end, Deaths) of
{_, []} ->
Expand All @@ -494,7 +474,7 @@ is_cycle_v2(TargetQueue, Deaths) ->
%% There is a cycle, but we only want to drop the message
%% if the cycle is "fully automatic", i.e. without a client
%% expliclity rejecting the message somewhere in the cycle.
lists:all(fun(#death_v2{reason = Reason}) ->
lists:all(fun({{_SourceQueue, Reason}, #death{}}) ->
Reason =/= rejected
end, [H | L])
end.
Expand All @@ -519,17 +499,3 @@ is_cycle_v1(Queue, [_ | Rem]) ->
set_received_at_timestamp(Anns) ->
Millis = os:system_time(millisecond),
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.

deaths_take(Queue, Reason, Deaths) ->
deaths_take(Queue, Reason, Deaths, []).

deaths_take(Queue,
Reason,
[#death_v2{source_queue = Queue,
reason = Reason} = H | T],
Acc) ->
{value, H, lists:reverse(Acc, T)};
deaths_take(Queue, Reason, [H|T], Acc) ->
deaths_take(Queue, Reason, T, [H|Acc]);
deaths_take(_Queue, _Reason, [], _Acc) ->
false.
19 changes: 9 additions & 10 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,9 @@ protocol_state_message_annotations(MA, Anns) ->
maps_upsert(K, mc_util:infer_type(V), L);
(<<"timestamp_in_ms">>, V, L) ->
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
(deaths_v2, Deaths, L) ->
Maps = encode_deaths(Deaths),
(deaths, #deaths{records = Records}, L)
when is_list(Records) ->
Maps = encode_deaths(Records),
maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L);
(_, _, Acc) ->
Acc
Expand Down Expand Up @@ -582,19 +583,17 @@ first_acquirer(Anns) ->

encode_deaths(Deaths) ->
lists:map(
fun(#death_v2{source_queue = Queue,
reason = Reason,
count = Count,
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
annotations = Anns}) ->
fun({{Queue, Reason},
#death{exchange = Exchange,
routing_keys = RoutingKeys,
count = Count,
anns = Anns = #{first_time := FirstTime}}}) ->
RKeys = [{utf8, Rk} || Rk <- RoutingKeys],
Map0 = [
{{symbol, <<"queue">>}, {utf8, Queue}},
{{symbol, <<"reason">>}, {symbol, atom_to_binary(Reason)}},
{{symbol, <<"count">>}, {ulong, Count}},
{{symbol, <<"time">>}, {timestamp, Timestamp}},
{{symbol, <<"time">>}, {timestamp, FirstTime}},
{{symbol, <<"exchange">>}, {utf8, Exchange}},
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys}}
],
Expand Down

0 comments on commit 9a84c34

Please sign in to comment.