diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl index ddf1ef36a7de..17c90aa6aaf8 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl @@ -9,6 +9,7 @@ -define(PG_SCOPE, pg_scope_rabbitmq_mqtt_clientid). -define(QUEUE_TYPE_QOS_0, rabbit_mqtt_qos0_queue). -define(PERSISTENT_TERM_MAILBOX_SOFT_LIMIT, mqtt_mailbox_soft_limit). +-define(PERSISTENT_TERM_EXCHANGE, mqtt_exchange). -define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/mqtt.html">>). -define(MQTT_PROTO_V3, mqtt310). diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index 38d67e6a8bcb..2d183518cf6a 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -2,12 +2,14 @@ -behaviour(mc). -include("rabbit_mqtt_packet.hrl"). +-include("rabbit_mqtt.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit/include/mc.hrl"). -define(CONTENT_TYPE_AMQP, <<"message/vnd.rabbitmq.amqp">>). +-define(DEFAULT_MQTT_EXCHANGE, <<"amq.topic">>). -export([ init/1, @@ -89,24 +91,37 @@ convert_from(mc_amqp, Sections) -> true -> #{'Payload-Format-Indicator' => 1}; false -> #{} end, - %% TODO convert #'v1_0.properties'{reply_to} to Response-Topic - %% Props1 = case AmqpProps of - #'v1_0.properties'{correlation_id = {_Type, _Val} = Corr} -> - Props0#{'Correlation-Data' => correlation_id(Corr)}; + #'v1_0.properties'{reply_to = {utf8, Address}} -> + MqttX = persistent_term:get(?PERSISTENT_TERM_EXCHANGE), + case Address of + <<"/topic/", Topic/binary>> + when MqttX =:= ?DEFAULT_MQTT_EXCHANGE -> + add_response_topic(Topic, Props0); + <<"/exchange/", MqttX:(byte_size(MqttX))/binary, "/", RoutingKey/binary>> -> + add_response_topic(RoutingKey, Props0); + _ -> + Props0 + end; _ -> Props0 end, - Props2 = case ContentType of + Props2 = case AmqpProps of + #'v1_0.properties'{correlation_id = {_Type, _Val} = Corr} -> + Props1#{'Correlation-Data' => correlation_id(Corr)}; + _ -> + Props1 + end, + Props3 = case ContentType of undefined -> case AmqpProps of #'v1_0.properties'{content_type = {symbol, ContentType1}} -> - Props1#{'Content-Type' => rabbit_data_coercion:to_binary(ContentType1)}; + Props2#{'Content-Type' => rabbit_data_coercion:to_binary(ContentType1)}; _ -> - Props1 + Props2 end; _ -> - Props1#{'Content-Type' => ContentType} + Props2#{'Content-Type' => ContentType} end, UserProp0 = lists:filtermap(fun({{symbol, <<"x-", _/binary>> = Key}, Val}) -> filter_map_amqp_to_utf8_string(Key, Val); @@ -118,8 +133,8 @@ convert_from(mc_amqp, Sections) -> filter_map_amqp_to_utf8_string(Key, Val) end, AppProps), Props = case UserProp0 ++ UserProp1 of - [] -> Props2; - UserProp -> Props2#{'User-Property' => UserProp} + [] -> Props3; + UserProp -> Props3#{'User-Property' => UserProp} end, Payload = lists:flatten(lists:reverse(PayloadRev)), #mqtt_msg{retain = false, @@ -246,18 +261,26 @@ convert_to(mc_amqp, #mqtt_msg{qos = Qos, _ -> undefined end, - %% TODO Translate MQTT Response-Topic to AMQP topic. - %% If operator did not modify mqtt.exchange, set reply-to address to "/topic/" RK. - %% If operator modified mqtt.exchange, set reply-to address to "/exchange/" X "/" RK. - % case Props of - % #{'Response-Topic' := Topic} -> - % rabbit_mqtt_util:mqtt_to_amqp(Topic) - S2 = case {ContentType, CorrId} of - {undefined, undefined} -> + ReplyTo = case Props of + #{'Response-Topic' := MqttTopic} -> + Topic = rabbit_mqtt_util:mqtt_to_amqp(MqttTopic), + Address = case persistent_term:get(?PERSISTENT_TERM_EXCHANGE) of + ?DEFAULT_MQTT_EXCHANGE -> + <<"/topic/", Topic/binary>>; + Exchange -> + <<"/exchange/", Exchange/binary, "/", Topic/binary>> + end, + {utf8, Address}; + _ -> + undefined + end, + S2 = case {ContentType, CorrId, ReplyTo} of + {undefined, undefined, undefined} -> S1; _ -> [#'v1_0.properties'{content_type = ContentType, - correlation_id = CorrId} | S1] + correlation_id = CorrId, + reply_to = ReplyTo} | S1] end, S3 = case MsgAnns of @@ -296,16 +319,16 @@ convert_to(mc_amqpl, #mqtt_msg{qos = Qos, Hs0 end, {CorrId, Hs2} = case Props of - #{'Correlation-Data' := Corr} -> - case mc_util:is_valid_shortstr(Corr) of - true -> - {Corr, Hs1}; - false -> - {undefined, [{<<"x-correlation-id">>, longstr, Corr} | Hs1]} - end; - _ -> - {undefined, Hs1} - end, + #{'Correlation-Data' := Corr} -> + case mc_util:is_valid_shortstr(Corr) of + true -> + {Corr, Hs1}; + false -> + {undefined, [{<<"x-correlation-id">>, longstr, Corr} | Hs1]} + end; + _ -> + {undefined, Hs1} + end, Expiration = case Props of #{'Message-Expiry-Interval' := Seconds} -> integer_to_binary(timer:seconds(Seconds)); @@ -517,3 +540,7 @@ amqp_encode(Data, Acc0) -> Bin = amqp10_framing:encode_bin(Data), Acc = setelement(5, Acc0, [Bin | element(5, Acc0)]), setelement(7, Acc, ?CONTENT_TYPE_AMQP). + +add_response_topic(AmqpTopic, PublishProperties) -> + MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(AmqpTopic), + PublishProperties#{'Response-Topic' => MqttTopic}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 7bb91193d2b1..785ec0d334e3 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -110,6 +110,10 @@ init_global_counters(ProtoVer) -> persist_static_configuration() -> rabbit_mqtt_util:init_sparkplug(), + {ok, Exchange} = application:get_env(?APP_NAME, exchange), + ?assert(is_binary(Exchange)), + ok = persistent_term:put(?PERSISTENT_TERM_EXCHANGE, Exchange), + {ok, MailboxSoftLimit} = application:get_env(?APP_NAME, mailbox_soft_limit), ?assert(is_integer(MailboxSoftLimit)), ok = persistent_term:put(?PERSISTENT_TERM_MAILBOX_SOFT_LIMIT, MailboxSoftLimit), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 6fcaf03b3b57..5933bc9ee9e0 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -200,6 +200,7 @@ process_connect( {ok, WillMsg} ?= make_will_msg(Packet), {TraceState, ConnName} = init_trace(VHost, ConnName0), ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket), + Exchange = rabbit_misc:r(VHost, exchange, persistent_term:get(?PERSISTENT_TERM_EXCHANGE)), S = #state{ cfg = #cfg{socket = Socket, proto_ver = proto_integer_to_atom(ProtoVer), @@ -215,7 +216,7 @@ process_connect( peer_ip_addr = PeerIp, peer_port = PeerPort, send_fun = SendFun, - exchange = rabbit_misc:r(VHost, exchange, rabbit_mqtt_util:env(exchange)), + exchange = Exchange, retainer_pid = rabbit_mqtt_retainer_sup:start_child_for_vhost(VHost), vhost = VHost, client_id = ClientId, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl index 3aba2258a0a6..236af0e0899a 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl @@ -143,7 +143,6 @@ env(Key) -> coerce_env_value(default_pass, Val) -> rabbit_data_coercion:to_binary(Val); coerce_env_value(default_user, Val) -> rabbit_data_coercion:to_binary(Val); -coerce_env_value(exchange, Val) -> rabbit_data_coercion:to_binary(Val); coerce_env_value(vhost, Val) -> rabbit_data_coercion:to_binary(Val); coerce_env_value(_, Val) -> Val. diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index de67ef3c0ae5..7720974b5394 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -15,9 +15,10 @@ all() -> groups() -> [ - {lossless, [parallel], + {lossless, [shuffle], [roundtrip_amqp, roundtrip_amqp_payload_format_indicator, + roundtrip_amqp_response_topic, roundtrip_amqpl, roundtrip_amqpl_correlation, amqp_to_mqtt_amqp_value_section_binary, @@ -27,10 +28,12 @@ groups() -> amqp_to_mqtt_amqp_value_section_boolean ] }, - {lossy, [parallel], + {lossy, [shuffle], [roundtrip_amqp_user_property, roundtrip_amqpl_user_property, - roundtrip_amqp_content_type + roundtrip_amqp_content_type, + amqp_to_mqtt_reply_to, + amqp_to_mqtt_footer ] } ]. @@ -112,6 +115,20 @@ roundtrip_amqp_payload_format_indicator(_Config) -> iolist_to_binary(Payload)), ?assertMatch(#{'Payload-Format-Indicator' := 1}, Props). +roundtrip_amqp_response_topic(_Config) -> + Topic = <<"/rabbit/🐇"/utf8>>, + Msg0 = mqtt_msg(), + Key = mqtt_exchange, + MqttExchanges = [<<"amq.topic">>, + <<"some-other-topic-exchange">>], + [begin + ok = persistent_term:put(Key, X), + Msg = Msg0#mqtt_msg{props = #{'Response-Topic' => Topic}}, + ?assertMatch(#mqtt_msg{props = #{'Response-Topic' := Topic}}, + roundtrip(mc_amqp, Msg)), + true = persistent_term:erase(Key) + end || X <- MqttExchanges]. + roundtrip_amqpl(_Config) -> Msg = #mqtt_msg{ qos = 1, @@ -248,6 +265,30 @@ roundtrip_amqp_content_type(_Config) -> #mqtt_msg{props = Props} = roundtrip(mc_amqp, Msg), ?assertNot(maps:is_key('Content-Type', Props)). +amqp_to_mqtt_reply_to(_Config) -> + Val = amqp_value({utf8, <<"hey">>}), + Key = mqtt_exchange, + ok = persistent_term:put(Key, <<"mqtt-topic-exchange">>), + + AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/mqtt-topic-exchange/my.routing.key">>}}, + #mqtt_msg{props = Props1} = amqp_to_mqtt([AmqpProps1, Val]), + ?assertEqual({ok, <<"my/routing/key">>}, + maps:find('Response-Topic', Props1)), + + AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/NON-mqtt-topic-exchange/my.routing.key">>}}, + #mqtt_msg{props = Props2} = amqp_to_mqtt([AmqpProps2, Val]), + ?assertEqual(error, + maps:find('Response-Topic', Props2)), + + true = persistent_term:erase(Key). + +amqp_to_mqtt_footer(_Config) -> + Val = amqp_value({utf8, <<"hey">>}), + Footer = #'v1_0.footer'{content = [{symbol, <<"key">>}, {utf8, <<"value">>}]}, + %% We can translate, but lose the footer. + #mqtt_msg{payload = Payload} = amqp_to_mqtt([Val, Footer]), + ?assertEqual(<<"hey">>, iolist_to_binary(Payload)). + mqtt_msg() -> #mqtt_msg{qos = 0, topic = <<"my/topic">>, diff --git a/deps/rabbitmq_mqtt/test/util_SUITE.erl b/deps/rabbitmq_mqtt/test/util_SUITE.erl index 3d058500abce..2f9e00e738df 100644 --- a/deps/rabbitmq_mqtt/test/util_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/util_SUITE.erl @@ -7,7 +7,6 @@ -module(util_SUITE). -compile([export_all, nowarn_export_all]). --include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). all() -> @@ -18,7 +17,6 @@ all() -> groups() -> [ {tests, [parallel], [ - coerce_exchange, coerce_vhost, coerce_default_user, coerce_default_pass, @@ -27,22 +25,13 @@ groups() -> } ]. -suite() -> - [{timetrap, {seconds, 60}}]. - init_per_suite(Config) -> ok = application:load(rabbitmq_mqtt), Config. + end_per_suite(Config) -> ok = application:unload(rabbitmq_mqtt), Config. -init_per_group(_, Config) -> Config. -end_per_group(_, Config) -> Config. -init_per_testcase(_, Config) -> Config. -end_per_testcase(_, Config) -> Config. - -coerce_exchange(_) -> - ?assertEqual(<<"amq.topic">>, rabbit_mqtt_util:env(exchange)). coerce_vhost(_) -> ?assertEqual(<<"/">>, rabbit_mqtt_util:env(vhost)).