diff --git a/apps/vmq_server/priv/vmq_server.schema b/apps/vmq_server/priv/vmq_server.schema index a4dc91f9f..318a6e469 100644 --- a/apps/vmq_server/priv/vmq_server.schema +++ b/apps/vmq_server/priv/vmq_server.schema @@ -43,6 +43,23 @@ [{default, on}, {datatype, flag}]}. +%% Retrain subscriber mode defines if subscribers on topics with retrained flag set should wait for +%% a retrain server sync or process immmediatly. +{mapping, "subscriber_retain_mode", "vmq_server.subscriber_retain_mode", [ + {default, immediate}, + {datatype, atom}, + hidden + ]}. + +{translation, "vmq_server.subscriber_retain_mode", + fun(Conf) -> + case cuttlefish:conf_get("subscriber_retain_mode", Conf) of + immediate -> immediate; + syncwait -> syncwait; + _ -> cuttlefish:invalid("subscriber_retain_mode must be either 'immediate' or 'syncwait'!") + end + end}. + %% @doc queue_type enables to change the default queue delivery behaviour from %% 'fifo' to 'lifo'. In order to make this work the underlying message store %% must deliver the messages in proper order. diff --git a/apps/vmq_server/src/vmq_config_cli.erl b/apps/vmq_server/src/vmq_config_cli.erl index 6dfc5d979..ccd32fd2d 100644 --- a/apps/vmq_server/src/vmq_config_cli.erl +++ b/apps/vmq_server/src/vmq_config_cli.erl @@ -64,7 +64,8 @@ register_config_() -> "suppress_lwt_on_session_takeover", "coordinate_registrations", "mqtt_connect_timeout", - "disconnect_on_unauthorized_publish_v3" + "disconnect_on_unauthorized_publish_v3", + "subscriber_retain_mode" ], _ = [ clique:register_config([Key], fun register_config_callback/2) diff --git a/apps/vmq_server/src/vmq_queue.erl b/apps/vmq_server/src/vmq_queue.erl index 9f50c7e88..043e9f7bf 100644 --- a/apps/vmq_server/src/vmq_queue.erl +++ b/apps/vmq_server/src/vmq_queue.erl @@ -44,6 +44,7 @@ notify/1, notify_recv/1, enqueue/2, + front/2, status/1, info/1, add_session/3, @@ -109,6 +110,7 @@ max_msgs_per_drain_step, waiting_call, opts, + insert_fun = enqueue, delayed_will :: {Delay :: non_neg_integer(), Fun :: function()} | undefined, @@ -140,6 +142,9 @@ notify_recv(Queue) when is_pid(Queue) -> enqueue(Queue, Msg) when is_pid(Queue) -> gen_fsm:send_event(Queue, {enqueue, to_internal(Msg)}). +front(Queue, Msg) when is_pid(Queue) -> + gen_fsm:send_event(Queue, {front, to_internal(Msg)}). + enqueue_many(Queue, Msgs) when is_pid(Queue) and is_list(Msgs) -> NMsgs = lists:map(fun to_internal/1, Msgs), gen_fsm:sync_send_event(Queue, {enqueue_many, NMsgs}, infinity). @@ -246,6 +251,12 @@ online({notify_recv, SessionPid}, #state{id = SId, sessions = Sessions} = State) online({enqueue, Msg}, State) -> _ = vmq_metrics:incr_queue_in(), {next_state, online, insert(Msg, State)}; +online({front, Msg}, State) -> + _ = vmq_metrics:incr_queue_in(), + State0 = State#state{insert_fun = front}, + State1 = insert(Msg, State0), + State2 = State1#state{insert_fun = enqueue}, + {next_state, online, State2}; online(Event, State) -> ?LOG_ERROR("got unknown event in online state ~p", [Event]), {next_state, online, State}. @@ -923,7 +934,9 @@ handle_session_down( end. handle_waiting_acks_and_msgs( - WAcks, NextMsgId, #state{id = SId, sessions = Sessions, offline = Offline} = State + WAcks, + NextMsgId, + #state{id = SId, sessions = Sessions, offline = Offline, insert_fun = QFun} = State ) -> %% we can only handle the last waiting acks and msgs if this is %% the last session active for this queue. @@ -936,12 +949,13 @@ handle_waiting_acks_and_msgs( (#deliver{msg = #vmq_msg{persisted = true} = Msg} = D, AccOffline) -> queue_insert( true, + QFun, D#deliver{msg = Msg#vmq_msg{persisted = false}}, AccOffline, SId ); (Msg, AccOffline) -> - queue_insert(true, Msg, AccOffline, SId) + queue_insert(true, QFun, Msg, AccOffline, SId) end, Offline, WAcks @@ -1079,45 +1093,58 @@ insert( %% no session online, skip QoS0 message for QoS1 or QoS2 Subscription (without QoS upgrade) _ = vmq_metrics:incr_queue_unhandled(1), State; -insert(MsgOrRef, #state{id = SId, offline = Offline, sessions = Sessions} = State) when +insert( + MsgOrRef, #state{id = SId, offline = Offline, sessions = Sessions, insert_fun = QFun} = State +) when Sessions == #{} -> %% no session online, insert in offline queue - State#state{offline = queue_insert(true, maybe_set_expiry_ts(MsgOrRef), Offline, SId)}; + State#state{offline = queue_insert(true, QFun, maybe_set_expiry_ts(MsgOrRef), Offline, SId)}; %% Online Queue -insert(MsgOrRef, #state{id = SId, deliver_mode = fanout, sessions = Sessions} = State) -> +insert( + MsgOrRef, + #state{id = SId, deliver_mode = fanout, sessions = Sessions, insert_fun = QFun} = State +) -> {NewSessions, _} = session_fold( - SId, fun session_insert/3, maybe_set_expiry_ts(MsgOrRef), Sessions + SId, QFun, fun session_insert/4, maybe_set_expiry_ts(MsgOrRef), Sessions ), State#state{sessions = NewSessions}; -insert(MsgOrRef, #state{id = SId, deliver_mode = balance, sessions = Sessions} = State) -> +insert( + MsgOrRef, + #state{id = SId, deliver_mode = balance, sessions = Sessions, insert_fun = QFun} = State +) -> Pids = maps:keys(Sessions), RandomPid = lists:nth(rand:uniform(maps:size(Sessions)), Pids), RandomSession = maps:get(RandomPid, Sessions), - {UpdatedSession, _} = session_insert(SId, RandomSession, maybe_set_expiry_ts(MsgOrRef)), + {UpdatedSession, _} = session_insert(SId, QFun, RandomSession, maybe_set_expiry_ts(MsgOrRef)), State#state{sessions = maps:update(RandomPid, UpdatedSession, Sessions)}. -session_insert(SId, #session{status = active, queue = Q} = Session, MsgOrRef) -> +session_insert(SId, QFun, #session{status = active, queue = Q} = Session, MsgOrRef) -> { - send(SId, Session#session{queue = queue_insert(false, MsgOrRef, Q, SId)}), + send(SId, Session#session{queue = queue_insert(false, QFun, MsgOrRef, Q, SId)}), MsgOrRef }; -session_insert(SId, #session{status = passive, queue = Q} = Session, MsgOrRef) -> +session_insert(SId, QFun, #session{status = passive, queue = Q} = Session, MsgOrRef) -> { - Session#session{queue = queue_insert(false, MsgOrRef, Q, SId)}, + Session#session{queue = queue_insert(false, QFun, MsgOrRef, Q, SId)}, MsgOrRef }; -session_insert(SId, #session{status = notify, queue = Q} = Session, MsgOrRef) -> - {send_notification(Session#session{queue = queue_insert(false, MsgOrRef, Q, SId)}), MsgOrRef}. +session_insert(SId, QFun, #session{status = notify, queue = Q} = Session, MsgOrRef) -> + { + send_notification(Session#session{queue = queue_insert(false, QFun, MsgOrRef, Q, SId)}), + MsgOrRef + }. %% unlimited messages accepted -queue_insert(Offline, MsgOrRef, #queue{max = -1, size = Size, queue = Queue} = Q, SId) -> +queue_insert(Offline, enqueue, MsgOrRef, #queue{max = -1, size = Size, queue = Queue} = Q, SId) -> Q#queue{queue = queue:in(maybe_offline_store(Offline, SId, MsgOrRef), Queue), size = Size + 1}; -queue_insert(Offline, MsgOrRef, #queue{ignore_max = true, size = Size, queue = Queue} = Q, SId) -> +queue_insert( + Offline, enqueue, MsgOrRef, #queue{ignore_max = true, size = Size, queue = Queue} = Q, SId +) -> Q#queue{queue = queue:in(maybe_offline_store(Offline, SId, MsgOrRef), Queue), size = Size + 1}; %% tail drop in case of fifo queue_insert( - _Offline, MsgOrRef, #queue{type = fifo, max = Max, size = Size, drop = Drop} = Q, SId + _Offline, _QFun, MsgOrRef, #queue{type = fifo, max = Max, size = Size, drop = Drop} = Q, SId ) when Size >= Max -> @@ -1128,6 +1155,7 @@ queue_insert( %% drop oldest in case of lifo queue_insert( Offline, + enqueue, MsgOrRef, #queue{type = lifo, max = Max, size = Size, queue = Queue, drop = Drop} = Q, SId @@ -1142,7 +1170,11 @@ queue_insert( queue = queue:in(maybe_offline_store(Offline, SId, MsgOrRef), NewQueue), drop = Drop + 1 }; %% normal enqueue -queue_insert(Offline, MsgOrRef, #queue{queue = Queue, size = Size} = Q, SId) -> +queue_insert(Offline, front, MsgOrRef, #queue{queue = Queue, size = Size} = Q, SId) -> + Q#queue{ + queue = queue:in_r(maybe_offline_store(Offline, SId, MsgOrRef), Queue), size = Size + 1 + }; +queue_insert(Offline, enqueue, MsgOrRef, #queue{queue = Queue, size = Size} = Q, SId) -> Q#queue{queue = queue:in(maybe_offline_store(Offline, SId, MsgOrRef), Queue), size = Size + 1}. send( @@ -1223,13 +1255,13 @@ cleanup_queue_(SId, {{value, {{qos2, _}, _}}, NewQueue}) -> cleanup_queue_(_, {empty, _}) -> ok. -session_fold(SId, Fun, Acc, Map) -> - session_fold(SId, Fun, Acc, Map, maps:keys(Map)). +session_fold(SId, QFun, Fun, Acc, Map) -> + session_fold(SId, QFun, Fun, Acc, Map, maps:keys(Map)). -session_fold(SId, Fun, Acc, Map, [K | Rest]) -> - {NewV, NewAcc} = Fun(SId, maps:get(K, Map), Acc), - session_fold(SId, Fun, NewAcc, maps:update(K, NewV, Map), Rest); -session_fold(_, _, Acc, Map, []) -> +session_fold(SId, QFun, Fun, Acc, Map, [K | Rest]) -> + {NewV, NewAcc} = Fun(SId, QFun, maps:get(K, Map), Acc), + session_fold(SId, QFun, Fun, NewAcc, maps:update(K, NewV, Map), Rest); +session_fold(_, _, _, Acc, Map, []) -> {Map, Acc}. maybe_set_expiry_timer( diff --git a/apps/vmq_server/src/vmq_reg.erl b/apps/vmq_server/src/vmq_reg.erl index b5b20ec6f..4cfff88a2 100644 --- a/apps/vmq_server/src/vmq_reg.erl +++ b/apps/vmq_server/src/vmq_reg.erl @@ -93,25 +93,25 @@ subscribe_op(SubscriberId, Topics) -> OldSubs = subscriptions_for_subscriber_id(SubscriberId), Existing = subscriptions_exist(OldSubs, Topics), add_subscriber(lists:usort(Topics), OldSubs, SubscriberId), - QoSTable = + {QoSTable, _Ign} = lists:foldl( fun %% MQTTv4 clauses - ({_, {_, not_allowed}}, AccQoSTable) -> - [not_allowed | AccQoSTable]; - ({Exists, {T, QoS}}, AccQoSTable) when is_integer(QoS) -> - deliver_retained(SubscriberId, T, QoS, #{}, Exists), - [QoS | AccQoSTable]; + ({_, {_, not_allowed}}, {AccQoSTable, WaitSync}) -> + {[not_allowed | AccQoSTable], WaitSync}; + ({Exists, {T, QoS}}, {AccQoSTable, WaitSync}) when is_integer(QoS) -> + WaitSync0 = deliver_retained(SubscriberId, T, QoS, #{}, Exists, WaitSync), + {[QoS | AccQoSTable], WaitSync0}; %% MQTTv5 clauses - ({_, {_, {not_allowed, _}}}, AccQoSTable) -> - [not_allowed | AccQoSTable]; - ({Exists, {T, {QoS, SubOpts}}}, AccQoSTable) when + ({_, {_, {not_allowed, _}}}, {AccQoSTable, WaitSync}) -> + {[not_allowed | AccQoSTable], WaitSync}; + ({Exists, {T, {QoS, SubOpts}}}, {AccQoSTable, WaitSync}) when is_integer(QoS), is_map(SubOpts) -> - deliver_retained(SubscriberId, T, QoS, SubOpts, Exists), - [QoS | AccQoSTable] + WaitSync0 = deliver_retained(SubscriberId, T, QoS, SubOpts, Exists, WaitSync), + {[QoS | AccQoSTable], WaitSync0} end, - [], + {[], (vmq_config:get_env(subscriber_retain_mode, immediate) == syncwait)}, lists:zip(Existing, Topics) ), {ok, lists:reverse(QoSTable)}. @@ -574,17 +574,28 @@ add_to_subscriber_group({Node, Group, SubscriberId, SubInfo}, SubscriberGroups, SubscriberGroups ). --spec deliver_retained(subscriber_id(), topic(), qos(), subopts(), boolean()) -> 'ok'. -deliver_retained(_, _, _, #{retain_handling := dont_send}, _) -> +-spec deliver_retained(subscriber_id(), topic(), qos(), subopts(), boolean(), boolean()) -> 'ok'. +deliver_retained(_, _, _, #{retain_handling := dont_send}, _, WaitSync) -> %% don't send, skip - ok; -deliver_retained(_, _, _, #{retain_handling := send_if_new_sub}, true) -> + WaitSync; +deliver_retained(_, _, _, #{retain_handling := send_if_new_sub}, true, WaitSync) -> %% subscription already existed, skip. - ok; -deliver_retained(_SubscriberId, [<<"$share">> | _], _QoS, _SubOpts, _) -> + WaitSync; +deliver_retained(_SubscriberId, [<<"$share">> | _], _QoS, _SubOpts, _, WaitSync) -> %% Never deliver retained messages to subscriber groups. - ok; -deliver_retained({MP, _} = SubscriberId, Topic, QoS, SubOpts, _) -> + WaitSync; +deliver_retained(SubscriberId, Topic, QoS, SubOpts, _, WaitSync) -> + deliver_retained(SubscriberId, Topic, QoS, SubOpts, WaitSync). + +deliver_retained({MP, _} = SubscriberId, Topic, QoS, SubOpts, WaitSync) -> + Ret0 = + case WaitSync of + true -> + timer:sleep(vmq_config:get_env(retain_persist_interval, 1000) * 2), + false; + _ -> + false + end, QPid = get_queue_pid(SubscriberId), vmq_retain_srv:match_fold( fun @@ -609,7 +620,7 @@ deliver_retained({MP, _} = SubscriberId, Topic, QoS, SubOpts, _) -> }, Msg1 = maybe_add_sub_id({QoS, SubOpts}, Msg), maybe_delete_expired(ExpiryTs, MP, Topic), - vmq_queue:enqueue(QPid, {deliver, QoS, Msg1}); + vmq_queue:front(QPid, {deliver, QoS, Msg1}); ({{_M, T}, Payload}, _) when is_binary(Payload) -> %% compatibility with old style retained messages. Msg = #vmq_msg{ @@ -622,12 +633,13 @@ deliver_retained({MP, _} = SubscriberId, Topic, QoS, SubOpts, _) -> msg_ref = vmq_mqtt_fsm_util:msg_ref(), properties = #{} }, - vmq_queue:enqueue(QPid, {deliver, QoS, Msg}) + vmq_queue:front(QPid, {deliver, QoS, Msg}) end, ok, MP, Topic - ). + ), + Ret0. maybe_delete_expired(undefined, _, _) -> ok; diff --git a/apps/vmq_server/src/vmq_server.app.src b/apps/vmq_server/src/vmq_server.app.src index e5d77ee58..4f52d86dd 100644 --- a/apps/vmq_server/src/vmq_server.app.src +++ b/apps/vmq_server/src/vmq_server.app.src @@ -49,6 +49,8 @@ {max_offline_messages, -1}, % balance vs fanout {queue_deliver_mode, fanout}, + % subscriber retain mode immediate + {subscriber_retain_mode, immediate}, % fifo vs lifo {queue_type, fifo}, % never diff --git a/apps/vmq_server/test/vmq_retain_SUITE.erl b/apps/vmq_server/test/vmq_retain_SUITE.erl index 4902a470d..10b48b7b2 100644 --- a/apps/vmq_server/test/vmq_retain_SUITE.erl +++ b/apps/vmq_server/test/vmq_retain_SUITE.erl @@ -28,8 +28,16 @@ end_per_suite(_Config) -> _Config. init_per_group(mqttv4, Config) -> + vmq_server_cmd:set_config(subscriber_retain_mode, immediate), [{protover, 4}|Config]; init_per_group(mqttv5, Config) -> + vmq_server_cmd:set_config(subscriber_retain_mode, immediate), + [{protover, 5}|Config]; +init_per_group(mqttv4waitsync, Config) -> + vmq_server_cmd:set_config(subscriber_retain_mode, waitsync), + [{protover, 4}|Config]; +init_per_group(mqttv5waitsync, Config) -> + vmq_server_cmd:set_config(subscriber_retain_mode, waitsync), [{protover, 5}|Config]. end_per_group(_Group, _Config) -> @@ -44,7 +52,9 @@ end_per_testcase(_, Config) -> all() -> [ {group, mqttv4}, - {group, mqttv5} + {group, mqttv4waitsync}, + {group, mqttv5}, + {group, mqttv5waitsync} ]. groups() -> diff --git a/changelog.md b/changelog.md index 6dec2a981..b618a22fc 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,4 @@ +- New feature: Synchronized wait for retrained message store - Add option for a default rule ("*") in database Lua scripts, so that Clients can fallback to default ACLs. ## VerneMQ 2.0 Release Candidate