Skip to content

Commit

Permalink
MQTT progress
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Feb 1, 2023
1 parent 73ba623 commit 8d6961f
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 104 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
-type qlen() :: rabbit_types:ok(non_neg_integer()).
-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
-type qmsg() :: {name(), pid() | {atom(), pid()}, msg_id(),
boolean(), rabbit_types:message()}.
boolean(), mc:state()}.
-type msg_id() :: non_neg_integer().
-type ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}.
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
pending_size/1,
stat/1,
stat/2,
query_single_active_consumer/1
query_single_active_consumer/1,
cluster_name/1
]).

-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@

-export_type([state/0,
consume_spec/0,
delivery_options/0,
action/0,
actions/0,
settle_op/0]).
Expand Down Expand Up @@ -183,7 +184,7 @@
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.

-callback deliver([{amqqueue:amqqueue(), queue_state()}],
Delivery :: mc:state(),
Message :: mc:state(),
Options :: delivery_options()) ->
{[{amqqueue:amqqueue(), queue_state()}], actions()}.

Expand Down
10 changes: 4 additions & 6 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,6 @@ deliver0(QName, Correlation, Msg, QState0) ->

deliver(QSs, Msg, Options) ->
Correlation = maps:get(correlation, Options, undefined),
% Content = prepare_content(Content0),
% Msg = Msg0#basic_message{content = Content},
lists:foldl(
fun({Q, stateless}, {Qs, Actions}) ->
QRef = amqqueue:get_pid(Q),
Expand All @@ -884,10 +882,10 @@ deliver(QSs, Msg, Options) ->
QName = amqqueue:get_name(Q),
case deliver0(QName, Correlation, Msg, S0) of
{reject_publish, S} ->
QName = rabbit_fifo_client:cluster_name(S),
{[{Q, S} | Qs], [{rejected, QName, [Correlation]} | Actions]};
{_, S} ->
{[{Q, S} | Qs], Actions}
{[{Q, S} | Qs],
[{rejected, QName, [Correlation]} | Actions]};
{ok, S, As} ->
{[{Q, S} | Qs], As ++ Actions}
end
end, {[], []}, QSs).

Expand Down
17 changes: 9 additions & 8 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ deliver(QSs, Message, Options) ->
LeaderPid = amqqueue:get_pid(Q),
ok = osiris:write(LeaderPid, undefined, 0, msg_to_iodata(Message)),
{Qs, Actions};
({Q, S0}, {Qs, Actions}) ->
S = deliver0(maps:get(correlation, Options, undefined),
Message, S0),
({Q, S0}, {Qs, Actions0}) ->
{S, Actions} = deliver0(maps:get(correlation, Options, undefined),
Message, S0, Actions0),
{[{Q, S} | Qs], Actions}
end, {[], []}, QSs).

Expand All @@ -387,7 +387,8 @@ deliver0(MsgId, Msg,
next_seq = Seq,
correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0} = State) ->
slow = Slow0} = State,
Actions0) ->
ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)),
Correlation = case MsgId of
undefined ->
Expand All @@ -397,9 +398,9 @@ deliver0(MsgId, Msg,
end,
{Slow, Actions} = case maps:size(Correlation) >= SftLmt of
true when not Slow0 ->
{true, [{block, Name}]};
{true, [{block, Name} | Actions0]};
Bool ->
{Bool, []}
{Bool, Actions0}
end,
{State#stream_client{next_seq = Seq + 1,
correlation = Correlation,
Expand Down Expand Up @@ -484,8 +485,8 @@ recover(_VHost, Queues) ->
end, {[], []}, Queues).

settle(QName, complete, CTag, MsgIds, #stream_client{readers = Readers0,
local_pid = LocalPid,
name = Name} = State) ->
local_pid = LocalPid,
name = Name} = State) ->
Credit = length(MsgIds),
{Readers, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_trace.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ tap_in(Msg, QNames, ConnName, ChannelNum, Username, TraceX) ->
tap_out(Msg, ConnName, Username, State) ->
tap_out(Msg, ConnName, ?CONNECTION_GLOBAL_CHANNEL_NUM, Username, State).

-spec tap_out(mc:state(), binary(),
-spec tap_out(rabbit_amqqueue:qmsg(), binary(),
rabbit_channel:channel_number(),
rabbit_types:username(), state()) -> 'ok'.
tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
Expand Down
96 changes: 41 additions & 55 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,6 @@ publish_to_queues(
auth_state = #auth_state{username = Username}
} = State) ->
RoutingKey = mqtt_to_amqp(Topic),
Confirm = Qos > ?QOS_0,
Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
{<<"x-mqtt-dup">>, bool, Dup}],
Props = #'P_basic'{
Expand All @@ -1225,82 +1224,62 @@ publish_to_queues(
protocol = none,
payload_fragments_rev = [Payload]
},
BasicMessage = #basic_message{
exchange_name = ExchangeName,
routing_keys = [RoutingKey],
content = Content,
id = <<>>, %% GUID set in rabbit_classic_queue
is_persistent = Confirm
},
Delivery = #delivery{
mandatory = false,
confirm = Confirm,
sender = self(),
message = BasicMessage,
msg_seq_no = PacketId,
flow = Flow
},

