diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index c1e5eb46214f..6508589b026d 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -1181,13 +1181,20 @@ wrap_map_value(true) -> wrap_map_value(false) -> {boolean, false}; wrap_map_value(V) when is_integer(V) -> - {uint, V}; + case V < 0 of + true -> + {int, V}; + false -> + uint(V) + end; wrap_map_value(V) when is_binary(V) -> utf8(V); wrap_map_value(V) when is_list(V) -> utf8(list_to_binary(V)); wrap_map_value(V) when is_atom(V) -> - utf8(atom_to_list(V)). + utf8(atom_to_list(V)); +wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) -> + TaggedValue. utf8(V) -> amqp10_client_types:utf8(V). diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 465c7054f089..b122c4780110 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -44,7 +44,7 @@ -type str() :: atom() | string() | binary(). -type internal_ann_key() :: atom(). -type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt- --type x_ann_value() :: str() | integer() | float() | [x_ann_value()]. +-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()]. -type protocol() :: module(). -type annotations() :: #{internal_ann_key() => term(), x_ann_key() => x_ann_value()}. diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 669dace41f45..1f20d15699db 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) -> {long, V}; infer_type(V) when is_boolean(V) -> {boolean, V}; -infer_type({T, _} = V) when is_atom(T) -> - %% looks like a pre-tagged type - V. +infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) -> + TaggedValue. utf8_string_is_ascii(UTF8String) -> utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end). diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 0c459b08a76b..7bdfab940a83 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1921,7 +1921,7 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed, Anns1 = lists:map( %% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10] fun({{symbol, <<"x-", _/binary>> = K}, V}) -> - {K, unwrap(V)} + {K, unwrap_simple_type(V)} end, KVList), maps:from_list(Anns1) end, @@ -3603,7 +3603,14 @@ format_status( topic_permission_cache => TopicPermissionCache}, maps:update(state, State, Status). -unwrap({_Tag, V}) -> + +unwrap_simple_type(V = {list, _}) -> + V; +unwrap_simple_type(V = {map, _}) -> + V; +unwrap_simple_type(V = {array, _, _}) -> + V; +unwrap_simple_type({_SimpleType, V}) -> V; -unwrap(V) -> +unwrap_simple_type(V) -> V. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index a6dc73e28e2c..982e5922d81a 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -118,6 +118,7 @@ groups() -> modified_classic_queue, modified_quorum_queue, modified_dead_letter_headers_exchange, + modified_dead_letter_history, dead_letter_headers_exchange, dead_letter_reject, dead_letter_reject_message_order_classic_queue, @@ -253,7 +254,8 @@ init_per_testcase(T, Config) end; init_per_testcase(T, Config) when T =:= modified_quorum_queue orelse - T =:= modified_dead_letter_headers_exchange -> + T =:= modified_dead_letter_headers_exchange orelse + T =:= modified_dead_letter_history -> case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of true -> rabbit_ct_helpers:testcase_started(Config, T); @@ -490,79 +492,127 @@ modified_quorum_queue(Config) -> ok = amqp10_client:send_msg(Sender, Msg2), ok = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + Receiver1Name = <<"receiver 1">>, + Receiver2Name = <<"receiver 2">>, + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled), - {ok, M1} = amqp10_client:get_msg(Receiver), + {ok, M1} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m1">>], amqp10_msg:body(M1)), ?assertMatch(#{delivery_count := 0, first_acquirer := true}, amqp10_msg:headers(M1)), - ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}), - {ok, M2a} = amqp10_client:get_msg(Receiver), + {ok, M2a} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2a)), ?assertMatch(#{delivery_count := 0, first_acquirer := true}, amqp10_msg:headers(M2a)), - ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}), - {ok, M2b} = amqp10_client:get_msg(Receiver), + {ok, M2b} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2b)), ?assertMatch(#{delivery_count := 0, first_acquirer := false}, amqp10_msg:headers(M2b)), - ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}), - {ok, M2c} = amqp10_client:get_msg(Receiver), + {ok, M2c} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2c)), ?assertMatch(#{delivery_count := 1, first_acquirer := false}, amqp10_msg:headers(M2c)), - ok = amqp10_client:settle_msg(Receiver, M2c, - {modified, true, false, - #{<<"x-opt-key">> => <<"val 1">>}}), - - {ok, M2d} = amqp10_client:get_msg(Receiver), + ok = amqp10_client:settle_msg( + Receiver1, M2c, + {modified, true, false, + %% Test that a history of requeue events can be tracked as described in + %% https://rabbitmq.com/blog/2024/10/11/modified-outcome + #{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]}, + <<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]}, + <<"x-opt-my-map">> => {map, [ + {{utf8, <<"k1">>}, {byte, -1}}, + {{utf8, <<"k2">>}, {ulong, 2}} + ]}}}), + + {ok, M2d} = amqp10_client:get_msg(Receiver2), ?assertEqual([<<"m2">>], amqp10_msg:body(M2d)), ?assertMatch(#{delivery_count := 2, first_acquirer := false}, amqp10_msg:headers(M2d)), - ?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)), - ok = amqp10_client:settle_msg(Receiver, M2d, - {modified, false, false, - #{<<"x-opt-key">> => <<"val 2">>, - <<"x-other">> => 99}}), - - {ok, M2e} = amqp10_client:get_msg(Receiver), + #{<<"x-opt-requeued-by">> := {array, utf8, L0}, + <<"x-opt-requeue-reason">> := L1, + <<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d), + ok = amqp10_client:settle_msg( + Receiver1, M2d, + {modified, false, false, + #{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]}, + <<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]}, + <<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]}, + <<"x-other">> => 99}}), + + {ok, M2e} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2e)), ?assertMatch(#{delivery_count := 2, first_acquirer := false}, amqp10_msg:headers(M2e)), - ?assertMatch(#{<<"x-opt-key">> := <<"val 2">>, + ?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]}, + <<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}], + <<"x-opt-my-map">> := [ + {{utf8, <<"k1">>}, {byte, -1}}, + {{utf8, <<"k2">>}, {ulong, 2}}, + {{symbol, <<"k3">>}, {symbol, <<"val 3">>}} + ], <<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)), - ok = amqp10_client:settle_msg(Receiver, M2e, modified), + ok = amqp10_client:settle_msg(Receiver1, M2e, modified), - ok = amqp10_client:detach_link(Receiver), - ?assertMatch({ok, #{message_count := 1}}, - rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + %% Test that we can consume via AMQP 0.9.1 + Ch = rabbit_ct_client_helpers:open_channel(Config), + {#'basic.get_ok'{}, + #amqp_msg{payload = <<"m2">>, + props = #'P_basic'{headers = Headers}} + } = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}), + %% We expect to receive only modified AMQP 1.0 message annotations that are of simple types + %% (i.e. excluding list, map, array). + ?assertEqual({value, {<<"x-other">>, long, 99}}, + lists:keysearch(<<"x-other">>, 1, Headers)), + ?assertEqual({value, {<<"x-delivery-count">>, long, 5}}, + lists:keysearch(<<"x-delivery-count">>, 1, Headers)), + ok = rabbit_ct_client_helpers:close_channel(Ch), + + ok = amqp10_client:detach_link(Receiver1), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). %% Test that a message can be routed based on the message-annotations -%% provided in the modified outcome. +%% provided in the modified outcome as described in +%% https://rabbitmq.com/blog/2024/10/11/modified-outcome modified_dead_letter_headers_exchange(Config) -> {Connection, Session, LinkPair} = init(Config), + HeadersXName = <<"my headers exchange">>, + AlternateXName = <<"my alternate exchange">>, SourceQName = <<"source quorum queue">>, AppleQName = <<"dead letter classic queue receiving apples">>, BananaQName = <<"dead letter quorum queue receiving bananas">>, + TrashQName = <<"trash queue receiving anything that doesn't match">>, + + ok = rabbitmq_amqp_client:declare_exchange( + LinkPair, + HeadersXName, + #{type => <<"headers">>, + arguments => #{<<"alternate-exchange">> => {utf8, AlternateXName}}}), + + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, AlternateXName, #{type => <<"fanout">>}), + {ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, SourceQName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, <<"x-overflow">> => {utf8, <<"reject-publish">>}, <<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>}, - <<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}}}), + <<"x-dead-letter-exchange">> => {utf8, HeadersXName}}}), {ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue( LinkPair, AppleQName, @@ -571,14 +621,16 @@ modified_dead_letter_headers_exchange(Config) -> LinkPair, BananaQName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TrashQName, #{}), ok = rabbitmq_amqp_client:bind_queue( - LinkPair, AppleQName, <<"amq.headers">>, <<>>, + LinkPair, AppleQName, HeadersXName, <<>>, #{<<"x-fruit">> => {utf8, <<"apple">>}, <<"x-match">> => {utf8, <<"any-with-x">>}}), ok = rabbitmq_amqp_client:bind_queue( - LinkPair, BananaQName, <<"amq.headers">>, <<>>, + LinkPair, BananaQName, HeadersXName, <<>>, #{<<"x-fruit">> => {utf8, <<"banana">>}, <<"x-match">> => {utf8, <<"any-with-x">>}}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, TrashQName, AlternateXName, <<>>, #{}), {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, rabbitmq_amqp_address:queue(SourceQName)), @@ -589,6 +641,8 @@ modified_dead_letter_headers_exchange(Config) -> Session, <<"receiver apple">>, rabbitmq_amqp_address:queue(AppleQName), unsettled), {ok, ReceiverBanana} = amqp10_client:attach_receiver_link( Session, <<"receiver banana">>, rabbitmq_amqp_address:queue(BananaQName), unsettled), + {ok, ReceiverTrash} = amqp10_client:attach_receiver_link( + Session, <<"receiver trash">>, rabbitmq_amqp_address:queue(TrashQName), unsettled), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)), @@ -598,7 +652,8 @@ modified_dead_letter_headers_exchange(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( #{"x-fruit" => <<"apple">>}, amqp10_msg:new(<<"t4">>, <<"m4">>))), - ok = wait_for_accepts(3), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t5">>, <<"m5">>)), + ok = wait_for_accepts(5), {ok, Msg1} = amqp10_client:get_msg(Receiver), ?assertMatch(#{delivery_count := 0, @@ -639,6 +694,16 @@ modified_dead_letter_headers_exchange(Config) -> amqp10_msg:headers(MsgBanana2)), ok = amqp10_client:accept_msg(ReceiverBanana, MsgBanana2), + {ok, Msg5} = amqp10_client:get_msg(Receiver), + %% This message should be routed via the alternate exchange to the trash queue. + ok = amqp10_client:settle_msg(Receiver, Msg5, {modified, false, true, #{<<"x-fruit">> => <<"strawberry">>}}), + {ok, MsgTrash} = amqp10_client:get_msg(ReceiverTrash), + ?assertEqual([<<"m5">>], amqp10_msg:body(MsgTrash)), + ?assertMatch(#{delivery_count := 0, + first_acquirer := false}, + amqp10_msg:headers(MsgTrash)), + ok = amqp10_client:accept_msg(ReceiverTrash, MsgTrash), + ok = detach_link_sync(Sender), ok = detach_link_sync(Receiver), ok = detach_link_sync(ReceiverApple), @@ -646,6 +711,88 @@ modified_dead_letter_headers_exchange(Config) -> {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, SourceQName), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, AppleQName), {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, BananaQName), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that custom dead lettering event tracking works as described in +%% https://rabbitmq.com/blog/2024/10/11/modified-outcome +modified_dead_letter_history(Config) -> + {Connection, Session, LinkPair} = init(Config), + Q1 = <<"qq 1">>, + Q2 = <<"qq 2">>, + + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, Q1, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>}, + <<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, Q2, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>}, + <<"x-dead-letter-exchange">> => {utf8, <<>>}}}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)), + wait_for_credit(Sender), + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled), + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)), + ok = wait_for_accepts(1), + ok = detach_link_sync(Sender), + + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + ?assertMatch(#{delivery_count := 0, + first_acquirer := true}, + amqp10_msg:headers(Msg1)), + ok = amqp10_client:settle_msg( + Receiver1, Msg1, + {modified, true, true, + #{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]}, + <<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]}, + <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}} + }), + + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertMatch(#{delivery_count := 1, + first_acquirer := false}, + amqp10_msg:headers(Msg2)), + #{<<"x-opt-history-list">> := L1, + <<"x-opt-history-map">> := L2, + <<"x-opt-history-array">> := {array, utf8, L0} + } = amqp10_msg:message_annotations(Msg2), + ok = amqp10_client:settle_msg( + Receiver2, Msg2, + {modified, true, true, + #{<<"x-opt-history-list">> => {list, [{int, -99} | L1]}, + <<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]}, + <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]}, + <<"x-other">> => -99}}), + + {ok, Msg3} = amqp10_client:get_msg(Receiver1), + ?assertEqual([<<"m">>], amqp10_msg:body(Msg3)), + ?assertMatch(#{delivery_count := 2, + first_acquirer := false}, + amqp10_msg:headers(Msg3)), + ?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]}, + <<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}], + <<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}}, + {{symbol, <<"k1">>}, {byte, -1}}], + <<"x-other">> := -99}, amqp10_msg:message_annotations(Msg3)), + ok = amqp10_client:accept_msg(Receiver1, Msg3), + + ok = detach_link_sync(Receiver1), + ok = detach_link_sync(Receiver2), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index fc5da6c7b4e4..ce38b0241d10 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -48,14 +48,12 @@ replicas => [binary()], leader => binary()}. --type queue_properties() :: #{name := binary(), - durable => boolean(), +-type queue_properties() :: #{durable => boolean(), exclusive => boolean(), auto_delete => boolean(), arguments => arguments()}. --type exchange_properties() :: #{name := binary(), - type => binary(), +-type exchange_properties() :: #{type => binary(), durable => boolean(), auto_delete => boolean(), internal => boolean(),