Skip to content

Commit

Permalink
Remove BCC from x-death routing-keys
Browse files Browse the repository at this point in the history
This commit is a follow up of #11174
which broke the following Java client test:
```
./mvnw verify -P '!setup-test-cluster' -Drabbitmqctl.bin=DOCKER:rabbitmq -Dit.test=DeadLetterExchange#deadLetterNewRK
```

The desired documented behaviour is the following:
> routing-keys: the routing keys (including CC keys but excluding BCC ones) the message was published with

This behaviour should be respected also for messages dead lettered into a
stream. Therefore, instead of first including the BCC keys in the `#death.routing_keys` field
and removing it again in mc_amqpl before sending the routing-keys to the
client as done in v3.13.2 in
https://github.com/rabbitmq/rabbitmq-server/blob/dc25ef53292eb0b34588ab8eaae61082b966b784/deps/rabbit/src/mc_amqpl.erl#L527
we instead omit directly the BCC keys from `#death.routing_keys` when
recording a death event.

This commit records the BCC keys in their own mc `bcc` annotation in `mc_amqpl:init/1`.

(cherry picked from commit 90a4010)
  • Loading branch information
ansd authored and mergify[bot] committed May 14, 2024
1 parent cd079d9 commit f8ddfe6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
12 changes: 10 additions & 2 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,22 @@ record_death(Reason, SourceQueue,
is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
#{?ANN_EXCHANGE := Exchange,
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
?ANN_ROUTING_KEYS := RKeys0} = Anns0,
%% The routing keys that we record in the death history and will
%% report to the client should include CC, but exclude BCC.
RKeys = case Anns0 of
#{bcc := BccKeys} ->
RKeys0 -- BccKeys;
_ ->
RKeys0
end,
Timestamp = os:system_time(millisecond),
Ttl = maps:get(ttl, Anns0, undefined),
DeathAnns = rabbit_misc:maps_put_truthy(
ttl, Ttl, #{first_time => Timestamp,
last_time => Timestamp}),
NewDeath = #death{exchange = Exchange,
routing_keys = RoutingKeys,
routing_keys = RKeys,
count = 1,
anns = DeathAnns},
Anns = case Anns0 of
Expand Down
22 changes: 16 additions & 6 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@

