Skip to content

Commit

Permalink
Do not depend on feature flags in mc
Browse files Browse the repository at this point in the history
Addresses PR feedback
#11174 (comment)
  • Loading branch information
ansd committed May 13, 2024
1 parent ac60d3a commit 0a9afb9
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 43 deletions.
3 changes: 0 additions & 3 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,6 @@ rabbitmq_suite(
rabbitmq_suite(
name = "mc_unit_SUITE",
size = "small",
runtime_deps = [
"@meck//:erlang_app",
],
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/rabbit_common:erlang_app",
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
-define(ANN_DURABLE, d).
-define(ANN_PRIORITY, p).

-define(FF_MC_DEATHS_V2, message_containers_deaths_v2).

-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{%% timestamp of the first time this message
%% was dead lettered from this queue for this reason
Expand Down
20 changes: 12 additions & 8 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
convert/3,
protocol_state/1,
prepare/2,
record_death/3,
record_death/4,
is_death_cycle/2,
death_queue_names/1
]).
Expand Down Expand Up @@ -356,9 +356,11 @@ protocol_state(BasicMsg) ->

-spec record_death(rabbit_dead_letter:reason(),
rabbit_misc:resource_name(),
state()) -> state().
state(),
environment()) -> state().
record_death(Reason, SourceQueue,
#?MODULE{annotations = Anns0} = State)
#?MODULE{annotations = Anns0} = State,
Env)
when is_atom(Reason) andalso
is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
Expand Down Expand Up @@ -398,9 +400,11 @@ record_death(Reason, SourceQueue,
deaths := Deaths#deaths{last = Key,
records = Rs}};
_ ->
Rs = case rabbit_feature_flags:is_enabled(message_containers_deaths_v2) of
true -> [{Key, NewDeath}];
false -> #{Key => NewDeath}
Rs = case Env of
#{?FF_MC_DEATHS_V2 := false} ->
#{Key => NewDeath};
_ ->
[{Key, NewDeath}]
end,
ReasonBin = atom_to_binary(Reason),
Anns0#{<<"x-first-death-reason">> => ReasonBin,
Expand All @@ -414,8 +418,8 @@ record_death(Reason, SourceQueue,
records = Rs}}
end,
State#?MODULE{annotations = Anns};
record_death(Reason, SourceQueue, BasicMsg) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg).
record_death(Reason, SourceQueue, BasicMsg, Env) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env).

update_death(#death{count = Count,
anns = DeathAnns} = Death, Timestamp) ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
protocol_state/1,
%serialize/1,
prepare/2,
record_death/3,
record_death/4,
is_death_cycle/2,
%deaths/1,
death_queue_names/1
Expand Down Expand Up @@ -155,7 +155,7 @@ prepare(store, Msg) ->
record_death(Reason, SourceQueue,
#basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys} = Msg) ->
routing_keys = RoutingKeys} = Msg, _Env) ->
% HeadersFun1 = fun (H) -> lists:keydelete(<<"CC">>, 1, H) end,
ReasonBin = atom_to_binary(Reason),
TimeSec = os:system_time(seconds),
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/src/rabbit_dead_letter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
_ ->
[RK]
end,
Msg1 = mc:record_death(Reason, SourceQName, Msg0),
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
true -> #{};
false -> #{?FF_MC_DEATHS_V2 => false}
end,
Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env),
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
Msg3 = mc:set_ttl(Ttl, Msg2),
Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3),
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,11 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
_ ->
[RKey]
end,
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg),
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
true -> #{};
false -> #{?FF_MC_DEATHS_V2 => false}
end,
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env),
Msg1 = mc:set_ttl(undefined, Msg0),
Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1),
Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2),
Expand Down
38 changes: 10 additions & 28 deletions deps/rabbit/test/mc_unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ all_tests() ->
amqp_amqpl_amqp_bodies
].

init_per_suite(Config) ->
meck:new(rabbit_feature_flags, [passthrough, no_link]),
Config.

end_per_suite(Config) ->
meck:unload(rabbit_feature_flags),
Config.

%%%===================================================================
%%% Test cases
%%%===================================================================
Expand Down Expand Up @@ -202,24 +194,18 @@ amqpl_table_x_header_array_of_tbls(_Config) ->
ok.

amqpl_death_v1_records(_Config) ->
meck:expect(rabbit_feature_flags,
is_enabled,
fun(message_containers_deaths_v2) -> false end),
ok = amqpl_death_records().
ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}).

amqpl_death_v2_records(_Config) ->
meck:expect(rabbit_feature_flags,
is_enabled,
fun(message_containers_deaths_v2) -> true end),
ok = amqpl_death_records().
ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => true}).

amqpl_death_records() ->
amqpl_death_records(Env) ->
Content = #content{class_id = 60,
properties = #'P_basic'{headers = []},
payload_fragments_rev = [<<"data">>]},
Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())),

Msg1 = mc:record_death(rejected, <<"q1">>, Msg0),
Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, Env),
?assertEqual([<<"q1">>], mc:death_queue_names(Msg1)),
?assertEqual(false, mc:is_death_cycle(<<"q1">>, Msg1)),

Expand All @@ -245,7 +231,7 @@ amqpl_death_records() ->
%% record_death uses a timestamp for death record ordering, ensure
%% it is definitely higher than the last timestamp taken
timer:sleep(2),
Msg2 = mc:record_death(expired, <<"dl">>, Msg1),
Msg2 = mc:record_death(expired, <<"dl">>, Msg1, Env),

#content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2),
{_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2),
Expand All @@ -254,10 +240,6 @@ amqpl_death_records() ->
ok.

is_death_cycle(_Config) ->
meck:expect(rabbit_feature_flags,
is_enabled,
fun(message_containers_deaths_v2) -> true end),

Content = #content{class_id = 60,
properties = #'P_basic'{headers = []},
payload_fragments_rev = [<<"data">>]},
Expand All @@ -267,29 +249,29 @@ is_death_cycle(_Config) ->
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
%% Q1 --rejected--> Q2 --expired--> Q3

Msg1 = mc:record_death(rejected, <<"q1">>, Msg0),
Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, #{}),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg1),
"A queue that dead letters to itself due to rejected is not considered a cycle."),
?assertNot(mc:is_death_cycle(<<"q2">>, Msg1)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg1)),

Msg2 = mc:record_death(expired, <<"q2">>, Msg1),
Msg2 = mc:record_death(expired, <<"q2">>, Msg1, #{}),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg2)),
?assert(mc:is_death_cycle(<<"q2">>, Msg2),
"A queue that dead letters to itself due to expired is considered a cycle."),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg2)),

Msg3 = mc:record_death(expired, <<"q3">>, Msg2),
Msg3 = mc:record_death(expired, <<"q3">>, Msg2, #{}),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg3)),
?assert(mc:is_death_cycle(<<"q2">>, Msg3)),
?assert(mc:is_death_cycle(<<"q3">>, Msg3)),

Msg4 = mc:record_death(rejected, <<"q1">>, Msg3),
Msg4 = mc:record_death(rejected, <<"q1">>, Msg3, #{}),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg4)),
?assertNot(mc:is_death_cycle(<<"q2">>, Msg4)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg4)),

Msg5 = mc:record_death(expired, <<"q2">>, Msg4),
Msg5 = mc:record_death(expired, <<"q2">>, Msg4, #{}),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg5)),
?assert(mc:is_death_cycle(<<"q2">>, Msg5)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg5)),
Expand Down

0 comments on commit 0a9afb9

Please sign in to comment.