Skip to content

Commit

Permalink
Merge pull request #58 from larshesel/pass_modified_payload_to_on_pub…
Browse files Browse the repository at this point in the history
…lish

Pass modified payload to on_publish hook
  • Loading branch information
larshesel committed May 13, 2016
2 parents 3714cf1 + c4d2f5d commit 23ea562
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 40 deletions.
8 changes: 6 additions & 2 deletions src/vmq_mqtt_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,8 @@ auth_on_publish(User, SubscriberId, #vmq_msg{routing_key=Topic,
ok ->
AuthSuccess(Msg, HookArgs);
{ok, ChangedPayload} when is_binary(ChangedPayload) ->
AuthSuccess(Msg#vmq_msg{payload=ChangedPayload}, HookArgs);
HookArgs1 = [User, SubscriberId, QoS, Topic, ChangedPayload, unflag(IsRetain)],
AuthSuccess(Msg#vmq_msg{payload=ChangedPayload}, HookArgs1);
{ok, Args} when is_list(Args) ->
#vmq_msg{reg_view=RegView, mountpoint=MP} = Msg,
ChangedTopic = proplists:get_value(topic, Args, Topic),
Expand All @@ -604,12 +605,15 @@ auth_on_publish(User, SubscriberId, #vmq_msg{routing_key=Topic,
ChangedQoS = proplists:get_value(qos, Args, QoS),
ChangedIsRetain = proplists:get_value(retain, Args, IsRetain),
ChangedMountpoint = proplists:get_value(mountpoint, Args, MP),
HookArgs1 = [User, SubscriberId, ChangedQoS,
ChangedTopic, ChangedPayload, ChangedIsRetain],
AuthSuccess(Msg#vmq_msg{routing_key=ChangedTopic,
payload=ChangedPayload,
reg_view=ChangedRegView,
qos=ChangedQoS,
retain=ChangedIsRetain,
mountpoint=ChangedMountpoint}, HookArgs);
mountpoint=ChangedMountpoint},
HookArgs1);
{error, Re} ->
lager:error("can't auth publish ~p due to ~p", [HookArgs, Re]),
{error, not_allowed}
Expand Down
91 changes: 53 additions & 38 deletions test/vmq_hook_rewrite_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
]).


-export([hook_auth_on_subscribe/3,
hook_auth_on_publish/6,
hook_on_deliver/4]).
-export([hook_auth_on_subscribe/3
, hook_auth_on_publish/6
, hook_on_deliver/4
, hook_on_publish_modified_payload/6
]).

%% ===================================================================
%% common_test callbacks
Expand Down Expand Up @@ -49,9 +51,6 @@ all() ->
, on_deliver_rewrite_packet_test
].

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Actual Tests
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
auth_on_publish_rewrite_payload_test(_) ->
Connect = packet:gen_connect("pub-rewrite-test", [{keepalive, 60}]),
Connack = packet:gen_connack(0),
Expand All @@ -61,8 +60,9 @@ auth_on_publish_rewrite_payload_test(_) ->
Subscribe = packet:gen_subscribe(3265, "pub/rewrite/payload", 0),
Suback = packet:gen_suback(3265, 0),

enable_on_subscribe(),
enable_on_publish(),
enable_auth_on_subscribe(),
enable_auth_on_publish(),
enable_hook_on_publish_modified_payload(),

{ok, Socket} = packet:do_client_connect(Connect, Connack, []),
ok = gen_tcp:send(Socket, Subscribe),
Expand All @@ -72,12 +72,13 @@ auth_on_publish_rewrite_payload_test(_) ->
ok = packet:expect_packet(Socket, "puback", Puback),

%% receive publish with rewritten payload
PublishRewritten = packet:gen_publish("pub/rewrite/payload", 0, <<"hello world">>, [{mid, 1}]),
PublishRewritten = packet:gen_publish("pub/rewrite/payload", 0, <<"rewritten">>, [{mid, 1}]),
ok = packet:expect_packet(Socket, "publish", PublishRewritten),


disable_on_publish(),
disable_on_subscribe(),
disable_hook_on_publish_modified_payload(),
disable_auth_on_publish(),
disable_auth_on_subscribe(),
ok = gen_tcp:close(Socket).

