-
Notifications
You must be signed in to change notification settings - Fork 187
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
276 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
.. _issue: https://github.com/mosquito/aio-pika/issues | ||
.. _pull request: https://github.com/mosquito/aio-pika/compare | ||
.. _aio-pika: https://github.com/mosquito/aio-pika | ||
.. _official tutorial: https://www.rabbitmq.com/tutorials/tutorial-seven-php.html | ||
.. _publisher-confirms: | ||
|
||
Publisher Confirms | ||
================== | ||
|
||
.. warning:: | ||
|
||
This is a beta version of the port from `official tutorial`_. Please when you found an | ||
error create `issue`_ or `pull request`_ for me. | ||
|
||
|
||
.. note:: | ||
Using the `aio-pika`_ async Python client | ||
|
||
|
||
.. note:: | ||
|
||
**Prerequisites** | ||
|
||
This tutorial assumes RabbitMQ is installed_ and running on localhost on standard port (`5672`). | ||
In case you use a different host, port or credentials, connections settings would require adjusting. | ||
|
||
.. _installed: https://www.rabbitmq.com/download.html | ||
|
||
**Where to get help** | ||
|
||
If you're having trouble going through this tutorial you can `contact us`_ through the mailing list. | ||
|
||
.. _contact us: https://groups.google.com/forum/#!forum/rabbitmq-users | ||
|
||
|
||
`Publisher confirms <https://www.rabbitmq.com/confirms.html#publisher-confirms>`_ are a RabbitMQ | ||
extension to implement reliable publishing. | ||
When publisher confirms are enabled on a channel, messages the client publishes are confirmed | ||
asynchronously by the broker, meaning they have been taken care of on the server side. | ||
|
||
Overview | ||
++++++++ | ||
|
||
In this tutorial we're going to use publisher confirms to make sure published messages have safely reached the broker. | ||
We will cover several strategies to using publisher confirms and explain their pros and cons. | ||
|
||
Enabling Publisher Confirms on a Channel | ||
++++++++++++++++++++++++++++++++++++++++ | ||
|
||
Publisher confirms are a RabbitMQ extension to the AMQP 0.9.1 protocol. | ||
Publisher confirms are enabled at the channel level by setting the `publisher_confirms` parameter to `True`, | ||
which is the default. | ||
|
||
.. code-block:: python | ||
channel = await connection.channel( | ||
publisher_confirms=True, # This is the default | ||
) | ||
Strategy #1: Publishing Messages Individually | ||
+++++++++++++++++++++++++++++++++++++++++++++ | ||
|
||
Let's start with the simplest approach to publishing with confirms, that is, publishing a message and | ||
waiting synchronously for its confirmation: | ||
|
||
.. literalinclude:: examples/7-publisher-confirms/publish_individually.py | ||
:language: python | ||
:lines: 22-30 | ||
|
||
In the previous example we publish a message as usual and wait for its confirmation with the `asyncio.wait_for` method. | ||
The method returns as soon as the message has been confirmed. | ||
If the message is not confirmed within the timeout or if it is nack-ed (meaning the broker could not take care of it for | ||
some reason), the method will throw an exception. | ||
The handling of the exception usually consists in logging an error message and/or retrying to send the message. | ||
|
||
If a timeout isn't requied the `await` keyword can be used instead directly as before. | ||
|
||
Different client libraries have different ways to synchronously deal with publisher confirms, so make sure to read | ||
carefully the documentation of the client you are using. | ||
|
||
This technique is very straightforward but also has a major drawback: it significantly slows down publishing, as the | ||
confirmation of a message blocks the publishing of all subsequent messages. | ||
This approach is not going to deliver throughput of more than a few hundreds of published messages per second. | ||
Nevertheless, this can be good enough for some applications. | ||
|
||
.. | ||
TODO: Do we need to specify, that wait_for cancels the awaitables waiting for publisher confirmation when the timeout | ||
occurs? | ||
Strategy #2: Publishing Messages in Batches | ||
+++++++++++++++++++++++++++++++++++++++++++ | ||
|
||
To improve upon our previous example, we can publish a batch of messages and wait for this whole batch to be confirmed. | ||
The following example uses a batch of 100: | ||
|
||
.. literalinclude:: examples/7-publisher-confirms/publish_batches.py | ||
:language: python | ||
:lines: 22-45 | ||
|
||
Waiting for a batch of messages to be confirmed improves throughput drastically over waiting for a confirm for individual | ||
message (up to 20-30 times with a remote RabbitMQ node). | ||
One drawback is that we do not know exactly what went wrong in case of failure, so we may have to keep a whole batch in memory | ||
to log something meaningful or to re-publish the messages. | ||
And this solution is still synchronous, so it blocks the publishing of messages. | ||
|
||
Strategy #3: Handling Publisher Confirms Asynchronously | ||
+++++++++++++++++++++++++++++++++++++++++++++++++++++++ | ||
|
||
The broker confirms published messages asynchronously, one just needs to register a callback on the client to be notified of these confirms: | ||
|
||
.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py | ||
:language: python | ||
:pyobject: handle_confirm | ||
|
||
.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py | ||
:language: python | ||
:lines: 32-40 | ||
|
||
The `.result()` method will either return a `aiormq.abc.ConfirmationFrameType` for confirmed messages | ||
or raise an Exception for nack-ed messages (messages that can be considered lost by the broker). | ||
|
||
.. | ||
TODO: Which exceptions can be raised? | ||
How to determine which message the callback belongs to? | ||
From RabbitMQ Tutorial: | ||
Each callback has AMQPMessage $message parameter with returned message, so you don't need to handle sequence numbers (delivery tag) to understand which message this callback belongs to. | ||
|
||
Summary | ||
+++++++ | ||
|
||
Making sure published messages made it to the broker can be essential in some applications. | ||
Publisher confirms are a RabbitMQ feature that helps to meet this requirement. | ||
Publisher confirms are asynchronous in nature but it is also possible to handle them synchronously. | ||
There is no definitive way to implement publisher confirms, this usually comes down to the constraints in the application | ||
and in the overall system. Typical techniques are: | ||
|
||
* publishing messages individually, waiting for the confirmation synchronously: simple, but very limited throughput. | ||
* publishing messages in batch, waiting for the confirmation synchronously for a batch: simple, reasonable throughput, but hard to reason about when something goes wrong. | ||
* asynchronous handling: best performance and use of resources, good control in case of error, but can be involved to implement correctly. | ||
|
||
|
||
|
||
.. note:: | ||
|
||
This material was adopted from `official tutorial`_ on **rabbitmq.org**. |
45 changes: 45 additions & 0 deletions
45
docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import asyncio | ||
|
||
from aio_pika import Message, connect | ||
|
||
|
||
def getMessagesToPublish(): | ||
for i in range(10000): | ||
yield f"Hello World {i}!".encode() | ||
|
||
def handle_confirm(confirmation): | ||
try: | ||
result = confirmation.result() | ||
except: | ||
# code when message is nack-ed | ||
pass | ||
else: | ||
# code when message is confirmed | ||
pass | ||
|
||
|
||
async def main() -> None: | ||
# Perform connection | ||
connection = await connect("amqp://guest:guest@localhost/") | ||
|
||
async with connection: | ||
# Creating a channel | ||
channel = await connection.channel() | ||
|
||
# Declaring queue | ||
queue = await channel.declare_queue("hello") | ||
|
||
async with asyncio.TaskGroup() as tg: | ||
# Sending the messages | ||
for msg in getMessagesToPublish(): | ||
tg.create_task( | ||
channel.default_exchange.publish( | ||
Message(msg), | ||
routing_key=queue.name, | ||
) | ||
).add_done_callback(handle_confirm) | ||
|
||
print(" [x] Sent and confirmed multiple messages asynchronously. ") | ||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
50 changes: 50 additions & 0 deletions
50
docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import asyncio | ||
|
||
from aio_pika import Message, connect | ||
|
||
|
||
def getMessagesToPublish(): | ||
for i in range(10000): | ||
yield f"Hello World {i}!".encode() | ||
|
||
|
||
async def main() -> None: | ||
# Perform connection | ||
connection = await connect("amqp://guest:guest@localhost/") | ||
|
||
async with connection: | ||
# Creating a channel | ||
channel = await connection.channel() | ||
|
||
# Declaring queue | ||
queue = await channel.declare_queue("hello") | ||
|
||
batchsize = 100 | ||
outstanding_messages = [] | ||
|
||
# Sending the messages | ||
for msg in getMessagesToPublish(): | ||
outstanding_messages.append( | ||
channel.default_exchange.publish( | ||
Message(msg), | ||
routing_key=queue.name, | ||
), | ||
) | ||
if len(outstanding_messages) == batchsize: | ||
await asyncio.wait_for( | ||
asyncio.gather(*outstanding_messages), | ||
timeout=5.0, | ||
) | ||
outstanding_messages.clear() | ||
|
||
if len(outstanding_messages) > 0: | ||
await asyncio.wait_for( | ||
asyncio.gather(*outstanding_messages), | ||
timeout=5.0, | ||
) | ||
outstanding_messages.clear() | ||
|
||
print(" [x] Sent and confirmed multiple messages in batches. ") | ||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
35 changes: 35 additions & 0 deletions
35
docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import asyncio | ||
|
||
from aio_pika import Message, connect | ||
|
||
|
||
def getMessagesToPublish(): | ||
for i in range(10000): | ||
yield f"Hello World {i}!".encode() | ||
|
||
|
||
async def main() -> None: | ||
# Perform connection | ||
connection = await connect("amqp://guest:guest@localhost/") | ||
|
||
async with connection: | ||
# Creating a channel | ||
channel = await connection.channel() | ||
|
||
# Declaring queue | ||
queue = await channel.declare_queue("hello") | ||
|
||
# Sending the messages | ||
for msg in getMessagesToPublish(): | ||
# Waiting for publisher confirmation with timeout for every message | ||
await asyncio.wait_for( | ||
channel.default_exchange.publish( | ||
Message(msg), | ||
routing_key=queue.name, | ||
), timeout=5.0, | ||
) | ||
|
||
print(" [x] Sent and confirmed multiple messages indiviudally. ") | ||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |