Skip to content

Commit

Permalink
Introduce outbound RabbitMQ internal AMQP flow control
Browse files Browse the repository at this point in the history
 ## What?

Introduce RabbitMQ internal flow control for messages sent to AMQP
clients.

Prior this PR, when an AMQP client granted a large amount of link
credit (e.g. 100k) to the sending queue, the sending queue sent
that amount of messages to the session process no matter what.
This becomes problematic for memory usage when the session process
cannot send out messages fast enough to the AMQP client, especially if
1. The writer proc cannot send fast enough. This can happen when
the AMQP client does not receive fast enough and causes TCP
back-pressure to the server. Or
2. The server session proc is limited by remote-incoming-window.

Both scenarios are now added as test cases.
Tests
* tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
* tcp_back_pressure_rabbitmq_internal_flow_classic_queue
cover scenario 1.

Tests
* incoming_window_closed_rabbitmq_internal_flow_quorum_queue
* incoming_window_closed_rabbitmq_internal_flow_classic_queue
cover scenario 2.

This PR sends messages from queues to AMQP clients in a more controlled
manner.

To illustrate:
```
make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
observer_cli:start()
mq
```
where `mq` sorts by message queue length.
Create a stream:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true
```
Next, send and receive from the Stream via AMQP.
Grant a large number of link credit to the sending stream:
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000
```

**Before** to this PR:
```
RESULTS

Count ............................................... 100,696 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 120,422 messages/s
Receiver rate ......................................... 3,363 messages/s
End-to-end rate ....................................... 3,359 messages/s
```
We observe that all 100k link credit worth of messages are buffered in the
writer proc's mailbox:
```
|No | Pid        | MsgQueue  |Name or Initial Call                 |      Memory | Reductions          |Current Function                  |
|1  |<0.845.0>   |100001     |rabbit_amqp_writer:init/1            |  126.0734 MB| 466633491           |prim_inet:send/5                  |
```

**After** to this PR:
```
RESULTS

Count ............................................. 2,973,440 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 123,322 messages/s
Receiver rate ........................................ 99,250 messages/s
End-to-end rate ...................................... 99,148 messages/s
```
We observe that the message queue lengths of both writer and session
procs are low.

 ## How?

Our goal is to have queues send out messages in a controlled manner
without overloading RabbitMQ itself.
We want RabbitMQ internal flow control between:
```
AMQP writer proc <--- session proc <--- queue proc
```
A similar concept exists for classic queues sending via AMQP 0.9.1.
We want an approach that applies to AMQP and works generic for all queue
types.

For the interaction between AMQP writer proc and session proc we use a
simple credit based approach reusing module `credit_flow`.

For the interaction between session proc and queue proc, the following options
exist:

 ### Option 1
The session process provides expliclity feedback to the queue after it
has sent N messages.
This approach is implemented in
https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1
and works well.
A new `rabbit_queue_type:sent/4` API was added which lets the queue proc know
that it can send further messages to the session proc.

Pros:
* Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages
  in auto ack mode to AMQP 0.9.1 clients.
* Simple for the session proc

Cons:
* Sligthly added complexity in every queue type implementation
* Multiple Ra commands (settle, credit, sent) to decide when a quorum
  queue sends more messages.

 ### Option 2
A dual link approach where two AMQP links exists between
```
AMQP client <---link--> session proc <---link---> queue proc
```
When the client grants a large amount of credits, the session proc will
top up credits to the queue proc periodically in smaller batches.

Pros:
* No queue type modifications required.
* Re-uses AMQP link flow control

Cons:
* Significant added complexity in the session proc. A client can
  dynamically decrease or increase credits and dynamically change the drain
  mode while the session tops up credit to the queue.

 ### Option 3
Credit is a 32 bit unsigned integer.
The spec mandates that the receiver independently chooses a credit.
Nothing in the spec prevents the receiver to choose a credit of 1 billion.
However the credit value is merely a **maximum**:
> The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by.

Therefore, the server is not required to send all available messages to this
receiver.

For delivery-count:
> Only the sender MAY independently modify this field.

