diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 51d1ce97524b..6ba91e324d61 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -795,9 +795,6 @@ rabbitmq_suite( rabbitmq_suite( name = "mc_unit_SUITE", size = "small", - runtime_deps = [ - "@meck//:erlang_app", - ], deps = [ "//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", diff --git a/deps/rabbit/include/mc.hrl b/deps/rabbit/include/mc.hrl index 12620e991924..53e0149c0269 100644 --- a/deps/rabbit/include/mc.hrl +++ b/deps/rabbit/include/mc.hrl @@ -13,6 +13,8 @@ -define(ANN_DURABLE, d). -define(ANN_PRIORITY, p). +-define(FF_MC_DEATHS_V2, message_containers_deaths_v2). + -type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}. -type death_anns() :: #{%% timestamp of the first time this message %% was dead lettered from this queue for this reason diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 00799eba024e..20020b0a5c11 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -34,7 +34,7 @@ convert/3, protocol_state/1, prepare/2, - record_death/3, + record_death/4, is_death_cycle/2, death_queue_names/1 ]). @@ -356,9 +356,11 @@ protocol_state(BasicMsg) -> -spec record_death(rabbit_dead_letter:reason(), rabbit_misc:resource_name(), - state()) -> state(). + state(), + environment()) -> state(). record_death(Reason, SourceQueue, - #?MODULE{annotations = Anns0} = State) + #?MODULE{annotations = Anns0} = State, + Env) when is_atom(Reason) andalso is_binary(SourceQueue) -> Key = {SourceQueue, Reason}, @@ -398,9 +400,11 @@ record_death(Reason, SourceQueue, deaths := Deaths#deaths{last = Key, records = Rs}}; _ -> - Rs = case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of - true -> [{Key, NewDeath}]; - false -> #{Key => NewDeath} + Rs = case Env of + #{?FF_MC_DEATHS_V2 := false} -> + #{Key => NewDeath}; + _ -> + [{Key, NewDeath}] end, ReasonBin = atom_to_binary(Reason), Anns0#{<<"x-first-death-reason">> => ReasonBin, @@ -414,8 +418,8 @@ record_death(Reason, SourceQueue, records = Rs}} end, State#?MODULE{annotations = Anns}; -record_death(Reason, SourceQueue, BasicMsg) -> - mc_compat:record_death(Reason, SourceQueue, BasicMsg). +record_death(Reason, SourceQueue, BasicMsg, Env) -> + mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env). update_death(#death{count = Count, anns = DeathAnns} = Death, Timestamp) -> diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index 0514c095d362..702f8c0f64ca 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -26,7 +26,7 @@ protocol_state/1, %serialize/1, prepare/2, - record_death/3, + record_death/4, is_death_cycle/2, %deaths/1, death_queue_names/1 @@ -155,7 +155,7 @@ prepare(store, Msg) -> record_death(Reason, SourceQueue, #basic_message{content = Content, exchange_name = Exchange, - routing_keys = RoutingKeys} = Msg) -> + routing_keys = RoutingKeys} = Msg, _Env) -> % HeadersFun1 = fun (H) -> lists:keydelete(<<"CC">>, 1, H) end, ReasonBin = atom_to_binary(Reason), TimeSec = os:system_time(seconds), diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index 98b87de3b721..a8c6b4515eda 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -31,7 +31,11 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK, _ -> [RK] end, - Msg1 = mc:record_death(Reason, SourceQName, Msg0), + Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of + true -> #{}; + false -> #{?FF_MC_DEATHS_V2 => false} + end, + Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env), {Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1), Msg3 = mc:set_ttl(Ttl, Msg2), Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3), diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index c5a00bd7492c..f96d8de1e491 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -324,7 +324,11 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason, _ -> [RKey] end, - Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg), + Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of + true -> #{}; + false -> #{?FF_MC_DEATHS_V2 => false} + end, + Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env), Msg1 = mc:set_ttl(undefined, Msg0), Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1), Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2), diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index dc9cf67cdae4..19182603207e 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -45,14 +45,6 @@ all_tests() -> amqp_amqpl_amqp_bodies ]. -init_per_suite(Config) -> - meck:new(rabbit_feature_flags, [passthrough, no_link]), - Config. - -end_per_suite(Config) -> - meck:unload(rabbit_feature_flags), - Config. - %%%=================================================================== %%% Test cases %%%=================================================================== @@ -202,24 +194,18 @@ amqpl_table_x_header_array_of_tbls(_Config) -> ok. amqpl_death_v1_records(_Config) -> - meck:expect(rabbit_feature_flags, - is_enabled, - fun(message_containers_deaths_v2) -> false end), - ok = amqpl_death_records(). + ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}). amqpl_death_v2_records(_Config) -> - meck:expect(rabbit_feature_flags, - is_enabled, - fun(message_containers_deaths_v2) -> true end), - ok = amqpl_death_records(). + ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => true}). -amqpl_death_records() -> +amqpl_death_records(Env) -> Content = #content{class_id = 60, properties = #'P_basic'{headers = []}, payload_fragments_rev = [<<"data">>]}, Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())), - Msg1 = mc:record_death(rejected, <<"q1">>, Msg0), + Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, Env), ?assertEqual([<<"q1">>], mc:death_queue_names(Msg1)), ?assertEqual(false, mc:is_death_cycle(<<"q1">>, Msg1)), @@ -245,7 +231,7 @@ amqpl_death_records() -> %% record_death uses a timestamp for death record ordering, ensure %% it is definitely higher than the last timestamp taken timer:sleep(2), - Msg2 = mc:record_death(expired, <<"dl">>, Msg1), + Msg2 = mc:record_death(expired, <<"dl">>, Msg1, Env), #content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2), {_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2), @@ -254,10 +240,6 @@ amqpl_death_records() -> ok. is_death_cycle(_Config) -> - meck:expect(rabbit_feature_flags, - is_enabled, - fun(message_containers_deaths_v2) -> true end), - Content = #content{class_id = 60, properties = #'P_basic'{headers = []}, payload_fragments_rev = [<<"data">>]}, @@ -267,29 +249,29 @@ is_death_cycle(_Config) -> %% Q1 --rejected--> Q2 --expired--> Q3 --expired--> %% Q1 --rejected--> Q2 --expired--> Q3 - Msg1 = mc:record_death(rejected, <<"q1">>, Msg0), + Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, #{}), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg1), "A queue that dead letters to itself due to rejected is not considered a cycle."), ?assertNot(mc:is_death_cycle(<<"q2">>, Msg1)), ?assertNot(mc:is_death_cycle(<<"q3">>, Msg1)), - Msg2 = mc:record_death(expired, <<"q2">>, Msg1), + Msg2 = mc:record_death(expired, <<"q2">>, Msg1, #{}), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg2)), ?assert(mc:is_death_cycle(<<"q2">>, Msg2), "A queue that dead letters to itself due to expired is considered a cycle."), ?assertNot(mc:is_death_cycle(<<"q3">>, Msg2)), - Msg3 = mc:record_death(expired, <<"q3">>, Msg2), + Msg3 = mc:record_death(expired, <<"q3">>, Msg2, #{}), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg3)), ?assert(mc:is_death_cycle(<<"q2">>, Msg3)), ?assert(mc:is_death_cycle(<<"q3">>, Msg3)), - Msg4 = mc:record_death(rejected, <<"q1">>, Msg3), + Msg4 = mc:record_death(rejected, <<"q1">>, Msg3, #{}), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg4)), ?assertNot(mc:is_death_cycle(<<"q2">>, Msg4)), ?assertNot(mc:is_death_cycle(<<"q3">>, Msg4)), - Msg5 = mc:record_death(expired, <<"q2">>, Msg4), + Msg5 = mc:record_death(expired, <<"q2">>, Msg4, #{}), ?assertNot(mc:is_death_cycle(<<"q1">>, Msg5)), ?assert(mc:is_death_cycle(<<"q2">>, Msg5)), ?assertNot(mc:is_death_cycle(<<"q3">>, Msg5)),