Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions deps/amqp_client/src/amqp_gen_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
server_properties,
%% connection.block, connection.unblock handler
block_handler,
blocked_by = sets:new([{version, 2}]),
closing = false %% #closing{} | false
}).

Expand Down Expand Up @@ -199,9 +200,36 @@ handle_cast({server_misbehaved, AmqpError}, State) ->
server_misbehaved_close(AmqpError, State);
handle_cast({server_close, #'connection.close'{} = Close}, State) ->
server_initiated_close(Close, State);
handle_cast({register_blocked_handler, HandlerPid}, State) ->
handle_cast({register_blocked_handler, HandlerPid},
#state{blocked_by = BlockedBy} = State) ->
Ref = erlang:monitor(process, HandlerPid),
{noreply, State#state{block_handler = {HandlerPid, Ref}}}.
State1 = State#state{block_handler = {HandlerPid, Ref}},
%% If an alarm is already active, immediately block the handler.
_ = case sets:is_empty(BlockedBy) of
false ->
HandlerPid ! #'connection.blocked'{};
true ->
ok
end,
{noreply, State1};
handle_cast({conserve_resources, Source, Conserve},
#state{blocked_by = BlockedBy} = State) ->
WasNotBlocked = sets:is_empty(BlockedBy),
BlockedBy1 = case Conserve of
true ->
sets:add_element(Source, BlockedBy);
false ->
sets:del_element(Source, BlockedBy)
end,
State1 = State#state{blocked_by = BlockedBy1},
case sets:is_empty(BlockedBy1) of
true ->
handle_method(#'connection.unblocked'{}, State1);
false when WasNotBlocked ->
handle_method(#'connection.blocked'{}, State1);
false ->
{noreply, State1}
end.

%% @private
handle_info({'DOWN', _, process, BlockHandler, Reason},
Expand Down
11 changes: 10 additions & 1 deletion deps/rabbit/src/rabbit_direct.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
-deprecated([{force_event_refresh, 1, eventually}]).

%% Internal
-export([list_local/0]).
-export([list_local/0,
conserve_resources/3]).

%% For testing only
-export([extract_extra_auth_props/4]).
Expand Down Expand Up @@ -206,6 +207,8 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
ok -> ok = pg_local:join(rabbit_direct, Pid),
rabbit_core_metrics:connection_created(Pid, Infos),
rabbit_event:notify(connection_created, Infos),
_ = rabbit_alarm:register(
Pid, {?MODULE, conserve_resources, []}),
{ok, {User, rabbit_reader:server_properties(Protocol)}}
catch
exit:#amqp_error{name = Reason = not_allowed} ->
Expand Down Expand Up @@ -252,3 +255,9 @@ disconnect(Pid, Infos) ->
pg_local:leave(rabbit_direct, Pid),
rabbit_core_metrics:connection_closed(Pid),
rabbit_event:notify(connection_closed, Infos).

-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> 'ok'.
conserve_resources(ChannelPid, Source, {_, Conserve, _}) ->
gen_server:cast(ChannelPid, {conserve_resources, Source, Conserve}).
283 changes: 283 additions & 0 deletions deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(amqp091_alarm_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([export_all, nowarn_export_all]).

all() ->
[
{group, network_connection},
{group, direct_connection}
].

groups() ->
[
{network_connection, [], [
dest_resource_alarm_on_confirm,
dest_resource_alarm_on_publish,
dest_resource_alarm_no_ack
]},
{direct_connection, [], [
dest_resource_alarm_on_confirm
]}
].

all_tests() ->
[
dest_resource_alarm_on_confirm,
dest_resource_alarm_on_publish,
dest_resource_alarm_no_ack
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},
{rmq_nodes_count, 2},
{rmq_nodes_clustered, false}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(network_connection, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
{shovel_source_idx, 1},
{shovel_dest_uri, shovel_test_utils:make_uri(Config, 0)},
{shovel_dest_idx, 0}
]);
init_per_group(direct_connection, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
{shovel_source_idx, 1},
{shovel_dest_uri, <<"amqp://">>},
{shovel_dest_idx, 0}
]).

end_per_group(_, Config) ->
Config.

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

dest_resource_alarm_on_confirm(Config) ->
dest_resource_alarm(<<"on-confirm">>, Config).

dest_resource_alarm_on_publish(Config) ->
dest_resource_alarm(<<"on-publish">>, Config).

dest_resource_alarm_no_ack(Config) ->
dest_resource_alarm(<<"no-ack">>, Config).

dest_resource_alarm(AckMode, Config) ->
SourceUri = ?config(shovel_source_uri, Config),
SourceIdx = ?config(shovel_source_idx, Config),
DestUri = ?config(shovel_dest_uri, Config),
DestIdx = ?config(shovel_dest_idx, Config),

{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(
Config, SourceIdx),
amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"src">>}),
publish(Ch1, <<>>, <<"src">>, <<"hello">>),
true = amqp_channel:wait_for_confirms(Ch1),
#{messages := 1} = message_count(Config, SourceIdx, <<"src">>),
while_blocked(Config, DestIdx,
fun() ->
ok = rabbit_ct_broker_helpers:rpc(
Config, DestIdx,
rabbit_runtime_parameters, set,
[
<<"/">>, <<"shovel">>, <<"test">>,
[{<<"src-uri">>, SourceUri},
{<<"dest-uri">>, [DestUri]},
{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"src-prefetch-count">>, 50},
{<<"ack-mode">>, AckMode},
{<<"src-delete-after">>, <<"never">>}], none]),
%% The destination is blocked, so the shovel is blocked.
?awaitMatch(
blocked,
shovel_test_utils:get_shovel_status(Config, DestIdx,
<<"test">>),
3_000),

%% The shoveled message triggered a
%% connection.blocked notification, but hasn't
%% reached the dest queue because of the resource
%% alarm
InitialMsgCnt =
case AckMode of
<<"on-confirm">> -> 1;
_ -> 0
end,
#{messages := InitialMsgCnt,
messages_unacknowledged := InitialMsgCnt
} = message_count(Config, SourceIdx, <<"src">>),
#{messages := 0} = message_count(Config, DestIdx, <<"dest">>),

%% Now publish more messages to "src" queue.
publish_count(Ch1, <<>>, <<"src">>, <<"hello">>, 1000),
true = amqp_channel:wait_for_confirms(Ch1),

%% No messages reached the dest queue
#{messages := 0} = message_count(Config, DestIdx, <<"dest">>),

%% When the shovel sets a prefetch_count
%% (on-confirm/on-publish mode), all messages are in
%% the source queue, prefrech count are
%% unacknowledged and buffered in the shovel
MsgCnts =
case AckMode of
<<"on-confirm">> ->
#{messages => 1001,
messages_unacknowledged => 50};
<<"on-publish">> ->
#{messages => 1000,
messages_unacknowledged => 50};
<<"no-ack">> ->
%% no prefetch limit, all messages are
%% buffered in the shovel
#{messages => 0,
messages_unacknowledged => 0}
end,

MsgCnts = message_count(Config, SourceIdx, <<"src">>),

%% There should be no process with a message buildup
?awaitMatch(
0,
begin
Top = [{_, P, _}] = rabbit_ct_broker_helpers:rpc(
Config, 0, recon, proc_count, [message_queue_len, 1]),
ct:pal("Top process by message queue length: ~p", [Top]),
P
end, 5_000),

ok
end),

%% After the alarm clears, all messages should arrive in the dest queue.
?awaitMatch(
#{messages := 1001},
message_count(Config, DestIdx, <<"dest">>),
5_000),
#{messages := 0} = message_count(Config, SourceIdx, <<"src">>),
running = shovel_test_utils:get_shovel_status(Config, DestIdx, <<"test">>),

rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Ch1),
cleanup(Config),
ok.

%%----------------------------------------------------------------------------

conserve_resources(Pid, Source, {_, Conserve, AlarmedNode}) ->
case Conserve of
true ->
ct:log("node ~w set alarm for resource ~ts",
[AlarmedNode, Source]),
Pid ! {block, Source};
false ->
ct:log("node ~w cleared alarm for resource ~ts",
[AlarmedNode, Source]),
Pid ! {unblock, Source}
end,
ok.

while_blocked(Config, Node, Fun) when is_function(Fun, 0) ->
OrigLimit = rabbit_ct_broker_helpers:rpc(Config, Node,
vm_memory_monitor,
get_vm_memory_high_watermark, []),

ok = rabbit_ct_broker_helpers:add_code_path_to_node(
rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename),
?MODULE),
[] = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_alarm, register,
[self(),
{?MODULE, conserve_resources, []}]),
ok = rabbit_ct_broker_helpers:rpc(Config, Node, vm_memory_monitor,
set_vm_memory_high_watermark, [0]),
Source = receive
{block, S} ->
S
after
15_000 ->
ct:fail(alarm_set_timeout)
end,
try
Fun()
after
ok = rabbit_ct_broker_helpers:rpc(Config, Node, vm_memory_monitor,
set_vm_memory_high_watermark,
[OrigLimit]),
receive
{unblock, Source} ->
ok
after
10_000 ->
ct:fail(alarm_clear_timeout)
end
end.

publish(Ch, X, Key, Payload) when is_binary(Payload) ->
publish(Ch, X, Key, #amqp_msg{payload = Payload});

publish(Ch, X, Key, Msg = #amqp_msg{}) ->
amqp_channel:cast(Ch, #'basic.publish'{exchange = X,
routing_key = Key}, Msg).

publish_count(Ch, X, Key, M, Count) ->
[begin

publish(Ch, X, Key, M)
end || _ <- lists:seq(1, Count)].

message_count(Config, Node, QueueName) ->
Resource = rabbit_misc:r(<<"/">>, queue, QueueName),
{ok, Q} = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue,
lookup, [Resource]),
maps:from_list(
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, info,
[Q, [messages, messages_unacknowledged]])).

cleanup(Config) ->
rabbit_ct_broker_helpers:rpc_all(Config, ?MODULE, cleanup1, []).

cleanup1() ->
[rabbit_runtime_parameters:clear(rabbit_misc:pget(vhost, P),
rabbit_misc:pget(component, P),
rabbit_misc:pget(name, P),
<<"acting-user">>) ||
P <- rabbit_runtime_parameters:list()],
[rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>)
|| Q <- rabbit_amqqueue:list()].
Loading
Loading