auth_on_publish_rewrite_packet_test(_) ->
Expand All @@ -89,8 +90,9 @@ auth_on_publish_rewrite_packet_test(_) ->
Subscribe = packet:gen_subscribe(3265, "pub/rewrite/topic", 0),
Suback = packet:gen_suback(3265, 0),

enable_on_subscribe(),
enable_on_publish(),
enable_auth_on_subscribe(),
enable_auth_on_publish(),
enable_hook_on_publish_modified_payload(),

{ok, Socket} = packet:do_client_connect(Connect, Connack, []),
ok = gen_tcp:send(Socket, Subscribe),
Expand All @@ -100,11 +102,12 @@ auth_on_publish_rewrite_packet_test(_) ->
ok = packet:expect_packet(Socket, "puback", Puback),

%% receive publish with rewritten payload and rewritten topic
PublishRewritten = packet:gen_publish("pub/rewrite/topic", 0, <<"hello world">>, [{mid, 1}]),
PublishRewritten = packet:gen_publish("pub/rewrite/topic", 0, <<"rewritten">>, [{mid, 1}]),
ok = packet:expect_packet(Socket, "publish", PublishRewritten),

disable_on_publish(),
disable_on_subscribe(),
disable_hook_on_publish_modified_payload(),
disable_auth_on_publish(),
disable_auth_on_subscribe(),
ok = gen_tcp:close(Socket).

auth_on_subscribe_rewrite_test(_) ->
Expand All @@ -117,8 +120,8 @@ auth_on_subscribe_rewrite_test(_) ->
Subscribe = packet:gen_subscribe(3265, "sub/rewrite/me", 1),
Suback = packet:gen_suback(3265, [0]),

enable_on_subscribe(),
enable_on_publish(),
enable_auth_on_subscribe(),
enable_auth_on_publish(),

{ok, Socket} = packet:do_client_connect(Connect, Connack, []),
ok = gen_tcp:send(Socket, Subscribe),
Expand All @@ -132,8 +135,8 @@ auth_on_subscribe_rewrite_test(_) ->
Publish1 = packet:gen_publish("sub/rewrite/topic", 0, <<"message">>, []),
ok = packet:expect_packet(Socket, "publish", Publish1),

disable_on_publish(),
disable_on_subscribe(),
disable_auth_on_publish(),
disable_auth_on_subscribe(),
ok = gen_tcp:close(Socket).


Expand All @@ -145,9 +148,9 @@ on_deliver_rewrite_payload_test(_) ->
Subscribe = packet:gen_subscribe(3265, "dlvr/rewrite/payload", 0),
Suback = packet:gen_suback(3265, 0),

enable_on_subscribe(),
enable_auth_on_subscribe(),
enable_on_deliver(),
enable_on_publish(),
enable_auth_on_publish(),

{ok, Socket} = packet:do_client_connect(Connect, Connack, []),
ok = gen_tcp:send(Socket, Subscribe),
Expand All @@ -157,13 +160,14 @@ on_deliver_rewrite_payload_test(_) ->
ok = gen_tcp:send(Socket, Publish),
ok = packet:expect_packet(Socket, "puback", Puback),


%% receive publish
Publish1 = packet:gen_publish("dlvr/rewrite/payload", 0, <<"hello world">>, []),
Publish1 = packet:gen_publish("dlvr/rewrite/payload", 0, <<"deliver rewritten">>, []),
ok = packet:expect_packet(Socket, "publish", Publish1),

disable_on_publish(),
disable_auth_on_publish(),
disable_on_deliver(),
disable_on_subscribe(),
disable_auth_on_subscribe(),
ok = gen_tcp:close(Socket).

on_deliver_rewrite_packet_test(_) ->
Expand All @@ -174,9 +178,9 @@ on_deliver_rewrite_packet_test(_) ->
Subscribe = packet:gen_subscribe(3265, "dlvr/rewrite/me", 0),
Suback = packet:gen_suback(3265, 0),

enable_on_subscribe(),
enable_auth_on_subscribe(),
enable_on_deliver(),
enable_on_publish(),
enable_auth_on_publish(),

{ok, Socket} = packet:do_client_connect(Connect, Connack, []),
ok = gen_tcp:send(Socket, Subscribe),
Expand All @@ -187,12 +191,12 @@ on_deliver_rewrite_packet_test(_) ->
ok = packet:expect_packet(Socket, "puback", Puback),

