Skip to content

Commit

Permalink
Introduces two new vmq-admin commands: session disconnect clients tha…
Browse files Browse the repository at this point in the history
…t allows to disconnect a comma-separeted list of clients and disconnect batch which allows to disconnect a number of (random) clients (#2181)

Co-authored-by: mths1 <mths1>
  • Loading branch information
mths1 committed Aug 24, 2023
1 parent de87dad commit ea95cb9
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 1 deletion.
167 changes: 167 additions & 0 deletions apps/vmq_server/src/vmq_info_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@
register_cli() ->
vmq_session_list_cmd(),
vmq_session_disconnect_cmd(),
vmq_session_disconnect_clients_cmd(),
vmq_session_disconnect_batch_cmd(),
vmq_session_reauthorize_cmd(),
vmq_retain_show_cmd(),
vmq_retain_delete_cmd(),

clique:register_usage(["vmq-admin", "session"], session_usage()),
clique:register_usage(["vmq-admin", "session", "show"], vmq_session_show_usage()),
clique:register_usage(["vmq-admin", "session", "disconnect"], vmq_session_disconnect_usage()),
clique:register_usage(
["vmq-admin", "session", "disconnect", "clients"], vmq_session_disconnect_clients_usage()
),
clique:register_usage(
["vmq-admin", "session", "disconnect", "batch"], vmq_session_disconnect_batch_usage()
),
clique:register_usage(["vmq-admin", "session", "reauthorize"], vmq_session_reauthorize_usage()),
clique:register_usage(["vmq-admin", "retain"], retain_usage()),
clique:register_usage(["vmq-admin", "retain", "show"], retain_show_usage()),
Expand Down Expand Up @@ -136,6 +144,134 @@ vmq_session_disconnect_cmd() ->
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

vmq_session_disconnect_clients_cmd() ->
Cmd = ["vmq-admin", "session", "disconnect", "clients"],
KeySpecs = [{'client-ids', [{typecast, fun(ClientId) -> ClientId end}]}],
FlagSpecs = [
{cleanup, [
{shortname, "c"},
{longname, "cleanup"}
]},
{mountpoint, [
{shortname, "m"},
{longname, "mountpoint"},
{typecast, fun(Mountpoint) -> Mountpoint end}
]}
],

Callback = fun
(_, [{'client-ids', ClientId}], Flags) ->
DoCleanup = lists:keymember(cleanup, 1, Flags),
ClientIdList = string:tokens(ClientId, ","),
OrConditions = lists:map(fun(Id) -> "client_id = \"" ++ Id ++ "\"" end, ClientIdList),
QueryString0 =
"SELECT queue_pid FROM sessions WHERE (" ++ string:join(OrConditions, " OR "),
case proplists:get_value(mountpoint, Flags, "") of
undefined ->
%% Unparsable mountpoint or without value
Text = clique_status:text("Invalid mountpoint value"),
[clique_status:alert([Text])];
Mountpoint ->
QueryString1 = QueryString0 ++ ") AND mountpoint=\"" ++ Mountpoint ++ "\"",
vmq_ql_query_mgr:fold_query(
fun(Row, _) ->
QueuePid = maps:get(queue_pid, Row),
vmq_queue:force_disconnect(QueuePid, ?ADMINISTRATIVE_ACTION, DoCleanup)
end,
ok,
QueryString1
),
[clique_status:text("Done")]
end;
(_, _, _) ->
Text = clique_status:text(vmq_session_disconnect_usage()),
[clique_status:alert([Text])]
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

vmq_session_disconnect_batch_cmd() ->
Cmd = ["vmq-admin", "session", "disconnect", "batch"],
KeySpecs = [{'count', [{typecast, fun(Limit) -> Limit end}]}],
FlagSpecs = [
{cleanup, [
{shortname, "c"},
{longname, "cleanup"}
]},
{mountpoint, [
{shortname, "m"},
{longname, "mountpoint"},
{typecast, fun(Mountpoint) -> Mountpoint end}
]},
{filterIDs, [
{shortname, "f"},
{longname, "filter-client-ids"},
{typecast, fun(ClientId) -> ClientId end}
]},
{node, [
{shortname, "n"},
{longname, "node"},
{typecast, fun(Node) -> Node end}
]}
],

Callback = fun
(_, [{'count', Limit}], Flags) ->
DoCleanup = lists:keymember(cleanup, 1, Flags),
ClientId = proplists:get_value(filterIDs, Flags, ""),
QueryWhereFilter =
case ClientId of
undefined ->
"";
"" ->
"";
_ ->
ClientIdList = string:tokens(ClientId, ","),
OrConditions = lists:map(
fun(Id) -> "client_id != \"" ++ Id ++ "\"" end, ClientIdList
),
"(" ++ string:join(OrConditions, " AND ") ++ ") AND "
end,
Node = proplists:get_value(node, Flags, ""),
QueryWhereNodeFilter =
case Node of
undefined ->
"";
"" ->
"";
_ ->
NodeIdList = string:tokens(Node, ","),
OrConditions2 = lists:map(
fun(Id) -> "node = \"" ++ Id ++ "\"" end, NodeIdList
),
"(" ++ string:join(OrConditions2, " AND ") ++ ") AND "
end,
QueryString0 =
"SELECT queue_pid FROM sessions WHERE " ++ QueryWhereFilter ++ QueryWhereNodeFilter,

case proplists:get_value(mountpoint, Flags, "") of
undefined ->
%% Unparsable mountpoint or without value
Text = clique_status:text("Invalid mountpoint value"),
[clique_status:alert([Text])];
Mountpoint ->
QueryString1 =
QueryString0 ++ "mountpoint=\"" ++ Mountpoint ++ "\" LIMIT " ++ Limit,
vmq_ql_query_mgr:fold_query(
fun(Row, _) ->
QueuePid = maps:get(queue_pid, Row),
vmq_queue:force_disconnect(QueuePid, ?ADMINISTRATIVE_ACTION, DoCleanup)
end,
ok,
QueryString1
),
[clique_status:text("Done")]
end;
(_, _, _) ->
Text = clique_status:text(vmq_session_disconnect_usage()),
[clique_status:alert([Text])]
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

vmq_session_reauthorize_cmd() ->
Cmd = ["vmq-admin", "session", "reauthorize"],
KeySpecs = [
Expand Down Expand Up @@ -293,6 +429,37 @@ vmq_session_disconnect_usage() ->
" removes the stored cluster state of this client like stored\n",
" messages and subscriptions.",
"\n\n"
" Sub-commands:\n",
" clients Forcefully disconnect multiple running sessions\n",
" batch Forcefully disconnect a number of (random) session\n",
"\n\n"
].
vmq_session_disconnect_clients_usage() ->
[
"vmq-admin session disconnect clients client-ids=<Comma Seperated List of ClientId>\n\n",
" Forcefully disconnects a number of clients from the cluster. \n\n",
" --mountpoint=<Mountpoint>, -m\n",
" specifies the mountpoint, defaults to the default mountpoint\n",
" --cleanup, -c\n",
" removes the stored cluster state of this client like stored\n",
" messages and subscriptions.",
"\n\n"
].

vmq_session_disconnect_batch_usage() ->
[
"vmq-admin session disconnect batch count=Number of clients to be disconnected\n\n",
" Forcefully disconnects a number of (random) clients from the cluster. \n\n",
" --mountpoint=<Mountpoint>, -m\n",
" specifies the mountpoint, defaults to the default mountpoint\n",
" --cleanup, -c\n",
" removes the stored cluster state of this client like stored\n",
" messages and subscriptions.\n",
" --filter-client-ids=<list of client-ids>, -f\n"
" the clients will be filtered and not disconnected from the cluster\n",
" --node=<list of nodes>, -n\n"
" limits disconnects to certain nodes\n",
"\n\n"
].

vmq_session_reauthorize_usage() ->
Expand Down
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
- 'vmq_admin': Add commands allowing batch disconnects (vmq-admin session disconnect batch and vmq-admin session disconnect clients)
- 'vmq_http_pub': Allow anonymous access (allow_anonymous = on)

## VerneMQ 1.13.0

- New Plugin: 'vmq_http_pub', allows to ingest MQTT messages via a HTTP REST interface
Expand Down

0 comments on commit ea95cb9

Please sign in to comment.