Skip to content

Commit

Permalink
Add MQTT v5 feature Message Expiry Interval
Browse files Browse the repository at this point in the history
This commit does not yet implement Message Expiry Interval of
* retained messages: "If the current retained message for a Topic
  expires, it is discarded and there will be no retained message for
  that topic."
  • Loading branch information
ansd committed Mar 2, 2023
1 parent 185c6e7 commit 290974d
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 28 deletions.
8 changes: 7 additions & 1 deletion deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl
Expand Up @@ -216,13 +216,19 @@
props = #{} :: properties()
}).

%% TODO Add compatibility for old #mqtt_msg{} record that could still be stored on disk
%% in retained message stores (both ETS and DETS): Could either migrate or do the conversion
%% lazily in rabbit_mqtt_retained_msg_store:lookup/2.
%%
%% MQTT application message.
-record(mqtt_msg, {retain :: boolean(),
qos :: qos(),
topic :: binary(),
dup :: boolean(),
packet_id :: option(packet_id()) | ?WILL_MSG_QOS_1_CORRELATION,
payload :: binary()
payload :: binary(),
%% PUBLISH or Will properties
props :: properties()
}).

-type mqtt_msg() :: #mqtt_msg{}.
Expand Down
83 changes: 63 additions & 20 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Expand Up @@ -297,21 +297,23 @@ process_request(?PUBLISH,
#mqtt_packet{
fixed = #mqtt_packet_fixed{qos = Qos,
retain = Retain,
dup = Dup },
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId },
packet_id = PacketId,
props = Props},
payload = Payload},
State0 = #state{unacked_client_pubs = U,
cfg = #cfg{proto_ver = ProtoVer}}) ->
EffectiveQos = maybe_downgrade_qos(Qos),
rabbit_global_counters:messages_received(ProtoVer, 1),
State = maybe_increment_publisher(State0),
Msg = #mqtt_msg{retain = Retain,
qos = EffectiveQos,
topic = Topic,
dup = Dup,
Msg = #mqtt_msg{retain = Retain,
qos = EffectiveQos,
topic = Topic,
dup = Dup,
packet_id = PacketId,
payload = Payload},
payload = Payload,
props = Props},
case EffectiveQos of
?QOS_0 ->
publish_to_queues_with_checks(Msg, State);
Expand Down Expand Up @@ -696,6 +698,7 @@ make_will_msg(#mqtt_packet_connect{will_flag = true,
will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_props = Props,
will_msg = Msg}) ->
EffectiveQos = maybe_downgrade_qos(Qos),
Correlation = case EffectiveQos of
Expand All @@ -707,6 +710,7 @@ make_will_msg(#mqtt_packet_connect{will_flag = true,
packet_id = Correlation,
topic = Topic,
dup = false,
props = Props,
payload = Msg}.