%% receive publish
Publish1 = packet:gen_publish("dlvr/rewrite/payload", 0, <<"hello world">>, []),
Publish1 = packet:gen_publish("dlvr/rewrite/payload", 0, <<"deliver rewritten">>, []),
ok = packet:expect_packet(Socket, "publish", Publish1),

disable_on_publish(),
disable_auth_on_publish(),
disable_on_deliver(),
disable_on_subscribe(),
disable_auth_on_subscribe(),
ok = gen_tcp:close(Socket).


Expand All @@ -208,46 +212,57 @@ hook_auth_on_subscribe(_, _, _) -> ok.
hook_auth_on_publish(_, {"", <<"pub-rewrite-test">>}, _MsgId, [<<"pub">>, <<"rewrite">>, <<"payload">>],
<<"message">>, false) ->
%% REWRITE PAYLOAD
{ok, <<"hello world">>};
{ok, <<"rewritten">>};

hook_auth_on_publish(_, {"", <<"pub-rewrite-test">>}, _MsgId, [<<"pub">>, <<"rewrite">>, <<"packet">>],
<<"message">>, false) ->
%% REWRITE PAYLOAD
{ok, [{payload, <<"hello world">>}, {topic, [<<"pub">>, <<"rewrite">>, <<"topic">>]}]};
{ok, [{payload, <<"rewritten">>}, {topic, [<<"pub">>, <<"rewrite">>, <<"topic">>]}]};

hook_auth_on_publish(_, _, _MsgId, _, _, _) ->
ok.

hook_on_publish_modified_payload(_UserName, _SubscriberId, _QoS, _Topic, <<"rewritten">>, _IsRetain) ->
ok;
hook_on_publish_modified_payload(_UserName, _SubscriberId, _QoS, _Topic, Payload, _IsRetain) ->
throw({expected_payload, <<"rewritten">>, got, Payload}).

hook_on_deliver(_User, {"", <<"dlvr-rewrite-test">>}, [<<"dlvr">>, <<"rewrite">>, <<"payload">>],
<<"message">>) ->
{ok, <<"hello world">>};
{ok, <<"deliver rewritten">>};
hook_on_deliver(_User, {"", <<"dlvr-rewrite-test">>}, [<<"dlvr">>, <<"rewrite">>, <<"me">>],
<<"message">>) ->
{ok, [{topic, [<<"dlvr">>, <<"rewrite">>, <<"payload">>]},
{payload, <<"hello world">>}]};
{payload, <<"deliver rewritten">>}]};
hook_on_deliver(_, _, _, _) -> ok.


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Helper
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
enable_on_subscribe() ->
enable_auth_on_subscribe() ->
vmq_plugin_mgr:enable_module_plugin(
auth_on_subscribe, ?MODULE, hook_auth_on_subscribe, 3).
enable_on_publish() ->
enable_auth_on_publish() ->
vmq_plugin_mgr:enable_module_plugin(
auth_on_publish, ?MODULE, hook_auth_on_publish, 6).
enable_on_deliver() ->
vmq_plugin_mgr:enable_module_plugin(
on_deliver, ?MODULE, hook_on_deliver, 4).
disable_on_subscribe() ->
enable_hook_on_publish_modified_payload() ->
vmq_plugin_mgr:enable_module_plugin(
on_publish, ?MODULE, hook_on_publish_modified_payload, 6).
disable_auth_on_subscribe() ->
vmq_plugin_mgr:disable_module_plugin(
auth_on_subscribe, ?MODULE, hook_auth_on_subscribe, 3).
disable_on_publish() ->
disable_auth_on_publish() ->
vmq_plugin_mgr:disable_module_plugin(
auth_on_publish, ?MODULE, hook_auth_on_publish, 6).
disable_on_deliver() ->
vmq_plugin_mgr:disable_module_plugin(
on_deliver, ?MODULE, hook_on_deliver, 4).
disable_hook_on_publish_modified_payload() ->
vmq_plugin_mgr:disable_module_plugin(
on_publish, ?MODULE, hook_on_publish_modified_payload, 6).


0 comments on commit 23ea562

Please sign in to comment.