diff --git a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl index 4023944515bd..b73c3667ad4b 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl @@ -22,7 +22,7 @@ -export([init/0]). -export([consumer_created/10, consumer_updated/9, - consumer_cancelled/5]). + consumer_cancelled/4]). -export([publisher_created/4, publisher_updated/7, publisher_deleted/3]). @@ -121,21 +121,17 @@ consumer_updated(Connection, ok. -consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notify) -> +consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser) -> ets:delete(?TABLE_CONSUMER, {StreamResource, Connection, SubscriptionId}), rabbit_global_counters:consumer_deleted(stream), rabbit_core_metrics:consumer_deleted(Connection, consumer_tag(SubscriptionId), StreamResource), - case Notify of - true -> - rabbit_event:notify(consumer_deleted, - [{consumer_tag, consumer_tag(SubscriptionId)}, - {channel, self()}, {queue, StreamResource}, - {user_who_performed_action, ActingUser}]); - _ -> ok - end, + rabbit_event:notify(consumer_deleted, + [{consumer_tag, consumer_tag(SubscriptionId)}, + {channel, self()}, {queue, StreamResource}, + {user_who_performed_action, ActingUser}]), ok. publisher_created(Connection, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index e5931ce041e3..f2f054bdd1e3 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2155,7 +2155,7 @@ handle_frame_post_auth(Transport, {Connection, State}; true -> {Connection1, State1} = - remove_subscription(SubscriptionId, Connection, State, true), + remove_subscription(SubscriptionId, Connection, State), response_ok(Transport, Connection, unsubscribe, CorrelationId), {Connection1, State1} end; @@ -3084,7 +3084,7 @@ evaluate_state_after_secret_update(Transport, _ -> {C1, S1} = lists:foldl(fun(SubId, {Conn, St}) -> - remove_subscription(SubId, Conn, St, false) + remove_subscription(SubId, Conn, St) end, {C0, S0}, Subs), {Acc#{Str => ok}, C1, S1} end @@ -3216,7 +3216,7 @@ notify_connection_closed(#statem_data{ rabbit_core_metrics:connection_closed(self()), [rabbit_stream_metrics:consumer_cancelled(self(), stream_r(S, Connection), - SubId, Username, false) + SubId, Username) || #consumer{configuration = #consumer_configuration{stream = S, subscription_id = SubId}} @@ -3298,8 +3298,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream, stream_r(Stream, C0), SubId, - Username, - false), + Username), maybe_unregister_consumer( VirtualHost, Consumer, single_active_consumer(Consumer), @@ -3310,8 +3309,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream, stream_r(Stream, C0), SubId, - Username, - false), + Username), maybe_unregister_consumer( VirtualHost, Consumer, single_active_consumer(Consumer), @@ -3428,8 +3426,7 @@ remove_subscription(SubscriptionId, virtual_host = VirtualHost, outstanding_requests = Requests0, stream_subscriptions = StreamSubscriptions} = Connection, - #stream_connection_state{consumers = Consumers} = State, - Notify) -> + #stream_connection_state{consumers = Consumers} = State) -> #{SubscriptionId := Consumer} = Consumers, #consumer{log = Log, configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} = @@ -3456,8 +3453,7 @@ remove_subscription(SubscriptionId, rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, Connection2), SubscriptionId, - Username, - Notify), + Username), Requests1 = maybe_unregister_consumer( VirtualHost, Consumer,