"independently" could be interpreted as the sender could add to the delivery-count
irrespective of what the client chose for drain and link-credit.

Option 3: The queue proc could at credit time already consume credit
and advance the delivery-count if credit is too large before checking out any messages.
For example if credit is 100k, but the queue only wants to send 1k, the queue could
consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages.
If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver,
otherwise the receiver wouldn’t know that it ran out of link-credit.

Pros:
* Very simple

Cons:
* Possibly unexpected behaviour for receiving AMQP clients
* Possibly poor end-to-end throughput in auto-ack mode because the queue
  would send a batch of messages followed by a FLOW containing the advanced
  delivery-count. Only therafter the client will learn that it ran out of
  credits and top-up again. This feels like synchronously pulling a batch
  of messages. In contrast, option 2 sends out more messages as soon as
  the previous messages left RabbitMQ without requiring again a credit top
  up from the receiver.
* drain mode with large credits requires the queue to send all available
  messages and only thereafter advance the delivery-count. Therefore,
  drain mode breaks option 3 somewhat.

 ### Option 4
Session proc drops message payload when its outgoing-pending queue gets
too large and re-reads payloads from the queue once the message can be
sent (see `get_checked_out` Ra command for quorum queues).

Cons:
* Would need to be implemented for every queue type, especially classic queues
* Doesn't limit the amount of message metadata in the session proc's
  outgoing-pending queue

 ### Decision: Option 2
This commit implements option 2 to avoid any queue type modification.
At most one credit request is in-flight between session process and
queue process for a given queue consumer.
If the AMQP client sends another FLOW in between, the session proc
stashes the FLOW until it processes the previous credit reply.

A delivery is only sent from the outgoing-pending queue if the
session proc is not blocked by
1. writer proc, or
2. remote-incoming-window

The credit reply is placed into the outgoing-pending queue.
This ensures that the session proc will only top up the next batch of
credits if sufficient messages were sent out to the writer proc.

A future commit could additionally have each queue limit the number of
unacked messages for a given AMQP consumer, which keeps memory usage
lower than delaying credit top ups by the session's outgoing-window.
  • Loading branch information