Message = rabbit_mc_amqp_legacy:message(ExchangeName#resource.name,
RoutingKey,
Content),
case rabbit_exchange:lookup(ExchangeName) of
{ok, Exchange} ->
QNames = rabbit_exchange:route(Exchange, Delivery),
rabbit_trace:tap_in(BasicMessage, QNames, ConnName, Username, TraceState),
deliver_to_queues(Delivery, QNames, State);
QNames = rabbit_exchange:route(Exchange, Message),
rabbit_trace:tap_in(Message, QNames, ConnName, Username, TraceState),
Options = maps_put_truthy(
flow, Flow, maps_put_truthy(correlation, PacketId, #{})),
deliver_to_queues(Message, Options, QNames, State);
{error, not_found} ->
?LOG_ERROR("~s not found", [rabbit_misc:rs(ExchangeName)]),
{error, exchange_not_found, State}
end.

deliver_to_queues(Delivery,
deliver_to_queues(Message,
Options,
RoutedToQNames,
State0 = #state{queue_states = QStates0,
cfg = #cfg{proto_ver = ProtoVer}}) ->
Qs0 = rabbit_amqqueue:lookup(RoutedToQNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
case rabbit_queue_type:deliver(Qs, Delivery, QStates0) of
case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of
{ok, QStates, Actions} ->
rabbit_global_counters:messages_routed(ProtoVer, length(Qs)),
State = process_routing_confirm(Delivery, Qs,
State = process_routing_confirm(Options, Qs,
State0#state{queue_states = QStates}),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes.
{ok, handle_queue_actions(Actions, State)};
{error, Reason} ->
Corr = maps:get(correlation, Options, undefined),
?LOG_ERROR("Failed to deliver message with packet_id=~p to queues: ~p",
[Delivery#delivery.msg_seq_no, Reason]),
[Corr, Reason]),
{error, Reason, State0}
end.

process_routing_confirm(#delivery{confirm = false},
[], State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
rabbit_global_counters:messages_unroutable_dropped(ProtoVer, 1),
State;
process_routing_confirm(#delivery{confirm = true,
msg_seq_no = undefined},
[], State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
%% unroutable will message with QoS > 0
rabbit_global_counters:messages_unroutable_dropped(ProtoVer, 1),
State;
process_routing_confirm(#delivery{confirm = true,
msg_seq_no = PktId},
process_routing_confirm(#{correlation := PktId},
[], State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
rabbit_global_counters:messages_unroutable_returned(ProtoVer, 1),
%% MQTT 5 spec:
%% If the Server knows that there are no matching subscribers, it MAY use
%% Reason Code 0x10 (No matching subscribers) instead of 0x00 (Success).
send_puback(PktId, State),
State;
process_routing_confirm(#delivery{confirm = false}, _, State) ->
State;
process_routing_confirm(#delivery{confirm = true,
msg_seq_no = undefined}, [_|_], State) ->
%% routable will message with QoS > 0
process_routing_confirm(#{}, [], State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
rabbit_global_counters:messages_unroutable_dropped(ProtoVer, 1),
State;
process_routing_confirm(#delivery{confirm = true,
msg_seq_no = PktId},
process_routing_confirm(#{correlation := PktId},
Qs, State = #state{unacked_client_pubs = U0}) ->
QNames = lists:map(fun amqqueue:get_name/1, Qs),
U = rabbit_mqtt_confirms:insert(PktId, QNames, U0),
State#state{unacked_client_pubs = U}.
State#state{unacked_client_pubs = U};
process_routing_confirm(#{}, _, State) ->
State.

send_puback(PktIds0, State)
when is_list(PktIds0) ->
Expand Down Expand Up @@ -1582,15 +1561,14 @@ deliver_to_client(Msgs, Ack, State) ->
deliver_one_to_client(Msg, Ack, S)
end, State, Msgs).

deliver_one_to_client(Msg = {QNameOrType, QPid, QMsgId, _Redelivered,
#basic_message{content = #content{properties = #'P_basic'{headers = Headers}}}},
deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Msg} = Delivery,
AckRequired, State0) ->
PublisherQoS = case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
{byte, QoS0} ->
QoS0;
PublisherQoS = case mc:proto_header(<<"x-mqtt-publish-qos">>, Msg) of
undefined ->
%% non-MQTT publishes are assumed to be QoS 1 regardless of delivery_mode
?QOS_1
?QOS_1;
QoS0 ->
QoS0
end,
SubscriberQoS = case AckRequired of
true ->
Expand All @@ -1599,7 +1577,7 @@ deliver_one_to_client(Msg = {QNameOrType, QPid, QMsgId, _Redelivered,
?QOS_0
end,
QoS = effective_qos(PublisherQoS, SubscriberQoS),
State1 = maybe_publish_to_client(Msg, QoS, State0),
State1 = maybe_publish_to_client(Delivery, QoS, State0),
State = maybe_auto_ack(AckRequired, QoS, QNameOrType, QMsgId, State1),
ok = maybe_notify_sent(QNameOrType, QPid, State),
State.
Expand All @@ -1615,12 +1593,12 @@ maybe_publish_to_client({_, _, _, _Redelivered = true, _}, ?QOS_0, State) ->
%% Do not redeliver to MQTT subscriber who gets message at most once.
State;
maybe_publish_to_client(
{QNameOrType, _QPid, QMsgId, Redelivered,
#basic_message{
routing_keys = [RoutingKey | _CcRoutes],
content = #content{payload_fragments_rev = FragmentsRev}}} = Msg,
{QNameOrType, _QPid, QMsgId, Redelivered, Msg} = Delivery,
QoS, State0 = #state{cfg = #cfg{send_fun = SendFun}}) ->
[RoutingKey | _] = mc:get_annotation(routing_keys, Msg),
{PacketId, State} = msg_id_to_packet_id(QMsgId, QoS, State0),
AmqpLegMsg = mc:convert(rabbit_mc_amqp_legacy, Msg),
#content{payload_fragments_rev = FragmentsRev} = mc:protocol_state(AmqpLegMsg),
Packet =
#mqtt_packet{
fixed = #mqtt_packet_fixed{
Expand All @@ -1638,7 +1616,7 @@ maybe_publish_to_client(
topic_name = amqp_to_mqtt(RoutingKey)},
payload = lists:reverse(FragmentsRev)},
SendFun(Packet, State),
trace_tap_out(Msg, State),
trace_tap_out(Delivery, State),
message_delivered(QNameOrType, Redelivered, QoS, State),
State.

Expand Down Expand Up @@ -2002,3 +1980,11 @@ format_status(
register_state => RegisterState,
queues_soft_limit_exceeded => QSLE,
qos0_messages_dropped => Qos0MsgsDropped}.


maps_put_truthy(_K, undefined, M) ->
M;
% maps_put_truthy(_K, false, M) ->
% M;
maps_put_truthy(K, V, M) ->
maps:put(K, V, M).
63 changes: 32 additions & 31 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
is_stateful/0,
declare/2,
delete/4,
deliver/2,
deliver/3,
is_enabled/0,
is_compatible/3,
is_recoverable/1,
Expand Down Expand Up @@ -100,38 +100,39 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
ok = rabbit_amqqueue:internal_delete(QName, ActingUser),
{ok, 0}.

-spec deliver([{amqqueue:amqqueue(), stateless}], Delivery :: term()) ->
-spec deliver([{amqqueue:amqqueue(), stateless}],
Msg :: mc:state(),
rabbit_queue_type:delivery_options()) ->
{[], rabbit_queue_type:actions()}.
deliver(Qs, #delivery{message = BasicMessage,
confirm = Confirm,
msg_seq_no = SeqNo}) ->
Msg = {queue_event, ?MODULE,
{?MODULE, _QPid = none, _QMsgId = none, _Redelivered = false, BasicMessage}},
deliver(Qs, Msg, Options) ->
Evt = {queue_event, ?MODULE,
{?MODULE, _QPid = none, _QMsgId = none, _Redelivered = false, Msg}},
{Pids, Actions} =
case Confirm of
false ->
Pids0 = lists:map(fun({Q, stateless}) -> amqqueue:get_pid(Q) end, Qs),
{Pids0, []};
true ->
%% We confirm the message directly here in the queue client.
%% Alternatively, we could have the target MQTT connection process confirm the message.
%% However, given that this message might be lost anyway between target MQTT connection
%% process and MQTT subscriber, and we know that the MQTT subscriber wants to receive
%% this message at most once, we confirm here directly.
%% Benefits:
%% 1. We do not block sending the confirmation back to the publishing client just because a single
%% (at-most-once) target queue out of potentially many (e.g. million) queues might be unavailable.
%% 2. Memory usage in this (publishing) process is kept lower because the target queue name can be
%% directly removed from rabbit_mqtt_confirms and rabbit_confirms.
%% 3. Reduced network traffic across RabbitMQ nodes.
%% 4. Lower latency of sending publisher confirmation back to the publishing client.
SeqNos = [SeqNo],
lists:mapfoldl(fun({Q, stateless}, Actions) ->
{amqqueue:get_pid(Q),
[{settled, amqqueue:get_name(Q), SeqNos} | Actions]}
end, [], Qs)
end,
delegate:invoke_no_result(Pids, {gen_server, cast, [Msg]}),
case maps:get(correlation, Options, undefined) of
undefined ->
Pids0 = lists:map(fun({Q, stateless}) -> amqqueue:get_pid(Q) end, Qs),
{Pids0, []};
Corr ->
%% We confirm the message directly here in the queue client.
%% Alternatively, we could have the target MQTT connection process confirm the message.
%% However, given that this message might be lost anyway between target MQTT connection
%% process and MQTT subscriber, and we know that the MQTT subscriber wants to receive
%% this message at most once, we confirm here directly.
%% Benefits:
%% 1. We do not block sending the confirmation back to the publishing client just because a single
%% (at-most-once) target queue out of potentially many (e.g. million) queues might be unavailable.
%% 2. Memory usage in this (publishing) process is kept lower because the target queue name can be
%% directly removed from rabbit_mqtt_confirms and rabbit_confirms.
%% 3. Reduced network traffic across RabbitMQ nodes.
%% 4. Lower latency of sending publisher confirmation back to the publishing client.
Corrs = [Corr],
lists:mapfoldl(fun({Q, stateless}, Actions) ->
{amqqueue:get_pid(Q),
[{settled, amqqueue:get_name(Q), Corrs}
| Actions]}
end, [], Qs)
end,
delegate:invoke_no_result(Pids, {gen_server, cast, [Evt]}),
{[], Actions}.

-spec is_enabled() ->
Expand Down

0 comments on commit 8d6961f

Please sign in to comment.