Skip to content

Commit

Permalink
Add properties to on_deliver hook, so that it can be used also for v3…
Browse files Browse the repository at this point in the history
… clients (#2256)

Co-authored-by: mths1 <mths1>
  • Loading branch information
mths1 committed Feb 19, 2024
1 parent 71f52a6 commit af957d5
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 35 deletions.
2 changes: 1 addition & 1 deletion apps/vmq_diversity/src/vmq_diversity.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
{vmq_diversity_plugin, on_publish, 6, []},
{vmq_diversity_plugin, on_subscribe, 3, []},
{vmq_diversity_plugin, on_unsubscribe, 3, []},
{vmq_diversity_plugin, on_deliver, 6, []},
{vmq_diversity_plugin, on_deliver, 7, []},

{vmq_diversity_plugin, auth_on_register_m5, 6, []},
{vmq_diversity_plugin, auth_on_publish_m5, 7, []},
Expand Down
9 changes: 5 additions & 4 deletions apps/vmq_diversity/src/vmq_diversity_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
-behaviour(on_publish_hook).
-behaviour(on_subscribe_hook).
-behaviour(on_unsubscribe_hook).
-behaviour(on_deliver_hook).
%-behaviour(on_deliver_hook).
-behaviour(on_offline_message_hook).
-behaviour(on_client_wakeup_hook).
-behaviour(on_client_offline_hook).
Expand All @@ -47,7 +47,7 @@
on_publish/6,
on_subscribe/3,
on_unsubscribe/3,
on_deliver/6,
on_deliver/7,
on_offline_message/5,
on_client_wakeup/1,
on_client_offline/1,
Expand Down Expand Up @@ -589,7 +589,7 @@ on_unsubscribe(UserName, SubscriberId, Topics) ->
])
end.

on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, Props) ->
{MP, ClientId} = subscriber_id(SubscriberId),
all_till_ok(on_deliver, [
{username, nilify(UserName)},
Expand All @@ -598,7 +598,8 @@ on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
{qos, QoS},
{topic, unword(Topic)},
{payload, Payload},
{retain, IsRetain}
{retain, IsRetain},
{properties, conv_args_props(Props)}
]).

on_offline_message(SubscriberId, QoS, Topic, Payload, Retain) ->
Expand Down
11 changes: 10 additions & 1 deletion apps/vmq_diversity/test/plugin_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,16 @@ function on_deliver(pub)
assert(pub.mountpoint == "")
assert(pub.topic == "test/topic")
assert(pub.payload == "hello world")
assert(pub.retain == false)
if (pub.retain == true) then
properties = pub.properties
assert(properties.p_correlation_data == "correlation_data")
assert(properties.p_response_topic == "response/topic")
assert(properties.p_payload_format_indicator == "utf8")
assert(properties.p_content_type == "content_type")
assert(properties.p_user_property[1].k1 == "v1")
assert(properties.p_user_property[2].k2 == "v2")
end

print("on_deliver called")
return true
end
Expand Down
18 changes: 16 additions & 2 deletions apps/vmq_diversity/test/vmq_diversity_plugin_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,22 @@ on_unsubscribe_test(_) ->

on_deliver_test(_) ->
ok = vmq_plugin:all_till_ok(on_deliver,
[username(), allowed_subscriber_id(), 1, topic(), payload(), false]).

[username(), allowed_subscriber_id(), 1, topic(), payload(), false, #{}]),
Args = [username(), allowed_subscriber_id(), 1, topic(), payload(), true,
#{?P_USER_PROPERTY =>
[{<<"k1">>, <<"v1">>},
{<<"k2">>, <<"v2">>}],
?P_CORRELATION_DATA => <<"correlation_data">>,
?P_RESPONSE_TOPIC => [<<"response">>,<<"topic">>],
?P_PAYLOAD_FORMAT_INDICATOR => utf8,
?P_CONTENT_TYPE => <<"content_type">>}],

ok = vmq_plugin:all_till_ok(on_deliver,
[username(), allowed_subscriber_id(), 1, topic(), payload(), false, Args]).




