Skip to content

Commit

Permalink
Add a new delayed subscription strategy for topics with retained mess…
Browse files Browse the repository at this point in the history
…ages.
  • Loading branch information
mths1 committed Apr 10, 2024
1 parent fa93526 commit 6ebf9d5
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 49 deletions.
17 changes: 17 additions & 0 deletions apps/vmq_server/priv/vmq_server.schema
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@
[{default, on},
{datatype, flag}]}.

%% Retrain subscriber mode defines if subscribers on topics with retrained flag set should wait for
%% a retrain server sync or process immmediatly.
{mapping, "subscriber_retain_mode", "vmq_server.subscriber_retain_mode", [
{default, immediate},
{datatype, atom},
hidden
]}.

{translation, "vmq_server.subscriber_retain_mode",
fun(Conf) ->
case cuttlefish:conf_get("subscriber_retain_mode", Conf) of
immediate -> immediate;
syncwait -> syncwait;
_ -> cuttlefish:invalid("subscriber_retain_mode must be either 'immediate' or 'syncwait'!")
end
end}.

%% @doc queue_type enables to change the default queue delivery behaviour from
%% 'fifo' to 'lifo'. In order to make this work the underlying message store
%% must deliver the messages in proper order.
Expand Down
3 changes: 2 additions & 1 deletion apps/vmq_server/src/vmq_config_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ register_config_() ->
"suppress_lwt_on_session_takeover",
"coordinate_registrations",
"mqtt_connect_timeout",
"disconnect_on_unauthorized_publish_v3"
"disconnect_on_unauthorized_publish_v3",
"subscriber_retain_mode"
],
_ = [
clique:register_config([Key], fun register_config_callback/2)
Expand Down
80 changes: 56 additions & 24 deletions apps/vmq_server/src/vmq_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
notify/1,
notify_recv/1,
enqueue/2,
front/2,
status/1,
info/1,
add_session/3,
Expand Down Expand Up @@ -109,6 +110,7 @@
max_msgs_per_drain_step,
waiting_call,
opts,
insert_fun = enqueue,
delayed_will ::
{Delay :: non_neg_integer(), Fun :: function()}
| undefined,
Expand Down Expand Up @@ -140,6 +142,9 @@ notify_recv(Queue) when is_pid(Queue) ->
enqueue(Queue, Msg) when is_pid(Queue) ->
gen_fsm:send_event(Queue, {enqueue, to_internal(Msg)}).

front(Queue, Msg) when is_pid(Queue) ->
gen_fsm:send_event(Queue, {front, to_internal(Msg)}).

enqueue_many(Queue, Msgs) when is_pid(Queue) and is_list(Msgs) ->
NMsgs = lists:map(fun to_internal/1, Msgs),
gen_fsm:sync_send_event(Queue, {enqueue_many, NMsgs}, infinity).
Expand Down Expand Up @@ -246,6 +251,12 @@ online({notify_recv, SessionPid}, #state{id = SId, sessions = Sessions} = State)
online({enqueue, Msg}, State) ->
_ = vmq_metrics:incr_queue_in(),
{next_state, online, insert(Msg, State)};
online({front, Msg}, State) ->
_ = vmq_metrics:incr_queue_in(),
State0 = State#state{insert_fun = front},
State1 = insert(Msg, State0),
State2 = State1#state{insert_fun = enqueue},
{next_state, online, State2};
online(Event, State) ->
?LOG_ERROR("got unknown event in online state ~p", [Event]),
{next_state, online, State}.
Expand Down Expand Up @@ -923,7 +934,9 @@ handle_session_down(
end.

handle_waiting_acks_and_msgs(
WAcks, NextMsgId, #state{id = SId, sessions = Sessions, offline = Offline} = State
WAcks,
NextMsgId,
#state{id = SId, sessions = Sessions, offline = Offline, insert_fun = QFun} = State
) ->
%% we can only handle the last waiting acks and msgs if this is
%% the last session active for this queue.
Expand All @@ -936,12 +949,13 @@ handle_waiting_acks_and_msgs(
(#deliver{msg = #vmq_msg{persisted = true} = Msg} = D, AccOffline) ->
queue_insert(
true,
QFun,
D#deliver{msg = Msg#vmq_msg{persisted = false}},
AccOffline,
SId
);
(Msg, AccOffline) ->
queue_insert(true, Msg, AccOffline, SId)
queue_insert(true, QFun, Msg, AccOffline, SId)
end,
Offline,
WAcks
Expand Down Expand Up @@ -1079,45 +1093,58 @@ insert(
%% no session online, skip QoS0 message for QoS1 or QoS2 Subscription (without QoS upgrade)
_ = vmq_metrics:incr_queue_unhandled(1),
State;
insert(MsgOrRef, #state{id = SId, offline = Offline, sessions = Sessions} = State) when
insert(
MsgOrRef, #state{id = SId, offline = Offline, sessions = Sessions, insert_fun = QFun} = State
) when
Sessions == #{}
->
%% no session online, insert in offline queue
State#state{offline = queue_insert(true, maybe_set_expiry_ts(MsgOrRef), Offline, SId)};
State#state{offline = queue_insert(true, QFun, maybe_set_expiry_ts(MsgOrRef), Offline, SId)};
%% Online Queue
insert(MsgOrRef, #state{id = SId, deliver_mode = fanout, sessions = Sessions} = State) ->
insert(
MsgOrRef,
#state{id = SId, deliver_mode = fanout, sessions = Sessions, insert_fun = QFun} = State
) ->
{NewSessions, _} = session_fold(
SId, fun session_insert/3, maybe_set_expiry_ts(MsgOrRef), Sessions
SId, QFun, fun session_insert/4, maybe_set_expiry_ts(MsgOrRef), Sessions
),
State#state{sessions = NewSessions};
insert(MsgOrRef, #state{id = SId, deliver_mode = balance, sessions = Sessions} = State) ->
insert(
MsgOrRef,
#state{id = SId, deliver_mode = balance, sessions = Sessions, insert_fun = QFun} = State
) ->
Pids = maps:keys(Sessions),
RandomPid = lists:nth(rand:uniform(maps:size(Sessions)), Pids),
RandomSession = maps:get(RandomPid, Sessions),
{UpdatedSession, _} = session_insert(SId, RandomSession, maybe_set_expiry_ts(MsgOrRef)),
{UpdatedSession, _} = session_insert(SId, QFun, RandomSession, maybe_set_expiry_ts(MsgOrRef)),
State#state{sessions = maps:update(RandomPid, UpdatedSession, Sessions)}.

session_insert(SId, #session{status = active, queue = Q} = Session, MsgOrRef) ->
session_insert(SId, QFun, #session{status = active, queue = Q} = Session, MsgOrRef) ->
{
send(SId, Session#session{queue = queue_insert(false, MsgOrRef, Q, SId)}),
send(SId, Session#session{queue = queue_insert(false, QFun, MsgOrRef, Q, SId)}),
MsgOrRef
};
session_insert(SId, #session{status = passive, queue = Q} = Session, MsgOrRef) ->
session_insert(SId, QFun, #session{status = passive, queue = Q} = Session, MsgOrRef) ->
{
Session#session{queue = queue_insert(false, MsgOrRef, Q, SId)},
Session#session{queue = queue_insert(false, QFun, MsgOrRef, Q, SId)},
MsgOrRef
};
session_insert(SId, #session{status = notify, queue = Q} = Session, MsgOrRef) ->
{send_notification(Session#session{queue = queue_insert(false, MsgOrRef, Q, SId)}), MsgOrRef}.
session_insert(SId, QFun, #session{status = notify, queue = Q} = Session, MsgOrRef) ->
{
send_notification(Session#session{queue = queue_insert(false, QFun, MsgOrRef, Q, SId)}),
MsgOrRef
}.

%% unlimited messages accepted
queue_insert(Offline, MsgOrRef, #queue{max = -1, size = Size, queue = Queue} = Q, SId) ->
queue_insert(Offline, enqueue, MsgOrRef, #queue{max = -1, size = Size, queue = Queue} = Q, SId) ->
Q#queue{queue = queue:in(maybe_offline_store(Offline, SId, MsgOrRef), Queue), size = Size + 1};
queue_insert(Offline, MsgOrRef, #queue{ignore_max = true, size = Size, queue = Queue} = Q, SId) ->
queue_insert(
Offline, enqueue, MsgOrRef, #queue{ignore_max = true, size = Size, queue = Queue} = Q, SId
) ->
Q#queue{queue = queue:in(maybe_offline_store(Offline, SId, MsgOrRef), Queue), size = Size + 1};
%% tail drop in case of fifo
queue_insert(
_Offline, MsgOrRef, #queue{type = fifo, max = Max, size = Size, drop = Drop} = Q, SId
_Offline, _QFun, MsgOrRef, #queue{type = fifo, max = Max, size = Size, drop = Drop} = Q, SId
) when
Size >= Max
->
Expand All @@ -1128,6 +1155,7 @@ queue_insert(
%% drop oldest in case of lifo
queue_insert(
Offline,
enqueue,
MsgOrRef,
#queue{type = lifo, max = Max, size = Size, queue = Queue, drop = Drop} = Q,
SId
Expand All @@ -1142,7 +1170,11 @@ queue_insert(
queue = queue:in(maybe_offline_store(Offline, SId, MsgOrRef), NewQueue), drop = Drop + 1
};
%% normal enqueue
queue_insert(Offline, MsgOrRef, #queue{queue = Queue, size = Size} = Q, SId) ->
queue_insert(Offline, front, MsgOrRef, #queue{queue = Queue, size = Size} = Q, SId) ->
Q#queue{
queue = queue:in_r(maybe_offline_store(Offline, SId, MsgOrRef), Queue), size = Size + 1
};
queue_insert(Offline, enqueue, MsgOrRef, #queue{queue = Queue, size = Size} = Q, SId) ->
Q#queue{queue = queue:in(maybe_offline_store(Offline, SId, MsgOrRef), Queue), size = Size + 1}.

send(
Expand Down Expand Up @@ -1223,13 +1255,13 @@ cleanup_queue_(SId, {{value, {{qos2, _}, _}}, NewQueue}) ->
cleanup_queue_(_, {empty, _}) ->
ok.

session_fold(SId, Fun, Acc, Map) ->
session_fold(SId, Fun, Acc, Map, maps:keys(Map)).
session_fold(SId, QFun, Fun, Acc, Map) ->
session_fold(SId, QFun, Fun, Acc, Map, maps:keys(Map)).

session_fold(SId, Fun, Acc, Map, [K | Rest]) ->
{NewV, NewAcc} = Fun(SId, maps:get(K, Map), Acc),
session_fold(SId, Fun, NewAcc, maps:update(K, NewV, Map), Rest);
session_fold(_, _, Acc, Map, []) ->
session_fold(SId, QFun, Fun, Acc, Map, [K | Rest]) ->
{NewV, NewAcc} = Fun(SId, QFun, maps:get(K, Map), Acc),
session_fold(SId, QFun, Fun, NewAcc, maps:update(K, NewV, Map), Rest);
session_fold(_, _, _, Acc, Map, []) ->
{Map, Acc}.

maybe_set_expiry_timer(
Expand Down
58 changes: 35 additions & 23 deletions apps/vmq_server/src/vmq_reg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,25 @@ subscribe_op(SubscriberId, Topics) ->
OldSubs = subscriptions_for_subscriber_id(SubscriberId),
Existing = subscriptions_exist(OldSubs, Topics),
add_subscriber(lists:usort(Topics), OldSubs, SubscriberId),
QoSTable =
{QoSTable, _Ign} =
lists:foldl(
fun
%% MQTTv4 clauses
({_, {_, not_allowed}}, AccQoSTable) ->
[not_allowed | AccQoSTable];
({Exists, {T, QoS}}, AccQoSTable) when is_integer(QoS) ->
deliver_retained(SubscriberId, T, QoS, #{}, Exists),
[QoS | AccQoSTable];
({_, {_, not_allowed}}, {AccQoSTable, WaitSync}) ->
{[not_allowed | AccQoSTable], WaitSync};
({Exists, {T, QoS}}, {AccQoSTable, WaitSync}) when is_integer(QoS) ->
WaitSync0 = deliver_retained(SubscriberId, T, QoS, #{}, Exists, WaitSync),
{[QoS | AccQoSTable], WaitSync0};
%% MQTTv5 clauses
({_, {_, {not_allowed, _}}}, AccQoSTable) ->
[not_allowed | AccQoSTable];
({Exists, {T, {QoS, SubOpts}}}, AccQoSTable) when
({_, {_, {not_allowed, _}}}, {AccQoSTable, WaitSync}) ->
{[not_allowed | AccQoSTable], WaitSync};
({Exists, {T, {QoS, SubOpts}}}, {AccQoSTable, WaitSync}) when
is_integer(QoS), is_map(SubOpts)
->
deliver_retained(SubscriberId, T, QoS, SubOpts, Exists),
[QoS | AccQoSTable]
WaitSync0 = deliver_retained(SubscriberId, T, QoS, SubOpts, Exists, WaitSync),
{[QoS | AccQoSTable], WaitSync0}
end,
[],
{[], (vmq_config:get_env(subscriber_retain_mode, immediate) == syncwait)},
lists:zip(Existing, Topics)
),
{ok, lists:reverse(QoSTable)}.
Expand Down Expand Up @@ -574,17 +574,28 @@ add_to_subscriber_group({Node, Group, SubscriberId, SubInfo}, SubscriberGroups,
SubscriberGroups
).

-spec deliver_retained(subscriber_id(), topic(), qos(), subopts(), boolean()) -> 'ok'.
deliver_retained(_, _, _, #{retain_handling := dont_send}, _) ->
-spec deliver_retained(subscriber_id(), topic(), qos(), subopts(), boolean(), boolean()) -> 'ok'.
deliver_retained(_, _, _, #{retain_handling := dont_send}, _, WaitSync) ->
%% don't send, skip
ok;
deliver_retained(_, _, _, #{retain_handling := send_if_new_sub}, true) ->
WaitSync;
deliver_retained(_, _, _, #{retain_handling := send_if_new_sub}, true, WaitSync) ->
%% subscription already existed, skip.
ok;
deliver_retained(_SubscriberId, [<<"$share">> | _], _QoS, _SubOpts, _) ->
WaitSync;
deliver_retained(_SubscriberId, [<<"$share">> | _], _QoS, _SubOpts, _, WaitSync) ->
%% Never deliver retained messages to subscriber groups.
ok;
deliver_retained({MP, _} = SubscriberId, Topic, QoS, SubOpts, _) ->
WaitSync;
deliver_retained(SubscriberId, Topic, QoS, SubOpts, _, WaitSync) ->
deliver_retained(SubscriberId, Topic, QoS, SubOpts, WaitSync).

deliver_retained({MP, _} = SubscriberId, Topic, QoS, SubOpts, WaitSync) ->
Ret0 =
case WaitSync of
true ->
timer:sleep(vmq_config:get_env(retain_persist_interval, 1000) * 2),
false;
_ ->
false
end,
QPid = get_queue_pid(SubscriberId),
vmq_retain_srv:match_fold(
fun
Expand All @@ -609,7 +620,7 @@ deliver_retained({MP, _} = SubscriberId, Topic, QoS, SubOpts, _) ->
},
Msg1 = maybe_add_sub_id({QoS, SubOpts}, Msg),
maybe_delete_expired(ExpiryTs, MP, Topic),
vmq_queue:enqueue(QPid, {deliver, QoS, Msg1});
vmq_queue:front(QPid, {deliver, QoS, Msg1});
({{_M, T}, Payload}, _) when is_binary(Payload) ->
%% compatibility with old style retained messages.
Msg = #vmq_msg{
Expand All @@ -622,12 +633,13 @@ deliver_retained({MP, _} = SubscriberId, Topic, QoS, SubOpts, _) ->
msg_ref = vmq_mqtt_fsm_util:msg_ref(),
properties = #{}
},
vmq_queue:enqueue(QPid, {deliver, QoS, Msg})
vmq_queue:front(QPid, {deliver, QoS, Msg})
end,
ok,
MP,
Topic
).
),
Ret0.

maybe_delete_expired(undefined, _, _) ->
ok;
Expand Down
2 changes: 2 additions & 0 deletions apps/vmq_server/src/vmq_server.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
{max_offline_messages, -1},
% balance vs fanout
{queue_deliver_mode, fanout},
% subscriber retain mode immediate
{subscriber_retain_mode, immediate},
% fifo vs lifo
{queue_type, fifo},
% never
Expand Down
12 changes: 11 additions & 1 deletion apps/vmq_server/test/vmq_retain_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ end_per_suite(_Config) ->
_Config.

init_per_group(mqttv4, Config) ->
vmq_server_cmd:set_config(subscriber_retain_mode, immediate),
[{protover, 4}|Config];
init_per_group(mqttv5, Config) ->
vmq_server_cmd:set_config(subscriber_retain_mode, immediate),
[{protover, 5}|Config];
init_per_group(mqttv4waitsync, Config) ->
vmq_server_cmd:set_config(subscriber_retain_mode, waitsync),
[{protover, 4}|Config];
init_per_group(mqttv5waitsync, Config) ->
vmq_server_cmd:set_config(subscriber_retain_mode, waitsync),
[{protover, 5}|Config].

end_per_group(_Group, _Config) ->
Expand All @@ -44,7 +52,9 @@ end_per_testcase(_, Config) ->
all() ->
[
{group, mqttv4},
{group, mqttv5}
{group, mqttv4waitsync},
{group, mqttv5},
{group, mqttv5waitsync}
].

groups() ->
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- New feature: Synchronized wait for retrained message store
- Add option for a default rule ("*") in database Lua scripts, so that Clients can fallback to default ACLs.

## VerneMQ 2.0 Release Candidate
Expand Down

0 comments on commit 6ebf9d5

Please sign in to comment.