check_vhost_exists(VHost, Username, PeerIp) ->
Expand Down Expand Up @@ -1116,10 +1120,11 @@ binding_action(ExchangeName, TopicName, QName, BindingFun, #auth_state{user = #u
BindingFun(Binding, Username).

publish_to_queues(
#mqtt_msg{qos = Qos,
topic = Topic,
packet_id = PacketId,
payload = Payload},
#mqtt_msg{qos = Qos,
topic = Topic,
packet_id = PacketId,
payload = Payload,
props = Props},
#state{cfg = #cfg{exchange = ExchangeName,
delivery_flow = Flow,
conn_name = ConnName,
Expand All @@ -1128,13 +1133,22 @@ publish_to_queues(
} = State) ->
RoutingKey = mqtt_to_amqp(Topic),
Confirm = Qos > ?QOS_0,
Props = #'P_basic'{
headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}],
delivery_mode = delivery_mode(Qos)},
{Expiration, Timestamp} = case Props of
#{'Message-Expiry-Interval' := ExpirySeconds} ->
{integer_to_binary(ExpirySeconds * 1000),
os:system_time(seconds)};
_ ->
{undefined, undefined}
end,
PBasic = #'P_basic'{
headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}],
delivery_mode = delivery_mode(Qos),
expiration = Expiration,
timestamp = Timestamp},
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
Content = #content{
class_id = ClassId,
properties = Props,
properties = PBasic,
properties_bin = none,
protocol = none,
payload_fragments_rev = [Payload]
Expand Down Expand Up @@ -1532,11 +1546,13 @@ maybe_publish_to_client({_, _, _, _Redelivered = true, _}, ?QOS_0, State) ->
%% Do not redeliver to MQTT subscriber who gets message at most once.
{complete, State};
maybe_publish_to_client(
{QNameOrType, _QPid, QMsgId, Redelivered,
#basic_message{
routing_keys = [RoutingKey | _CcRoutes],
content = #content{payload_fragments_rev = FragmentsRev}}} = Msg,
Msg = {QNameOrType, _QPid, QMsgId, Redelivered,
#basic_message{
routing_keys = [RoutingKey | _CcRoutes],
content = #content{payload_fragments_rev = FragmentsRev,
properties = PBasic}}},
QoS, State0) ->
Props = p_basic_to_publish_properties(PBasic),
{PacketId, State} = msg_id_to_packet_id(QMsgId, QoS, State0),
Packet =
#mqtt_packet{
Expand All @@ -1546,7 +1562,8 @@ maybe_publish_to_client(
dup = Redelivered},
variable = #mqtt_packet_publish{
packet_id = PacketId,
topic_name = amqp_to_mqtt(RoutingKey)},
topic_name = amqp_to_mqtt(RoutingKey),
props = Props},
payload = lists:reverse(FragmentsRev)},
SettleOp = case send(Packet, State) of
ok ->
Expand All @@ -1558,6 +1575,32 @@ maybe_publish_to_client(
end,
{SettleOp, State}.

%% Converts AMQP 0.9.1 properties to MQTT v5 properties.
%% TODO map more properties such as content_encoding, content_type, correlation_id, headers, etc.
-spec p_basic_to_publish_properties(#'P_basic'{}) -> properties().
p_basic_to_publish_properties(#'P_basic'{headers = Headers,
expiration = Expiration,
timestamp = TimestampSeconds
})
when is_binary(Expiration) andalso
is_integer(TimestampSeconds) ->
%% Check whether source protocol is MQTT
case lists:keymember(<<"x-mqtt-publish-qos">>, 1, Headers) of
true ->
ExpirationMs = binary_to_integer(Expiration),
ExpirationSeconds = ExpirationMs div 1000,
%% "The PUBLISH packet sent to a Client by the Server MUST contain a Message
%% Expiry Interval set to the received value minus the time that the
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
WaitingSeconds = os:system_time(seconds) - TimestampSeconds,
Expiry = max(0, ExpirationSeconds - WaitingSeconds),
#{'Message-Expiry-Interval' => Expiry};
false ->
#{}
end;
p_basic_to_publish_properties(#'P_basic'{}) ->
#{}.

msg_id_to_packet_id(_, ?QOS_0, State) ->
%% "A PUBLISH packet MUST NOT contain a Packet Identifier if its QoS value is set to 0 [MQTT-2.2.1-2]."
{undefined, State};
Expand Down
90 changes: 83 additions & 7 deletions deps/rabbitmq_mqtt/test/v5_SUITE.erl
Expand Up @@ -37,7 +37,9 @@ cluster_size_1_tests() ->
[
client_set_max_packet_size_publish,
client_set_max_packet_size_connack,
client_set_max_packet_size_invalid
client_set_max_packet_size_invalid,
message_expiry_interval,
message_expiry_interval_will_message
].

cluster_size_3_tests() ->
Expand Down Expand Up @@ -116,12 +118,8 @@ client_set_max_packet_size_publish(Config) ->
?assertMatch({ok, _}, emqtt:publish(C, Topic, PayloadTooLarge, [{qos, 1}])),
%% We expect the server to drop the PUBLISH packet prior to sending to the client
%% because the packet is larger than what the client is able to receive.
receive Unexpected -> ct:fail("Unexpected message: ~p", [Unexpected])
after 500 -> ok
end,
Counters = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []),
M = maps:get([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, disabled}], Counters),
?assertEqual(1, maps:get(messages_dead_lettered_rejected_total, M)),
assert_nothing_received(),
?assertEqual(1, dead_letter_metric(messages_dead_lettered_rejected_total, Config)),
ok = emqtt:disconnect(C).

