Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces two new vmq-admin commands: disconnect clients and disconnect batch #2181

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions apps/vmq_server/src/vmq_info_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@
register_cli() ->
vmq_session_list_cmd(),
vmq_session_disconnect_cmd(),
vmq_session_disconnect_clients_cmd(),
vmq_session_disconnect_batch_cmd(),
vmq_session_reauthorize_cmd(),
vmq_retain_show_cmd(),
vmq_retain_delete_cmd(),

clique:register_usage(["vmq-admin", "session"], session_usage()),
clique:register_usage(["vmq-admin", "session", "show"], vmq_session_show_usage()),
clique:register_usage(["vmq-admin", "session", "disconnect"], vmq_session_disconnect_usage()),
clique:register_usage(
["vmq-admin", "session", "disconnect", "clients"], vmq_session_disconnect_clients_usage()
),
clique:register_usage(
["vmq-admin", "session", "disconnect", "batch"], vmq_session_disconnect_batch_usage()
),
clique:register_usage(["vmq-admin", "session", "reauthorize"], vmq_session_reauthorize_usage()),
clique:register_usage(["vmq-admin", "retain"], retain_usage()),
clique:register_usage(["vmq-admin", "retain", "show"], retain_show_usage()),
Expand Down Expand Up @@ -136,6 +144,134 @@ vmq_session_disconnect_cmd() ->
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

vmq_session_disconnect_clients_cmd() ->
Cmd = ["vmq-admin", "session", "disconnect", "clients"],
KeySpecs = [{'client-ids', [{typecast, fun(ClientId) -> ClientId end}]}],
FlagSpecs = [
{cleanup, [
{shortname, "c"},
{longname, "cleanup"}
]},
{mountpoint, [
{shortname, "m"},
{longname, "mountpoint"},
{typecast, fun(Mountpoint) -> Mountpoint end}
]}
],

Callback = fun
(_, [{'client-ids', ClientId}], Flags) ->
DoCleanup = lists:keymember(cleanup, 1, Flags),
ClientIdList = string:tokens(ClientId, ","),
OrConditions = lists:map(fun(Id) -> "client_id = \"" ++ Id ++ "\"" end, ClientIdList),
QueryString0 =
"SELECT queue_pid FROM sessions WHERE (" ++ string:join(OrConditions, " OR "),
case proplists:get_value(mountpoint, Flags, "") of
undefined ->
%% Unparsable mountpoint or without value
Text = clique_status:text("Invalid mountpoint value"),
[clique_status:alert([Text])];
Mountpoint ->
QueryString1 = QueryString0 ++ ") AND mountpoint=\"" ++ Mountpoint ++ "\"",
vmq_ql_query_mgr:fold_query(
fun(Row, _) ->
QueuePid = maps:get(queue_pid, Row),
vmq_queue:force_disconnect(QueuePid, ?ADMINISTRATIVE_ACTION, DoCleanup)
end,
ok,
QueryString1
),
[clique_status:text("Done")]
end;
(_, _, _) ->
Text = clique_status:text(vmq_session_disconnect_usage()),
[clique_status:alert([Text])]
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

vmq_session_disconnect_batch_cmd() ->
Cmd = ["vmq-admin", "session", "disconnect", "batch"],
KeySpecs = [{'count', [{typecast, fun(Limit) -> Limit end}]}],
FlagSpecs = [
{cleanup, [
{shortname, "c"},
{longname, "cleanup"}
]},
{mountpoint, [
{shortname, "m"},
{longname, "mountpoint"},
{typecast, fun(Mountpoint) -> Mountpoint end}
]},
{filterIDs, [
{shortname, "f"},
{longname, "filter-client-ids"},
{typecast, fun(ClientId) -> ClientId end}
]},
{node, [
{shortname, "n"},
{longname, "node"},
{typecast, fun(Node) -> Node end}
]}
],