%% mc implementation
init(#content{} = Content0) ->
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
Content1 = rabbit_binary_parser:ensure_content_decoded(Content0),
%% project essential properties into annotations
Anns = essential_properties(Content),
{strip_header(Content, ?DELETED_HEADER), Anns}.
Anns = essential_properties(Content1),
Content = strip_header(Content1, ?DELETED_HEADER),
{Content, Anns}.

convert_from(mc_amqp, Sections, _Env) ->
{H, MAnn, Prop, AProp, BodyRev} =
Expand Down Expand Up @@ -480,7 +481,7 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
Error;
HeaderRoutes ->
{ok, mc:init(?MODULE,
rabbit_basic:strip_bcc_header(Content),
Content,
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
?ANN_EXCHANGE => ExchangeNameBin})}
end;
Expand Down Expand Up @@ -734,7 +735,8 @@ message_id(undefined, _HKey, H) ->
essential_properties(#content{} = C) ->
#'P_basic'{delivery_mode = Mode,
priority = Priority,
timestamp = TimestampRaw} = Props = C#content.properties,
timestamp = TimestampRaw,
headers = Headers} = Props = C#content.properties,
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
Timestamp = case TimestampRaw of
undefined ->
Expand All @@ -744,6 +746,12 @@ essential_properties(#content{} = C) ->
TimestampRaw * 1000
end,
Durable = Mode == 2,
BccKeys = case rabbit_basic:header(<<"BCC">>, Headers) of
{<<"BCC">>, array, Routes} ->
[Route || {longstr, Route} <- Routes];
_ ->
undefined
end,
maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
Expand All @@ -752,7 +760,9 @@ essential_properties(#content{} = C) ->
?ANN_TIMESTAMP, Timestamp,
maps_put_falsy(
?ANN_DURABLE, Durable,
#{})))).
maps_put_truthy(
bcc, BccKeys,
#{}))))).

%% headers that are added as annotations during conversions
is_internal_header(<<"x-basic-", _/binary>>) ->
Expand Down
29 changes: 21 additions & 8 deletions deps/rabbit/test/dead_lettering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1391,17 +1391,18 @@ dead_letter_headers_BCC(Config) ->
routing_key = DLXQName}),

P1 = <<"msg1">>,
BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]},
publish(Ch, QName, [P1], [BCCHeader]),
CCHeader = {<<"CC">>, array, [{longstr, <<"cc 1">>}, {longstr, <<"cc 2">>}]},
BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}, {longstr, <<"bcc 2">>}]},
publish(Ch, QName, [P1], [CCHeader, BCCHeader]),
%% Message is published to both queues because of BCC header and DLX queue bound to both
%% exchanges
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
{#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
props = #'P_basic'{headers = Headers1}}} =
amqp_channel:call(Ch, #'basic.get'{queue = QName}),
amqp_channel:call(Ch, #'basic.get'{queue = QName}),
{#'basic.get_ok'{}, #amqp_msg{payload = P1,
props = #'P_basic'{headers = Headers2}}} =
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
%% We check the headers to ensure no dead lettering has happened
?assertEqual(undefined, header_lookup(Headers1, <<"x-death">>)),
?assertEqual(undefined, header_lookup(Headers2, <<"x-death">>)),
Expand All @@ -1413,10 +1414,15 @@ dead_letter_headers_BCC(Config) ->
wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
{#'basic.get_ok'{}, #amqp_msg{payload = P1,
props = #'P_basic'{headers = Headers3}}} =
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
consume_empty(Ch, QName),
?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"BCC">>)),
?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
{array, [{table, Death}]} = rabbit_misc:table_lookup(Headers3, <<"x-death">>),
{array, RKeys0} = rabbit_misc:table_lookup(Death, <<"routing-keys">>),
RKeys = [RKey || {longstr, RKey} <- RKeys0],
%% routing-keys in the death history should include CC but exclude BCC keys
?assertEqual(lists:sort([QName, <<"cc 1">>, <<"cc 2">>]),
lists:sort(RKeys)).

%% Three top-level headers are added for the very first dead-lettering event.
%% They are
Expand Down Expand Up @@ -1681,7 +1687,11 @@ stream(Config) ->
#'basic.publish'{routing_key = Q1},
#amqp_msg{payload = Payload,
props = #'P_basic'{expiration = <<"0">>,
headers = [{<<"CC">>, array, [{longstr, <<"other key">>}]}]}
headers = [{<<"CC">>, array, [{longstr, <<"cc 1">>},
{longstr, <<"cc 2">>}]},
{<<"BCC">>, array, [{longstr, <<"bcc 1">>},
{longstr, <<"bcc 2">>}]}
]}
}),

#'basic.qos_ok'{} = amqp_channel:call(Ch1, #'basic.qos'{prefetch_count = 1}),
Expand Down Expand Up @@ -1722,7 +1732,10 @@ stream(Config) ->
?assertEqual({longstr, Reason}, rabbit_misc:table_lookup(Death1, <<"reason">>)),
?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death1, <<"exchange">>)),
?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
?assertEqual({array, [{longstr, Q1}, {longstr, <<"other key">>}]},
%% routing-keys in the death history should include CC but exclude BCC keys
?assertEqual({array, [{longstr, Q1},
{longstr, <<"cc 1">>},
{longstr, <<"cc 2">>}]},
rabbit_misc:table_lookup(Death1, <<"routing-keys">>)),
?assertEqual({longstr, <<"0">>}, rabbit_misc:table_lookup(Death1, <<"original-expiration">>)),
{timestamp, T1} = rabbit_misc:table_lookup(Death1, <<"time">>),
Expand Down

0 comments on commit f8ddfe6

Please sign in to comment.