Skip to content

Commit

Permalink
Make death_v2{} record extensible
Browse files Browse the repository at this point in the history
similar to how death{} record is extensible
  • Loading branch information
ansd committed May 13, 2024
1 parent d0b0ec8 commit b056b45
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 11 deletions.
6 changes: 3 additions & 3 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-define(ANN_PRIORITY, p).

%% RabbitMQ >= 3.13.3
-type death_v2_anns() :: #{ttl => OriginalTtlHeader :: non_neg_integer()}.
-record(death_v2, {source_queue :: rabbit_misc:resource_name(),
reason :: rabbit_dead_letter:reason(),
%% how many times this message was dead lettered
Expand All @@ -24,10 +25,9 @@
first_death_timestamp :: pos_integer(),
original_exchange :: rabbit_misc:resource_name(),
original_routing_keys :: [rabbit_types:routing_key(),...],
%% original message ttl header if reason is 'expired'
original_ttl :: undefined | non_neg_integer()}).
annotations :: death_v2_anns()}).

%% These records were used in RabbitMQ 3.13.0 - 3.13.2.
%% RabbitMQ 3.13.0 - 3.13.2.
-type death_v1_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_v1_anns() :: #{first_time := non_neg_integer(),
last_time := non_neg_integer(),
Expand Down
5 changes: 3 additions & 2 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ record_death(Reason, SourceQueue, BasicMsg) ->

record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp, Ttl,
#?MODULE{annotations = Anns0} = State) ->
DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{}),
Anns = case Anns0 of
#{deaths_v2 := Deaths0} ->
%% deaths_v2 is ordered by recency
Expand All @@ -428,7 +429,7 @@ record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
original_ttl = Ttl},
annotations = DeathAnns},
[Death | Deaths0]
end,
Anns0#{deaths_v2 := Deaths,
Expand All @@ -442,7 +443,7 @@ record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
original_ttl = Ttl},
annotations = DeathAnns},
Anns0#{deaths_v2 => [Death],
<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
Expand Down
11 changes: 6 additions & 5 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ encode_deaths(Deaths) ->
first_death_timestamp = Timestamp,
original_exchange = Exchange,
original_routing_keys = RoutingKeys,
original_ttl = Ttl}) ->
annotations = Anns}) ->
RKeys = [{utf8, Rk} || Rk <- RoutingKeys],
Map0 = [
{{symbol, <<"queue">>}, {utf8, Queue}},
Expand All @@ -598,10 +598,11 @@ encode_deaths(Deaths) ->
{{symbol, <<"exchange">>}, {utf8, Exchange}},
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys}}
],
Map = if Ttl =:= undefined ->
Map0;
is_integer(Ttl) ->
[{{symbol, <<"ttl">>}, {uint, Ttl}} | Map0]
Map = case Anns of
#{ttl := Ttl} ->
[{{symbol, <<"ttl">>}, {uint, Ttl}} | Map0];
_ ->
Map0
end,
{map, Map}
end, Deaths).
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,8 @@ deaths_v2_to_headers(Deaths, Headers) ->
first_death_timestamp = Timestamp,
original_exchange = Ex,
original_routing_keys = RKeys,
original_ttl = Ttl}) ->
annotations = Anns}) ->
Ttl = maps:get(ttl, Anns, undefined),
death_table(Queue, Reason, Count, Timestamp, Ex, RKeys, Ttl)
end, Deaths),
rabbit_misc:set_table_value(Headers, <<"x-death">>, array, Infos).
Expand Down

0 comments on commit b056b45

Please sign in to comment.