on_offline_message_test(_) ->
[next] = vmq_plugin:all(on_offline_message, [allowed_subscriber_id(), 2,
topic(), payload(), false]).
Expand Down
20 changes: 8 additions & 12 deletions apps/vmq_server/src/vmq_mqtt_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1237,11 +1237,12 @@ prepare_frame(#deliver{qos = QoS, msg_id = MsgId, msg = Msg}, State) ->
payload = Payload,
retain = IsRetained,
dup = IsDup,
qos = MsgQoS
qos = MsgQoS,
properties = Props0
} = Msg,
NewQoS = maybe_upgrade_qos(QoS, MsgQoS, State),
{NewTopic, NewPayload} =
case on_deliver_hook(User, SubscriberId, QoS, Topic, Payload, IsRetained) of
case on_deliver_hook(User, SubscriberId, QoS, Topic, Payload, IsRetained, Props0) of
{error, _} ->
%% no on_deliver hook specified... that's ok
{Topic, Payload};
Expand Down Expand Up @@ -1277,16 +1278,11 @@ prepare_frame(#deliver{qos = QoS, msg_id = MsgId, msg = Msg}, State) ->
}}
end.

-spec on_deliver_hook(username(), subscriber_id(), qos(), topic(), payload(), flag()) -> any().
on_deliver_hook(User, SubscriberId, QoS, Topic, Payload, IsRetain) ->
HookArgs0 = [User, SubscriberId, Topic, Payload],
case vmq_plugin:all_till_ok(on_deliver, HookArgs0) of
{error, _} ->
HookArgs1 = [User, SubscriberId, QoS, Topic, Payload, IsRetain],
vmq_plugin:all_till_ok(on_deliver, HookArgs1);
Other ->
Other
end.
-spec on_deliver_hook(username(), subscriber_id(), qos(), topic(), payload(), flag(), any()) ->
any().
on_deliver_hook(User, SubscriberId, QoS, Topic, Payload, IsRetain, Properties) ->
HookArgs = [User, SubscriberId, QoS, Topic, Payload, IsRetain, Properties],
vmq_plugin:all_till_ok(on_deliver, HookArgs).

-spec maybe_publish_last_will(state(), any()) -> ok.
maybe_publish_last_will(_, ?CLIENT_DISCONNECT) ->
Expand Down
14 changes: 7 additions & 7 deletions apps/vmq_server/test/vmq_hook_rewrite_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ hook_on_publish_modified_payload(_UserName, _SubscriberId, _QoS, _Topic, <<"rewr
hook_on_publish_modified_payload(_UserName, _SubscriberId, _QoS, _Topic, Payload, _IsRetain) ->
throw({expected_payload, <<"rewritten">>, got, Payload}).

hook_on_deliver(_User, {"", <<"dlvr-rewrite-test">>}, [<<"dlvr">>, <<"rewrite">>, <<"payload">>],
<<"message">>) ->
hook_on_deliver(_User, {"", <<"dlvr-rewrite-test">>}, _, [<<"dlvr">>, <<"rewrite">>, <<"payload">>],
<<"message">>, _, _) ->
{ok, <<"deliver rewritten">>};
hook_on_deliver(_User, {"", <<"dlvr-rewrite-test">>}, [<<"dlvr">>, <<"rewrite">>, <<"me">>],
<<"message">>) ->
hook_on_deliver(_User, {"", <<"dlvr-rewrite-test">>}, _, [<<"dlvr">>, <<"rewrite">>, <<"me">>],
<<"message">>, _, _) ->
{ok, [{topic, [<<"dlvr">>, <<"rewrite">>, <<"payload">>]},
{payload, <<"deliver rewritten">>}]};
hook_on_deliver(_, _, _, _) -> ok.
hook_on_deliver(_, _, _, _, _, _, _) -> ok.


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand All @@ -238,7 +238,7 @@ enable_auth_on_publish() ->
auth_on_publish, ?MODULE, hook_auth_on_publish, 6).
enable_on_deliver() ->
ok = vmq_plugin_mgr:enable_module_plugin(
on_deliver, ?MODULE, hook_on_deliver, 4).
on_deliver, ?MODULE, hook_on_deliver, 7).
enable_hook_on_publish_modified_payload() ->
ok = vmq_plugin_mgr:enable_module_plugin(
on_publish, ?MODULE, hook_on_publish_modified_payload, 6).
Expand All @@ -250,7 +250,7 @@ disable_auth_on_publish() ->
auth_on_publish, ?MODULE, hook_auth_on_publish, 6).
disable_on_deliver() ->
ok = vmq_plugin_mgr:disable_module_plugin(
on_deliver, ?MODULE, hook_on_deliver, 4).
on_deliver, ?MODULE, hook_on_deliver, 7).
disable_hook_on_publish_modified_payload() ->
ok = vmq_plugin_mgr:disable_module_plugin(
on_publish, ?MODULE, hook_on_publish_modified_payload, 6).
Expand Down
2 changes: 1 addition & 1 deletion apps/vmq_webhooks/src/vmq_webhooks.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
{vmq_webhooks_plugin, on_publish, 6, []},
{vmq_webhooks_plugin, on_subscribe, 3, []},
{vmq_webhooks_plugin, on_unsubscribe, 3, []},
{vmq_webhooks_plugin, on_deliver, 6, []},
{vmq_webhooks_plugin, on_deliver, 7, []},

