Skip to content

Conversation

the-mikedavis
Copy link
Collaborator

@the-mikedavis the-mikedavis commented Oct 1, 2025

Fixes #14657

Previously direct 0-9-1 connections did not notice when memory or disk alarms were set. This could allow an 0-9-1 shovel where the destination is a direct connection to completely overload a broker which is already in alarm. With this change, direct connections register the connection process with rabbit_alarm and emit connection.blocked and
connection.unblocked to the blocked handler if one is registered. rabbit_amqp091_shovel already respects the connection.blocked, so the destination will not receive any messages.

@the-mikedavis the-mikedavis self-assigned this Oct 1, 2025
@ansd
Copy link
Member

ansd commented Oct 2, 2025

Just in case it helps, the following shows how to easily trigger memory and/or disk alarms in a test case:

%% Set memory alarm before beginning the session.
DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []),
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
timer:sleep(100),
{ok, Session1} = amqp10_client:begin_session_sync(Connection),
Address = rabbitmq_amqp_address:queue(QName),
{ok, Sender} = amqp10_client:attach_sender_link(Session1, <<"test-sender">>, Address, unsettled),
%% We should still receive link credit since the target queue is fine.
ok = wait_for_credit(Sender),
%% However, RabbitMQ's incoming window shouldn't allow our client to send any TRANSFER.
%% In other words, the client is limited by session flow control, but not by link flow control.
Tag1 = <<"tag1">>,
Msg1 = amqp10_msg:new(Tag1, <<"m1">>, false),
?assertEqual({error, remote_incoming_window_exceeded},
amqp10_client:send_msg(Sender, Msg1)),
%% Set additionally disk alarm.
DefaultDiskFreeLimit = rpc(Config, rabbit_disk_monitor, get_disk_free_limit, []),
ok = rpc(Config, rabbit_disk_monitor, set_disk_free_limit, [999_000_000_000_000]), % 999 TB
timer:sleep(100),
?assertEqual({error, remote_incoming_window_exceeded},
amqp10_client:send_msg(Sender, Msg1)),
%% Clear memory alarm.
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [DefaultWatermark]),
timer:sleep(100),
?assertEqual({error, remote_incoming_window_exceeded},
amqp10_client:send_msg(Sender, Msg1)),
%% Clear disk alarm.
ok = rpc(Config, rabbit_disk_monitor, set_disk_free_limit, [DefaultDiskFreeLimit]),
timer:sleep(100),

This change also refactors them to use two unclustered nodes. This is
a prerequisite for the child change which will remove the workaround of
using a direct connection to be able to publish while a node is in
alarm.
Previously direct 0-9-1 connections did not notice when memory or disk
alarms were set. This could allow an 0-9-1 shovel where the destination
is a direct connection to completely overload a broker which is already
in alarm. With this change, direct connections register the connection
process with `rabbit_alarm` and emit `connection.blocked` and
`connection.unblocked` to the blocked handler if one is registered.
`rabbit_amqp091_shovel` already respects the `connection.blocked`, so
the destination will not receive any messages.
@the-mikedavis the-mikedavis force-pushed the md/block-direct-publishes branch from c75597c to 9393ec9 Compare October 2, 2025 17:26
@the-mikedavis
Copy link
Collaborator Author

the-mikedavis commented Oct 2, 2025

Thanks for the pointers @ansd & @gomoripeti! I moved the existing tests for alarms and 0-9-1 shovels to a new suite and added a group there that uses a direct connection

@the-mikedavis the-mikedavis marked this pull request as ready for review October 2, 2025 19:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Block direct shovel publishes during alarms
2 participants