client_set_max_packet_size_connack(Config) ->
Expand All @@ -139,5 +137,83 @@ client_set_max_packet_size_invalid(Config) ->
unlink(C),
?assertMatch({error, _}, Connect(C)).

message_expiry_interval(Config) ->
NumExpiredBefore = dead_letter_metric(messages_dead_lettered_expired_total, Config),
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
Pub = connect(<<"publisher">>, Config),
Sub1 = connect(ClientId, Config, [{clean_start, false}]),
{ok, _, [1]} = emqtt:subscribe(Sub1, Topic, qos1),
ok = emqtt:disconnect(Sub1),

{ok, _} = emqtt:publish(Pub, Topic, #{'Message-Expiry-Interval' => 1}, <<"m1">>, [{qos, 1}]),
{ok, _} = emqtt:publish(Pub, Topic, #{}, <<"m2">>, [{qos, 1}]),
{ok, _} = emqtt:publish(Pub, Topic, #{'Message-Expiry-Interval' => 10}, <<"m3">>, [{qos, 1}]),
{ok, _} = emqtt:publish(Pub, Topic, #{'Message-Expiry-Interval' => 2}, <<"m4">>, [{qos, 1}]),
timer:sleep(2001),
Sub2 = connect(ClientId, Config, [{clean_start, false}]),
receive {publish, #{client_pid := Sub2,
topic := Topic,
payload := <<"m2">>,
properties := Props}}
when map_size(Props) =:= 0 -> ok
after 1000 -> ct:fail("did not receive m2")
end,

receive {publish, #{client_pid := Sub2,
topic := Topic,
payload := <<"m3">>,
%% "The PUBLISH packet sent to a Client by the Server MUST contain a Message
%% Expiry Interval set to the received value minus the time that the
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
properties := #{'Message-Expiry-Interval' := 10-2}}} -> ok
after 100 -> ct:fail("did not receive m3")
end,
assert_nothing_received(),
NumExpired = dead_letter_metric(messages_dead_lettered_expired_total, Config) - NumExpiredBefore,
?assertEqual(2, NumExpired),

ok = emqtt:disconnect(Pub),
ok = emqtt:disconnect(Sub2),
Sub3 = connect(ClientId, Config, [{clean_start, true}]),
ok = emqtt:disconnect(Sub3).

message_expiry_interval_will_message(Config) ->
NumExpiredBefore = dead_letter_metric(messages_dead_lettered_expired_total, Config),
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
Opts = [{will_topic, Topic},
{will_payload, <<"will payload">>},
{will_qos, 1},
{will_props, #{'Message-Expiry-Interval' => 1}}
],
Pub = connect(<<"will-publisher">>, Config, Opts),
timer:sleep(100),
[ServerPublisherPid] = util:all_connection_pids(Config),

Sub1 = connect(ClientId, Config, [{clean_start, false}]),
{ok, _, [1]} = emqtt:subscribe(Sub1, Topic, qos1),
ok = emqtt:disconnect(Sub1),

unlink(Pub),
%% Trigger sending of will message.
erlang:exit(ServerPublisherPid, test_will),
%% Wait for will message to expire.
timer:sleep(1100),
NumExpired = dead_letter_metric(messages_dead_lettered_expired_total, Config) - NumExpiredBefore,
?assertEqual(1, NumExpired),

Sub2 = connect(ClientId, Config, [{clean_start, true}]),
assert_nothing_received(),
ok = emqtt:disconnect(Sub2).

satisfy_bazel(_Config) ->
ok.

dead_letter_metric(Metric, Config) ->
Counters = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []),
Map = maps:get([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, disabled}], Counters),
maps:get(Metric, Map).

assert_nothing_received() ->
receive Unexpected -> ct:fail("Received unexpected message: ~p", [Unexpected])
after 500 -> ok
end.

0 comments on commit 290974d

Please sign in to comment.