From 52c89ab7a3cadcac8689a8f54cdc3a0a61501c56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 28 May 2025 17:10:50 +0200 Subject: [PATCH] Always emit consumer_deleted event when stream consumer goes away Not only when it is removed explicitly by the client. This is necessary to make sure the consumer record is removed from the management ETS tables (consumer_stats) and to avoid ghost consumers. For other protocols like AMQP 091, the consumer_status ETS table is cleaned up when a channel goes down, but there is no channel concept in the stream protocol. This is not consistent with other protocols or queue implementations (which emits the event only on explicit consumer cancellation) but is necessary to clean up stats correctly. References #13092 --- .../src/rabbit_stream_metrics.erl | 16 ++++++---------- .../src/rabbit_stream_reader.erl | 18 +++++++----------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl index 4023944515bd..b73c3667ad4b 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl @@ -22,7 +22,7 @@ -export([init/0]). -export([consumer_created/10, consumer_updated/9, - consumer_cancelled/5]). + consumer_cancelled/4]). -export([publisher_created/4, publisher_updated/7, publisher_deleted/3]). @@ -121,21 +121,17 @@ consumer_updated(Connection, ok. -consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notify) -> +consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser) -> ets:delete(?TABLE_CONSUMER, {StreamResource, Connection, SubscriptionId}), rabbit_global_counters:consumer_deleted(stream), rabbit_core_metrics:consumer_deleted(Connection, consumer_tag(SubscriptionId), StreamResource), - case Notify of - true -> - rabbit_event:notify(consumer_deleted, - [{consumer_tag, consumer_tag(SubscriptionId)}, - {channel, self()}, {queue, StreamResource}, - {user_who_performed_action, ActingUser}]); - _ -> ok - end, + rabbit_event:notify(consumer_deleted, + [{consumer_tag, consumer_tag(SubscriptionId)}, + {channel, self()}, {queue, StreamResource}, + {user_who_performed_action, ActingUser}]), ok. publisher_created(Connection, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index e5931ce041e3..f2f054bdd1e3 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2155,7 +2155,7 @@ handle_frame_post_auth(Transport, {Connection, State}; true -> {Connection1, State1} = - remove_subscription(SubscriptionId, Connection, State, true), + remove_subscription(SubscriptionId, Connection, State), response_ok(Transport, Connection, unsubscribe, CorrelationId), {Connection1, State1} end; @@ -3084,7 +3084,7 @@ evaluate_state_after_secret_update(Transport, _ -> {C1, S1} = lists:foldl(fun(SubId, {Conn, St}) -> - remove_subscription(SubId, Conn, St, false) + remove_subscription(SubId, Conn, St) end, {C0, S0}, Subs), {Acc#{Str => ok}, C1, S1} end @@ -3216,7 +3216,7 @@ notify_connection_closed(#statem_data{ rabbit_core_metrics:connection_closed(self()), [rabbit_stream_metrics:consumer_cancelled(self(), stream_r(S, Connection), - SubId, Username, false) + SubId, Username) || #consumer{configuration = #consumer_configuration{stream = S, subscription_id = SubId}} @@ -3298,8 +3298,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream, stream_r(Stream, C0), SubId, - Username, - false), + Username), maybe_unregister_consumer( VirtualHost, Consumer, single_active_consumer(Consumer), @@ -3310,8 +3309,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream, stream_r(Stream, C0), SubId, - Username, - false), + Username), maybe_unregister_consumer( VirtualHost, Consumer, single_active_consumer(Consumer), @@ -3428,8 +3426,7 @@ remove_subscription(SubscriptionId, virtual_host = VirtualHost, outstanding_requests = Requests0, stream_subscriptions = StreamSubscriptions} = Connection, - #stream_connection_state{consumers = Consumers} = State, - Notify) -> + #stream_connection_state{consumers = Consumers} = State) -> #{SubscriptionId := Consumer} = Consumers, #consumer{log = Log, configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} = @@ -3456,8 +3453,7 @@ remove_subscription(SubscriptionId, rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, Connection2), SubscriptionId, - Username, - Notify), + Username), Requests1 = maybe_unregister_consumer( VirtualHost, Consumer,