Callback = fun
(_, [{'count', Limit}], Flags) ->
DoCleanup = lists:keymember(cleanup, 1, Flags),
ClientId = proplists:get_value(filterIDs, Flags, ""),
QueryWhereFilter =
case ClientId of
undefined ->
"";
"" ->
"";
_ ->
ClientIdList = string:tokens(ClientId, ","),
OrConditions = lists:map(
fun(Id) -> "client_id != \"" ++ Id ++ "\"" end, ClientIdList
),
"(" ++ string:join(OrConditions, " AND ") ++ ") AND "
end,
Node = proplists:get_value(node, Flags, ""),
QueryWhereNodeFilter =
case Node of
undefined ->
"";
"" ->
"";
_ ->
NodeIdList = string:tokens(Node, ","),
OrConditions2 = lists:map(
fun(Id) -> "node = \"" ++ Id ++ "\"" end, NodeIdList
),
"(" ++ string:join(OrConditions2, " AND ") ++ ") AND "
end,
QueryString0 =
"SELECT queue_pid FROM sessions WHERE " ++ QueryWhereFilter ++ QueryWhereNodeFilter,

case proplists:get_value(mountpoint, Flags, "") of
undefined ->
%% Unparsable mountpoint or without value
Text = clique_status:text("Invalid mountpoint value"),
[clique_status:alert([Text])];
Mountpoint ->
QueryString1 =
QueryString0 ++ "mountpoint=\"" ++ Mountpoint ++ "\" LIMIT " ++ Limit,
vmq_ql_query_mgr:fold_query(
fun(Row, _) ->
QueuePid = maps:get(queue_pid, Row),
vmq_queue:force_disconnect(QueuePid, ?ADMINISTRATIVE_ACTION, DoCleanup)
end,
ok,
QueryString1
),
[clique_status:text("Done")]
end;
(_, _, _) ->
Text = clique_status:text(vmq_session_disconnect_usage()),
[clique_status:alert([Text])]
end,
clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).

vmq_session_reauthorize_cmd() ->
Cmd = ["vmq-admin", "session", "reauthorize"],
KeySpecs = [
Expand Down Expand Up @@ -293,6 +429,37 @@ vmq_session_disconnect_usage() ->
" removes the stored cluster state of this client like stored\n",
" messages and subscriptions.",
"\n\n"
" Sub-commands:\n",
" clients Forcefully disconnect multiple running sessions\n",
" batch Forcefully disconnect a number of (random) session\n",
"\n\n"
].
vmq_session_disconnect_clients_usage() ->
[
"vmq-admin session disconnect clients client-ids=<Comma Seperated List of ClientId>\n\n",
" Forcefully disconnects a number of clients from the cluster. \n\n",
" --mountpoint=<Mountpoint>, -m\n",
" specifies the mountpoint, defaults to the default mountpoint\n",
" --cleanup, -c\n",
" removes the stored cluster state of this client like stored\n",
" messages and subscriptions.",
"\n\n"
].

vmq_session_disconnect_batch_usage() ->
[
"vmq-admin session disconnect batch count=Number of clients to be disconnected\n\n",
" Forcefully disconnects a number of (random) clients from the cluster. \n\n",
" --mountpoint=<Mountpoint>, -m\n",
" specifies the mountpoint, defaults to the default mountpoint\n",
" --cleanup, -c\n",
" removes the stored cluster state of this client like stored\n",
" messages and subscriptions.\n",
" --filter-client-ids=<list of client-ids>, -f\n"
" the clients will be filtered and not disconnected from the cluster\n",
" --node=<list of nodes>, -n\n"
" limits disconnects to certain nodes\n",
"\n\n"
].

vmq_session_reauthorize_usage() ->
Expand Down
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
- 'vmq_admin': Add commands allowing batch disconnects (vmq-admin session disconnect batch and vmq-admin session disconnect clients)
- 'vmq_http_pub': Allow anonymous access (allow_anonymous = on)

## VerneMQ 1.13.0

- New Plugin: 'vmq_http_pub', allows to ingest MQTT messages via a HTTP REST interface
Expand Down
Loading