diff --git a/deps/rabbit/include/mc.hrl b/deps/rabbit/include/mc.hrl index 1e24813b9e85..41ad438479bc 100644 --- a/deps/rabbit/include/mc.hrl +++ b/deps/rabbit/include/mc.hrl @@ -13,29 +13,23 @@ -define(ANN_DURABLE, d). -define(ANN_PRIORITY, p). -%% RabbitMQ >= 3.13.3 --type death_v2_anns() :: #{ttl => OriginalTtlHeader :: non_neg_integer()}. --record(death_v2, {source_queue :: rabbit_misc:resource_name(), - reason :: rabbit_dead_letter:reason(), - %% how many times this message was dead lettered - %% from this source_queue for this reason - count :: pos_integer(), - %% timestamp when this message was dead lettered the first time - %% from this source_queue for this reason - first_death_timestamp :: pos_integer(), - original_exchange :: rabbit_misc:resource_name(), - original_routing_keys :: [rabbit_types:routing_key(),...], - annotations :: death_v2_anns()}). +-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}. +-type death_anns() :: #{%% timestamp of the first time this message + %% was dead lettered from this queue for this reason + first_time := pos_integer(), + %% timestamp of the last time this message + %% was dead lettered from this queue for this reason + last_time := pos_integer(), + ttl => OriginalTtlHeader :: non_neg_integer()}. -%% RabbitMQ 3.13.0 - 3.13.2. --type death_v1_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}. --type death_v1_anns() :: #{first_time := non_neg_integer(), - last_time := non_neg_integer(), - ttl => OriginalExpiration :: non_neg_integer()}. -record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(), - routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()], - count = 0 :: non_neg_integer(), - anns :: death_v1_anns()}). --record(deaths, {first :: death_v1_key(), - last :: death_v1_key(), - records = #{} :: #{death_v1_key() := #death{}}}). + routing_keys :: OriginalRoutingKeys :: [rabbit_types:routing_key(),...], + %% how many times this message was dead lettered from this queue for this reason + count :: pos_integer(), + anns :: death_anns()}). + +-record(deaths, {first :: death_key(), % redundant to mc annotations x-first-death-* + last :: death_key(), % redundant to mc annotations x-last-death-* + records :: #{death_key() := #death{}} | % feature flag message_containers_deaths_v2 disabled + [{death_key(), #death{}}] % feature flag message_containers_deaths_v2 enabled + }). diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 6b60d5ff06c8..d1c80d9ca272 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -366,99 +366,75 @@ record_death(Reason, SourceQueue, ?ANN_ROUTING_KEYS := RoutingKeys} = Anns0, Timestamp = os:system_time(millisecond), Ttl = maps:get(ttl, Anns0, undefined), - - ReasonBin = atom_to_binary(Reason), DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{first_time => Timestamp, last_time => Timestamp}), - case maps:get(deaths, Anns0, undefined) of - undefined -> - case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of - true -> - record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, - RoutingKeys, Timestamp, Ttl, State); - false -> - Ds = #deaths{last = Key, - first = Key, - records = #{Key => #death{count = 1, - exchange = Exchange, - routing_keys = RoutingKeys, - anns = DeathAnns}}}, - Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin, - <<"x-first-death-queue">> => SourceQueue, - <<"x-first-death-exchange">> => Exchange, - <<"x-last-death-reason">> => ReasonBin, - <<"x-last-death-queue">> => SourceQueue, - <<"x-last-death-exchange">> => Exchange, - deaths => Ds - }, - State#?MODULE{annotations = Anns} - end; - #deaths{records = Rs} = Ds0 -> - Death = #death{count = C, - anns = DA} = maps:get(Key, Rs, - #death{exchange = Exchange, - routing_keys = RoutingKeys, - anns = DeathAnns}), - Ds = Ds0#deaths{last = Key, - records = Rs#{Key => - Death#death{count = C + 1, - anns = DA#{last_time => Timestamp}}}}, - Anns = Anns0#{deaths => Ds, + case Anns0 of + #{deaths := Deaths = #deaths{records = Rs0}} -> + Rs = if is_list(Rs0) -> + %% records are ordered by recency + case lists:keytake(Key, 1, Rs0) of + {value, {Key, D0}, Rs1} -> + D = update_death(D0, Timestamp), + [{Key, D} | Rs1]; + false -> + [{Key, #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = 1, + anns = DeathAnns}} | Rs0] + end; + is_map(Rs0) -> + case Rs0 of + #{Key := Death} -> + Rs0#{Key := update_death(Death, Timestamp)}; + _ -> + Rs0#{Key => #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = 1, + anns = DeathAnns}} + end + end, + Anns = Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason), + <<"x-last-death-queue">> := SourceQueue, + <<"x-last-death-exchange">> := Exchange, + deaths := Deaths#deaths{last = Key, + records = Rs}}, + State#?MODULE{annotations = Anns}; + _ -> + Death = #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = 1, + anns = DeathAnns}, + Rs = case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of + true -> [{Key, Death}]; + false -> #{Key => Death} + end, + ReasonBin = atom_to_binary(Reason), + Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin, + <<"x-first-death-queue">> => SourceQueue, + <<"x-first-death-exchange">> => Exchange, <<"x-last-death-reason">> => ReasonBin, <<"x-last-death-queue">> => SourceQueue, - <<"x-last-death-exchange">> => Exchange}, + <<"x-last-death-exchange">> => Exchange, + deaths => #deaths{last = Key, + first = Key, + records = Rs}}, State#?MODULE{annotations = Anns} end; record_death(Reason, SourceQueue, BasicMsg) -> mc_compat:record_death(Reason, SourceQueue, BasicMsg). -record_death_v2(SourceQueue, Reason, ReasonBin, Exchange, RoutingKeys, Timestamp, Ttl, - #?MODULE{annotations = Anns0} = State) -> - DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{}), - Anns = case Anns0 of - #{deaths_v2 := Deaths0} -> - %% deaths_v2 is ordered by recency - Deaths = case deaths_take(SourceQueue, Reason, Deaths0) of - {value, Death0 = #death_v2{count = Count}, Deaths1} -> - Death = Death0#death_v2{count = Count + 1}, - [Death | Deaths1]; - false -> - Death = #death_v2{source_queue = SourceQueue, - reason = Reason, - count = 1, - first_death_timestamp = Timestamp, - original_exchange = Exchange, - original_routing_keys = RoutingKeys, - annotations = DeathAnns}, - [Death | Deaths0] - end, - Anns0#{deaths_v2 := Deaths, - <<"x-last-death-reason">> := ReasonBin, - <<"x-last-death-queue">> := SourceQueue, - <<"x-last-death-exchange">> := Exchange}; - _ -> - Death = #death_v2{source_queue = SourceQueue, - reason = Reason, - count = 1, - first_death_timestamp = Timestamp, - original_exchange = Exchange, - original_routing_keys = RoutingKeys, - annotations = DeathAnns}, - Anns0#{deaths_v2 => [Death], - <<"x-first-death-reason">> => ReasonBin, - <<"x-first-death-queue">> => SourceQueue, - <<"x-first-death-exchange">> => Exchange, - <<"x-last-death-reason">> => ReasonBin, - <<"x-last-death-queue">> => SourceQueue, - <<"x-last-death-exchange">> => Exchange} - end, - State#?MODULE{annotations = Anns}. +update_death(#death{count = Count, + anns = DeathAnns} = Death, Timestamp) -> + Death#death{count = Count + 1, + anns = DeathAnns#{last_time := Timestamp}}. -spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean(). -is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths_v2 := Deaths}}) -> - is_cycle_v2(TargetQueue, Deaths); -is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) -> - is_cycle_v1(TargetQueue, maps:keys(Deaths#deaths.records)); +is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Recs}}}) + when is_list(Recs) -> + is_cycle_v2(TargetQueue, Recs); +is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Recs}}}) + when is_map(Recs) -> + is_cycle_v1(TargetQueue, maps:keys(Recs)); is_death_cycle(_TargetQueue, #?MODULE{}) -> false; is_death_cycle(TargetQueue, BasicMsg) -> @@ -466,9 +442,13 @@ is_death_cycle(TargetQueue, BasicMsg) -> %% Returns death queue names ordered by recency. -spec death_queue_names(state()) -> [rabbit_misc:resource_name()]. -death_queue_names(#?MODULE{annotations = #{deaths_v2 := Deaths}}) -> - lists:map(fun(#death_v2{source_queue = Q}) -> Q end, Deaths); -death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}}) -> +death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}}) + when is_list(Records) -> + lists:map(fun({{Queue, _Reason}, _Death}) -> + Queue + end, Records); +death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Records}}}) + when is_map(Records) -> proplists:get_keys(maps:keys(Records)); death_queue_names(#?MODULE{}) -> []; @@ -485,7 +465,7 @@ prepare(For, State) -> %% INTERNAL is_cycle_v2(TargetQueue, Deaths) -> - case lists:splitwith(fun(#death_v2{source_queue = SourceQueue}) -> + case lists:splitwith(fun({{SourceQueue, _Reason}, #death{}}) -> SourceQueue =/= TargetQueue end, Deaths) of {_, []} -> @@ -494,7 +474,7 @@ is_cycle_v2(TargetQueue, Deaths) -> %% There is a cycle, but we only want to drop the message %% if the cycle is "fully automatic", i.e. without a client %% expliclity rejecting the message somewhere in the cycle. - lists:all(fun(#death_v2{reason = Reason}) -> + lists:all(fun({{_SourceQueue, Reason}, #death{}}) -> Reason =/= rejected end, [H | L]) end. @@ -519,17 +499,3 @@ is_cycle_v1(Queue, [_ | Rem]) -> set_received_at_timestamp(Anns) -> Millis = os:system_time(millisecond), Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}. - -deaths_take(Queue, Reason, Deaths) -> - deaths_take(Queue, Reason, Deaths, []). - -deaths_take(Queue, - Reason, - [#death_v2{source_queue = Queue, - reason = Reason} = H | T], - Acc) -> - {value, H, lists:reverse(Acc, T)}; -deaths_take(Queue, Reason, [H|T], Acc) -> - deaths_take(Queue, Reason, T, [H|Acc]); -deaths_take(_Queue, _Reason, [], _Acc) -> - false. diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index a121b233922a..503dce6fc765 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -402,8 +402,9 @@ protocol_state_message_annotations(MA, Anns) -> maps_upsert(K, mc_util:infer_type(V), L); (<<"timestamp_in_ms">>, V, L) -> maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L); - (deaths_v2, Deaths, L) -> - Maps = encode_deaths(Deaths), + (deaths, #deaths{records = Records}, L) + when is_list(Records) -> + Maps = encode_deaths(Records), maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L); (_, _, Acc) -> Acc @@ -582,19 +583,17 @@ first_acquirer(Anns) -> encode_deaths(Deaths) -> lists:map( - fun(#death_v2{source_queue = Queue, - reason = Reason, - count = Count, - first_death_timestamp = Timestamp, - original_exchange = Exchange, - original_routing_keys = RoutingKeys, - annotations = Anns}) -> + fun({{Queue, Reason}, + #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = Count, + anns = Anns = #{first_time := FirstTime}}}) -> RKeys = [{utf8, Rk} || Rk <- RoutingKeys], Map0 = [ {{symbol, <<"queue">>}, {utf8, Queue}}, {{symbol, <<"reason">>}, {symbol, atom_to_binary(Reason)}}, {{symbol, <<"count">>}, {ulong, Count}}, - {{symbol, <<"time">>}, {timestamp, Timestamp}}, + {{symbol, <<"time">>}, {timestamp, FirstTime}}, {{symbol, <<"exchange">>}, {utf8, Exchange}}, {{symbol, <<"routing-keys">>}, {array, utf8, RKeys}} ], diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index b1895741e293..65b332d08157 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -450,10 +450,8 @@ protocol_state(#content{properties = #'P_basic'{headers = H00, H00 end, Headers0 = case Anns of - #{deaths_v2 := Deaths} -> - deaths_v2_to_headers(Deaths, H0); #{deaths := Deaths} -> - deaths_v1_to_headers(Deaths, H0); + deaths_to_headers(Deaths, H0); _ -> H0 end, @@ -576,43 +574,23 @@ from_basic_message(#basic_message{content = Content, %% Internal -deaths_v1_to_headers(#deaths{records = Records}, Headers0) -> - %% sort records by the last timestamp - List = lists:sort( - fun({_, #death{anns = #{last_time := L1}}}, - {_, #death{anns = #{last_time := L2}}}) -> - L1 =< L2 - end, maps:to_list(Records)), - Infos = lists:foldl( - fun ({{QName, Reason}, #death{anns = #{first_time := Ts} = DA, - exchange = Ex, - count = Count, - routing_keys = RoutingKeys}}, - Acc) -> - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers0)], - Ttl = maps:get(ttl, DA, undefined), - Tab = death_table(QName, Reason, Count, Ts, Ex, RKs, Ttl), - [Tab | Acc] - end, [], List), +deaths_to_headers(#deaths{records = Records}, Headers0) -> + Infos = if is_list(Records) -> + lists:map(fun death_table/1, Records); + is_map(Records) -> + %% sort records by the last timestamp + List = lists:sort( + fun({_, #death{anns = #{last_time := L1}}}, + {_, #death{anns = #{last_time := L2}}}) -> + L1 =< L2 + end, maps:to_list(Records)), + lists:foldl(fun(Record, Acc) -> + Table = death_table(Record), + [Table | Acc] + end, [], List) + end, rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos). -deaths_v2_to_headers(Deaths, Headers) -> - Infos = lists:map( - fun(#death_v2{ - source_queue = Queue, - reason = Reason, - count = Count, - first_death_timestamp = Timestamp, - original_exchange = Ex, - original_routing_keys = RKeys, - annotations = Anns}) -> - Ttl = maps:get(ttl, Anns, undefined), - death_table(Queue, Reason, Count, Timestamp, Ex, RKeys, Ttl) - end, Deaths), - rabbit_misc:set_table_value(Headers, <<"x-death">>, array, Infos). - convert_from_amqp_deaths({array, map, Maps}) -> L = lists:map( fun({map, KvList}) -> @@ -628,22 +606,29 @@ convert_from_amqp_deaths({array, map, Maps}) -> {{symbol, <<"count">>}, {ulong, Count}}, {{symbol, <<"time">>}, {timestamp, Timestamp}}, {{symbol, <<"exchange">>}, {utf8, Exchange}}, - {{symbol, <<"routing-keys">>}, {array, utf8, RKeys}} + {{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}} ] = KvList1, - RoutingKeys = [Key || {utf8, Key} <- RKeys], - death_table(Queue, Reason, Count, Timestamp, - Exchange, RoutingKeys, Ttl) + RKeys = [Key || {utf8, Key} <- RKeys0], + death_table(Queue, Reason, Exchange, RKeys, Count, Timestamp, Ttl) end, Maps), {true, {<<"x-death">>, array, L}}; convert_from_amqp_deaths(_IgnoreUnknownValue) -> false. -death_table(QName, Reason, Count, Timestamp, Exchange, RoutingKeys, Ttl) -> +death_table({{QName, Reason}, + #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = Count, + anns = DeathAnns = #{first_time := FirstTime}}}) -> + death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, + maps:get(ttl, DeathAnns, undefined)). + +death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) -> L0 = [ {<<"count">>, long, Count}, {<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)}, {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, Timestamp div 1000}, + {<<"time">>, timestamp, FirstTime div 1000}, {<<"exchange">>, longstr, Exchange}, {<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]} ],