Skip to content

Commit

Permalink
Translate Response Topic between MQTT and AMQP
Browse files Browse the repository at this point in the history
Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.

The Response Topic must be a UTF-8 encoded string.

This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/"     RK        Publish to amq.topic with routing key RK
"/exchange/"  X "/" RK  Publish to exchange X with routing key RK
```

By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.

When an operator modifies the mqtt.exchange, the 2nd target address is
used.
  • Loading branch information
ansd committed Aug 24, 2023
1 parent 6eb035d commit 1cc35cf
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 46 deletions.
1 change: 1 addition & 0 deletions deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-define(PG_SCOPE, pg_scope_rabbitmq_mqtt_clientid).
-define(QUEUE_TYPE_QOS_0, rabbit_mqtt_qos0_queue).
-define(PERSISTENT_TERM_MAILBOX_SOFT_LIMIT, mqtt_mailbox_soft_limit).
-define(PERSISTENT_TERM_EXCHANGE, mqtt_exchange).
-define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/mqtt.html">>).

-define(MQTT_PROTO_V3, mqtt310).
Expand Down
85 changes: 56 additions & 29 deletions deps/rabbitmq_mqtt/src/mc_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
-behaviour(mc).

-include("rabbit_mqtt_packet.hrl").
-include("rabbit_mqtt.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit/include/mc.hrl").

-define(CONTENT_TYPE_AMQP, <<"message/vnd.rabbitmq.amqp">>).
-define(DEFAULT_MQTT_EXCHANGE, <<"amq.topic">>).

-export([
init/1,
Expand Down Expand Up @@ -89,24 +91,37 @@ convert_from(mc_amqp, Sections) ->
true -> #{'Payload-Format-Indicator' => 1};
false -> #{}
end,
%% TODO convert #'v1_0.properties'{reply_to} to Response-Topic
%%
Props1 = case AmqpProps of
#'v1_0.properties'{correlation_id = {_Type, _Val} = Corr} ->
Props0#{'Correlation-Data' => correlation_id(Corr)};
#'v1_0.properties'{reply_to = {utf8, Address}} ->
MqttX = persistent_term:get(?PERSISTENT_TERM_EXCHANGE),
case Address of
<<"/topic/", Topic/binary>>
when MqttX =:= ?DEFAULT_MQTT_EXCHANGE ->
add_response_topic(Topic, Props0);
<<"/exchange/", MqttX:(byte_size(MqttX))/binary, "/", RoutingKey/binary>> ->
add_response_topic(RoutingKey, Props0);
_ ->
Props0
end;
_ ->
Props0
end,
Props2 = case ContentType of
Props2 = case AmqpProps of
#'v1_0.properties'{correlation_id = {_Type, _Val} = Corr} ->
Props1#{'Correlation-Data' => correlation_id(Corr)};
_ ->
Props1
end,
Props3 = case ContentType of
undefined ->
case AmqpProps of
#'v1_0.properties'{content_type = {symbol, ContentType1}} ->
Props1#{'Content-Type' => rabbit_data_coercion:to_binary(ContentType1)};
Props2#{'Content-Type' => rabbit_data_coercion:to_binary(ContentType1)};
_ ->
Props1
Props2
end;
_ ->
Props1#{'Content-Type' => ContentType}
Props2#{'Content-Type' => ContentType}
end,
UserProp0 = lists:filtermap(fun({{symbol, <<"x-", _/binary>> = Key}, Val}) ->
filter_map_amqp_to_utf8_string(Key, Val);
Expand All @@ -118,8 +133,8 @@ convert_from(mc_amqp, Sections) ->
filter_map_amqp_to_utf8_string(Key, Val)
end, AppProps),
Props = case UserProp0 ++ UserProp1 of
[] -> Props2;
UserProp -> Props2#{'User-Property' => UserProp}
[] -> Props3;
UserProp -> Props3#{'User-Property' => UserProp}
end,
Payload = lists:flatten(lists:reverse(PayloadRev)),
#mqtt_msg{retain = false,
Expand Down Expand Up @@ -246,18 +261,26 @@ convert_to(mc_amqp, #mqtt_msg{qos = Qos,
_ ->
undefined
end,
%% TODO Translate MQTT Response-Topic to AMQP topic.
%% If operator did not modify mqtt.exchange, set reply-to address to "/topic/" RK.
%% If operator modified mqtt.exchange, set reply-to address to "/exchange/" X "/" RK.
% case Props of
% #{'Response-Topic' := Topic} ->
% rabbit_mqtt_util:mqtt_to_amqp(Topic)
S2 = case {ContentType, CorrId} of
{undefined, undefined} ->
ReplyTo = case Props of
#{'Response-Topic' := MqttTopic} ->
Topic = rabbit_mqtt_util:mqtt_to_amqp(MqttTopic),
Address = case persistent_term:get(?PERSISTENT_TERM_EXCHANGE) of
?DEFAULT_MQTT_EXCHANGE ->
<<"/topic/", Topic/binary>>;
Exchange ->
<<"/exchange/", Exchange/binary, "/", Topic/binary>>
end,
{utf8, Address};
_ ->
undefined
end,
S2 = case {ContentType, CorrId, ReplyTo} of
{undefined, undefined, undefined} ->
S1;
_ ->
[#'v1_0.properties'{content_type = ContentType,
correlation_id = CorrId} | S1]
correlation_id = CorrId,
reply_to = ReplyTo} | S1]
end,

S3 = case MsgAnns of
Expand Down Expand Up @@ -296,16 +319,16 @@ convert_to(mc_amqpl, #mqtt_msg{qos = Qos,
Hs0
end,
{CorrId, Hs2} = case Props of
#{'Correlation-Data' := Corr} ->
case mc_util:is_valid_shortstr(Corr) of
true ->
{Corr, Hs1};
false ->
{undefined, [{<<"x-correlation-id">>, longstr, Corr} | Hs1]}
end;
_ ->
{undefined, Hs1}
end,
#{'Correlation-Data' := Corr} ->
case mc_util:is_valid_shortstr(Corr) of
true ->
{Corr, Hs1};
false ->
{undefined, [{<<"x-correlation-id">>, longstr, Corr} | Hs1]}
end;
_ ->
{undefined, Hs1}
end,
Expiration = case Props of
#{'Message-Expiry-Interval' := Seconds} ->
integer_to_binary(timer:seconds(Seconds));
Expand Down Expand Up @@ -517,3 +540,7 @@ amqp_encode(Data, Acc0) ->
Bin = amqp10_framing:encode_bin(Data),
Acc = setelement(5, Acc0, [Bin | element(5, Acc0)]),
setelement(7, Acc, ?CONTENT_TYPE_AMQP).

add_response_topic(AmqpTopic, PublishProperties) ->
MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(AmqpTopic),
PublishProperties#{'Response-Topic' => MqttTopic}.
4 changes: 4 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ init_global_counters(ProtoVer) ->
persist_static_configuration() ->
rabbit_mqtt_util:init_sparkplug(),

{ok, Exchange} = application:get_env(?APP_NAME, exchange),
?assert(is_binary(Exchange)),
ok = persistent_term:put(?PERSISTENT_TERM_EXCHANGE, Exchange),

{ok, MailboxSoftLimit} = application:get_env(?APP_NAME, mailbox_soft_limit),
?assert(is_integer(MailboxSoftLimit)),
ok = persistent_term:put(?PERSISTENT_TERM_MAILBOX_SOFT_LIMIT, MailboxSoftLimit),
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ process_connect(
{ok, WillMsg} ?= make_will_msg(Packet),
{TraceState, ConnName} = init_trace(VHost, ConnName0),
ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket),
Exchange = rabbit_misc:r(VHost, exchange, persistent_term:get(?PERSISTENT_TERM_EXCHANGE)),
S = #state{
cfg = #cfg{socket = Socket,
proto_ver = proto_integer_to_atom(ProtoVer),
Expand All @@ -215,7 +216,7 @@ process_connect(
peer_ip_addr = PeerIp,
peer_port = PeerPort,
send_fun = SendFun,
exchange = rabbit_misc:r(VHost, exchange, rabbit_mqtt_util:env(exchange)),
exchange = Exchange,
retainer_pid = rabbit_mqtt_retainer_sup:start_child_for_vhost(VHost),
vhost = VHost,
client_id = ClientId,
Expand Down
1 change: 0 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ env(Key) ->

coerce_env_value(default_pass, Val) -> rabbit_data_coercion:to_binary(Val);
coerce_env_value(default_user, Val) -> rabbit_data_coercion:to_binary(Val);
coerce_env_value(exchange, Val) -> rabbit_data_coercion:to_binary(Val);
coerce_env_value(vhost, Val) -> rabbit_data_coercion:to_binary(Val);
coerce_env_value(_, Val) -> Val.

Expand Down
47 changes: 44 additions & 3 deletions deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ all() ->

groups() ->
[
{lossless, [parallel],
{lossless, [shuffle],
[roundtrip_amqp,
roundtrip_amqp_payload_format_indicator,
roundtrip_amqp_response_topic,
roundtrip_amqpl,
roundtrip_amqpl_correlation,
amqp_to_mqtt_amqp_value_section_binary,
Expand All @@ -27,10 +28,12 @@ groups() ->
amqp_to_mqtt_amqp_value_section_boolean
]
},
{lossy, [parallel],
{lossy, [shuffle],
[roundtrip_amqp_user_property,
roundtrip_amqpl_user_property,
roundtrip_amqp_content_type
roundtrip_amqp_content_type,
amqp_to_mqtt_reply_to,
amqp_to_mqtt_footer
]
}
].
Expand Down Expand Up @@ -112,6 +115,20 @@ roundtrip_amqp_payload_format_indicator(_Config) ->
iolist_to_binary(Payload)),
?assertMatch(#{'Payload-Format-Indicator' := 1}, Props).

roundtrip_amqp_response_topic(_Config) ->
Topic = <<"/rabbit/🐇"/utf8>>,
Msg0 = mqtt_msg(),
Key = mqtt_exchange,
MqttExchanges = [<<"amq.topic">>,
<<"some-other-topic-exchange">>],
[begin
ok = persistent_term:put(Key, X),
Msg = Msg0#mqtt_msg{props = #{'Response-Topic' => Topic}},
?assertMatch(#mqtt_msg{props = #{'Response-Topic' := Topic}},
roundtrip(mc_amqp, Msg)),
true = persistent_term:erase(Key)
end || X <- MqttExchanges].

roundtrip_amqpl(_Config) ->
Msg = #mqtt_msg{
qos = 1,
Expand Down Expand Up @@ -248,6 +265,30 @@ roundtrip_amqp_content_type(_Config) ->
#mqtt_msg{props = Props} = roundtrip(mc_amqp, Msg),
?assertNot(maps:is_key('Content-Type', Props)).

amqp_to_mqtt_reply_to(_Config) ->
Val = amqp_value({utf8, <<"hey">>}),
Key = mqtt_exchange,
ok = persistent_term:put(Key, <<"mqtt-topic-exchange">>),

AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/mqtt-topic-exchange/my.routing.key">>}},
#mqtt_msg{props = Props1} = amqp_to_mqtt([AmqpProps1, Val]),
?assertEqual({ok, <<"my/routing/key">>},
maps:find('Response-Topic', Props1)),

AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/NON-mqtt-topic-exchange/my.routing.key">>}},
#mqtt_msg{props = Props2} = amqp_to_mqtt([AmqpProps2, Val]),
?assertEqual(error,
maps:find('Response-Topic', Props2)),

true = persistent_term:erase(Key).

amqp_to_mqtt_footer(_Config) ->
Val = amqp_value({utf8, <<"hey">>}),
Footer = #'v1_0.footer'{content = [{symbol, <<"key">>}, {utf8, <<"value">>}]},
%% We can translate, but lose the footer.
#mqtt_msg{payload = Payload} = amqp_to_mqtt([Val, Footer]),
?assertEqual(<<"hey">>, iolist_to_binary(Payload)).

mqtt_msg() ->
#mqtt_msg{qos = 0,
topic = <<"my/topic">>,
Expand Down
13 changes: 1 addition & 12 deletions deps/rabbitmq_mqtt/test/util_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
-module(util_SUITE).
-compile([export_all, nowarn_export_all]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

all() ->
Expand All @@ -18,7 +17,6 @@ all() ->
groups() ->
[
{tests, [parallel], [
coerce_exchange,
coerce_vhost,
coerce_default_user,
coerce_default_pass,
Expand All @@ -27,22 +25,13 @@ groups() ->
}
].

suite() ->
[{timetrap, {seconds, 60}}].

init_per_suite(Config) ->
ok = application:load(rabbitmq_mqtt),
Config.

end_per_suite(Config) ->
ok = application:unload(rabbitmq_mqtt),
Config.
init_per_group(_, Config) -> Config.
end_per_group(_, Config) -> Config.
init_per_testcase(_, Config) -> Config.
end_per_testcase(_, Config) -> Config.

coerce_exchange(_) ->
?assertEqual(<<"amq.topic">>, rabbit_mqtt_util:env(exchange)).

coerce_vhost(_) ->
?assertEqual(<<"/">>, rabbit_mqtt_util:env(vhost)).
Expand Down

0 comments on commit 1cc35cf

Please sign in to comment.