ansd committed Jun 3, 2024
1 parent 096015b commit 1dae7d6
Show file tree
Hide file tree
Showing 18 changed files with 1,244 additions and 336 deletions.
12 changes: 8 additions & 4 deletions deps/amqp10_client/src/amqp10_client_frame_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,18 @@ handle_input(StateName, Data, State) ->
defer_heartbeat_timer(State =
#state{heartbeat_timer_ref = TRef,
connection_config = #{idle_time_out := T}})
when is_number(T) andalso T > 0 ->
when is_integer(T) andalso T > 0 ->
_ = case TRef of
undefined -> ok;
_ -> _ = erlang:cancel_timer(TRef)
undefined ->
ok;
_ ->
erlang:cancel_timer(TRef, [{async, true},
{info, false}])
end,
NewTRef = erlang:send_after(T * 2, self(), heartbeat),
State#state{heartbeat_timer_ref = NewTRef};
defer_heartbeat_timer(State) -> State.
defer_heartbeat_timer(State) ->
State.

route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) ->
{DestinationPid, State} = find_destination(Channel, FrameType, Performative,
Expand Down
8 changes: 6 additions & 2 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -902,9 +902,10 @@ handle_link_flow(#'v1_0.flow'{delivery_count = MaybeTheirDC,
handle_link_flow(#'v1_0.flow'{delivery_count = TheirDC,
link_credit = {uint, TheirCredit},
available = Available,
drain = Drain},
drain = Drain0},
Link0 = #link{role = receiver}) ->
Link = case Drain andalso TheirCredit =< 0 of
Drain = default(Drain0, false),
Link = case Drain andalso TheirCredit =:= 0 of
true ->
notify_credit_exhausted(Link0),
Link0#link{delivery_count = unpack(TheirDC),
Expand Down Expand Up @@ -1212,6 +1213,9 @@ boolean_to_role(?AMQP_ROLE_SENDER) ->
boolean_to_role(?AMQP_ROLE_RECEIVER) ->
receiver.

default(undefined, Default) -> Default;
default(Thing, _Default) -> Thing.

format_status(Status = #{data := Data0}) ->
#state{channel = Channel,
remote_channel = RemoteChannel,
Expand Down
21 changes: 12 additions & 9 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
-import(rabbit_amqp_session,
[check_resource_access/4,
check_read_permitted_on_topic/4]).
-import(rabbit_misc,
[queue_resource/2,
exchange_resource/2]).

-type permission_caches() :: {rabbit_amqp_session:permission_cache(),
rabbit_amqp_session:topic_permission_cache()}.
Expand Down Expand Up @@ -77,7 +80,7 @@ handle_http_req(<<"GET">>,
_ConnPid,
PermCaches) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
QName = queue_resource(Vhost, QNameBin),
case rabbit_amqqueue:with(
QName,
fun(Q) ->
Expand Down Expand Up @@ -118,7 +121,7 @@ handle_http_req(HttpMethod = <<"PUT">>,
_ -> ok
end,
ok = prohibit_cr_lf(QNameBin),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
QName = queue_resource(Vhost, QNameBin),
ok = prohibit_reserved_amq(QName),
PermCache1 = check_resource_access(QName, configure, User, PermCache0),
rabbit_core_metrics:queue_declared(QName),
Expand Down Expand Up @@ -192,7 +195,7 @@ handle_http_req(<<"PUT">>,
catch exit:#amqp_error{explanation = Explanation} ->
throw(<<"400">>, Explanation, [])
end,
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
XName = exchange_resource(Vhost, XNameBin),
ok = prohibit_default_exchange(XName),
PermCache = check_resource_access(XName, configure, User, PermCache0),
X = case rabbit_exchange:lookup(XName) of
Expand Down Expand Up @@ -223,7 +226,7 @@ handle_http_req(<<"DELETE">>,
ConnPid,
{PermCache0, TopicPermCache}) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
QName = queue_resource(Vhost, QNameBin),
PermCache = check_resource_access(QName, read, User, PermCache0),
try rabbit_amqqueue:with_exclusive_access_or_die(
QName, ConnPid,
Expand Down Expand Up @@ -251,7 +254,7 @@ handle_http_req(<<"DELETE">>,
ConnPid,
{PermCache0, TopicPermCache}) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
QName = queue_resource(Vhost, QNameBin),
ok = prohibit_cr_lf(QNameBin),
PermCache = check_resource_access(QName, configure, User, PermCache0),
try rabbit_amqqueue:delete_with(QName, ConnPid, false, false, Username, true) of
Expand All @@ -271,7 +274,7 @@ handle_http_req(<<"DELETE">>,
_ConnPid,
{PermCache0, TopicPermCache}) ->
XNameBin = uri_string:unquote(XNameBinQuoted),
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
XName = exchange_resource(Vhost, XNameBin),
ok = prohibit_cr_lf(XNameBin),
ok = prohibit_default_exchange(XName),
ok = prohibit_reserved_amq(XName),
Expand All @@ -296,7 +299,7 @@ handle_http_req(<<"POST">>,
#{destination_exchange := Bin} ->
{exchange, Bin}
end,
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
SrcXName = exchange_resource(Vhost, SrcXNameBin),
DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin),
PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0),
Binding = #binding{source = SrcXName,
Expand All @@ -319,7 +322,7 @@ handle_http_req(<<"DELETE">>,
DstNameBin,
BindingKey,
ArgsHash} = decode_binding_path_segment(BindingSegment),
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
SrcXName = exchange_resource(Vhost, SrcXNameBin),
DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin),
PermCaches = binding_checks(SrcXName, DstName, BindingKey, User, PermCaches0),
Bindings = rabbit_binding:list_for_source_and_destination(SrcXName, DstName),
Expand Down Expand Up @@ -351,7 +354,7 @@ handle_http_req(<<"GET">>,
"missing 'dste' or 'dstq' in query: ~tp",
QueryMap)
end,
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
SrcXName = exchange_resource(Vhost, SrcXNameBin),
DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin),
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName),
Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key],
Expand Down
Loading

0 comments on commit 1dae7d6

Please sign in to comment.