{vmq_webhooks_plugin, auth_on_register_m5, 6, []},
{vmq_webhooks_plugin, auth_on_publish_m5, 7, []},
Expand Down
11 changes: 6 additions & 5 deletions apps/vmq_webhooks/src/vmq_webhooks_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
-behaviour(on_publish_hook).
-behaviour(on_subscribe_hook).
-behaviour(on_unsubscribe_hook).
-behaviour(on_deliver_hook).
%-behaviour(on_deliver_hook).
-behaviour(on_offline_message_hook).
-behaviour(on_client_wakeup_hook).
-behaviour(on_client_offline_hook).
Expand All @@ -50,7 +50,7 @@
on_publish/6,
on_subscribe/3,
on_unsubscribe/3,
on_deliver/6,
on_deliver/7,
on_offline_message/5,
on_client_wakeup/1,
on_client_offline/1,
Expand Down Expand Up @@ -487,9 +487,9 @@ on_unsubscribe_m5(UserName, SubscriberId, Topics, Props) ->
{properties, Props}
]).

-spec on_deliver(username(), subscriber_id(), qos(), topic(), payload(), flag()) ->
-spec on_deliver(username(), subscriber_id(), qos(), topic(), payload(), flag(), properties()) ->
'next' | 'ok' | {'ok', payload() | [on_deliver_hook:msg_modifier()]}.
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain, Props) ->
{MP, ClientId} = subscriber_id(SubscriberId),
all_till_ok(on_deliver, [
{username, nullify(UserName)},
Expand All @@ -498,7 +498,8 @@ on_deliver(UserName, SubscriberId, QoS, Topic, Payload, IsRetain) ->
{qos, QoS},
{topic, unword(Topic)},
{payload, Payload},
{retain, IsRetain}
{retain, IsRetain},
{properties, Props}
]).

-spec on_deliver_m5(username(), subscriber_id(), qos(), topic(), payload(), flag(), properties()) ->
Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_webhooks/test/vmq_webhooks_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ on_deliver_test(_) ->
register_hook(on_deliver, ?ENDPOINT),
Self = pid_to_bin(self()),
ok = vmq_plugin:all_till_ok(on_deliver,
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false]),
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #{}]),
ok = exp_response(on_deliver_ok),
deregister_hook(on_deliver, ?ENDPOINT).

Expand Down Expand Up @@ -685,7 +685,7 @@ base_https_test(Config, ServerOpts, ClientSSLEnv) ->
register_hook(on_deliver, ?HTTPS_ENDPOINT),
Self = pid_to_bin(self()),
_ = vmq_plugin:all_till_ok(on_deliver,
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false]),
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #{}]),
ExpResponse = exp_response(on_deliver_ok),
clear_ssl_app_env(),
deregister_hook(on_deliver, ?HTTPS_ENDPOINT),
Expand Down

0 comments on commit af957d5

Please sign in to comment.