Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dead lettering #11174

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,13 @@ rabbitmq_integration_suite(
additional_beam = [
":test_queue_utils_beam",
],
shard_count = 7,
shard_count = 8,
)

rabbitmq_integration_suite(
name = "message_containers_deaths_v2_SUITE",
size = "medium",
shard_count = 1,
)

rabbitmq_integration_suite(
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2219,3 +2219,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"],
)
erlang_bytecode(
name = "message_containers_deaths_v2_SUITE_beam_files",
testonly = True,
srcs = ["test/message_containers_deaths_v2_SUITE.erl"],
outs = ["test/message_containers_deaths_v2_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
36 changes: 22 additions & 14 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{first_time := non_neg_integer(), %% the timestamp of the first
last_time := non_neg_integer(), %% the timestamp of the last
ttl => OriginalExpiration :: non_neg_integer()}.
-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys = [] :: OriginalRoutingKeys :: [rabbit_types:routing_key()],
count = 0 :: non_neg_integer(),
anns :: death_anns()}).

-record(deaths, {first :: death_key(),
last :: death_key(),
records = #{} :: #{death_key() := #death{}}}).


%% good enough for most use cases
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).

Expand All @@ -26,3 +12,25 @@
-define(ANN_RECEIVED_AT_TIMESTAMP, rts).
-define(ANN_DURABLE, d).
-define(ANN_PRIORITY, p).

-define(FF_MC_DEATHS_V2, message_containers_deaths_v2).

-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{%% timestamp of the first time this message
%% was dead lettered from this queue for this reason
first_time := pos_integer(),
%% timestamp of the last time this message
%% was dead lettered from this queue for this reason
last_time := pos_integer(),
ttl => OriginalTtlHeader :: non_neg_integer()}.

-record(death, {exchange :: OriginalExchange :: rabbit_misc:resource_name(),
routing_keys :: OriginalRoutingKeys :: [rabbit_types:routing_key(),...],
%% how many times this message was dead lettered from this queue for this reason
count :: pos_integer(),
anns :: death_anns()}).

-record(deaths, {first :: death_key(), % redundant to mc annotations x-first-death-*
last :: death_key(), % redundant to mc annotations x-last-death-*
records :: #{death_key() := #death{}}
}).
175 changes: 101 additions & 74 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@
convert/3,
protocol_state/1,
prepare/2,
record_death/3,
record_death/4,
is_death_cycle/2,
last_death/1,
death_queue_names/1
]).

Expand Down Expand Up @@ -356,89 +355,103 @@ protocol_state(BasicMsg) ->
mc_compat:protocol_state(BasicMsg).

-spec record_death(rabbit_dead_letter:reason(),
SourceQueue :: rabbit_misc:resource_name(),
state()) -> state().
rabbit_misc:resource_name(),
state(),
environment()) -> state().
record_death(Reason, SourceQueue,
#?MODULE{protocol = _Mod,
data = _Data,
annotations = Anns0} = State)
when is_atom(Reason) andalso is_binary(SourceQueue) ->
#?MODULE{annotations = Anns0} = State,
Env)
when is_atom(Reason) andalso
is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
#{?ANN_EXCHANGE := Exchange,
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
Timestamp = os:system_time(millisecond),
Ttl = maps:get(ttl, Anns0, undefined),

ReasonBin = atom_to_binary(Reason),
DeathAnns = rabbit_misc:maps_put_truthy(ttl, Ttl, #{first_time => Timestamp,
last_time => Timestamp}),
case maps:get(deaths, Anns0, undefined) of
undefined ->
Ds = #deaths{last = Key,
first = Key,
records = #{Key => #death{count = 1,
exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}}},
Anns = Anns0#{<<"x-first-death-reason">> => ReasonBin,
DeathAnns = rabbit_misc:maps_put_truthy(
ttl, Ttl, #{first_time => Timestamp,
last_time => Timestamp}),
NewDeath = #death{exchange = Exchange,
routing_keys = RoutingKeys,
count = 1,
anns = DeathAnns},
Anns = case Anns0 of
#{deaths := Deaths0} ->
Deaths = case Deaths0 of
#deaths{records = Rs0} ->
Rs = maps:update_with(
Key,
fun(Death) ->
update_death(Death, Timestamp)
end,
NewDeath,
Rs0),
Deaths0#deaths{last = Key,
records = Rs};
_ ->
%% Deaths are ordered by recency
case lists:keytake(Key, 1, Deaths0) of
{value, {Key, D0}, Deaths1} ->
D = update_death(D0, Timestamp),
[{Key, D} | Deaths1];
false ->
[{Key, NewDeath} | Deaths0]
end
end,
Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange,
deaths := Deaths};
_ ->
Deaths = case Env of
#{?FF_MC_DEATHS_V2 := false} ->
#deaths{last = Key,
first = Key,
records = #{Key => NewDeath}};
_ ->
[{Key, NewDeath}]
end,
ReasonBin = atom_to_binary(Reason),
Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange
},

