From d8fd7711e047ca87490c5100f77a6abec85a3f9b Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Fri, 8 Mar 2024 13:48:22 +0100 Subject: [PATCH] Prevent consulting Web MQTT cons pids for MQTT Discussion #9302 Previous commit tried to do the same as this one but changed an exported function so current commit provides and makes use of a new internal function listing only MQTT connections. --- deps/rabbit/src/rabbit_networking.erl | 10 +++++++++- deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl | 5 ++++- ....Ctl.Commands.ListMqttConnectionsCommand.erl | 5 ++--- deps/rabbitmq_mqtt/src/rabbit_mqtt.erl | 17 ++++++++++++----- deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl | 15 ++++++--------- ...l.Commands.ListWebMqttConnectionsCommand.erl | 2 +- 6 files changed, 34 insertions(+), 20 deletions(-) diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 9ae8561a378c..4563da64d1b4 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -34,7 +34,8 @@ force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1, handshake/2, tcp_host/1, ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1, - listener_of_protocol/1, stop_ranch_listener_of_protocol/1]). + listener_of_protocol/1, stop_ranch_listener_of_protocol/1, + list_local_connections_of_protocol/1]). %% Used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, @@ -252,6 +253,13 @@ stop_ranch_listener_of_protocol(Protocol) -> ranch:stop_listener(Ref) end. +-spec list_local_connections_of_protocol(atom()) -> [pid()]. +list_local_connections_of_protocol(Protocol) -> + case ranch_ref_of_protocol(Protocol) of + undefined -> []; + AcceptorRef -> ranch:procs(AcceptorRef, connections) + end. + -spec start_tcp_listener( listener_config(), integer()) -> 'ok' | {'error', term()}. diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl index a8bbeea8cf1f..0a629b7cf6de 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl @@ -11,8 +11,11 @@ -define(PERSISTENT_TERM_MAILBOX_SOFT_LIMIT, mqtt_mailbox_soft_limit). -define(PERSISTENT_TERM_EXCHANGE, mqtt_exchange). -define(DEFAULT_MQTT_EXCHANGE, <<"amq.topic">>). --define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/mqtt.html">>). +-define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/mqtt">>). +-define(WEB_MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/web-mqtt">>). +-define(MQTT_TCP_PROTOCOL, 'mqtt'). +-define(MQTT_TLS_PROTOCOL, 'mqtt/ssl'). -define(MQTT_PROTO_V3, mqtt310). -define(MQTT_PROTO_V4, mqtt311). -define(MQTT_PROTO_V5, mqtt50). diff --git a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl index e5c0e83959c1..6b3204051912 100644 --- a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl +++ b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl @@ -71,15 +71,14 @@ run(Args, #{node := NodeName, Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName), - AllItemsEnum = 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items( + 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items( NodeName, rabbit_mqtt, emit_connection_info_all, [Nodes, InfoKeys], Timeout, InfoKeys, - length(Nodes)), - 'Elixir.Enum':to_list(AllItemsEnum). + length(Nodes)). banner(_, _) -> <<"Listing MQTT connections ...">>. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 77dc0fb958ba..86cf09d149b4 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -62,7 +62,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> -spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok. emit_connection_info_local(Items, Ref, AggregatorPid) -> - LocalPids = local_connection_pids(), + LocalPids = list_local_mqtt_connections(), emit_connection_info(Items, Ref, AggregatorPid, LocalPids). emit_connection_info(Items, Ref, AggregatorPid, Pids) -> @@ -87,12 +87,19 @@ local_connection_pids() -> AllPids = rabbit_mqtt_collector:list_pids(), lists:filter(fun(Pid) -> node(Pid) =:= node() end, AllPids); false -> - case rabbit_networking:ranch_ref_of_protocol('mqtt') of - undefined -> []; - AcceptorRef -> ranch:procs(AcceptorRef, connections) - end + PgScope = persistent_term:get(?PG_SCOPE), + lists:flatmap(fun(Group) -> + pg:get_local_members(PgScope, Group) + end, pg:which_groups(PgScope)) end. +%% This function excludes Web MQTT connections +list_local_mqtt_connections() -> + PlainPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TCP_PROTOCOL), + TLSPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TLS_PROTOCOL), + PlainPids ++ TLSPids. + + init_global_counters() -> lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl index 79185a12348f..2bdacebb58e2 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl @@ -13,9 +13,6 @@ -export([start_link/2, init/1, stop_listeners/0]). --define(TCP_PROTOCOL, 'mqtt'). --define(TLS_PROTOCOL, 'mqtt/ssl'). - start_link(Listeners, []) -> supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]). @@ -66,8 +63,8 @@ init([{Listeners, SslListeners0}]) -> -spec stop_listeners() -> ok. stop_listeners() -> - _ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL), - _ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL), + _ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TCP_PROTOCOL), + _ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TLS_PROTOCOL), ok. %% @@ -86,7 +83,7 @@ tcp_listener_spec([Address, SocketOpts, NumAcceptors, ConcurrentConnsSups]) -> rabbit_mqtt_listener_sup, Address, SocketOpts, - transport(?TCP_PROTOCOL), + transport(?MQTT_TCP_PROTOCOL), rabbit_mqtt_reader, [], mqtt, @@ -101,7 +98,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu rabbit_mqtt_listener_sup, Address, SocketOpts ++ SslOpts, - transport(?TLS_PROTOCOL), + transport(?MQTT_TLS_PROTOCOL), rabbit_mqtt_reader, [], 'mqtt/ssl', @@ -111,7 +108,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu "MQTT TLS listener" ). -transport(?TCP_PROTOCOL) -> +transport(?MQTT_TCP_PROTOCOL) -> ranch_tcp; -transport(?TLS_PROTOCOL) -> +transport(?MQTT_TLS_PROTOCOL) -> ranch_ssl. diff --git a/deps/rabbitmq_web_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl b/deps/rabbitmq_web_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl index 7ecae78cde4e..5a89ab3ca36a 100644 --- a/deps/rabbitmq_web_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl +++ b/deps/rabbitmq_web_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl @@ -59,7 +59,7 @@ usage_additional() -> ]. usage_doc_guides() -> - [?MQTT_GUIDE_URL]. + [?WEB_MQTT_GUIDE_URL]. run(Args, #{node := NodeName, timeout := Timeout,