From a3b16dbb104e22d22e55b77aa734391224dbe87e Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 09:35:46 +0200 Subject: [PATCH 1/4] Track requeue history Support tracking the requeue history as described in https://github.com/rabbitmq/rabbitmq-website/pull/2095 This commit: 1. adds a test case tracing the requeue history via AMQP 1.0 using the modified outcome and 2. fixes bugs in the broker which crashed if a modified message annotation value is an AMQP 1.0 list, map, or array. Complex modified annotation values (list, map, array) are stored as tagged values from now on. This means AMQP 0.9.1 consumers will not receive modified annotations of type list, map, or array (which is okay). (cherry picked from commit e6818f0040bb09cafe33da50d68909205408460c) --- .../src/amqp10_client_session.erl | 8 +- deps/rabbit/src/mc.erl | 2 +- deps/rabbit/src/mc_util.erl | 5 +- deps/rabbit/src/rabbit_amqp_session.erl | 13 ++- deps/rabbit/test/amqp_client_SUITE.erl | 85 +++++++++++++------ 5 files changed, 78 insertions(+), 35 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index c1e5eb46214f..7af68be826d7 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -1180,14 +1180,16 @@ wrap_map_value(true) -> {boolean, true}; wrap_map_value(false) -> {boolean, false}; -wrap_map_value(V) when is_integer(V) -> - {uint, V}; +wrap_map_value(V) when is_integer(V) andalso V >= 0 -> + uint(V); wrap_map_value(V) when is_binary(V) -> utf8(V); wrap_map_value(V) when is_list(V) -> utf8(list_to_binary(V)); wrap_map_value(V) when is_atom(V) -> - utf8(atom_to_list(V)). + utf8(atom_to_list(V)); +wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) -> + TaggedValue. utf8(V) -> amqp10_client_types:utf8(V). diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 465c7054f089..b122c4780110 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -44,7 +44,7 @@ -type str() :: atom() | string() | binary(). -type internal_ann_key() :: atom(). -type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt- --type x_ann_value() :: str() | integer() | float() | [x_ann_value()]. +-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()]. -type protocol() :: module(). -type annotations() :: #{internal_ann_key() => term(), x_ann_key() => x_ann_value()}. diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 669dace41f45..1f20d15699db 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) -> {long, V}; infer_type(V) when is_boolean(V) -> {boolean, V}; -infer_type({T, _} = V) when is_atom(T) -> - %% looks like a pre-tagged type - V. +infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) -> + TaggedValue. utf8_string_is_ascii(UTF8String) -> utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end). diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 0c459b08a76b..7bdfab940a83 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1921,7 +1921,7 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed, Anns1 = lists:map( %% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10] fun({{symbol, <<"x-", _/binary>> = K}, V}) -> - {K, unwrap(V)} + {K, unwrap_simple_type(V)} end, KVList), maps:from_list(Anns1) end, @@ -3603,7 +3603,14 @@ format_status( topic_permission_cache => TopicPermissionCache}, maps:update(state, State, Status). -unwrap({_Tag, V}) -> + +unwrap_simple_type(V = {list, _}) -> + V; +unwrap_simple_type(V = {map, _}) -> + V; +unwrap_simple_type(V = {array, _, _}) -> + V; +unwrap_simple_type({_SimpleType, V}) -> V; -unwrap(V) -> +unwrap_simple_type(V) -> V. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index a6dc73e28e2c..9e9a6e954d3b 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -490,61 +490,96 @@ modified_quorum_queue(Config) -> ok = amqp10_client:send_msg(Sender, Msg2), ok = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + Receiver1Name = <<"receiver 1">>, + Receiver2Name = <<"receiver 2">>, + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled), - {ok, M1} = amqp10_client:get_msg(Receiver), + {ok, M1} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m1">>], amqp10_msg:body(M1)), ?assertMatch(#{delivery_count := 0, first_acquirer := true}, amqp10_msg:headers(M1)), - ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}), - {ok, M2a} = amqp10_client:get_msg(Receiver), + {ok, M2a} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2a)), ?assertMatch(#{delivery_count := 0, first_acquirer := true}, amqp10_msg:headers(M2a)), - ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}), - {ok, M2b} = amqp10_client:get_msg(Receiver), + {ok, M2b} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2b)), ?assertMatch(#{delivery_count := 0, first_acquirer := false}, amqp10_msg:headers(M2b)), - ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}), - {ok, M2c} = amqp10_client:get_msg(Receiver), + {ok, M2c} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2c)), ?assertMatch(#{delivery_count := 1, first_acquirer := false}, amqp10_msg:headers(M2c)), - ok = amqp10_client:settle_msg(Receiver, M2c, - {modified, true, false, - #{<<"x-opt-key">> => <<"val 1">>}}), - - {ok, M2d} = amqp10_client:get_msg(Receiver), + ok = amqp10_client:settle_msg( + Receiver1, M2c, + {modified, true, false, + %% Test that a history of requeue events can be tracked as described in + %% https://rabbitmq.com/blog/2024/10/11/modified-outcome + #{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]}, + <<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]}, + <<"x-opt-my-map">> => {map, [ + {{utf8, <<"k1">>}, {byte, -1}}, + {{utf8, <<"k2">>}, {ulong, 2}} + ]}}}), + + {ok, M2d} = amqp10_client:get_msg(Receiver2), ?assertEqual([<<"m2">>], amqp10_msg:body(M2d)), ?assertMatch(#{delivery_count := 2, first_acquirer := false}, amqp10_msg:headers(M2d)), - ?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)), - ok = amqp10_client:settle_msg(Receiver, M2d, - {modified, false, false, - #{<<"x-opt-key">> => <<"val 2">>, - <<"x-other">> => 99}}), - - {ok, M2e} = amqp10_client:get_msg(Receiver), + #{<<"x-opt-requeued-by">> := {array, utf8, L0}, + <<"x-opt-requeue-reason">> := L1, + <<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d), + ok = amqp10_client:settle_msg( + Receiver1, M2d, + {modified, false, false, + #{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]}, + <<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]}, + <<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]}, + <<"x-other">> => 99}}), + + {ok, M2e} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2e)), ?assertMatch(#{delivery_count := 2, first_acquirer := false}, amqp10_msg:headers(M2e)), - ?assertMatch(#{<<"x-opt-key">> := <<"val 2">>, + ?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]}, + <<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}], + <<"x-opt-my-map">> := [ + {{utf8, <<"k1">>}, {byte, -1}}, + {{utf8, <<"k2">>}, {ulong, 2}}, + {{symbol, <<"k3">>}, {symbol, <<"val 3">>}} + ], <<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)), - ok = amqp10_client:settle_msg(Receiver, M2e, modified), + ok = amqp10_client:settle_msg(Receiver1, M2e, modified), - ok = amqp10_client:detach_link(Receiver), - ?assertMatch({ok, #{message_count := 1}}, - rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + %% Test that we can consume via AMQP 0.9.1 + Ch = rabbit_ct_client_helpers:open_channel(Config), + {#'basic.get_ok'{}, + #amqp_msg{payload = <<"m2">>, + props = #'P_basic'{headers = Headers}} + } = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}), + %% We expect to receive only modified AMQP 1.0 message annotations that are of simple types + %% (i.e. excluding list, map, array). + ?assertEqual({value, {<<"x-other">>, long, 99}}, + lists:keysearch(<<"x-other">>, 1, Headers)), + ?assertEqual({value, {<<"x-delivery-count">>, long, 5}}, + lists:keysearch(<<"x-delivery-count">>, 1, Headers)), + ok = rabbit_ct_client_helpers:close_channel(Ch), + + ok = amqp10_client:detach_link(Receiver1), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). From 22ac040c8109367f461550d8cba6d5e2e42ca533 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 12:12:09 +0200 Subject: [PATCH 2/4] Add alternate exchange test assertion Test the use case described in https://github.com/rabbitmq/rabbitmq-website/pull/2095 (cherry picked from commit 855a32ab28ea7c33813f1e845f85bbb7f95b26d4) --- deps/rabbit/test/amqp_client_SUITE.erl | 41 ++++++++++++++++--- .../src/rabbitmq_amqp_client.erl | 6 +-- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 9e9a6e954d3b..0e227012f42d 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -585,19 +585,32 @@ modified_quorum_queue(Config) -> ok = amqp10_client:close_connection(Connection). %% Test that a message can be routed based on the message-annotations -%% provided in the modified outcome. +%% provided in the modified outcome as described in +%% https://rabbitmq.com/blog/2024/10/11/modified-outcome modified_dead_letter_headers_exchange(Config) -> {Connection, Session, LinkPair} = init(Config), + HeadersXName = <<"my headers exchange">>, + AlternateXName = <<"my alternate exchange">>, SourceQName = <<"source quorum queue">>, AppleQName = <<"dead letter classic queue receiving apples">>, BananaQName = <<"dead letter quorum queue receiving bananas">>, + TrashQName = <<"trash queue receiving anything that doesn't match">>, + + ok = rabbitmq_amqp_client:declare_exchange( + LinkPair, + HeadersXName, + #{type => <<"headers">>, + arguments => #{<<"alternate-exchange">> => {utf8, AlternateXName}}}), + + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, AlternateXName, #{type => <<"fanout">>}), + {ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, SourceQName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, <<"x-overflow">> => {utf8, <<"reject-publish">>}, <<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>}, - <<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}}}), + <<"x-dead-letter-exchange">> => {utf8, HeadersXName}}}), {ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, AppleQName, @@ -606,14 +619,16 @@ modified_dead_letter_headers_exchange(Config) -> LinkPair, BananaQName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TrashQName, #{}), ok = rabbitmq_amqp_client:bind_queue( - LinkPair, AppleQName, <<"amq.headers">>, <<>>, + LinkPair, AppleQName, HeadersXName, <<>>, #{<<"x-fruit">> => {utf8, <<"apple">>}, <<"x-match">> => {utf8, <<"any-with-x">>}}), ok = rabbitmq_amqp_client:bind_queue( - LinkPair, BananaQName, <<"amq.headers">>, <<>>, + LinkPair, BananaQName, HeadersXName, <<>>, #{<<"x-fruit">> => {utf8, <<"banana">>}, <<"x-match">> => {utf8, <<"any-with-x">>}}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, TrashQName, AlternateXName, <<>>, #{}), {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, rabbitmq_amqp_address:queue(SourceQName)), @@ -624,6 +639,8 @@ modified_dead_letter_headers_exchange(Config) -> Session, <<"receiver apple">>, rabbitmq_amqp_address:queue(AppleQName), unsettled), {ok, ReceiverBanana} = amqp10_client:attach_receiver_link( Session, <<"receiver banana">>, rabbitmq_amqp_address:queue(BananaQName), unsettled), + {ok, ReceiverTrash} = amqp10_client:attach_receiver_link( + Session, <<"receiver trash">>, rabbitmq_amqp_address:queue(TrashQName), unsettled), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)), @@ -633,7 +650,8 @@ modified_dead_letter_headers_exchange(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( #{"x-fruit" => <<"apple">>}, amqp10_msg:new(<<"t4">>, <<"m4">>))), - ok = wait_for_accepts(3), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t5">>, <<"m5">>)), + ok = wait_for_accepts(5), {ok, Msg1} = amqp10_client:get_msg(Receiver), ?assertMatch(#{delivery_count := 0, @@ -674,6 +692,16 @@ modified_dead_letter_headers_exchange(Config) -> amqp10_msg:headers(MsgBanana2)), ok = amqp10_client:accept_msg(ReceiverBanana, MsgBanana2), + {ok, Msg5} = amqp10_client:get_msg(Receiver), + %% This message should be routed via the alternate exchange to the trash queue. + ok = amqp10_client:settle_msg(Receiver, Msg5, {modified, false, true, #{<<"x-fruit">> => <<"strawberry">>}}), + {ok, MsgTrash} = amqp10_client:get_msg(ReceiverTrash), + ?assertEqual([<<"m5">>], amqp10_msg:body(MsgTrash)), + ?assertMatch(#{delivery_count := 0, + first_acquirer := false}, + amqp10_msg:headers(MsgTrash)), + ok = amqp10_client:accept_msg(ReceiverTrash, MsgTrash), + ok = detach_link_sync(Sender), ok = detach_link_sync(Receiver), ok = detach_link_sync(ReceiverApple), @@ -681,6 +709,9 @@ modified_dead_letter_headers_exchange(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, SourceQName), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, AppleQName), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, BananaQName), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index fc5da6c7b4e4..ce38b0241d10 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -48,14 +48,12 @@ replicas => [binary()], leader => binary()}. --type queue_properties() :: #{name := binary(), - durable => boolean(), +-type queue_properties() :: #{durable => boolean(), exclusive => boolean(), auto_delete => boolean(), arguments => arguments()}. --type exchange_properties() :: #{name := binary(), - type => binary(), +-type exchange_properties() :: #{type => binary(), durable => boolean(), auto_delete => boolean(), internal => boolean(), From 46a0f5653608edef71137d7c439c43cf65a55a3a Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 12:57:33 +0200 Subject: [PATCH 3/4] Add custom dead letter history test Test the use case described in https://github.com/rabbitmq/rabbitmq-website/pull/2095: > Rather than relying solely on RabbitMQ's built-in dead lettering tracking via x-opt-deaths, consumers can customise dead lettering event tracking. (cherry picked from commit 2e90619a6285262dad81006ff659dda866b43eba) --- deps/rabbit/test/amqp_client_SUITE.erl | 83 +++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 0e227012f42d..463306e1c90e 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -118,6 +118,7 @@ groups() -> modified_classic_queue, modified_quorum_queue, modified_dead_letter_headers_exchange, + modified_dead_letter_history, dead_letter_headers_exchange, dead_letter_reject, dead_letter_reject_message_order_classic_queue, @@ -253,7 +254,8 @@ init_per_testcase(T, Config) end; init_per_testcase(T, Config) when T =:= modified_quorum_queue orelse - T =:= modified_dead_letter_headers_exchange -> + T =:= modified_dead_letter_headers_exchange orelse + T =:= modified_dead_letter_history -> case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of true -> rabbit_ct_helpers:testcase_started(Config, T); @@ -716,6 +718,85 @@ modified_dead_letter_headers_exchange(Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). +%% Test that custom dead lettering event tracking works as described in +%% https://rabbitmq.com/blog/2024/10/11/modified-outcome +modified_dead_letter_history(Config) -> + {Connection, Session, LinkPair} = init(Config), + Q1 = <<"qq 1">>, + Q2 = <<"qq 2">>, + + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, Q1, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>}, + <<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, Q2, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>}, + <<"x-dead-letter-exchange">> => {utf8, <<>>}}}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)), + wait_for_credit(Sender), + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled), + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)), + ok = wait_for_accepts(1), + ok = detach_link_sync(Sender), + + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + ?assertMatch(#{delivery_count := 0, + first_acquirer := true}, + amqp10_msg:headers(Msg1)), + ok = amqp10_client:settle_msg( + Receiver1, Msg1, + {modified, true, true, + #{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]}, + <<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]}, + <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}} + }), + + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertMatch(#{delivery_count := 1, + first_acquirer := false}, + amqp10_msg:headers(Msg2)), + #{<<"x-opt-history-list">> := L1, + <<"x-opt-history-map">> := L2, + <<"x-opt-history-array">> := {array, utf8, L0} + } = amqp10_msg:message_annotations(Msg2), + ok = amqp10_client:settle_msg( + Receiver2, Msg2, + {modified, true, true, + #{<<"x-opt-history-list">> => {list, [{int, -99} | L1]}, + <<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]}, + <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]}, + <<"x-other">> => 99}}), + + {ok, Msg3} = amqp10_client:get_msg(Receiver1), + ?assertEqual([<<"m">>], amqp10_msg:body(Msg3)), + ?assertMatch(#{delivery_count := 2, + first_acquirer := false}, + amqp10_msg:headers(Msg3)), + ?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]}, + <<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}], + <<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}}, + {{symbol, <<"k1">>}, {byte, -1}}], + <<"x-other">> := 99}, amqp10_msg:message_annotations(Msg3)), + ok = amqp10_client:accept_msg(Receiver1, Msg3), + + ok = detach_link_sync(Receiver1), + ok = detach_link_sync(Receiver2), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue. sender_settle_mode_unsettled(Config) -> From 67319f0d31db382b3843e150e62581e1866cd9b1 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 14:43:31 +0200 Subject: [PATCH 4/4] Support negative integers in modified annotations (cherry picked from commit b1064fddba5de487577f62a6ee807482e7830ab7) --- deps/amqp10_client/src/amqp10_client_session.erl | 9 +++++++-- deps/rabbit/test/amqp_client_SUITE.erl | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 7af68be826d7..6508589b026d 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -1180,8 +1180,13 @@ wrap_map_value(true) -> {boolean, true}; wrap_map_value(false) -> {boolean, false}; -wrap_map_value(V) when is_integer(V) andalso V >= 0 -> - uint(V); +wrap_map_value(V) when is_integer(V) -> + case V < 0 of + true -> + {int, V}; + false -> + uint(V) + end; wrap_map_value(V) when is_binary(V) -> utf8(V); wrap_map_value(V) when is_list(V) -> diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 463306e1c90e..982e5922d81a 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -775,7 +775,7 @@ modified_dead_letter_history(Config) -> #{<<"x-opt-history-list">> => {list, [{int, -99} | L1]}, <<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]}, <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]}, - <<"x-other">> => 99}}), + <<"x-other">> => -99}}), {ok, Msg3} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m">>], amqp10_msg:body(Msg3)), @@ -786,7 +786,7 @@ modified_dead_letter_history(Config) -> <<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}], <<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}}, {{symbol, <<"k1">>}, {byte, -1}}], - <<"x-other">> := 99}, amqp10_msg:message_annotations(Msg3)), + <<"x-other">> := -99}, amqp10_msg:message_annotations(Msg3)), ok = amqp10_client:accept_msg(Receiver1, Msg3), ok = detach_link_sync(Receiver1),