State#?MODULE{annotations = Anns#{deaths => Ds}};
#deaths{records = Rs} = Ds0 ->
Death = #death{count = C,
anns = DA} = maps:get(Key, Rs,
#death{exchange = Exchange,
routing_keys = RoutingKeys,
anns = DeathAnns}),
Ds = Ds0#deaths{last = Key,
records = Rs#{Key =>
Death#death{count = C + 1,
anns = DA#{last_time => Timestamp}}}},
Anns = Anns0#{deaths => Ds,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange},
State#?MODULE{annotations = Anns}
end;
record_death(Reason, SourceQueue, BasicMsg) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg).

<<"x-last-death-exchange">> => Exchange,
deaths => Deaths}
end,
State#?MODULE{annotations = Anns};
record_death(Reason, SourceQueue, BasicMsg, Env) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env).

update_death(#death{count = Count,
anns = DeathAnns} = Death, Timestamp) ->
Death#death{count = Count + 1,
anns = DeathAnns#{last_time := Timestamp}}.

-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Rs}}}) ->
is_cycle_v1(TargetQueue, maps:keys(Rs));
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
is_cycle(TargetQueue, maps:keys(Deaths#deaths.records));
is_cycle_v2(TargetQueue, Deaths);
is_death_cycle(_TargetQueue, #?MODULE{}) ->
false;
is_death_cycle(TargetQueue, BasicMsg) ->
mc_compat:is_death_cycle(TargetQueue, BasicMsg).

%% Returns death queue names ordered by recency.
-spec death_queue_names(state()) -> [rabbit_misc:resource_name()].
death_queue_names(#?MODULE{annotations = Anns}) ->
case maps:get(deaths, Anns, undefined) of
undefined ->
[];
#deaths{records = Records} ->
proplists:get_keys(maps:keys(Records))
end;
death_queue_names(#?MODULE{annotations = #{deaths := #deaths{records = Rs}}}) ->
proplists:get_keys(maps:keys(Rs));
death_queue_names(#?MODULE{annotations = #{deaths := Deaths}}) ->
lists:map(fun({{Queue, _Reason}, _Death}) ->
Queue
end, Deaths);
death_queue_names(#?MODULE{}) ->
[];
death_queue_names(BasicMsg) ->
mc_compat:death_queue_names(BasicMsg).

-spec last_death(state()) ->
undefined | {death_key(), #death{}}.
last_death(#?MODULE{annotations = Anns})
when not is_map_key(deaths, Anns) ->
undefined;
last_death(#?MODULE{annotations = #{deaths := #deaths{last = Last,
records = Rs}}}) ->
{Last, maps:get(Last, Rs)};
last_death(BasicMsg) ->
mc_compat:last_death(BasicMsg).

-spec prepare(read | store, state()) -> state().
prepare(For, #?MODULE{protocol = Proto,
data = Data} = State) ->
Expand All @@ -448,24 +461,38 @@ prepare(For, State) ->

%% INTERNAL

%% if there is a death with a source queue that is the same as the target
is_cycle_v2(TargetQueue, Deaths) ->
case lists:splitwith(fun({{SourceQueue, _Reason}, #death{}}) ->
SourceQueue =/= TargetQueue
end, Deaths) of
{_, []} ->
false;
{L, [H | _]} ->
%% There is a cycle, but we only want to drop the message
%% if the cycle is "fully automatic", i.e. without a client
%% expliclity rejecting the message somewhere in the cycle.
lists:all(fun({{_SourceQueue, Reason}, _Death}) ->
Reason =/= rejected
end, [H | L])
end.

%% The desired v1 behaviour is the following:
%% "If there is a death with a source queue that is the same as the target
%% queue name and there are no newer deaths with the 'rejected' reason then
%% consider this a cycle
is_cycle(_Queue, []) ->
%% consider this a cycle."
%% However, the correct death order cannot be reliably determined in v1.
%% deaths_v2 fixes this bug.
is_cycle_v1(_Queue, []) ->
false;
is_cycle(_Queue, [{_Q, rejected} | _]) ->
is_cycle_v1(_Queue, [{_Q, rejected} | _]) ->
%% any rejection breaks the cycle
false;
is_cycle(Queue, [{Queue, Reason} | _])
is_cycle_v1(Queue, [{Queue, Reason} | _])
when Reason =/= rejected ->
true;
is_cycle(Queue, [_ | Rem]) ->
is_cycle(Queue, Rem).
is_cycle_v1(Queue, [_ | Rem]) ->
is_cycle_v1(Queue, Rem).

set_received_at_timestamp(Anns) ->
Millis = os:system_time(millisecond),
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
Loading
Loading