From 6b300a2f3464496277b899682ab6bfd9448bafa1 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Sat, 4 May 2024 17:12:02 +0200 Subject: [PATCH] Fix dead lettering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What? This commit fixes #11159, #11160, #11173. # How? ## Background RabbitMQ allows to dead letter messages for four different reasons, out of which three reasons cause messages to be dead lettered automatically internally in the broker: (maxlen, expired, delivery_limit) and 1 reason is caused by an explicit client action (rejected). RabbitMQ also allows dead letter topologies. When a message is dead lettered, it is re-published to an exchange, and therefore zero to multiple target queues. These target queues can in turn dead letter messages. Hence it is possible to create a cycle of queues where messages get dead lettered endlessly, which is what we want to avoid. ## Alternative approach One approach to avoid such endless cycles is to use a similar concept of the TTL field of the IPv4 datagram, or the hop limit field of an IPv6 datagram. These fields ensure that IP packets aren't cicrulating forever in the Internet. Each router decrements this counter. If this counter reaches 0, the sender will be notified and the message gets dropped. We could use the same approach in RabbitMQ: Whenever a queue dead letters a message, a dead_letter_hop_limit field could be decremented. If this field reaches 0, the message will be dropped. Such a hop limit field could have a sensible default value, for example 32. The sender of the message could override this value. Likewise, the client rejecting a message could set a new value via the Modified outcome. Such an approach has multiple advantages: 1. No dead letter cycle detection per se needs to be performed within the broker which is a slight simplification to what we have today. 2. Simpler dead letter topologies. One very common use case is that clients re-try sending the message after some time by consuming from a dead-letter queue and rejecting the message such that the message gets republished to the original queue. Instead of requiring explicit client actions, which increases complexity, a x-message-ttl argument could be set on the dead-letter queue to automatically retry after some time. This is a big simplification because it eliminates the need of various frameworks that retry, such as https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/rabbitmq-retry.html 3. No dead letter history information needs to be compressed because there is a clear limit on how often a message gets dead lettered. Therefore, the full history including timestamps of every dead letter event will be available to clients. Disadvantages: 1. Breaks a lot of clients, even for 4.0. ## 3.12 approach Instead of decrementing a counter, the approach up to 3.12 has been to drop the message if the message cycled automatically. A message cycled automatically if no client expliclity rejected the message, i.e. the mesage got dead lettered due to maxlen, expired, or delivery_limit, but not due to rejected. In this approach, the broker must be able to detect such cycles reliably. Reliably detecting dead letter cycles broke in 3.13 due to #11159 and #11160. To reliably detect cycles, the broker must be able to obtain the exact order of dead letter events for a given message. In 3.13.0 - 3.13.2, the order cannot exactly be determined because wall clock time is used to record the death time. This commit uses the same approach as done in 3.12: a list ordered by death recency is used with the most recent death at the head of the list. To not grow this list endlessly (for example when a client rejects the same message hundreds of times), this list should be compacted. This commit, like 3.12, compacts by tuple `{Queue, Reason}`: If this message got already dead lettered from this Queue for this Reason, then only a counter is incremented and the element is moved to the front of the list. ## Streams & AMQP 1.0 clients Dead lettering from a stream doesn't make sense because: 1. a client cannot reject a message from a stream since the stream must maintain the total order of events to be consumed by multiple clients. 2. TTL is implemented by Stream retention where only old Stream segments are automatically deleted (or archived in the future). 3. same applies to maxlen Although messages cannot be dead lettered **from** a stream, messages can be dead lettered **into** a stream. This commit provides clients consuming from a stream the death history: #11173 Additionally, this commit provides AMQP 1.0 clients the death history via message annotation `x-opt-deaths` which contains the same information as AMQP 0.9.1 header `x-death`. Both, storing the death history in a stream and providing death history to an AMQP 1.0 client, use the same encoding: a message annoation `x-opt-deaths` that contains an array of maps ordered by death recency. The information encoded is the same as in the AMQP 0.9.1 x-death header. Instead of providing an array of maps, a better approach could be to use an array of a custom AMQP death type, such as: ```xml
``` However, encoding and decoding custom AMQP types that are nested within arrays which in turn are nested within the message annotation map can be difficult for clients and the broker. Also, each client will need to know the custom AMQP type. For now, therefore we use an array of maps. ## Feature flag The new way to record death information is done via mc annotation `deaths_v2`. Because old nodes do not know this new annotation, recording death information via mc annotation `deaths_v2` is hidden behind a new feature flag `message_containers_deaths_v2`. If this feature flag is disabled, a message will continue to use the 3.13.0 - 3.13.2 way to record death information in mc annotation `deaths`, or even the older way within `x-death` header directly if feature flag message_containers is also disabled. Only if feature flag `message_containers_deaths_v2` is enabled and this message hasn't been dead lettered before, will the new mc annotation `deaths_v2` be used. --- deps/rabbit/BUILD.bazel | 8 +- deps/rabbit/app.bzl | 9 + deps/rabbit/include/mc.hrl | 36 +-- deps/rabbit/src/mc.erl | 175 ++++++++------ deps/rabbit/src/mc_amqp.erl | 77 +++--- deps/rabbit/src/mc_amqpl.erl | 110 ++++++--- deps/rabbit/src/mc_compat.erl | 25 +- deps/rabbit/src/rabbit_core_ff.erl | 8 + deps/rabbit/src/rabbit_dead_letter.erl | 61 ++--- deps/rabbit/src/rabbit_fifo_dlx_worker.erl | 35 ++- deps/rabbit/test/amqp_client_SUITE.erl | 220 ++++++++++++++++- deps/rabbit/test/dead_lettering_SUITE.erl | 221 +++++++++++++++--- deps/rabbit/test/mc_unit_SUITE.erl | 87 ++++++- .../message_containers_deaths_v2_SUITE.erl | 127 ++++++++++ 14 files changed, 926 insertions(+), 273 deletions(-) create mode 100644 deps/rabbit/test/message_containers_deaths_v2_SUITE.erl diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 80891cf7382c..6ba91e324d61 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -391,7 +391,13 @@ rabbitmq_integration_suite( additional_beam = [ ":test_queue_utils_beam", ], - shard_count = 7, + shard_count = 8, +) + +rabbitmq_integration_suite( + name = "message_containers_deaths_v2_SUITE", + size = "medium", + shard_count = 1, ) rabbitmq_integration_suite( diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 6d18557b546b..177b49355c2e 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -2219,3 +2219,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"], ) + erlang_bytecode( + name = "message_containers_deaths_v2_SUITE_beam_files", + testonly = True, + srcs = ["test/message_containers_deaths_v2_SUITE.erl"], + outs = ["test/message_containers_deaths_v2_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], + ) diff --git a/deps/rabbit/include/mc.hrl b/deps/rabbit/include/mc.hrl index 2d5f5b396d19..abe38fb7c617 100644 --- a/deps/rabbit/include/mc.hrl +++ b/deps/rabbit/include/mc.hrl @@ -1,17 +1,3 @@ --type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}. --type death_anns() :: #{first_time := non_neg_integer(), %% the timestamp of the first - last_time := non_neg_integer(), %% the timestamp of the last - ttl => OriginalExpiration :: non_neg_integer()}. --record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(), - routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()], - count = 0 :: non_neg_integer(), - anns :: death_anns()}). - --record(deaths, {first :: death_key(), - last :: death_key(), - records = #{} :: #{death_key() := #death{}}}). - - %% good enough for most use cases -define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5). @@ -26,3 +12,25 @@ -define(ANN_RECEIVED_AT_TIMESTAMP, rts). -define(ANN_DURABLE, d). -define(ANN_PRIORITY, p). + +-define(FF_MC_DEATHS_V2, message_containers_deaths_v2). + +-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}. +-type death_anns() :: #{%% timestamp of the first time this message + %% was dead lettered from this queue for this reason + first_time := pos_integer(), + %% timestamp of the last time this message + %% was dead lettered from this queue for this reason + last_time := pos_integer(), + ttl => OriginalTtlHeader :: non_neg_integer()}. + +-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(), + routing_keys :: OriginalRoutingKeys :: [rabbit_types:routing_key(),...], + %% how many times this message was dead lettered from this queue for this reason + count :: pos_integer(), + anns :: death_anns()}). + +-record(deaths, {first :: death_key(), % redundant to mc annotations x-first-death-* + last :: death_key(), % redundant to mc annotations x-last-death-* + records :: #{death_key() := #death{}} + }). diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index bd7679e7345d..f9bafd949a66 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -34,9 +34,8 @@ convert/3, protocol_state/1, prepare/2, - record_death/3, + record_death/4, is_death_cycle/2, - last_death/1, death_queue_names/1 ]). @@ -356,89 +355,103 @@ protocol_state(BasicMsg) -> mc_compat:protocol_state(BasicMsg). -spec record_death(rabbit_dead_letter:reason(), - SourceQueue :: rabbit_misc:resource_name(), - state()) -> state(). + rabbit_misc:resource_name(), + state(), + environment()) -> state(). record_death(Reason, SourceQueue, - #?MODULE{protocol = _Mod, - data = _Data, - annotations = Anns0} = State) - when is_atom(Reason) andalso is_binary(SourceQueue) -> + #?MODULE{annotations = Anns0} = State, + Env) + when is_atom(Reason) andalso + is_binary(SourceQueue) -> Key = {SourceQueue, Reason}, #{?ANN_EXCHANGE := Exchange, ?ANN_ROUTING_KEYS := RoutingKeys} = Anns0, Timestamp = os:system_time(millisecond), Ttl = maps:get(ttl, Anns0, undefined), - - ReasonBin = atom_to_binary(Reason), - DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{first_time => Timestamp, - last_time => Timestamp}), - case maps:get(deaths, Anns0, undefined) of - undefined -> - Ds = #deaths{last = Key, - first = Key, - records = #{Key => #death{count = 1, - exchange = Exchange, - routing_keys = RoutingKeys, - anns = DeathAnns}}}, - Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin, + DeathAnns = rabbit_misc:maps_put_truthy( + ttl, Ttl, #{first_time => Timestamp, + last_time => Timestamp}), + NewDeath = #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = 1, + anns = DeathAnns}, + Anns = case Anns0 of + #{deaths := Deaths0} -> + Deaths = case Deaths0 of + #deaths{records = Rs0} -> + Rs = maps:update_with( + Key, + fun(Death) -> + update_death(Death, Timestamp) + end, + NewDeath, + Rs0), + Deaths0#deaths{last = Key, + records = Rs}; + _ -> + %% Deaths are ordered by recency + case lists:keytake(Key, 1, Deaths0) of + {value, {Key, D0}, Deaths1} -> + D = update_death(D0, Timestamp), + [{Key, D} | Deaths1]; + false -> + [{Key, NewDeath} | Deaths0] + end + end, + Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason), + <<"x-last-death-queue">> := SourceQueue, + <<"x-last-death-exchange">> := Exchange, + deaths := Deaths}; + _ -> + Deaths = case Env of + #{?FF_MC_DEATHS_V2 := false} -> + #deaths{last = Key, + first = Key, + records = #{Key => NewDeath}}; + _ -> + [{Key, NewDeath}] + end, + ReasonBin = atom_to_binary(Reason), + Anns0#{<<"x-first-death-reason">> => ReasonBin, <<"x-first-death-queue">> => SourceQueue, <<"x-first-death-exchange">> => Exchange, <<"x-last-death-reason">> => ReasonBin, <<"x-last-death-queue">> => SourceQueue, - <<"x-last-death-exchange">> => Exchange - }, - - State#?MODULE{annotations = Anns#{deaths => Ds}}; - #deaths{records = Rs} = Ds0 -> - Death = #death{count = C, - anns = DA} = maps:get(Key, Rs, - #death{exchange = Exchange, - routing_keys = RoutingKeys, - anns = DeathAnns}), - Ds = Ds0#deaths{last = Key, - records = Rs#{Key => - Death#death{count = C + 1, - anns = DA#{last_time => Timestamp}}}}, - Anns = Anns0#{deaths => Ds, - <<"x-last-death-reason">> => ReasonBin, - <<"x-last-death-queue">> => SourceQueue, - <<"x-last-death-exchange">> => Exchange}, - State#?MODULE{annotations = Anns} - end; -record_death(Reason, SourceQueue, BasicMsg) -> - mc_compat:record_death(Reason, SourceQueue, BasicMsg). - + <<"x-last-death-exchange">> => Exchange, + deaths => Deaths} + end, + State#?MODULE{annotations = Anns}; +record_death(Reason, SourceQueue, BasicMsg, Env) -> + mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env). + +update_death(#death{count = Count, + anns = DeathAnns} = Death, Timestamp) -> + Death#death{count = Count + 1, + anns = DeathAnns#{last_time := Timestamp}}. -spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean(). +is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Rs}}}) -> + is_cycle_v1(TargetQueue, maps:keys(Rs)); is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) -> - is_cycle(TargetQueue, maps:keys(Deaths#deaths.records)); + is_cycle_v2(TargetQueue, Deaths); is_death_cycle(_TargetQueue, #?MODULE{}) -> false; is_death_cycle(TargetQueue, BasicMsg) -> mc_compat:is_death_cycle(TargetQueue, BasicMsg). +%% Returns death queue names ordered by recency. -spec death_queue_names(state()) -> [rabbit_misc:resource_name()]. -death_queue_names(#?MODULE{annotations = Anns}) -> - case maps:get(deaths, Anns, undefined) of - undefined -> - []; - #deaths{records = Records} -> - proplists:get_keys(maps:keys(Records)) - end; +death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Rs}}}) -> + proplists:get_keys(maps:keys(Rs)); +death_queue_names(#?MODULE{annotations = #{deaths := Deaths}}) -> + lists:map(fun({{Queue, _Reason}, _Death}) -> + Queue + end, Deaths); +death_queue_names(#?MODULE{}) -> + []; death_queue_names(BasicMsg) -> mc_compat:death_queue_names(BasicMsg). --spec last_death(state()) -> - undefined | {death_key(), #death{}}. -last_death(#?MODULE{annotations = Anns}) - when not is_map_key(deaths, Anns) -> - undefined; -last_death(#?MODULE{annotations = #{deaths := #deaths{last = Last, - records = Rs}}}) -> - {Last, maps:get(Last, Rs)}; -last_death(BasicMsg) -> - mc_compat:last_death(BasicMsg). - -spec prepare(read | store, state()) -> state(). prepare(For, #?MODULE{protocol = Proto, data = Data} = State) -> @@ -448,24 +461,38 @@ prepare(For, State) -> %% INTERNAL -%% if there is a death with a source queue that is the same as the target +is_cycle_v2(TargetQueue, Deaths) -> + case lists:splitwith(fun({{SourceQueue, _Reason}, #death{}}) -> + SourceQueue =/= TargetQueue + end, Deaths) of + {_, []} -> + false; + {L, [H | _]} -> + %% There is a cycle, but we only want to drop the message + %% if the cycle is "fully automatic", i.e. without a client + %% expliclity rejecting the message somewhere in the cycle. + lists:all(fun({{_SourceQueue, Reason}, _Death}) -> + Reason =/= rejected + end, [H | L]) + end. + +%% The desired v1 behaviour is the following: +%% "If there is a death with a source queue that is the same as the target %% queue name and there are no newer deaths with the 'rejected' reason then -%% consider this a cycle -is_cycle(_Queue, []) -> +%% consider this a cycle." +%% However, the correct death order cannot be reliably determined in v1. +%% deaths_v2 fixes this bug. +is_cycle_v1(_Queue, []) -> false; -is_cycle(_Queue, [{_Q, rejected} | _]) -> +is_cycle_v1(_Queue, [{_Q, rejected} | _]) -> %% any rejection breaks the cycle false; -is_cycle(Queue, [{Queue, Reason} | _]) +is_cycle_v1(Queue, [{Queue, Reason} | _]) when Reason =/= rejected -> true; -is_cycle(Queue, [_ | Rem]) -> - is_cycle(Queue, Rem). +is_cycle_v1(Queue, [_ | Rem]) -> + is_cycle_v1(Queue, Rem). set_received_at_timestamp(Anns) -> Millis = os:system_time(millisecond), Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}. - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). --endif. diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index a0cba333e69b..3a90e2879842 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -402,6 +402,10 @@ protocol_state_message_annotations(MA, Anns) -> maps_upsert(K, mc_util:infer_type(V), L); (<<"timestamp_in_ms">>, V, L) -> maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L); + (deaths, Deaths, L) + when is_list(Deaths) -> + Maps = encode_deaths(Deaths), + maps_upsert(<<"x-opt-deaths">>, {array, map, Maps}, L); (_, _, Acc) -> Acc end, MA, Anns). @@ -569,13 +573,6 @@ msg_body_encoded([{{pos, Pos}, {body, Code}}], BarePos, Msg) binary_part_bare_and_footer(Payload, Start) -> binary_part(Payload, Start, byte_size(Payload) - Start). -key_find(K, [{{_, K}, {_, V}} | _]) -> - V; -key_find(K, [_ | Rem]) -> - key_find(K, Rem); -key_find(_K, []) -> - undefined. - -spec first_acquirer(mc:annotations()) -> boolean(). first_acquirer(Anns) -> Redelivered = case Anns of @@ -584,48 +581,38 @@ first_acquirer(Anns) -> end, not Redelivered. -recover_deaths([], Acc) -> - Acc; -recover_deaths([{map, Kvs} | Rem], Acc) -> - Queue = key_find(<<"queue">>, Kvs), - Reason = binary_to_existing_atom(key_find(<<"reason">>, Kvs)), - DA0 = case key_find(<<"original-expiration">>, Kvs) of - undefined -> - #{}; - Exp -> - #{ttl => binary_to_integer(Exp)} - end, - RKeys = [RK || {_, RK} <- key_find(<<"routing-keys">>, Kvs)], - Ts = key_find(<<"time">>, Kvs), - DA = DA0#{first_time => Ts, - last_time => Ts}, - recover_deaths(Rem, - Acc#{{Queue, Reason} => - #death{anns = DA, - exchange = key_find(<<"exchange">>, Kvs), - count = key_find(<<"count">>, Kvs), - routing_keys = RKeys}}). +encode_deaths(Deaths) -> + lists:map( + fun({{Queue, Reason}, + #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = Count, + anns = Anns = #{first_time := FirstTime, + last_time := LastTime}}}) -> + RKeys = [{utf8, Rk} || Rk <- RoutingKeys], + Map0 = [ + {{symbol, <<"queue">>}, {utf8, Queue}}, + {{symbol, <<"reason">>}, {symbol, atom_to_binary(Reason)}}, + {{symbol, <<"count">>}, {ulong, Count}}, + {{symbol, <<"first-time">>}, {timestamp, FirstTime}}, + {{symbol, <<"last-time">>}, {timestamp, LastTime}}, + {{symbol, <<"exchange">>}, {utf8, Exchange}}, + {{symbol, <<"routing-keys">>}, {array, utf8, RKeys}} + ], + Map = case Anns of + #{ttl := Ttl} -> + [{{symbol, <<"ttl">>}, {uint, Ttl}} | Map0]; + _ -> + Map0 + end, + {map, Map} + end, Deaths). essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) -> Durable = get_property(durable, Msg), Priority = get_property(priority, Msg), Timestamp = get_property(timestamp, Msg), Ttl = get_property(ttl, Msg), - - Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of - {list, DeathMaps} -> - %% TODO: make more correct? - Def = {utf8, <<>>}, - {utf8, FstQ} = message_annotation(<<"x-first-death-queue">>, Msg, Def), - {utf8, FstR} = message_annotation(<<"x-first-death-reason">>, Msg, Def), - {utf8, LastQ} = message_annotation(<<"x-last-death-queue">>, Msg, Def), - {utf8, LastR} = message_annotation(<<"x-last-death-reason">>, Msg, Def), - #deaths{first = {FstQ, binary_to_existing_atom(FstR)}, - last = {LastQ, binary_to_existing_atom(LastR)}, - records = recover_deaths(DeathMaps, #{})}; - _ -> - undefined - end, Anns0 = #{?ANN_DURABLE => Durable}, Anns = maps_put_truthy( ?ANN_PRIORITY, Priority, @@ -633,9 +620,7 @@ essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) -> ?ANN_TIMESTAMP, Timestamp, maps_put_truthy( ttl, Ttl, - maps_put_truthy( - deaths, Deaths, - Anns0)))), + Anns0))), case MA of [] -> Anns; diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 5d5f67d8eb0b..ac5bb31a8732 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -157,6 +157,8 @@ convert_from(mc_amqp, Sections, Env) -> {true, to_091(<<"CC">>, V)}; ({{symbol, <<"x-opt-rabbitmq-received-time">>}, {timestamp, Ts}}) -> {true, {<<"timestamp_in_ms">>, long, Ts}}; + ({{symbol, <<"x-opt-deaths">>}, V}) -> + convert_from_amqp_deaths(V); ({{symbol, <<"x-", _/binary>> = K}, V}) when ?IS_SHORTSTR_LEN(K) -> case is_internal_header(K) of @@ -447,8 +449,12 @@ protocol_state(#content{properties = #'P_basic'{headers = H00, _ -> H00 end, - Deaths = maps:get(deaths, Anns, undefined), - Headers0 = deaths_to_headers(Deaths, H0), + Headers0 = case Anns of + #{deaths := Deaths} -> + deaths_to_headers(Deaths, H0); + _ -> + H0 + end, Headers1 = maps:fold( fun (<<"x-", _/binary>> = Key, Val, H) when is_integer(Val) -> [{Key, long, Val} | H]; @@ -568,44 +574,74 @@ from_basic_message(#basic_message{content = Content, %% Internal -deaths_to_headers(undefined, Headers) -> - Headers; -deaths_to_headers(#deaths{records = Records}, Headers0) -> - %% sort records by the last timestamp - List = lists:sort( - fun({_, #death{anns = #{last_time := L1}}}, - {_, #death{anns = #{last_time := L2}}}) -> - L1 < L2 - end, maps:to_list(Records)), - Infos = lists:foldl( - fun ({{QName, Reason}, #death{anns = #{first_time := Ts} = DA, - exchange = Ex, - count = Count, - routing_keys = RoutingKeys}}, - Acc) -> - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers0)], - RKeys = [{longstr, Key} || Key <- RKs], - ReasonBin = atom_to_binary(Reason, utf8), - PerMsgTTL = case maps:get(ttl, DA, undefined) of - undefined -> []; - Ttl when is_integer(Ttl) -> - Expiration = integer_to_binary(Ttl), - [{<<"original-expiration">>, longstr, - Expiration}] - end, - [{table, [{<<"count">>, long, Count}, - {<<"reason">>, longstr, ReasonBin}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, Ts div 1000}, - {<<"exchange">>, longstr, Ex}, - {<<"routing-keys">>, array, RKeys}] ++ PerMsgTTL} - | Acc] - end, [], List), +deaths_to_headers(Deaths, Headers0) -> + Infos = case Deaths of + #deaths{records = Records} -> + %% sort records by the last timestamp + List = lists:sort( + fun({_, #death{anns = #{last_time := L1}}}, + {_, #death{anns = #{last_time := L2}}}) -> + L1 =< L2 + end, maps:to_list(Records)), + lists:foldl(fun(Record, Acc) -> + Table = death_table(Record), + [Table | Acc] + end, [], List); + _ -> + lists:map(fun death_table/1, Deaths) + end, rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos). +convert_from_amqp_deaths({array, map, Maps}) -> + L = lists:map( + fun({map, KvList}) -> + {Ttl, KvList1} = case KvList of + [{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] -> + {Ttl0, Tail}; + _ -> + {undefined, KvList} + end, + [ + {{symbol, <<"queue">>}, {utf8, Queue}}, + {{symbol, <<"reason">>}, {symbol, Reason}}, + {{symbol, <<"count">>}, {ulong, Count}}, + {{symbol, <<"first-time">>}, {timestamp, FirstTime}}, + {{symbol, <<"last-time">>}, {timestamp, _LastTime}}, + {{symbol, <<"exchange">>}, {utf8, Exchange}}, + {{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}} + ] = KvList1, + RKeys = [Key || {utf8, Key} <- RKeys0], + death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl) + end, Maps), + {true, {<<"x-death">>, array, L}}; +convert_from_amqp_deaths(_IgnoreUnknownValue) -> + false. +death_table({{QName, Reason}, + #death{exchange = Exchange, + routing_keys = RoutingKeys, + count = Count, + anns = DeathAnns = #{first_time := FirstTime}}}) -> + death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, + maps:get(ttl, DeathAnns, undefined)). + +death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) -> + L0 = [ + {<<"count">>, long, Count}, + {<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, FirstTime div 1000}, + {<<"exchange">>, longstr, Exchange}, + {<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]} + ], + L = case Ttl of + undefined -> + L0; + _ -> + Expiration = integer_to_binary(Ttl), + [{<<"original-expiration">>, longstr, Expiration} | L0] + end, + {table, L}. strip_header(#content{properties = #'P_basic'{headers = undefined}} = DecodedContent, _Key) -> diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index 8a2a407ceaae..702f8c0f64ca 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -26,10 +26,9 @@ protocol_state/1, %serialize/1, prepare/2, - record_death/3, + record_death/4, is_death_cycle/2, %deaths/1, - last_death/1, death_queue_names/1 ]). @@ -156,7 +155,7 @@ prepare(store, Msg) -> record_death(Reason, SourceQueue, #basic_message{content = Content, exchange_name = Exchange, - routing_keys = RoutingKeys} = Msg) -> + routing_keys = RoutingKeys} = Msg, _Env) -> % HeadersFun1 = fun (H) -> lists:keydelete(<<"CC">>, 1, H) end, ReasonBin = atom_to_binary(Reason), TimeSec = os:system_time(seconds), @@ -361,26 +360,6 @@ death_queue_names(#basic_message{content = Content}) -> [] end. -last_death(#basic_message{content = Content}) -> - #content{properties = #'P_basic'{headers = Headers}} = - rabbit_binary_parser:ensure_content_decoded(Content), - %% TODO: review this conversion and/or change the API - case rabbit_misc:table_lookup(Headers, <<"x-death">>) of - {array, [{table, Info} | _]} -> - X = x_death_event_key(Info, <<"exchange">>), - Q = x_death_event_key(Info, <<"queue">>), - T = x_death_event_key(Info, <<"time">>, 0), - Keys = x_death_event_key(Info, <<"routing_keys">>), - Count = x_death_event_key(Info, <<"count">>), - {Q, #death{exchange = X, - anns = #{first_time => T * 1000, - last_time => T * 1000}, - routing_keys = Keys, - count = Count}}; - _ -> - undefined - end. - get_property(P, #content{properties = none} = Content) -> %% this is inefficient but will only apply to old messages that are %% not containerized diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 8e6b4969568c..01e8a8dc544d 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -177,3 +177,11 @@ stability => stable, depends_on => [message_containers] }}). + +-rabbit_feature_flag( + {message_containers_deaths_v2, + #{desc => "Bug fix for dead letter cycle detection", + doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/11159", + stability => stable, + depends_on => [message_containers] + }}). diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index c7647a5f1a36..a8c6b4515eda 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -31,46 +31,53 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK, _ -> [RK] end, - Msg1 = mc:record_death(Reason, SourceQName, Msg0), + Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of + true -> #{}; + false -> #{?FF_MC_DEATHS_V2 => false} + end, + Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env), {Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1), Msg3 = mc:set_ttl(Ttl, Msg2), Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3), DLMsg = mc:set_annotation(?ANN_EXCHANGE, XName#resource.name, Msg4), - Routed = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}), - {QNames, Cycles} = detect_cycles(Reason, DLMsg, Routed), + Routed0 = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}), + {Cycles, Routed} = detect_cycles(Reason, DLMsg, Routed0), lists:foreach(fun log_cycle_once/1, Cycles), - Qs0 = rabbit_amqqueue:lookup_many(QNames), + Qs0 = rabbit_amqqueue:lookup_many(Routed), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), _ = rabbit_queue_type:deliver(Qs, DLMsg, #{}, stateless), ok. detect_cycles(rejected, _Msg, Queues) -> - {Queues, []}; + %% shortcut + {[], Queues}; detect_cycles(_Reason, Msg, Queues) -> - {Cycling, NotCycling} = - lists:partition(fun (#resource{name = Queue}) -> - mc:is_death_cycle(Queue, Msg); - ({#resource{name = Queue}, _RouteInfos}) -> - mc:is_death_cycle(Queue, Msg) - end, Queues), - DeathQueues = mc:death_queue_names(Msg), - CycleKeys = lists:foldl(fun(#resource{name = Q}, Acc) -> - [Q | Acc]; - ({#resource{name = Q}, _RouteInfos}, Acc) -> - [Q | Acc] - end, DeathQueues, Cycling), - {NotCycling, CycleKeys}. + {Cycling, + NotCycling} = lists:partition(fun(#resource{name = Queue}) -> + mc:is_death_cycle(Queue, Msg); + ({#resource{name = Queue}, _RouteInfos}) -> + mc:is_death_cycle(Queue, Msg) + end, Queues), + Names = mc:death_queue_names(Msg), + Cycles = lists:map(fun(#resource{name = Q}) -> + [Q | Names]; + ({#resource{name = Q}, _RouteInfos}) -> + [Q | Names] + end, Cycling), + {Cycles, NotCycling}. -log_cycle_once(Queues) -> - %% using a hash won't eliminate this as a potential memory leak but it will - %% reduce the potential amount of memory used whilst probably being - %% "good enough" - Key = {queue_cycle, erlang:phash2(Queues)}, +log_cycle_once(Cycle) -> + %% Using a hash won't eliminate this as a potential memory leak but it will + %% reduce the potential amount of memory used whilst probably being "good enough". + Key = {dead_letter_cycle, erlang:phash2(Cycle)}, case get(Key) of - true -> ok; + true -> + ok; undefined -> - rabbit_log:warning("Message dropped. Dead-letter queues cycle detected" - ": ~tp~nThis cycle will NOT be reported again.", - [Queues]), + rabbit_log:warning( + "Message dropped because the following list of queues (ordered by " + "death recency) contains a dead letter cycle without reason 'rejected'. " + "This list will not be logged again: ~tp", + [Cycle]), put(Key, true) end. diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index 01e97e7dfaab..f96d8de1e491 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -46,6 +46,8 @@ consumed_msg_id :: non_neg_integer(), delivery :: mc:state(), reason :: rabbit_dead_letter:reason(), + %% routing keys (including CC keys) the message was published with to the source quorum queue + original_routing_keys :: [rabbit_types:routing_key(),...], %% target queues for which publisher confirm has not been received yet unsettled = [] :: [rabbit_amqqueue:name()], %% target queues for which publisher rejection was received recently @@ -315,13 +317,18 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, Now = os:system_time(millisecond), #resource{name = SourceQName} = ConsumedQRef, #resource{name = DLXName} = DLXRef, + OriginalRoutingKeys = mc:routing_keys(ConsumedMsg), DLRKeys = case RKey of undefined -> - mc:routing_keys(ConsumedMsg); + OriginalRoutingKeys; _ -> [RKey] end, - Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg), + Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of + true -> #{}; + false -> #{?FF_MC_DEATHS_V2 => false} + end, + Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env), Msg1 = mc:set_ttl(undefined, Msg0), Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1), Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2), @@ -331,7 +338,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, {[], State0}; _ -> RouteToQs0 = rabbit_exchange:route(DLX, Msg), - {RouteToQs1, Cycles} = rabbit_dead_letter:detect_cycles( + {Cycles, RouteToQs1} = rabbit_dead_letter:detect_cycles( Reason, Msg, RouteToQs0), State1 = log_cycles(Cycles, [RKey], State0), RouteToQs2 = rabbit_amqqueue:lookup_many(RouteToQs1), @@ -347,7 +354,8 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, Pend0 = #pending{consumed_msg_id = ConsumedMsgId, consumed_at = Now, delivery = Msg, - reason = Reason}, + reason = Reason, + original_routing_keys = OriginalRoutingKeys}, case TargetQs of [] -> %% We can't deliver this message since there is no target queue we can route to. @@ -453,15 +461,18 @@ redeliver_messages(#state{pendings = Pendings, end, State, Pendings) end. -redeliver(#pending{delivery = Msg} = Pend, - DLX, OutSeq, #state{routing_key = undefined} = State) -> +redeliver(#pending{original_routing_keys = RKeys} = Pend, + DLX, + OutSeq, + #state{routing_key = undefined} = State) -> %% No dead-letter-routing-key defined for source quorum queue. %% Therefore use all of messages's original routing keys (which can include CC and BCC recipients). %% This complies with the behaviour of the rabbit_dead_letter module. - %% We stored these original routing keys in the 1st (i.e. most recent) x-death entry. - {_, #death{routing_keys = Routes}} = mc:last_death(Msg), - redeliver0(Pend, DLX, Routes, OutSeq, State); -redeliver(Pend, DLX, OutSeq, #state{routing_key = DLRKey} = State) -> + redeliver0(Pend, DLX, RKeys, OutSeq, State); +redeliver(Pend, + DLX, + OutSeq, + #state{routing_key = DLRKey} = State) -> redeliver0(Pend, DLX, [DLRKey], OutSeq, State). redeliver0(#pending{delivery = Msg0, @@ -501,7 +512,7 @@ redeliver0(#pending{delivery = Msg0, %% Note that a quorum queue client does not redeliver on our behalf if it previously %% rejected the message. This is why we always redeliver rejected messages here. RouteToQs1 = Unsettled -- clients_redeliver(Unsettled0, QTypeState), - {RouteToQs, Cycles} = rabbit_dead_letter:detect_cycles(Reason, Msg, RouteToQs1), + {Cycles, RouteToQs} = rabbit_dead_letter:detect_cycles(Reason, Msg, RouteToQs1), State1 = log_cycles(Cycles, DLRKeys, State0), case RouteToQs of [] -> @@ -599,6 +610,7 @@ format_status(_Opt, [_PDict, #state{ format_pending(#pending{consumed_msg_id = ConsumedMsgId, delivery = _DoNotLogLargeBinary, reason = Reason, + original_routing_keys = OriginalRoutingKeys, unsettled = Unsettled, rejected = Rejected, settled = Settled, @@ -607,6 +619,7 @@ format_pending(#pending{consumed_msg_id = ConsumedMsgId, consumed_at = ConsumedAt}) -> #{consumed_msg_id => ConsumedMsgId, reason => Reason, + original_routing_keys => OriginalRoutingKeys, unsettled => Unsettled, rejected => Rejected, settled => Settled, diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 295ac15b118b..4f875ab835c9 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -105,11 +105,13 @@ groups() -> attach_to_exclusive_queue, classic_priority_queue, dead_letter_headers_exchange, + dead_letter_reject, immutable_bare_message ]}, {cluster_size_3, [shuffle], [ + dead_letter_into_stream, last_queue_confirms, target_queue_deleted, target_classic_queue_down, @@ -185,6 +187,22 @@ init_per_testcase(T = immutable_bare_message, Config) -> {skip, "RabbitMQ is known to wrongfully modify the bare message with feature " "flag message_containers_store_amqp_v1 disabled"} end; +init_per_testcase(T = dead_letter_into_stream, Config) -> + case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_deaths_v2]) of + true -> + rabbit_ct_helpers:testcase_started(Config, T); + false -> + {skip, "This test is known to fail with feature flag message_containers_deaths_v2 disabled " + "due to missing feature https://github.com/rabbitmq/rabbitmq-server/issues/11173"} + end; +init_per_testcase(T = dead_letter_reject, Config) -> + case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_deaths_v2]) of + true -> + rabbit_ct_helpers:testcase_started(Config, T); + false -> + {skip, "This test is known to fail with feature flag message_containers_deaths_v2 disabled " + "due bug https://github.com/rabbitmq/rabbitmq-server/issues/11159"} + end; init_per_testcase(T, Config) when T =:= classic_queue_on_new_node orelse T =:= quorum_queue_on_new_node -> @@ -3497,6 +3515,7 @@ dead_letter_headers_exchange(Config) -> #{<<"x-my key">> => 6}, amqp10_msg:new(<<"tag 4">>, <<"m4">>, false)), + Now = os:system_time(millisecond), [ok = amqp10_client:send_msg(Sender, M) || M <- [M1, M2, M3, M4]], ok = wait_for_accepts(4), flush(accepted), @@ -3507,9 +3526,36 @@ dead_letter_headers_exchange(Config) -> ?assertEqual(<<"m2">>, amqp10_msg:body_bin(Msg2)), ?assertEqual(#{message_id => <<"my ID">>}, amqp10_msg:properties(Msg1)), ?assertEqual(0, maps:size(amqp10_msg:properties(Msg2))), + case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_deaths_v2]) of + true -> + ?assertMatch( + #{<<"x-first-death-queue">> := QName1, + <<"x-first-death-exchange">> := <<>>, + <<"x-first-death-reason">> := <<"expired">>, + <<"x-last-death-queue">> := QName1, + <<"x-last-death-exchange">> := <<>>, + <<"x-last-death-reason">> := <<"expired">>, + <<"x-opt-deaths">> := {array, + map, + [{map, + [ + {{symbol, <<"queue">>}, {utf8, QName1}}, + {{symbol, <<"reason">>}, {symbol, <<"expired">>}}, + {{symbol, <<"count">>}, {ulong, 1}}, + {{symbol, <<"first-time">>}, {timestamp, Timestamp}}, + {{symbol, <<"last-time">>}, {timestamp, Timestamp}}, + {{symbol, <<"exchange">>},{utf8, <<>>}}, + {{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}} + ]}]} + } when is_integer(Timestamp) andalso + Timestamp > Now - 5000 andalso + Timestamp < Now + 5000, + amqp10_msg:message_annotations(Msg1)); + false -> + ok + end, %% We expect M3 and M4 to get dropped. - %% Since the queue has no messages yet, we shouldn't receive any message. receive Unexp -> ct:fail({unexpected, Unexp}) after 10 -> ok end, @@ -3522,6 +3568,173 @@ dead_letter_headers_exchange(Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). +dead_letter_reject(Config) -> + {Connection, Session, LinkPair} = init(Config), + QName1 = <<"q1">>, + QName2 = <<"q2">>, + QName3 = <<"q3">>, + {ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + QName1, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-message-ttl">> => {ulong, 20}, + <<"x-dead-letter-exchange">> => {utf8, <<>>}, + <<"x-dead-letter-routing-key">> => {utf8, QName2} + }}), + {ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + QName2, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-exchange">> => {utf8, <<>>}, + <<"x-dead-letter-routing-key">> => {utf8, QName3} + }}), + {ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue( + LinkPair, + QName3, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"classic">>}, + <<"x-message-ttl">> => {ulong, 20}, + <<"x-dead-letter-exchange">> => {utf8, <<>>}, + <<"x-dead-letter-routing-key">> => {utf8, QName1} + }}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, <<"/queue/", QName2/binary>>, unsettled), + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"sender">>, <<"/queue/", QName1/binary>>, unsettled), + wait_for_credit(Sender), + Tag = <<"my tag">>, + Body = <<"my body">>, + M = amqp10_msg:new(Tag, Body), + ok = amqp10_client:send_msg(Sender, M), + ok = wait_for_settlement(Tag), + + {ok, Msg1} = amqp10_client:get_msg(Receiver), + ok = amqp10_client:settle_msg(Receiver, Msg1, rejected), + {ok, Msg2} = amqp10_client:get_msg(Receiver), + ok = amqp10_client:settle_msg(Receiver, Msg2, rejected), + {ok, Msg3} = amqp10_client:get_msg(Receiver), + ok = amqp10_client:settle_msg(Receiver, Msg3, accepted), + ?assertEqual(Body, amqp10_msg:body_bin(Msg3)), + Annotations = amqp10_msg:message_annotations(Msg3), + ?assertMatch( + #{<<"x-first-death-queue">> := QName1, + <<"x-first-death-exchange">> := <<>>, + <<"x-first-death-reason">> := <<"expired">>, + <<"x-last-death-queue">> := QName1, + <<"x-last-death-exchange">> := <<>>, + <<"x-last-death-reason">> := <<"expired">>}, + Annotations), + %% The array should be ordered by death recency. + {ok, {array, map, [D1, D3, D2]}} = maps:find(<<"x-opt-deaths">>, Annotations), + {map, [ + {{symbol, <<"queue">>}, {utf8, QName1}}, + {{symbol, <<"reason">>}, {symbol, <<"expired">>}}, + {{symbol, <<"count">>}, {ulong, 3}}, + {{symbol, <<"first-time">>}, {timestamp, Ts1}}, + {{symbol, <<"last-time">>}, {timestamp, Ts2}}, + {{symbol, <<"exchange">>},{utf8, <<>>}}, + {{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}} + ]} = D1, + {map, [ + {{symbol, <<"queue">>}, {utf8, QName2}}, + {{symbol, <<"reason">>}, {symbol, <<"rejected">>}}, + {{symbol, <<"count">>}, {ulong, 2}}, + {{symbol, <<"first-time">>}, {timestamp, Ts3}}, + {{symbol, <<"last-time">>}, {timestamp, Ts4}}, + {{symbol, <<"exchange">>},{utf8, <<>>}}, + {{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName2}]}} + ]} = D2, + {map, [ + {{symbol, <<"queue">>}, {utf8, QName3}}, + {{symbol, <<"reason">>}, {symbol, <<"expired">>}}, + {{symbol, <<"count">>}, {ulong, 2}}, + {{symbol, <<"first-time">>}, {timestamp, Ts5}}, + {{symbol, <<"last-time">>}, {timestamp, Ts6}}, + {{symbol, <<"exchange">>},{utf8, <<>>}}, + {{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName3}]}} + ]} = D3, + ?assertEqual([Ts1, Ts3, Ts5, Ts4, Ts6, Ts2], + lists:sort([Ts1, Ts2, Ts3, Ts4, Ts5, Ts6])), + + ok = amqp10_client:detach_link(Receiver), + ok = amqp10_client:detach_link(Sender), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName3), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Dead letter from a quorum queue into a stream. +dead_letter_into_stream(Config) -> + {Connection0, Session0, LinkPair0} = init(0, Config), + {Connection1, Session1, LinkPair1} = init(1, Config), + QName0 = <<"q0">>, + QName1 = <<"q1">>, + {ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue( + LinkPair0, + QName0, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-quorum-initial-group-size">> => {ulong, 1}, + <<"x-dead-letter-exchange">> => {utf8, <<>>}, + <<"x-dead-letter-routing-key">> => {utf8, QName1} + }}), + {ok, #{type := <<"stream">>}} = rabbitmq_amqp_client:declare_queue( + LinkPair1, + QName1, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}, + <<"x-initial-cluster-size">> => {ulong, 1} + }}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session1, <<"receiver">>, <<"/queue/", QName1/binary>>, settled), + {ok, Sender} = amqp10_client:attach_sender_link( + Session0, <<"sender">>, <<"/queue/", QName0/binary>>), + wait_for_credit(Sender), + Ttl = 10, + M = amqp10_msg:set_headers( + #{durable => true, + ttl => Ttl}, + amqp10_msg:new(<<"tag">>, <<"msg">>, true)), + Now = os:system_time(millisecond), + ok = amqp10_client:send_msg(Sender, M), + + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(<<"msg">>, amqp10_msg:body_bin(Msg)), + ?assertMatch( + #{<<"x-first-death-queue">> := QName0, + <<"x-first-death-exchange">> := <<>>, + <<"x-first-death-reason">> := <<"expired">>, + <<"x-last-death-queue">> := QName0, + <<"x-last-death-exchange">> := <<>>, + <<"x-last-death-reason">> := <<"expired">>, + <<"x-opt-deaths">> := {array, + map, + [{map, + [ + {{symbol, <<"ttl">>}, {uint, Ttl}}, + {{symbol, <<"queue">>}, {utf8, QName0}}, + {{symbol, <<"reason">>}, {symbol, <<"expired">>}}, + {{symbol, <<"count">>}, {ulong, 1}}, + {{symbol, <<"first-time">>}, {timestamp, Timestamp}}, + {{symbol, <<"last-time">>}, {timestamp, Timestamp}}, + {{symbol, <<"exchange">>},{utf8, <<>>}}, + {{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName0}]}} + ]}]} + } when is_integer(Timestamp) andalso + Timestamp > Now - 5000 andalso + Timestamp < Now + 5000, + amqp10_msg:message_annotations(Msg)), + + ok = amqp10_client:detach_link(Receiver), + ok = amqp10_client:detach_link(Sender), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair0, QName0), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair1, QName1), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair0), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1), + ok = end_session_sync(Session0), + ok = end_session_sync(Session1), + ok = amqp10_client:close_connection(Connection0), + ok = amqp10_client:close_connection(Connection1). + %% This test asserts the following §3.2 requirement: %% "The bare message is immutable within the AMQP network. That is, none of the sections can be %% changed by any node acting as an AMQP intermediary. If a section of the bare message is @@ -3689,7 +3902,10 @@ footer_checksum(FooterOpt, Config) -> %% init(Config) -> - OpnConf = connection_config(Config), + init(0, Config). + +init(Node, Config) -> + OpnConf = connection_config(Node, Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 488e794f01f0..8b8e4b54a9aa 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -19,7 +19,7 @@ all() -> [ - {group, dead_letter_tests} + {group, tests} ]. groups() -> @@ -28,6 +28,7 @@ groups() -> dead_letter_nack_requeue, dead_letter_nack_requeue_multiple, dead_letter_reject, + dead_letter_reject_expire_expire, dead_letter_reject_many, dead_letter_reject_requeue, dead_letter_max_length_drop_head, @@ -62,7 +63,7 @@ groups() -> metric_expired_per_msg_msg_ttl], Opts = [shuffle], [ - {dead_letter_tests, Opts, + {tests, Opts, [ {classic_queue, Opts, [{at_most_once, Opts, [dead_letter_max_length_reject_publish_dlx | DeadLetterTests]}, {disabled, Opts, DisabledMetricTests}]}, @@ -80,7 +81,9 @@ groups() -> dead_letter_missing_exchange ]} ] - }]}]. + }, + {stream_queue, Opts, [stream]} + ]}]. suite() -> [ @@ -178,28 +181,37 @@ end_per_group(Group, Config) -> Config end. +init_per_testcase(T, Config) + when T =:= dead_letter_reject_expire_expire orelse + T =:= stream -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of + ok -> + init_per_testcase0(T, Config); + {skip, _} = Skip -> + %% With feature flag message_containers_deaths_v2 disabled, test case: + %% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159 + %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 + Skip + end; init_per_testcase(Testcase, Config) -> - IsMixed = rabbit_ct_helpers:is_mixed_versions(), - case Testcase of - dead_letter_headers_should_not_be_appended_for_republish when IsMixed -> - {skip, "dead_letter_headers_should_not_be_appended_for_republish isn't mixed versions compatible"}; - _ -> - Group = proplists:get_value(name, ?config(tc_group_properties, Config)), - Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~tp", [Group, Testcase])), - Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), - Q3 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_3", [Group, Testcase])), - Policy = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_policy", [Group, Testcase])), - DLXExchange = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_dlx_exchange", - [Group, Testcase])), - Counters = get_global_counters(Config), - Config1 = rabbit_ct_helpers:set_config(Config, [{dlx_exchange, DLXExchange}, - {queue_name, Q}, - {queue_name_dlx, Q2}, - {queue_name_dlx_2, Q3}, - {policy, Policy}, - {counters, Counters}]), - rabbit_ct_helpers:testcase_started(Config1, Testcase) - end. + init_per_testcase0(Testcase, Config). + +init_per_testcase0(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Q3 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_3", [Group, Testcase])), + Policy = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_policy", [Group, Testcase])), + DLXExchange = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_dlx_exchange", + [Group, Testcase])), + Counters = get_global_counters(Config), + Config1 = rabbit_ct_helpers:set_config(Config, [{dlx_exchange, DLXExchange}, + {queue_name, Q}, + {queue_name_dlx, Q2}, + {queue_name_dlx_2, Q3}, + {policy, Policy}, + {counters, Counters}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). end_per_testcase(Testcase, Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -377,6 +389,65 @@ dead_letter_reject(Config) -> consume_empty(Ch, QName), ?assertEqual(1, counted(messages_dead_lettered_rejected_total, Config)). +dead_letter_reject_expire_expire(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + %% In 3.13.0 - 3.13.2 there is a bug in mc:is_death_cycle/2 where the queue names matter: + %% https://github.com/rabbitmq/rabbitmq-server/issues/11159 + %% The following queue names triggered the bug because they affect the order returned by maps:keys/1. + Q1 = <<"b">>, + Q2 = <<"a2">>, + Q3 = <<"a3">>, + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + + %% Test the followig topology message flow: + %% Q1 --rejected--> Q2 --expired--> Q3 --expired--> + %% Q1 --rejected--> Q2 --expired--> Q3 --expired--> + %% Q1 + + #'queue.declare_ok'{} = amqp_channel:call( + Ch, + #'queue.declare'{ + queue = Q1, + arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, Q2}], + durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call( + Ch, + #'queue.declare'{ + queue = Q2, + arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, Q3}, + {<<"x-message-ttl">>, long, 5}], + durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call( + Ch, + #'queue.declare'{ + queue = Q3, + arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, Q1}, + {<<"x-message-ttl">>, long, 5}], + durable = Durable}), + + %% Send a single message. + P = <<"msg">>, + publish(Ch, Q1, [P]), + wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]), + + %% Reject the 1st time. + [DTag1] = consume(Ch, Q1, [P]), + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag1, + requeue = false}), + %% Message should now flow from Q1 -> Q2 -> Q3 -> Q1 + wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]), + + %% Reject the 2nd time. + [DTag2] = consume(Ch, Q1, [P]), + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag2, + requeue = false}), + %% Message should again flow from Q1 -> Q2 -> Q3 -> Q1 + wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]). + %% 1) Many messages are rejected. They get dead-lettered in correct order. dead_letter_reject_many(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -752,7 +823,9 @@ dead_letter_routing_key_cycle_max_length(Config) -> DeadLetterArgs = [{<<"x-max-length">>, long, 1}, {<<"x-dead-letter-exchange">>, longstr, <<>>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = DeadLetterArgs ++ Args, + durable = Durable}), P1 = <<"msg1">>, P2 = <<"msg2">>, @@ -777,7 +850,9 @@ dead_letter_routing_key_cycle_ttl(Config) -> DeadLetterArgs = [{<<"x-message-ttl">>, long, 1}, {<<"x-dead-letter-exchange">>, longstr, <<>>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = DeadLetterArgs ++ Args, + durable = Durable}), P1 = <<"msg1">>, P2 = <<"msg2">>, @@ -796,7 +871,9 @@ dead_letter_routing_key_cycle_with_reject(Config) -> QName = ?config(queue_name, Config), DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = DeadLetterArgs ++ Args, + durable = Durable}), P = <<"msg1">>, @@ -1560,6 +1637,96 @@ metric_expired_per_msg_msg_ttl(Config) -> || Payload <- Payloads], ?awaitMatch(1000, counted(messages_dead_lettered_expired_total, Config), 3000, 300). +%% The final dead letter queue is a stream. +stream(Config) -> + Ch0 = rabbit_ct_client_helpers:open_channel(Config, 0), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, 1), + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + #'queue.declare_ok'{} = amqp_channel:call( + Ch0, + #'queue.declare'{queue = Q1, + arguments = [{<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, Q2}]}), + #'queue.declare_ok'{} = amqp_channel:call( + Ch0, + #'queue.declare'{queue = Q2, + arguments = [{<<"x-message-ttl">>, long, 2500}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, Q3}]}), + #'queue.declare_ok'{} = amqp_channel:call( + Ch1, + #'queue.declare'{queue = Q3, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 1}], + durable = true}), + + Payload = <<"my payload">>, + %% Message should travel Q1 -> Q2 -> Q3 + amqp_channel:call( + Ch0, + #'basic.publish'{routing_key = Q1}, + #amqp_msg{payload = Payload, + props = #'P_basic'{expiration = <<"0">>, + headers = [{<<"CC">>, array, [{longstr, <<"other key">>}]}]} + }), + + #'basic.qos_ok'{} = amqp_channel:call(Ch1, #'basic.qos'{prefetch_count = 1}), + Ctag = <<"my ctag">>, + amqp_channel:subscribe( + Ch1, + #'basic.consume'{queue = Q3, + consumer_tag = Ctag, + arguments = [{<<"x-stream-offset">>, longstr, <<"first">>}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = Ctag} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + Headers = receive {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = Payload, + props = #'P_basic'{headers = Headers0} + }} -> + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + Headers0 + after 10_000 -> ct:fail({missing_event, ?LINE}) + end, + + Reason = <<"expired">>, + ?assertEqual({longstr, Reason}, rabbit_misc:table_lookup(Headers, <<"x-first-death-reason">>)), + ?assertEqual({longstr, Q1}, rabbit_misc:table_lookup(Headers, <<"x-first-death-queue">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Headers, <<"x-first-death-exchange">>)), + ?assertEqual({longstr, Reason}, rabbit_misc:table_lookup(Headers, <<"x-last-death-reason">>)), + ?assertEqual({longstr, Q2}, rabbit_misc:table_lookup(Headers, <<"x-last-death-queue">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Headers, <<"x-last-death-exchange">>)), + + %% We expect the array to be ordered by death recency. + {array, [{table, Death2}, {table, Death1}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), + + ?assertEqual({longstr, Q1}, rabbit_misc:table_lookup(Death1, <<"queue">>)), + ?assertEqual({longstr, Reason}, rabbit_misc:table_lookup(Death1, <<"reason">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death1, <<"exchange">>)), + ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)), + ?assertEqual({array, [{longstr, Q1}, {longstr, <<"other key">>}]}, + rabbit_misc:table_lookup(Death1, <<"routing-keys">>)), + ?assertEqual({longstr, <<"0">>}, rabbit_misc:table_lookup(Death1, <<"original-expiration">>)), + {timestamp, T1} = rabbit_misc:table_lookup(Death1, <<"time">>), + + ?assertEqual({longstr, Q2}, rabbit_misc:table_lookup(Death2, <<"queue">>)), + ?assertEqual({longstr, Reason}, rabbit_misc:table_lookup(Death2, <<"reason">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death2, <<"exchange">>)), + ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death2, <<"count">>)), + ?assertEqual({array, [{longstr, Q2}]}, rabbit_misc:table_lookup(Death2, <<"routing-keys">>)), + ?assertEqual(undefined, rabbit_misc:table_lookup(Death2, <<"original-expiration">>)), + {timestamp, T2} = rabbit_misc:table_lookup(Death2, <<"time">>), + ?assert(T1 < T2), + + ok = rabbit_ct_client_helpers:close_channel(Ch0), + ok = rabbit_ct_client_helpers:close_channel(Ch1). + %%%%%%%%%%%%%%%%%%%%%%%% %% Test helpers %%%%%%%%%%%%%%%%%%%%%%%% diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 3bd9c9c916b5..19182603207e 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -29,7 +29,9 @@ all_tests() -> amqpl_compat, amqpl_table_x_header, amqpl_table_x_header_array_of_tbls, - amqpl_death_records, + amqpl_death_v1_records, + amqpl_death_v2_records, + is_death_cycle, amqpl_amqp_bin_amqpl, amqpl_cc_amqp_bin_amqpl, amqp_amqpl_amqp_uuid_correlation_id, @@ -191,27 +193,29 @@ amqpl_table_x_header_array_of_tbls(_Config) -> ok. -amqpl_death_records(_Config) -> +amqpl_death_v1_records(_Config) -> + ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}). + +amqpl_death_v2_records(_Config) -> + ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => true}). + +amqpl_death_records(Env) -> Content = #content{class_id = 60, properties = #'P_basic'{headers = []}, payload_fragments_rev = [<<"data">>]}, Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())), - Msg1 = mc:record_death(rejected, <<"q1">>, Msg0), + Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, Env), ?assertEqual([<<"q1">>], mc:death_queue_names(Msg1)), - ?assertMatch({{<<"q1">>, rejected}, - #death{exchange = <<"exch">>, - routing_keys = [<<"apple">>], - count = 1}}, mc:last_death(Msg1)), ?assertEqual(false, mc:is_death_cycle(<<"q1">>, Msg1)), #content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg1), ?assertMatch({_, array, [_]}, header(<<"x-death">>, H1)), ?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H1)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"x-last-death-queue">>, H1)), ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-first-death-exchange">>, H1)), - ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)), ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H1)), + ?assertMatch({_, longstr, <<"q1">>}, header(<<"x-last-death-queue">>, H1)), + ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)), ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-last-death-reason">>, H1)), {_, array, [{table, T1}]} = header(<<"x-death">>, H1), ?assertMatch({_, long, 1}, header(<<"count">>, T1)), @@ -222,12 +226,12 @@ amqpl_death_records(_Config) -> ?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)), - %% second dead letter, e.g. a ttl reason returning to source queue + %% second dead letter, e.g. an expired reason returning to source queue %% record_death uses a timestamp for death record ordering, ensure %% it is definitely higher than the last timestamp taken timer:sleep(2), - Msg2 = mc:record_death(ttl, <<"dl">>, Msg1), + Msg2 = mc:record_death(expired, <<"dl">>, Msg1, Env), #content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2), {_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2), @@ -235,6 +239,67 @@ amqpl_death_records(_Config) -> ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)), ok. +is_death_cycle(_Config) -> + Content = #content{class_id = 60, + properties = #'P_basic'{headers = []}, + payload_fragments_rev = [<<"data">>]}, + Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())), + + %% Test the followig topology: + %% Q1 --rejected--> Q2 --expired--> Q3 --expired--> + %% Q1 --rejected--> Q2 --expired--> Q3 + + Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, #{}), + ?assertNot(mc:is_death_cycle(<<"q1">>, Msg1), + "A queue that dead letters to itself due to rejected is not considered a cycle."), + ?assertNot(mc:is_death_cycle(<<"q2">>, Msg1)), + ?assertNot(mc:is_death_cycle(<<"q3">>, Msg1)), + + Msg2 = mc:record_death(expired, <<"q2">>, Msg1, #{}), + ?assertNot(mc:is_death_cycle(<<"q1">>, Msg2)), + ?assert(mc:is_death_cycle(<<"q2">>, Msg2), + "A queue that dead letters to itself due to expired is considered a cycle."), + ?assertNot(mc:is_death_cycle(<<"q3">>, Msg2)), + + Msg3 = mc:record_death(expired, <<"q3">>, Msg2, #{}), + ?assertNot(mc:is_death_cycle(<<"q1">>, Msg3)), + ?assert(mc:is_death_cycle(<<"q2">>, Msg3)), + ?assert(mc:is_death_cycle(<<"q3">>, Msg3)), + + Msg4 = mc:record_death(rejected, <<"q1">>, Msg3, #{}), + ?assertNot(mc:is_death_cycle(<<"q1">>, Msg4)), + ?assertNot(mc:is_death_cycle(<<"q2">>, Msg4)), + ?assertNot(mc:is_death_cycle(<<"q3">>, Msg4)), + + Msg5 = mc:record_death(expired, <<"q2">>, Msg4, #{}), + ?assertNot(mc:is_death_cycle(<<"q1">>, Msg5)), + ?assert(mc:is_death_cycle(<<"q2">>, Msg5)), + ?assertNot(mc:is_death_cycle(<<"q3">>, Msg5)), + + DeathQsOrderedByRecency = [<<"q2">>, <<"q1">>, <<"q3">>], + ?assertEqual(DeathQsOrderedByRecency, mc:death_queue_names(Msg5)), + + #content{properties = #'P_basic'{headers = H}} = mc:protocol_state(Msg5), + ?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H)), + ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H)), + ?assertMatch({_, longstr, <<"q2">>}, header(<<"x-last-death-queue">>, H)), + ?assertMatch({_, longstr, <<"expired">>}, header(<<"x-last-death-reason">>, H)), + + %% We expect the array to be ordered by recency. + {_, array, [{table, T1}, {table, T2}, {table, T3}]} = header(<<"x-death">>, H), + + ?assertMatch({_, longstr, <<"q2">>}, header(<<"queue">>, T1)), + ?assertMatch({_, longstr, <<"expired">>}, header(<<"reason">>, T1)), + ?assertMatch({_, long, 2}, header(<<"count">>, T1)), + + ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2)), + ?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T2)), + ?assertMatch({_, long, 2}, header(<<"count">>, T2)), + + ?assertMatch({_, longstr, <<"q3">>}, header(<<"queue">>, T3)), + ?assertMatch({_, longstr, <<"expired">>}, header(<<"reason">>, T3)), + ?assertMatch({_, long, 1}, header(<<"count">>, T3)). + header(K, H) -> rabbit_basic:header(K, H). diff --git a/deps/rabbit/test/message_containers_deaths_v2_SUITE.erl b/deps/rabbit/test/message_containers_deaths_v2_SUITE.erl new file mode 100644 index 000000000000..a0888073411d --- /dev/null +++ b/deps/rabbit/test/message_containers_deaths_v2_SUITE.erl @@ -0,0 +1,127 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +%% This SUITE should be deleted when feature flag message_containers_deaths_v2 becomes required. +-module(message_containers_deaths_v2_SUITE). + +-define(FEATURE_FLAG, message_containers_deaths_v2). + +-compile([export_all, nowarn_export_all]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + +all() -> + [ + {group, cluster_size_1} + ]. + +groups() -> + [ + {cluster_size_1, [], [enable_feature_flag]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, []). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_Group, Config0) -> + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{forced_feature_flags_on_init, []}]}), + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +enable_feature_flag(Config) -> + Ch = rabbit_ct_client_helpers:open_channel(Config), + Q1 = <<"q1">>, + Q2 = <<"q2">>, + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{queue = Q1, + arguments = [{<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, Q2}, + {<<"x-message-ttl">>, long, 3}]}), + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{queue = Q2, + arguments = [{<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, Q1}]}), + P1 = <<"payload 1">>, + P2 = <<"payload 2">>, + amqp_channel:call(Ch, + #'basic.publish'{routing_key = Q1}, + #amqp_msg{payload = P1}), + ?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, ?FEATURE_FLAG)), + ?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FEATURE_FLAG)), + amqp_channel:call(Ch, + #'basic.publish'{routing_key = Q1}, + #amqp_msg{payload = P2}), + + %% We now have 2 messages in Q2 with different mc annotations: + %% * deaths for v1 in the 1st msg + %% * deaths_v2 for v2 in the 2nd msg + + reject(Ch, Q2, P1), + reject(Ch, Q2, P2), + reject(Ch, Q2, P1), + reject(Ch, Q2, P2), + + {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{headers = H1}}} = + ?awaitMatch({#'basic.get_ok'{}, + #amqp_msg{payload = P1}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q2}), + 5000), + + {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{headers = H2}}} = + ?awaitMatch({#'basic.get_ok'{}, + #amqp_msg{payload = P2}}, + amqp_channel:call(Ch, #'basic.get'{queue = Q2}), + 5000), + + lists:foreach( + fun(Headers) -> + ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Headers, <<"x-first-death-reason">>)), + ?assertEqual({longstr, Q1}, rabbit_misc:table_lookup(Headers, <<"x-first-death-queue">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Headers, <<"x-first-death-exchange">>)), + ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Headers, <<"x-last-death-reason">>)), + ?assertEqual({longstr, Q1}, rabbit_misc:table_lookup(Headers, <<"x-last-death-queue">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Headers, <<"x-last-death-exchange">>)), + + {array, [{table, Death1}, + {table, Death2}]} = rabbit_misc:table_lookup(H1, <<"x-death">>), + + ?assertEqual({longstr, Q1}, rabbit_misc:table_lookup(Death1, <<"queue">>)), + ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death1, <<"reason">>)), + ?assertMatch({timestamp, _}, rabbit_misc:table_lookup(Death1, <<"time">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death1, <<"exchange">>)), + ?assertEqual({long, 3}, rabbit_misc:table_lookup(Death1, <<"count">>)), + ?assertEqual({array, [{longstr, Q1}]}, rabbit_misc:table_lookup(Death1, <<"routing-keys">>)), + + ?assertEqual({longstr, Q2}, rabbit_misc:table_lookup(Death2, <<"queue">>)), + ?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)), + ?assertMatch({timestamp, _}, rabbit_misc:table_lookup(Death2, <<"time">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death2, <<"exchange">>)), + ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)), + ?assertEqual({array, [{longstr, Q2}]}, rabbit_misc:table_lookup(Death2, <<"routing-keys">>)) + end, [H1, H2]), + ok. + +reject(Ch, Queue, Payload) -> + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{}} = + ?awaitMatch({#'basic.get_ok'{}, + #amqp_msg{payload = Payload}}, + amqp_channel:call(Ch, #'basic.get'{queue = Queue}), + 5000), + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, + requeue = false}).