diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 3d7d6427c7e1..2e4cac1a2c59 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -313,7 +313,8 @@ consume(Q, Spec, #stream_client{} = QState0) consumer_tag := ConsumerTag, exclusive_consume := ExclusiveConsume, args := Args, - ok_msg := OkMsg} = Spec, + ok_msg := OkMsg, + acting_user := ActingUser} = Spec, QName = amqqueue:get_name(Q), rabbit_log:debug("~s:~s Local pid resolved ~0p", [?MODULE, ?FUNCTION_NAME, LocalPid]), @@ -330,6 +331,15 @@ consume(Q, Spec, #stream_client{} = QState0) rabbit_core_metrics:consumer_created( ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, ConsumerPrefetchCount, true, up, Args), + rabbit_event:notify(consumer_created, + [{consumer_tag, ConsumerTag}, + {exclusive, ExclusiveConsume}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, QName}, + {prefetch_count, ConsumerPrefetchCount}, + {arguments, Args}, + {user_who_performed_action, ActingUser}]), %% reply needs to be sent before the stream %% begins sending maybe_send_reply(ChPid, OkMsg), diff --git a/deps/rabbit/test/rabbit_list_test_event_handler.erl b/deps/rabbit/test/rabbit_list_test_event_handler.erl new file mode 100644 index 000000000000..559795c387d3 --- /dev/null +++ b/deps/rabbit/test/rabbit_list_test_event_handler.erl @@ -0,0 +1,59 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2025 Broadcom. All Rights Reserved. +%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_list_test_event_handler). + +-behaviour(gen_event). + +-export([start_link/0, stop/0, get_events/0, clear_events/0]). + +%% callbacks +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). + +start_link() -> + gen_event:start_link({local, ?MODULE}). + +stop() -> + gen_event:stop(?MODULE). + +get_events() -> + gen_event:call(?MODULE, ?MODULE, get_events). + +clear_events() -> + gen_event:call(?MODULE, ?MODULE, clear_events). + +%% Callbacks + +init([]) -> + {ok, []}. + +handle_event(Event, State) -> + {ok, [Event | State]}. + +handle_call(get_events, State) -> + {ok, lists:reverse(State), State}; +handle_call(clear_events, _) -> + {ok, ok, []}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 3815f5df6bac..79d8ab617eb4 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -144,6 +144,7 @@ all_tests_3() -> consume_credit_multiple_ack, basic_cancel, consumer_metrics_cleaned_on_connection_close, + consume_cancel_should_create_events, receive_basic_cancel_on_queue_deletion, keep_consuming_on_leader_restart, max_length_bytes, @@ -1195,7 +1196,7 @@ consumer_metrics_cleaned_on_connection_close(Config) -> Conn = rabbit_ct_client_helpers:open_connection(Config, Server), {ok, Ch} = amqp_connection:open_channel(Conn), qos(Ch, 10, false), - CTag = <<"consumer_metrics_cleaned_on_connection_close">>, + CTag = rabbit_data_coercion:to_binary(?FUNCTION_NAME), subscribe(Ch, Q, false, 0, CTag), rabbit_ct_helpers:await_condition( fun() -> @@ -1211,6 +1212,49 @@ consumer_metrics_cleaned_on_connection_close(Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +consume_cancel_should_create_events(Config) -> + HandlerMod = rabbit_list_test_event_handler, + rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, HandlerMod), + rabbit_ct_broker_helpers:rpc(Config, 0, + gen_event, + add_handler, + [rabbit_event, HandlerMod, []]), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + Conn = rabbit_ct_client_helpers:open_connection(Config, Server), + {ok, Ch} = amqp_connection:open_channel(Conn), + qos(Ch, 10, false), + + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + gen_event, + call, + [rabbit_event, HandlerMod, clear_events]), + + CTag = rabbit_data_coercion:to_binary(?FUNCTION_NAME), + + ?assertEqual([], filtered_events(Config, consumer_created, CTag)), + ?assertEqual([], filtered_events(Config, consumer_deleted, CTag)), + + subscribe(Ch, Q, false, 0, CTag), + + ?awaitMatch([{event, consumer_created, _, _, _}], filtered_events(Config, consumer_created, CTag), ?WAIT), + ?assertEqual([], filtered_events(Config, consumer_deleted, CTag)), + + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), + + ?awaitMatch([{event, consumer_deleted, _, _, _}], filtered_events(Config, consumer_deleted, CTag), ?WAIT), + + rabbit_ct_broker_helpers:rpc(Config, 0, + gen_event, + delete_handler, + [rabbit_event, HandlerMod, []]), + + ok = rabbit_ct_client_helpers:close_connection(Conn), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). receive_basic_cancel_on_queue_deletion(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1395,6 +1439,18 @@ filter_consumers(Config, Server, CTag) -> end end, [], CInfo). + +filtered_events(Config, EventType, CTag) -> + Events = rabbit_ct_broker_helpers:rpc(Config, 0, + gen_event, + call, + [rabbit_event, rabbit_list_test_event_handler, get_events]), + lists:filter(fun({event, Type, Fields, _, _}) when Type =:= EventType -> + proplists:get_value(consumer_tag, Fields) =:= CTag; + (_) -> + false + end, Events). + consume_and_reject(Config) -> consume_and_(Config, fun (DT) -> #'basic.reject'{delivery_tag = DT} end). consume_and_nack(Config) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl index 3c1d12c8c7b5..4023944515bd 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_metrics.erl @@ -20,9 +20,9 @@ %% API -export([init/0]). --export([consumer_created/9, +-export([consumer_created/10, consumer_updated/9, - consumer_cancelled/4]). + consumer_cancelled/5]). -export([publisher_created/4, publisher_updated/7, publisher_deleted/3]). @@ -42,7 +42,8 @@ consumer_created(Connection, Offset, OffsetLag, Active, - Properties) -> + Properties, + ActingUser) -> Values = [{credits, Credits}, {consumed, MessageCount}, @@ -55,16 +56,32 @@ consumer_created(Connection, ets:insert(?TABLE_CONSUMER, {{StreamResource, Connection, SubscriptionId}, Values}), rabbit_global_counters:consumer_created(stream), - rabbit_core_metrics:consumer_created(Connection, - consumer_tag(SubscriptionId), - false, - false, + CTag = consumer_tag(SubscriptionId), + ExclusiveConsume = false, + AckRequired = false, + Pid = Connection, + PrefetchCount = 0, + Args = rabbit_misc:to_amqp_table(Properties), + rabbit_core_metrics:consumer_created(Pid, + CTag, + ExclusiveConsume, + AckRequired, StreamResource, - 0, + PrefetchCount, Active, rabbit_stream_utils:consumer_activity_status(Active, Properties), - rabbit_misc:to_amqp_table(Properties)), + Args), + + rabbit_event:notify(consumer_created, + [{consumer_tag, CTag}, + {exclusive, ExclusiveConsume}, + {ack_required, AckRequired}, + {channel, Pid}, + {queue, StreamResource}, + {prefetch_count, PrefetchCount}, + {arguments, Args}, + {user_who_performed_action, ActingUser}]), ok. consumer_tag(SubscriptionId) -> @@ -104,7 +121,7 @@ consumer_updated(Connection, ok. -consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) -> +consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notify) -> ets:delete(?TABLE_CONSUMER, {StreamResource, Connection, SubscriptionId}), rabbit_global_counters:consumer_deleted(stream), @@ -115,7 +132,8 @@ consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) -> true -> rabbit_event:notify(consumer_deleted, [{consumer_tag, consumer_tag(SubscriptionId)}, - {channel, self()}, {queue, StreamResource}]); + {channel, self()}, {queue, StreamResource}, + {user_who_performed_action, ActingUser}]); _ -> ok end, ok. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 65855d98cbe1..02233757103c 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2924,9 +2924,8 @@ consumer_name(_Properties) -> maybe_dispatch_on_subscription(Transport, State, ConsumerState, - #stream_connection{deliver_version = - DeliverVersion} = - Connection, + #stream_connection{deliver_version = DeliverVersion, + user = #user{username = Username}} = Connection, Consumers, Stream, SubscriptionId, @@ -2970,13 +2969,14 @@ maybe_dispatch_on_subscription(Transport, ConsumerOffset, ConsumerOffsetLag, true, - SubscriptionProperties), + SubscriptionProperties, + Username), State#stream_connection_state{consumers = Consumers1} end; maybe_dispatch_on_subscription(_Transport, State, ConsumerState, - Connection, + #stream_connection{user = #user{username = Username}} = Connection, Consumers, Stream, SubscriptionId, @@ -3000,7 +3000,8 @@ maybe_dispatch_on_subscription(_Transport, Offset, 0, %% offset lag Active, - SubscriptionProperties), + SubscriptionProperties, + Username), Consumers1 = Consumers#{SubscriptionId => ConsumerState}, State#stream_connection_state{consumers = Consumers1}. @@ -3205,19 +3206,15 @@ partition_index(VirtualHost, Stream, Properties) -> -1 end. -notify_connection_closed(#statem_data{connection = - #stream_connection{name = Name, - publishers = - Publishers} = - Connection, - connection_state = - #stream_connection_state{consumers = - Consumers} = - ConnectionState}) -> +notify_connection_closed(#statem_data{ + connection = #stream_connection{name = Name, + user = #user{username = Username}, + publishers = Publishers} = Connection, + connection_state = #stream_connection_state{consumers = Consumers} = ConnectionState}) -> rabbit_core_metrics:connection_closed(self()), [rabbit_stream_metrics:consumer_cancelled(self(), stream_r(S, Connection), - SubId, false) + SubId, Username, false) || #consumer{configuration = #consumer_configuration{stream = S, subscription_id = SubId}} @@ -3275,24 +3272,15 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport end, {Connection, State}, Partitions). clean_state_after_stream_deletion_or_failure(MemberPid, Stream, - #stream_connection{virtual_host = - VirtualHost, - stream_subscriptions - = - StreamSubscriptions, - publishers = - Publishers, - publisher_to_ids - = - PublisherToIds, - stream_leaders = - Leaders, - outstanding_requests = Requests0} = - C0, - #stream_connection_state{consumers - = - Consumers} = - S0) -> + #stream_connection{ + user = #user{username = Username}, + virtual_host = VirtualHost, + stream_subscriptions = StreamSubscriptions, + publishers = Publishers, + publisher_to_ids = PublisherToIds, + stream_leaders = Leaders, + outstanding_requests = Requests0} = C0, + #stream_connection_state{consumers = Consumers} = S0) -> {SubscriptionsCleaned, C1, S1} = case stream_has_subscriptions(Stream, C0) of true -> @@ -3306,6 +3294,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream, stream_r(Stream, C0), SubId, + Username, false), maybe_unregister_consumer( VirtualHost, Consumer, @@ -3317,6 +3306,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream, stream_r(Stream, C0), SubId, + Username, false), maybe_unregister_consumer( VirtualHost, Consumer, @@ -3429,11 +3419,11 @@ lookup_leader_from_manager(VirtualHost, Stream) -> rabbit_stream_manager:lookup_leader(VirtualHost, Stream). remove_subscription(SubscriptionId, - #stream_connection{virtual_host = VirtualHost, - outstanding_requests = Requests0, - stream_subscriptions = - StreamSubscriptions} = - Connection, + #stream_connection{ + user = #user{username = Username}, + virtual_host = VirtualHost, + outstanding_requests = Requests0, + stream_subscriptions = StreamSubscriptions} = Connection, #stream_connection_state{consumers = Consumers} = State, Notify) -> #{SubscriptionId := Consumer} = Consumers, @@ -3462,6 +3452,7 @@ remove_subscription(SubscriptionId, rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, Connection2), SubscriptionId, + Username, Notify), Requests1 = maybe_unregister_consumer( diff --git a/deps/rabbitmq_stream/test/rabbit_list_test_event_handler.erl b/deps/rabbitmq_stream/test/rabbit_list_test_event_handler.erl new file mode 100644 index 000000000000..54877de232fd --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_list_test_event_handler.erl @@ -0,0 +1,54 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2025 Broadcom. All Rights Reserved. +%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_list_test_event_handler). + +-behaviour(gen_event). + +-export([start_link/0, stop/0, get_events/0]). + +%% callbacks +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). + +start_link() -> + gen_event:start_link({local, ?MODULE}). + +stop() -> + gen_event:stop(?MODULE). + +get_events() -> + gen_event:call(?MODULE, ?MODULE, get_events). + +%% Callbacks + +init([]) -> + {ok, []}. + +handle_event(Event, State) -> + {ok, [Event | State]}. + +handle_call(get_events, State) -> + {ok, lists:reverse(State), State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 91644f1364f6..9c2a0c1df1c0 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is Pivotal Software, Inc. -%% Copyright (c) 2020-2024 Broadcom. All Rights Reserved. +%% Copyright (c) 2020-2025 Broadcom. All Rights Reserved. %% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% @@ -66,7 +66,8 @@ groups() -> unauthorized_vhost_access_should_close_with_delay, sasl_anonymous, test_publisher_with_too_long_reference_errors, - test_consumer_with_too_long_reference_errors + test_consumer_with_too_long_reference_errors, + subscribe_unsubscribe_should_create_events ]}, %% Run `test_global_counters` on its own so the global metrics are %% initialised to 0 for each testcase @@ -489,7 +490,8 @@ test_gc_consumers(Config) -> 0, 0, true, - #{}]), + #{}, + <<"guest">>]), ?awaitMatch(0, consumer_count(Config), ?WAIT), ok. @@ -1011,6 +1013,57 @@ test_consumer_with_too_long_reference_errors(Config) -> test_close(T, S, C), ok. +subscribe_unsubscribe_should_create_events(Config) -> + HandlerMod = rabbit_list_test_event_handler, + rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, HandlerMod), + rabbit_ct_broker_helpers:rpc(Config, 0, + gen_event, + add_handler, + [rabbit_event, HandlerMod, []]), + Stream = atom_to_binary(?FUNCTION_NAME, utf8), + Transport = gen_tcp, + Port = get_stream_port(Config), + Opts = get_opts(Transport), + {ok, S} = Transport:connect("localhost", Port, Opts), + C0 = rabbit_stream_core:init(0), + C1 = test_peer_properties(Transport, S, C0), + C2 = test_authenticate(Transport, S, C1), + C3 = test_create_stream(Transport, S, Stream, C2), + + ?assertEqual([], filtered_events(Config, consumer_created)), + ?assertEqual([], filtered_events(Config, consumer_deleted)), + + SubscriptionId = 42, + C4 = test_subscribe(Transport, S, SubscriptionId, Stream, C3), + + ?awaitMatch([{event, consumer_created, _, _, _}], filtered_events(Config, consumer_created), ?WAIT), + ?assertEqual([], filtered_events(Config, consumer_deleted)), + + C5 = test_unsubscribe(Transport, S, SubscriptionId, C4), + + ?awaitMatch([{event, consumer_deleted, _, _, _}], filtered_events(Config, consumer_deleted), ?WAIT), + + rabbit_ct_broker_helpers:rpc(Config, 0, + gen_event, + delete_handler, + [rabbit_event, HandlerMod, []]), + + C6 = test_delete_stream(Transport, S, Stream, C5, false), + _C7 = test_close(Transport, S, C6), + closed = wait_for_socket_close(Transport, S, 10), + ok. + +filtered_events(Config, EventType) -> + Events = rabbit_ct_broker_helpers:rpc(Config, 0, + gen_event, + call, + [rabbit_event, rabbit_list_test_event_handler, get_events]), + lists:filter(fun({event, Type, _, _, _}) when Type =:= EventType -> + true; + (_) -> + false + end, Events). + consumer_offset_info(Config, ConnectionName) -> [[{offset, Offset}, {offset_lag, Lag}]] = rpc(Config, 0, ?MODULE, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_reader_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_reader_SUITE.erl index b4916a04de13..c32666706ca2 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_reader_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_reader_SUITE.erl @@ -9,7 +9,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is Pivotal Software, Inc. -%% Copyright (c) 2024 Broadcom. All Rights Reserved. +%% Copyright (c) 2024-2025 Broadcom. All Rights Reserved. %% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% @@ -143,7 +143,8 @@ evaluate_state_after_secret_update_test(_) -> {C1, S1} = Mod:evaluate_state_after_secret_update(ModTransport, #user{}, #stream_connection{publishers = Publishers, - stream_subscriptions = Subscriptions}, + stream_subscriptions = Subscriptions, + user = #user{}}, #stream_connection_state{consumers = Consumers}), meck:validate(ModLog), @@ -176,7 +177,7 @@ evaluate_state_after_secret_update_test(_) -> Now = os:system_time(second), meck:expect(rabbit_access_control, expiry_timestamp, fun (_) -> Now + 60 end), {C2, _} = Mod:evaluate_state_after_secret_update(ModTransport, #user{}, - #stream_connection{}, + #stream_connection{user = #user{}}, #stream_connection_state{}), #stream_connection{token_expiry_timer = TRef2} = C2, Cancel2 = erlang:cancel_timer(TRef2, [{async, false}, {info, true}]),