Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce outbound RabbitMQ internal AMQP flow control #11307

Merged
merged 2 commits into from
Jun 5, 2024

Conversation

ansd
Copy link
Member

@ansd ansd commented May 23, 2024

What?

Introduce RabbitMQ internal flow control for messages sent to AMQP
clients.

Prior this PR, when an AMQP client granted a large amount of link
credit (e.g. 100k) to the sending queue, the sending queue sent
that amount of messages to the session process no matter what.
This becomes problematic for memory usage when the session process
cannot send out messages fast enough to the AMQP client, especially if

  1. The writer proc cannot send fast enough. This can happen when
    the AMQP client does not receive fast enough and causes TCP
    back-pressure to the server. Or
  2. The server session proc is limited by remote-incoming-window.

Both scenarios are now added as test cases.
Tests

  • tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
  • tcp_back_pressure_rabbitmq_internal_flow_classic_queue
    cover scenario 1.

Tests

  • incoming_window_closed_rabbitmq_internal_flow_quorum_queue
  • incoming_window_closed_rabbitmq_internal_flow_classic_queue
    cover scenario 2.

This PR sends messages from queues to AMQP clients in a more controlled
manner.

To illustrate:

make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
observer_cli:start()
mq

where mq sorts by message queue length.
Create a stream:

deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true

Next, send and receive from the Stream via AMQP.
Grant a large number of link credit to the sending stream:

docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000

Before to this PR:

RESULTS

Count ............................................... 100,696 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 120,422 messages/s
Receiver rate ......................................... 3,363 messages/s
End-to-end rate ....................................... 3,359 messages/s

We observe that all 100k link credit worth of messages are buffered in the
writer proc's mailbox:

|No | Pid        | MsgQueue  |Name or Initial Call                 |      Memory | Reductions          |Current Function                  |
|1  |<0.845.0>   |100001     |rabbit_amqp_writer:init/1            |  126.0734 MB| 466633491           |prim_inet:send/5                  |

After to this PR:

RESULTS

Count ............................................. 2,973,440 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 123,322 messages/s
Receiver rate ........................................ 99,250 messages/s
End-to-end rate ...................................... 99,148 messages/s

We observe that the message queue lengths of both writer and session
procs are low.

How?

Our goal is to have queues send out messages in a controlled manner
without overloading RabbitMQ itself.
We want RabbitMQ internal flow control between:

AMQP writer proc <--- session proc <--- queue proc

A similar concept exists for classic queues sending via AMQP 0.9.1.
We want an approach that applies to AMQP and works generic for all queue
types.

For the interaction between AMQP writer proc and session proc we use a
simple credit based approach reusing module credit_flow.

For the interaction between session proc and queue proc, the following options
exist:

Option 1

The session process provides expliclity feedback to the queue after it
has sent N messages.
This approach is implemented in
https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1
and works well.
A new rabbit_queue_type:sent/4 API was added which lets the queue proc know
that it can send further messages to the session proc.

Pros:

  • Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages
    in auto ack mode to AMQP 0.9.1 clients.
  • Simple for the session proc

Cons:

  • Sligthly added complexity in every queue type implementation
  • Multiple Ra commands (settle, credit, sent) to decide when a quorum
    queue sends more messages.

Option 2

A dual link approach where two AMQP links exists between

AMQP client <---link--> session proc <---link---> queue proc

When the client grants a large amount of credits, the session proc will
top up credits to the queue proc periodically in smaller batches.

Pros:

  • No queue type modifications required.
  • Re-uses AMQP link flow control

Cons:

  • Significant added complexity in the session proc. A client can
    dynamically decrease or increase credits and dynamically change the drain
    mode while the session tops up credit to the queue.

Option 3

Credit is a 32 bit unsigned integer.
The spec mandates that the receiver independently chooses a credit.
Nothing in the spec prevents the receiver to choose a credit of 1 billion.
However the credit value is merely a maximum:

The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by.

Therefore, the server is not required to send all available messages to this
receiver.

For delivery-count:

Only the sender MAY independently modify this field.

"independently" could be interpreted as the sender could add to the delivery-count
irrespective of what the client chose for drain and link-credit.

Option 3: The queue proc could at credit time already consume credit
and advance the delivery-count if credit is too large before checking out any messages.
For example if credit is 100k, but the queue only wants to send 1k, the queue could
consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages.
If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver,
otherwise the receiver wouldn’t know that it ran out of link-credit.

Pros:

  • Very simple

