diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl index 5681845779da..886a06d45f05 100644 --- a/deps/amqp_client/src/amqp_gen_connection.erl +++ b/deps/amqp_client/src/amqp_gen_connection.erl @@ -31,6 +31,7 @@ server_properties, %% connection.block, connection.unblock handler block_handler, + blocked_by = sets:new([{version, 2}]), closing = false %% #closing{} | false }). @@ -199,9 +200,36 @@ handle_cast({server_misbehaved, AmqpError}, State) -> server_misbehaved_close(AmqpError, State); handle_cast({server_close, #'connection.close'{} = Close}, State) -> server_initiated_close(Close, State); -handle_cast({register_blocked_handler, HandlerPid}, State) -> +handle_cast({register_blocked_handler, HandlerPid}, + #state{blocked_by = BlockedBy} = State) -> Ref = erlang:monitor(process, HandlerPid), - {noreply, State#state{block_handler = {HandlerPid, Ref}}}. + State1 = State#state{block_handler = {HandlerPid, Ref}}, + %% If an alarm is already active, immediately block the handler. + _ = case sets:is_empty(BlockedBy) of + false -> + HandlerPid ! #'connection.blocked'{}; + true -> + ok + end, + {noreply, State1}; +handle_cast({conserve_resources, Source, Conserve}, + #state{blocked_by = BlockedBy} = State) -> + WasNotBlocked = sets:is_empty(BlockedBy), + BlockedBy1 = case Conserve of + true -> + sets:add_element(Source, BlockedBy); + false -> + sets:del_element(Source, BlockedBy) + end, + State1 = State#state{blocked_by = BlockedBy1}, + case sets:is_empty(BlockedBy1) of + true -> + handle_method(#'connection.unblocked'{}, State1); + false when WasNotBlocked -> + handle_method(#'connection.blocked'{}, State1); + false -> + {noreply, State1} + end. %% @private handle_info({'DOWN', _, process, BlockHandler, Reason}, diff --git a/deps/rabbit/src/rabbit_direct.erl b/deps/rabbit/src/rabbit_direct.erl index 9f9a601bb25a..62d34c49776a 100644 --- a/deps/rabbit/src/rabbit_direct.erl +++ b/deps/rabbit/src/rabbit_direct.erl @@ -13,7 +13,8 @@ -deprecated([{force_event_refresh, 1, eventually}]). %% Internal --export([list_local/0]). +-export([list_local/0, + conserve_resources/3]). %% For testing only -export([extract_extra_auth_props/4]). @@ -206,6 +207,8 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) -> ok -> ok = pg_local:join(rabbit_direct, Pid), rabbit_core_metrics:connection_created(Pid, Infos), rabbit_event:notify(connection_created, Infos), + _ = rabbit_alarm:register( + Pid, {?MODULE, conserve_resources, []}), {ok, {User, rabbit_reader:server_properties(Protocol)}} catch exit:#amqp_error{name = Reason = not_allowed} -> @@ -252,3 +255,9 @@ disconnect(Pid, Infos) -> pg_local:leave(rabbit_direct, Pid), rabbit_core_metrics:connection_closed(Pid), rabbit_event:notify(connection_closed, Infos). + +-spec conserve_resources(pid(), + rabbit_alarm:resource_alarm_source(), + rabbit_alarm:resource_alert()) -> 'ok'. +conserve_resources(ChannelPid, Source, {_, Conserve, _}) -> + gen_server:cast(ChannelPid, {conserve_resources, Source, Conserve}). diff --git a/deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl new file mode 100644 index 000000000000..8ca9ae238ae9 --- /dev/null +++ b/deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl @@ -0,0 +1,283 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(amqp091_alarm_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + +-compile([export_all, nowarn_export_all]). + +all() -> + [ + {group, network_connection}, + {group, direct_connection} + ]. + +groups() -> + [ + {network_connection, [], [ + dest_resource_alarm_on_confirm, + dest_resource_alarm_on_publish, + dest_resource_alarm_no_ack + ]}, + {direct_connection, [], [ + dest_resource_alarm_on_confirm + ]} + ]. + +all_tests() -> + [ + dest_resource_alarm_on_confirm, + dest_resource_alarm_on_publish, + dest_resource_alarm_no_ack + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE}, + {rmq_nodes_count, 2}, + {rmq_nodes_clustered, false} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(network_connection, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)}, + {shovel_source_idx, 1}, + {shovel_dest_uri, shovel_test_utils:make_uri(Config, 0)}, + {shovel_dest_idx, 0} + ]); +init_per_group(direct_connection, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)}, + {shovel_source_idx, 1}, + {shovel_dest_uri, <<"amqp://">>}, + {shovel_dest_idx, 0} + ]). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +dest_resource_alarm_on_confirm(Config) -> + dest_resource_alarm(<<"on-confirm">>, Config). + +dest_resource_alarm_on_publish(Config) -> + dest_resource_alarm(<<"on-publish">>, Config). + +dest_resource_alarm_no_ack(Config) -> + dest_resource_alarm(<<"no-ack">>, Config). + +dest_resource_alarm(AckMode, Config) -> + SourceUri = ?config(shovel_source_uri, Config), + SourceIdx = ?config(shovel_source_idx, Config), + DestUri = ?config(shovel_dest_uri, Config), + DestIdx = ?config(shovel_dest_idx, Config), + + {Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, SourceIdx), + amqp_channel:call(Ch1, #'confirm.select'{}), + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"src">>}), + publish(Ch1, <<>>, <<"src">>, <<"hello">>), + true = amqp_channel:wait_for_confirms(Ch1), + #{messages := 1} = message_count(Config, SourceIdx, <<"src">>), + while_blocked(Config, DestIdx, + fun() -> + ok = rabbit_ct_broker_helpers:rpc( + Config, DestIdx, + rabbit_runtime_parameters, set, + [ + <<"/">>, <<"shovel">>, <<"test">>, + [{<<"src-uri">>, SourceUri}, + {<<"dest-uri">>, [DestUri]}, + {<<"src-queue">>, <<"src">>}, + {<<"dest-queue">>, <<"dest">>}, + {<<"src-prefetch-count">>, 50}, + {<<"ack-mode">>, AckMode}, + {<<"src-delete-after">>, <<"never">>}], none]), + %% The destination is blocked, so the shovel is blocked. + ?awaitMatch( + blocked, + shovel_test_utils:get_shovel_status(Config, DestIdx, + <<"test">>), + 3_000), + + %% The shoveled message triggered a + %% connection.blocked notification, but hasn't + %% reached the dest queue because of the resource + %% alarm + InitialMsgCnt = + case AckMode of + <<"on-confirm">> -> 1; + _ -> 0 + end, + #{messages := InitialMsgCnt, + messages_unacknowledged := InitialMsgCnt + } = message_count(Config, SourceIdx, <<"src">>), + #{messages := 0} = message_count(Config, DestIdx, <<"dest">>), + + %% Now publish more messages to "src" queue. + publish_count(Ch1, <<>>, <<"src">>, <<"hello">>, 1000), + true = amqp_channel:wait_for_confirms(Ch1), + + %% No messages reached the dest queue + #{messages := 0} = message_count(Config, DestIdx, <<"dest">>), + + %% When the shovel sets a prefetch_count + %% (on-confirm/on-publish mode), all messages are in + %% the source queue, prefrech count are + %% unacknowledged and buffered in the shovel + MsgCnts = + case AckMode of + <<"on-confirm">> -> + #{messages => 1001, + messages_unacknowledged => 50}; + <<"on-publish">> -> + #{messages => 1000, + messages_unacknowledged => 50}; + <<"no-ack">> -> + %% no prefetch limit, all messages are + %% buffered in the shovel + #{messages => 0, + messages_unacknowledged => 0} + end, + + MsgCnts = message_count(Config, SourceIdx, <<"src">>), + + %% There should be no process with a message buildup + ?awaitMatch( + 0, + begin + Top = [{_, P, _}] = rabbit_ct_broker_helpers:rpc( + Config, 0, recon, proc_count, [message_queue_len, 1]), + ct:pal("Top process by message queue length: ~p", [Top]), + P + end, 5_000), + + ok + end), + + %% After the alarm clears, all messages should arrive in the dest queue. + ?awaitMatch( + #{messages := 1001}, + message_count(Config, DestIdx, <<"dest">>), + 5_000), + #{messages := 0} = message_count(Config, SourceIdx, <<"src">>), + running = shovel_test_utils:get_shovel_status(Config, DestIdx, <<"test">>), + + rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Ch1), + cleanup(Config), + ok. + +%%---------------------------------------------------------------------------- + +conserve_resources(Pid, Source, {_, Conserve, AlarmedNode}) -> + case Conserve of + true -> + ct:log("node ~w set alarm for resource ~ts", + [AlarmedNode, Source]), + Pid ! {block, Source}; + false -> + ct:log("node ~w cleared alarm for resource ~ts", + [AlarmedNode, Source]), + Pid ! {unblock, Source} + end, + ok. + +while_blocked(Config, Node, Fun) when is_function(Fun, 0) -> + OrigLimit = rabbit_ct_broker_helpers:rpc(Config, Node, + vm_memory_monitor, + get_vm_memory_high_watermark, []), + + ok = rabbit_ct_broker_helpers:add_code_path_to_node( + rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename), + ?MODULE), + [] = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_alarm, register, + [self(), + {?MODULE, conserve_resources, []}]), + ok = rabbit_ct_broker_helpers:rpc(Config, Node, vm_memory_monitor, + set_vm_memory_high_watermark, [0]), + Source = receive + {block, S} -> + S + after + 15_000 -> + ct:fail(alarm_set_timeout) + end, + try + Fun() + after + ok = rabbit_ct_broker_helpers:rpc(Config, Node, vm_memory_monitor, + set_vm_memory_high_watermark, + [OrigLimit]), + receive + {unblock, Source} -> + ok + after + 10_000 -> + ct:fail(alarm_clear_timeout) + end + end. + +publish(Ch, X, Key, Payload) when is_binary(Payload) -> + publish(Ch, X, Key, #amqp_msg{payload = Payload}); + +publish(Ch, X, Key, Msg = #amqp_msg{}) -> + amqp_channel:cast(Ch, #'basic.publish'{exchange = X, + routing_key = Key}, Msg). + +publish_count(Ch, X, Key, M, Count) -> + [begin + + publish(Ch, X, Key, M) + end || _ <- lists:seq(1, Count)]. + +message_count(Config, Node, QueueName) -> + Resource = rabbit_misc:r(<<"/">>, queue, QueueName), + {ok, Q} = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, + lookup, [Resource]), + maps:from_list( + rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, info, + [Q, [messages, messages_unacknowledged]])). + +cleanup(Config) -> + rabbit_ct_broker_helpers:rpc_all(Config, ?MODULE, cleanup1, []). + +cleanup1() -> + [rabbit_runtime_parameters:clear(rabbit_misc:pget(vhost, P), + rabbit_misc:pget(component, P), + rabbit_misc:pget(name, P), + <<"acting-user">>) || + P <- rabbit_runtime_parameters:list()], + [rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>) + || Q <- rabbit_amqqueue:list()]. diff --git a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl index b2ada51afd02..2f169fdf7bb8 100644 --- a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl @@ -42,9 +42,6 @@ groups() -> security_validation, get_connection_name, credit_flow, - dest_resource_alarm_on_confirm, - dest_resource_alarm_on_publish, - dest_resource_alarm_no_ack, missing_src_queue_with_src_predeclared, missing_dest_queue_with_dest_predeclared ]}, @@ -744,130 +741,6 @@ credit_flow(Config) -> end end). -dest_resource_alarm_on_confirm(Config) -> - dest_resource_alarm(<<"on-confirm">>, Config). - -dest_resource_alarm_on_publish(Config) -> - dest_resource_alarm(<<"on-publish">>, Config). - -dest_resource_alarm_no_ack(Config) -> - dest_resource_alarm(<<"no-ack">>, Config). - -dest_resource_alarm(AckMode, Config) -> - with_ch(Config, - fun (Ch) -> - amqp_channel:call(Ch, #'confirm.select'{}), - amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}), - publish(Ch, <<>>, <<"src">>, <<"hello">>), - amqp_channel:call(Ch, #'queue.declare'{queue = <<"temp">>}), - publish_count(Ch, <<>>, <<"temp">>, <<"hello">>, 1000), - true = amqp_channel:wait_for_confirms(Ch), - - #{messages := 1} = message_count(Config, <<"src">>), - %%#{messages := 0} = message_count(Config, <<"dest">>), - - %% A resource alarm will block publishing connections - OrigLimit = set_vm_memory_high_watermark(Config, 0.00000001), - %% Let connection block. - timer:sleep(100), - - try - shovel_test_utils:set_param( - Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, - {<<"dest-queue">>, <<"dest">>}, - {<<"src-prefetch-count">>, 50}, - {<<"ack-mode">>, AckMode}, - {<<"src-delete-after">>, <<"never">>}]), - - %% The shovel is blocked - blocked = shovel_test_utils:get_shovel_status(Config, <<"test">>), - - %% The shoveled message triggered a - %% connection.blocked notification, but hasn't - %% reached the dest queue because of the resource - %% alarm - InitialMsgCnt = - case AckMode of - <<"on-confirm">> -> 1; - _ -> 0 - end, - - #{messages := InitialMsgCnt, - messages_unacknowledged := InitialMsgCnt} = message_count(Config, <<"src">>), - #{messages := 0} = message_count(Config, <<"dest">>), - - %% Now publish messages to "src" queue - %% (network connections are blocked from publishing - %% so we use a temporary shovel with direct - %% connections to populate "src" queue with messages - %% from the "temp" queue) - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, set, - [ - <<"/">>, <<"shovel">>, <<"temp">>, - [{<<"src-uri">>, <<"amqp://">>}, - {<<"dest-uri">>, [<<"amqp://">>]}, - {<<"src-queue">>, <<"temp">>}, - {<<"dest-queue">>, <<"src">>}, - {<<"src-delete-after">>, <<"queue-length">>}], none]), - shovel_test_utils:await( - fun() -> - #{messages := Cnt} = message_count(Config, <<"temp">>), - Cnt =:= 0 - end, - 5000), - - %% No messages reached the dest queue - #{messages := 0} = message_count(Config, <<"dest">>), - - %% When the shovel sets a prefetch_count - %% (on-confirm/on-publish mode), all messages are in - %% the source queue, prefrech count are - %% unacknowledged and buffered in the shovel - MsgCnts = - case AckMode of - <<"on-confirm">> -> - #{messages => 1001, - messages_unacknowledged => 50}; - <<"on-publish">> -> - #{messages => 1000, - messages_unacknowledged => 50}; - <<"no-ack">> -> - %% no prefetch limit, all messages are - %% buffered in the shovel - #{messages => 0, - messages_unacknowledged => 0} - end, - - MsgCnts = message_count(Config, <<"src">>), - - %% There should be no process with a message buildup - eventually(?_assertEqual(0, begin - Top = [{_, P, _}] = rabbit_ct_broker_helpers:rpc( - Config, 0, recon, proc_count, [message_queue_len, 1]), - ct:pal("Top process by message queue length: ~p", [Top]), - P - end)), - - %% Clear the resource alarm, all messages should - %% arrive to the dest queue - set_vm_memory_high_watermark(Config, OrigLimit), - - catch shovel_test_utils:await( - fun() -> - #{messages := Cnt} = message_count(Config, <<"dest">>), - Cnt =:= 1001 - end, - 5000), - #{messages := 0} = message_count(Config, <<"src">>), - running = shovel_test_utils:get_shovel_status(Config, <<"test">>) - after - set_vm_memory_high_watermark(Config, OrigLimit) - end - end). - %%---------------------------------------------------------------------------- with_ch(Config, Fun) -> @@ -999,15 +872,6 @@ set_default_credit(Config, Value) -> ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [Key, Value]), OrigValue. -set_vm_memory_high_watermark(Config, Limit) -> - OrigLimit = - rabbit_ct_broker_helpers:rpc( - Config, 0, vm_memory_monitor, get_vm_memory_high_watermark, []), - ok = - rabbit_ct_broker_helpers:rpc( - Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [Limit]), - OrigLimit. - message_count(Config, QueueName) -> Resource = rabbit_misc:r(<<"/">>, queue, QueueName), {ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [Resource]), @@ -1073,9 +937,6 @@ process_name(Pid) -> undefined end. -proc_info(Pid) -> - erpc:call(node(Pid), erlang, process_info, [Pid]). - proc_info(Pid, Item) -> {Item, Value} = erpc:call(node(Pid), erlang, process_info, [Pid, Item]), Value. diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index e0e26b570725..e0448db5b4b1 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -131,8 +131,12 @@ get_shovel_status(Config, Node, Name) -> not_found -> not_found; _ -> - {Status, Info} = proplists:get_value(info, S), - proplists:get_value(blocked_status, Info, Status) + case proplists:get_value(info, S) of + starting -> + starting; + {Status, Info} -> + proplists:get_value(blocked_status, Info, Status) + end end. await(Pred) ->