Cons:

  • Possibly unexpected behaviour for receiving AMQP clients
  • Possibly poor end-to-end throughput in auto-ack mode because the queue
    would send a batch of messages followed by a FLOW containing the advanced
    delivery-count. Only therafter the client will learn that it ran out of
    credits and top-up again. This feels like synchronously pulling a batch
    of messages. In contrast, option 2 sends out more messages as soon as
    the previous messages left RabbitMQ without requiring again a credit top
    up from the receiver.
  • drain mode with large credits requires the queue to send all available
    messages and only thereafter advance the delivery-count. Therefore,
    drain mode breaks option 3 somewhat.

Option 4

Session proc drops message payload when its outgoing-pending queue gets
too large and re-reads payloads from the queue once the message can be
sent (see get_checked_out Ra command for quorum queues).

Cons:

  • Would need to be implemented for every queue type, especially classic queues
  • Doesn't limit the amount of message metadata in the session proc's
    outgoing-pending queue

Decision: Option 2

This commit implements option 2 to avoid any queue type modification.
At most one credit request is in-flight between session process and
queue process for a given queue consumer.
If the AMQP client sends another FLOW in between, the session proc
stashes the FLOW until it processes the previous credit reply.

A delivery is only sent from the outgoing-pending queue if the
session proc is not blocked by

  1. writer proc, or
  2. remote-incoming-window

The credit reply is placed into the outgoing-pending queue.
This ensures that the session proc will only top up the next batch of
credits if sufficient messages were sent out to the writer proc.

A future commit could additionally have each queue limit the number of
unacked messages for a given AMQP consumer, or alternatively make use
of session outgoing-window.

@ansd ansd force-pushed the amqp-flow-control-poc-2 branch 8 times, most recently from 4eb03d9 to 83f8318 Compare May 30, 2024 08:04
@ansd ansd force-pushed the amqp-flow-control-poc-2 branch 13 times, most recently from 1dae7d6 to ae9ce6c Compare June 3, 2024 11:30
@ansd ansd changed the title AMQP flow control PoC 2 Introduce outbound RabbitMQ internal AMQP flow control Jun 3, 2024
@ansd ansd force-pushed the amqp-flow-control-poc-2 branch from ae9ce6c to 3a03d44 Compare June 4, 2024 07:21
 ## What?

Introduce RabbitMQ internal flow control for messages sent to AMQP
clients.

Prior this PR, when an AMQP client granted a large amount of link
credit (e.g. 100k) to the sending queue, the sending queue sent
that amount of messages to the session process no matter what.
This becomes problematic for memory usage when the session process
cannot send out messages fast enough to the AMQP client, especially if
1. The writer proc cannot send fast enough. This can happen when
the AMQP client does not receive fast enough and causes TCP
back-pressure to the server. Or
2. The server session proc is limited by remote-incoming-window.

Both scenarios are now added as test cases.
Tests
* tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
* tcp_back_pressure_rabbitmq_internal_flow_classic_queue
cover scenario 1.

Tests
* incoming_window_closed_rabbitmq_internal_flow_quorum_queue
* incoming_window_closed_rabbitmq_internal_flow_classic_queue
cover scenario 2.

This PR sends messages from queues to AMQP clients in a more controlled
manner.

To illustrate:
```
make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
observer_cli:start()
mq
```
where `mq` sorts by message queue length.
Create a stream:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true
```
Next, send and receive from the Stream via AMQP.
Grant a large number of link credit to the sending stream:
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000
```

**Before** to this PR:
```
RESULTS

Count ............................................... 100,696 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 120,422 messages/s
Receiver rate ......................................... 3,363 messages/s
End-to-end rate ....................................... 3,359 messages/s
```
We observe that all 100k link credit worth of messages are buffered in the
writer proc's mailbox:
```
|No | Pid        | MsgQueue  |Name or Initial Call                 |      Memory | Reductions          |Current Function                  |
|1  |<0.845.0>   |100001     |rabbit_amqp_writer:init/1            |  126.0734 MB| 466633491           |prim_inet:send/5                  |
```

**After** to this PR:
```
RESULTS

Count ............................................. 2,973,440 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 123,322 messages/s
Receiver rate ........................................ 99,250 messages/s
End-to-end rate ...................................... 99,148 messages/s
```
We observe that the message queue lengths of both writer and session
procs are low.

 ## How?

Our goal is to have queues send out messages in a controlled manner
without overloading RabbitMQ itself.
We want RabbitMQ internal flow control between:
```
AMQP writer proc <--- session proc <--- queue proc
```
A similar concept exists for classic queues sending via AMQP 0.9.1.
We want an approach that applies to AMQP and works generic for all queue
types.

For the interaction between AMQP writer proc and session proc we use a
simple credit based approach reusing module `credit_flow`.

For the interaction between session proc and queue proc, the following options
exist:

 ### Option 1
The session process provides expliclity feedback to the queue after it
has sent N messages.
This approach is implemented in
https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1
and works well.
A new `rabbit_queue_type:sent/4` API was added which lets the queue proc know
that it can send further messages to the session proc.

Pros:
* Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages
  in auto ack mode to AMQP 0.9.1 clients.
* Simple for the session proc

Cons:
* Sligthly added complexity in every queue type implementation
* Multiple Ra commands (settle, credit, sent) to decide when a quorum
  queue sends more messages.

 ### Option 2
A dual link approach where two AMQP links exists between
```
AMQP client <---link--> session proc <---link---> queue proc
```
When the client grants a large amount of credits, the session proc will
top up credits to the queue proc periodically in smaller batches.

Pros:
* No queue type modifications required.
* Re-uses AMQP link flow control

Cons:
* Significant added complexity in the session proc. A client can
  dynamically decrease or increase credits and dynamically change the drain
  mode while the session tops up credit to the queue.

 ### Option 3
Credit is a 32 bit unsigned integer.
The spec mandates that the receiver independently chooses a credit.
Nothing in the spec prevents the receiver to choose a credit of 1 billion.
However the credit value is merely a **maximum**:
> The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by.

Therefore, the server is not required to send all available messages to this
receiver.

For delivery-count:
> Only the sender MAY independently modify this field.

"independently" could be interpreted as the sender could add to the delivery-count
irrespective of what the client chose for drain and link-credit.

Option 3: The queue proc could at credit time already consume credit
and advance the delivery-count if credit is too large before checking out any messages.
For example if credit is 100k, but the queue only wants to send 1k, the queue could
consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages.
If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver,
otherwise the receiver wouldn’t know that it ran out of link-credit.

Pros:
* Very simple

Cons:
* Possibly unexpected behaviour for receiving AMQP clients
* Possibly poor end-to-end throughput in auto-ack mode because the queue
  would send a batch of messages followed by a FLOW containing the advanced
  delivery-count. Only therafter the client will learn that it ran out of
  credits and top-up again. This feels like synchronously pulling a batch
  of messages. In contrast, option 2 sends out more messages as soon as
  the previous messages left RabbitMQ without requiring again a credit top
  up from the receiver.
* drain mode with large credits requires the queue to send all available
  messages and only thereafter advance the delivery-count. Therefore,
  drain mode breaks option 3 somewhat.

 ### Option 4
Session proc drops message payload when its outgoing-pending queue gets
too large and re-reads payloads from the queue once the message can be
sent (see `get_checked_out` Ra command for quorum queues).

Cons:
* Would need to be implemented for every queue type, especially classic queues
* Doesn't limit the amount of message metadata in the session proc's
  outgoing-pending queue

 ### Decision: Option 2
This commit implements option 2 to avoid any queue type modification.
At most one credit request is in-flight between session process and
queue process for a given queue consumer.
If the AMQP client sends another FLOW in between, the session proc
stashes the FLOW until it processes the previous credit reply.

A delivery is only sent from the outgoing-pending queue if the
session proc is not blocked by
1. writer proc, or
2. remote-incoming-window

The credit reply is placed into the outgoing-pending queue.
This ensures that the session proc will only top up the next batch of
credits if sufficient messages were sent out to the writer proc.

A future commit could additionally have each queue limit the number of
unacked messages for a given AMQP consumer, or alternatively make use
of session outgoing-window.
@ansd ansd force-pushed the amqp-flow-control-poc-2 branch from e10f6f2 to d70e529 Compare June 4, 2024 11:12
@ansd ansd marked this pull request as ready for review June 4, 2024 11:37
@kjnilsson kjnilsson self-requested a review June 4, 2024 16:17
Avoid #resource{} record creation in every queue type
interaction by storing the queue #resource{} in the
session state.
@kjnilsson kjnilsson merged commit ebbff46 into main Jun 5, 2024
18 checks passed
@kjnilsson kjnilsson deleted the amqp-flow-control-poc-2 branch June 5, 2024 14:01
@michaelklishin michaelklishin added this to the 4.0.0 milestone Jun 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants