From 584707d53f43c7ac36cc869a63adbaa9fd77fd16 Mon Sep 17 00:00:00 2001 From: MaPePeR Date: Mon, 29 May 2023 15:42:56 +0200 Subject: [PATCH 1/7] Transfer RabbitMQ publisher confirms tutorial to aio-pika. --- .../7-publisher-confirms.rst | 149 ++++++++++++++++++ .../publish_asynchronously.py | 53 +++++++ .../7-publisher-confirms/publish_batches.py | 52 ++++++ .../publish_individually.py | 36 +++++ 4 files changed, 290 insertions(+) create mode 100644 docs/source/rabbitmq-tutorial/7-publisher-confirms.rst create mode 100644 docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py create mode 100644 docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py create mode 100644 docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py diff --git a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst new file mode 100644 index 00000000..e62d9c69 --- /dev/null +++ b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst @@ -0,0 +1,149 @@ +.. _issue: https://github.com/mosquito/aio-pika/issues +.. _pull request: https://github.com/mosquito/aio-pika/compare +.. _aio-pika: https://github.com/mosquito/aio-pika +.. _official tutorial: https://www.rabbitmq.com/tutorials/tutorial-seven-php.html +.. _publisher-confirms: + +Publisher Confirms +================== + +.. warning:: + + This is a beta version of the port from `official tutorial`_. Please when you found an + error create `issue`_ or `pull request`_ for me. + + +.. note:: + Using the `aio-pika`_ async Python client + + +.. note:: + + **Prerequisites** + + This tutorial assumes RabbitMQ is installed_ and running on localhost on standard port (`5672`). + In case you use a different host, port or credentials, connections settings would require adjusting. + + .. _installed: https://www.rabbitmq.com/download.html + + **Where to get help** + + If you're having trouble going through this tutorial you can `contact us`_ through the mailing list. + + .. _contact us: https://groups.google.com/forum/#!forum/rabbitmq-users + + +`Publisher confirms `_ are a RabbitMQ +extension to implement reliable publishing. +When publisher confirms are enabled on a channel, messages the client publishes are confirmed +asynchronously by the broker, meaning they have been taken care of on the server side. + +Overview +++++++++ + +In this tutorial we're going to use publisher confirms to make sure published messages have safely reached the broker. +We will cover several strategies to using publisher confirms and explain their pros and cons. + +Enabling Publisher Confirms on a Channel +++++++++++++++++++++++++++++++++++++++++ + +Publisher confirms are a RabbitMQ extension to the AMQP 0.9.1 protocol. +Publisher confirms are enabled at the channel level by setting the :code:`publisher_confirms` parameter to :code:`True`, +which is the default. + +.. code-block:: python + + channel = await connection.channel( + publisher_confirms=True, # This is the default + ) + +Strategy #1: Publishing Messages Individually ++++++++++++++++++++++++++++++++++++++++++++++ + +Let's start with the simplest approach to publishing with confirms, that is, publishing a message and +waiting synchronously for its confirmation: + +.. literalinclude:: examples/7-publisher-confirms/publish_individually.py + :language: python + :start-at: # Sending the messages + :end-before: # Done sending messages + +In the previous example we publish a message as usual and wait for its confirmation with the `asyncio.wait_for` method. +The method returns as soon as the message has been confirmed. +If the message is not confirmed within the timeout or if it is nack-ed (meaning the broker could not take care of it for +some reason), the method will throw an exception. +The handling of the exception usually consists in logging an error message and/or retrying to send the message. + +If a timeout isn't requied the `await` keyword can be used instead directly as before. + +Different client libraries have different ways to synchronously deal with publisher confirms, so make sure to read +carefully the documentation of the client you are using. + +This technique is very straightforward but also has a major drawback: it **significantly slows down publishing**, as the +confirmation of a message blocks the publishing of all subsequent messages. +This approach is not going to deliver throughput of more than a few hundreds of published messages per second. +Nevertheless, this can be good enough for some applications. + +.. + TODO: Do we need to specify, that wait_for cancels the awaitables waiting for publisher confirmation when the timeout + occurs? + +Strategy #2: Publishing Messages in Batches ++++++++++++++++++++++++++++++++++++++++++++ + +To improve upon our previous example, we can publish a batch of messages and wait for this whole batch to be confirmed. +The following example uses a batch of 100: + +.. literalinclude:: examples/7-publisher-confirms/publish_batches.py + :language: python + :start-at: batchsize = 100 + :end-before: # Done sending messages + +Waiting for a batch of messages to be confirmed improves throughput drastically over waiting for a confirm for individual +message (up to 20-30 times with a remote RabbitMQ node). +One drawback is that we do not know exactly what went wrong in case of failure, so we may have to keep a whole batch in memory +to log something meaningful or to re-publish the messages. +And this solution is still synchronous, so it blocks the publishing of messages. + +Strategy #3: Handling Publisher Confirms Asynchronously ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +The broker confirms published messages asynchronously, one just needs to register a callback on the client to be notified of these confirms: + +.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py + :language: python + :pyobject: handle_confirm + +.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py + :language: python + :start-at: with asyncio.TaskGroup + :end-at: add_done_callback + +The `.result()` method will either return a `aiormq.abc.ConfirmationFrameType` for confirmed messages +or raise an Exception for nack-ed messages (messages that can be considered lost by the broker). + +.. + TODO: Which exceptions can be raised? + How to determine which message the callback belongs to? + From RabbitMQ Tutorial: + + Each callback has AMQPMessage $message parameter with returned message, so you don't need to handle sequence numbers (delivery tag) to understand which message this callback belongs to. + +Summary ++++++++ + +Making sure published messages made it to the broker can be essential in some applications. +Publisher confirms are a RabbitMQ feature that helps to meet this requirement. +Publisher confirms are asynchronous in nature but it is also possible to handle them synchronously. +There is no definitive way to implement publisher confirms, this usually comes down to the constraints in the application +and in the overall system. Typical techniques are: + +* publishing messages individually, waiting for the confirmation synchronously: simple, but very limited throughput. +* publishing messages in batch, waiting for the confirmation synchronously for a batch: simple, reasonable throughput, but hard to reason about when something goes wrong. +* asynchronous handling: best performance and use of resources, good control in case of error, but can be involved to implement correctly. + + + +.. note:: + + This material was adopted from `official tutorial`_ on **rabbitmq.org**. diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py new file mode 100644 index 00000000..fb007eae --- /dev/null +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py @@ -0,0 +1,53 @@ +import asyncio + +from aio_pika import Message, connect + +from aiormq.exceptions import DeliveryError + + +def get_messages_to_publish(): + for i in range(10000): + yield f"Hello World {i}!".encode() + + +def handle_confirm(confirmation): + try: + _ = confirmation.result() + # code when message is ack-ed + except DeliveryError: + # code when message is nack-ed + pass + except TimeoutError: + # code for message timeout + pass + else: + # code when message is confirmed + pass + + +async def main() -> None: + # Perform connection + connection = await connect("amqp://guest:guest@localhost/") + + async with connection: + # Creating a channel + channel = await connection.channel() + + # Declaring queue + queue = await channel.declare_queue("hello") + + async with asyncio.TaskGroup() as tg: + # Sending the messages + for msg in get_messages_to_publish(): + tg.create_task( + channel.default_exchange.publish( + Message(msg), + routing_key=queue.name, + ) + ).add_done_callback(handle_confirm) + + print(" [x] Sent and confirmed multiple messages asynchronously. ") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py new file mode 100644 index 00000000..6fff8184 --- /dev/null +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py @@ -0,0 +1,52 @@ +import asyncio + +from aio_pika import Message, connect + + +def get_messages_to_publish(): + for i in range(10000): + yield f"Hello World {i}!".encode() + + +async def main() -> None: + # Perform connection + connection = await connect("amqp://guest:guest@localhost/") + + async with connection: + # Creating a channel + channel = await connection.channel() + + # Declaring queue + queue = await channel.declare_queue("hello") + + batchsize = 100 + outstanding_messages = [] + + # Sending the messages + for msg in get_messages_to_publish(): + outstanding_messages.append( + channel.default_exchange.publish( + Message(msg), + routing_key=queue.name, + ), + ) + if len(outstanding_messages) == batchsize: + await asyncio.wait_for( + asyncio.gather(*outstanding_messages), + timeout=5.0, + ) + outstanding_messages.clear() + + if len(outstanding_messages) > 0: + await asyncio.wait_for( + asyncio.gather(*outstanding_messages), + timeout=5.0, + ) + outstanding_messages.clear() + # Done sending messages + + print(" [x] Sent and confirmed multiple messages in batches. ") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py new file mode 100644 index 00000000..b5d5fef1 --- /dev/null +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py @@ -0,0 +1,36 @@ +import asyncio + +from aio_pika import Message, connect + + +def get_messages_to_publish(): + for i in range(10000): + yield f"Hello World {i}!".encode() + + +async def main() -> None: + # Perform connection + connection = await connect("amqp://guest:guest@localhost/") + + async with connection: + # Creating a channel + channel = await connection.channel() + + # Declaring queue + queue = await channel.declare_queue("hello") + + # Sending the messages + for msg in get_messages_to_publish(): + # Waiting for publisher confirmation with timeout for every message + await asyncio.wait_for( + channel.default_exchange.publish( + Message(msg), + routing_key=queue.name, + ), timeout=5.0, + ) + # Done sending messages + print(" [x] Sent and confirmed multiple messages individually. ") + + +if __name__ == "__main__": + asyncio.run(main()) From 57938148dbadb706d5ead0a2c83cd32939ca7d02 Mon Sep 17 00:00:00 2001 From: MaPePeR Date: Tue, 30 May 2023 17:11:16 +0200 Subject: [PATCH 2/7] Use `timeout=` parameter instead of calling `wait_for` ourselves. --- .../rabbitmq-tutorial/7-publisher-confirms.rst | 12 +++--------- .../7-publisher-confirms/publish_asynchronously.py | 1 + .../7-publisher-confirms/publish_batches.py | 13 ++++--------- .../7-publisher-confirms/publish_individually.py | 9 ++++----- 4 files changed, 12 insertions(+), 23 deletions(-) diff --git a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst index e62d9c69..24620b53 100644 --- a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst +++ b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst @@ -68,14 +68,12 @@ waiting synchronously for its confirmation: :start-at: # Sending the messages :end-before: # Done sending messages -In the previous example we publish a message as usual and wait for its confirmation with the `asyncio.wait_for` method. -The method returns as soon as the message has been confirmed. +In the previous example we publish a message as usual and wait for its confirmation with the :code:`await` keyword. +The :code:`await` returns as soon as the message has been confirmed. If the message is not confirmed within the timeout or if it is nack-ed (meaning the broker could not take care of it for -some reason), the method will throw an exception. +some reason), the :code:`await` will throw an exception. The handling of the exception usually consists in logging an error message and/or retrying to send the message. -If a timeout isn't requied the `await` keyword can be used instead directly as before. - Different client libraries have different ways to synchronously deal with publisher confirms, so make sure to read carefully the documentation of the client you are using. @@ -84,10 +82,6 @@ confirmation of a message blocks the publishing of all subsequent messages. This approach is not going to deliver throughput of more than a few hundreds of published messages per second. Nevertheless, this can be good enough for some applications. -.. - TODO: Do we need to specify, that wait_for cancels the awaitables waiting for publisher confirmation when the timeout - occurs? - Strategy #2: Publishing Messages in Batches +++++++++++++++++++++++++++++++++++++++++++ diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py index fb007eae..a7057aff 100644 --- a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py @@ -43,6 +43,7 @@ async def main() -> None: channel.default_exchange.publish( Message(msg), routing_key=queue.name, + timeout=5.0, ) ).add_done_callback(handle_confirm) diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py index 6fff8184..cf37bb8b 100644 --- a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py @@ -28,20 +28,15 @@ async def main() -> None: channel.default_exchange.publish( Message(msg), routing_key=queue.name, - ), - ) - if len(outstanding_messages) == batchsize: - await asyncio.wait_for( - asyncio.gather(*outstanding_messages), timeout=5.0, ) + ) + if len(outstanding_messages) == batchsize: + await asyncio.gather(*outstanding_messages) outstanding_messages.clear() if len(outstanding_messages) > 0: - await asyncio.wait_for( - asyncio.gather(*outstanding_messages), - timeout=5.0, - ) + await asyncio.gather(*outstanding_messages) outstanding_messages.clear() # Done sending messages diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py index b5d5fef1..f6d02b16 100644 --- a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_individually.py @@ -22,11 +22,10 @@ async def main() -> None: # Sending the messages for msg in get_messages_to_publish(): # Waiting for publisher confirmation with timeout for every message - await asyncio.wait_for( - channel.default_exchange.publish( - Message(msg), - routing_key=queue.name, - ), timeout=5.0, + await channel.default_exchange.publish( + Message(msg), + routing_key=queue.name, + timeout=5.0, ) # Done sending messages print(" [x] Sent and confirmed multiple messages individually. ") From 59fa123f198285ca1fa6504ed7dd156b06b1abfd Mon Sep 17 00:00:00 2001 From: MaPePeR Date: Tue, 30 May 2023 18:27:37 +0200 Subject: [PATCH 3/7] Suggest `ContextVar` to find message for ConfirmationFrame asynchronously. --- .../rabbitmq-tutorial/7-publisher-confirms.rst | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst index 24620b53..86fbaec1 100644 --- a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst +++ b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst @@ -116,12 +116,14 @@ The broker confirms published messages asynchronously, one just needs to registe The `.result()` method will either return a `aiormq.abc.ConfirmationFrameType` for confirmed messages or raise an Exception for nack-ed messages (messages that can be considered lost by the broker). -.. - TODO: Which exceptions can be raised? - How to determine which message the callback belongs to? - From RabbitMQ Tutorial: - - Each callback has AMQPMessage $message parameter with returned message, so you don't need to handle sequence numbers (delivery tag) to understand which message this callback belongs to. +The callback does not have the `Message` that corresponds to the `ConfirmationFrame` that is returned by `.result()` or +contained in the `DeliveryError`. +You can use sequence numbers (delivery tag) to understand which message this callback belongs to or retrieve additional +information using a `ContextVar`_. +The `TimeoutError` does not contain a `ConfirmationFrame`, so a `ContextVar` is required to get additional information +about the message that triggered the timeout. + +.. _ContextVar: https://docs.python.org/3/library/contextvars.html#contextvars.ContextVar Summary +++++++ From 01618df73b12392de1ecaed26ae93cb92f96c467 Mon Sep 17 00:00:00 2001 From: MaPePeR Date: Wed, 7 Jun 2023 10:54:55 +0200 Subject: [PATCH 4/7] Mention `on_return_raises` in publisher confirms tutorial. --- docs/source/rabbitmq-tutorial/7-publisher-confirms.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst index 86fbaec1..a72376f2 100644 --- a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst +++ b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst @@ -72,6 +72,8 @@ In the previous example we publish a message as usual and wait for its confirmat The :code:`await` returns as soon as the message has been confirmed. If the message is not confirmed within the timeout or if it is nack-ed (meaning the broker could not take care of it for some reason), the :code:`await` will throw an exception. +The :code:`on_return_raises` parameter of :code:`aio_pika.connect()` and :code:`connection.channel()` controls this behaivior for if a mandatory +message is returned. The handling of the exception usually consists in logging an error message and/or retrying to send the message. Different client libraries have different ways to synchronously deal with publisher confirms, so make sure to read From 01b2b37eeda6f40170fac2689334e9f5840411fa Mon Sep 17 00:00:00 2001 From: MaPePeR Date: Wed, 7 Jun 2023 11:23:53 +0200 Subject: [PATCH 5/7] Create task to send batched messages and handle confirmations with helper. --- .../rabbitmq-tutorial/7-publisher-confirms.rst | 13 +++++++++++++ .../7-publisher-confirms/publish_batches.py | 13 +++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst index a72376f2..e70275b2 100644 --- a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst +++ b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst @@ -101,6 +101,19 @@ One drawback is that we do not know exactly what went wrong in case of failure, to log something meaningful or to re-publish the messages. And this solution is still synchronous, so it blocks the publishing of messages. +.. note:: + + To initiate message sending asynchronously, a task is created with :code:`asyncio.create_task`, so the execution of our function + is handled by the event-loop. + The :code:`await asyncio.sleep(0)` is required to make the event loop switch to our coroutine. + Any :code:`await` would have sufficed, though. + Using :code:`async for` with an :code:`async` generator also requires the generator to yield control flow with :code:`await` for message + sending to be initiated. + + Without the task and the :code:`await` the message sending would only be initiated with the :code:`asyncio.gather` call. + For some applications this behaivior might be acceptable. + + Strategy #3: Handling Publisher Confirms Asynchronously +++++++++++++++++++++++++++++++++++++++++++++++++++++++ diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py index cf37bb8b..ceafdb66 100644 --- a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_batches.py @@ -25,12 +25,17 @@ async def main() -> None: # Sending the messages for msg in get_messages_to_publish(): outstanding_messages.append( - channel.default_exchange.publish( - Message(msg), - routing_key=queue.name, - timeout=5.0, + asyncio.create_task( + channel.default_exchange.publish( + Message(msg), + routing_key=queue.name, + timeout=5.0, + ) ) ) + # Yield control flow to event loop, so message sending is initiated: + await asyncio.sleep(0) + if len(outstanding_messages) == batchsize: await asyncio.gather(*outstanding_messages) outstanding_messages.clear() From 2813b50d7c38443b348e334ec81cc33de42dcc5f Mon Sep 17 00:00:00 2001 From: MaPePeR Date: Wed, 7 Jun 2023 11:56:48 +0200 Subject: [PATCH 6/7] Use helper function to handle confirmations asynchronously in strategy #3. --- .../7-publisher-confirms.rst | 24 +++++-------- .../publish_asynchronously.py | 36 ++++++++++--------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst index e70275b2..3fca8af4 100644 --- a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst +++ b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst @@ -117,28 +117,22 @@ And this solution is still synchronous, so it blocks the publishing of messages. Strategy #3: Handling Publisher Confirms Asynchronously +++++++++++++++++++++++++++++++++++++++++++++++++++++++ -The broker confirms published messages asynchronously, one just needs to register a callback on the client to be notified of these confirms: - -.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py - :language: python - :pyobject: handle_confirm +The broker confirms published messages asynchronously, our helper function will publish the messages and be notified of these confirms: .. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py :language: python :start-at: with asyncio.TaskGroup - :end-at: add_done_callback + :end-at: asyncio.sleep(0) -The `.result()` method will either return a `aiormq.abc.ConfirmationFrameType` for confirmed messages -or raise an Exception for nack-ed messages (messages that can be considered lost by the broker). +The :code:`TaskGroup` is required to ensure that all tasks are awaited properly. -The callback does not have the `Message` that corresponds to the `ConfirmationFrame` that is returned by `.result()` or -contained in the `DeliveryError`. -You can use sequence numbers (delivery tag) to understand which message this callback belongs to or retrieve additional -information using a `ContextVar`_. -The `TimeoutError` does not contain a `ConfirmationFrame`, so a `ContextVar` is required to get additional information -about the message that triggered the timeout. +The helper function publishes the message and awaits the confirmation. +This way the helper function knows which message the confirmation, timeout or rejection belongs to. + +.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py + :language: python + :pyobject: publish_and_handle_confirm -.. _ContextVar: https://docs.python.org/3/library/contextvars.html#contextvars.ContextVar Summary +++++++ diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py index a7057aff..28bdc850 100644 --- a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py @@ -1,8 +1,8 @@ import asyncio from aio_pika import Message, connect - from aiormq.exceptions import DeliveryError +from pamqp.commands import Basic def get_messages_to_publish(): @@ -10,19 +10,20 @@ def get_messages_to_publish(): yield f"Hello World {i}!".encode() -def handle_confirm(confirmation): +async def publish_and_handle_confirm(exchange, queue_name, message_body): try: - _ = confirmation.result() - # code when message is ack-ed - except DeliveryError: - # code when message is nack-ed - pass + confirmation = await exchange.publish( + Message(message_body), + routing_key=queue_name, + timeout=5.0, + ) + except DeliveryError as e: + print(f"Delivery of {message_body!r} failed with exception: {e}") except TimeoutError: - # code for message timeout - pass + print(f"Timeout occured for {message_body!r}") else: - # code when message is confirmed - pass + if not isinstance(confirmation, Basic.Ack): + print(f"Message {message_body!r} was not acknowledged by broker!") async def main() -> None: @@ -40,12 +41,15 @@ async def main() -> None: # Sending the messages for msg in get_messages_to_publish(): tg.create_task( - channel.default_exchange.publish( - Message(msg), - routing_key=queue.name, - timeout=5.0, + publish_and_handle_confirm( + channel.default_exchange, + queue.name, + msg, ) - ).add_done_callback(handle_confirm) + ) + # Yield control flow to event loop, + # so message sending is initiated: + await asyncio.sleep(0) print(" [x] Sent and confirmed multiple messages asynchronously. ") From 6d3d07318aae9b797a4f043754a24f2b05bdb430 Mon Sep 17 00:00:00 2001 From: MaPePeR Date: Wed, 7 Jun 2023 16:00:12 +0200 Subject: [PATCH 7/7] Replace `TaskGroup` with `list` and `gather` for python<3.11 compatability. --- .../7-publisher-confirms.rst | 6 ++-- .../publish_asynchronously.py | 28 +++++++++++-------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst index 3fca8af4..88d99593 100644 --- a/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst +++ b/docs/source/rabbitmq-tutorial/7-publisher-confirms.rst @@ -121,10 +121,10 @@ The broker confirms published messages asynchronously, our helper function will .. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py :language: python - :start-at: with asyncio.TaskGroup - :end-at: asyncio.sleep(0) + :start-at: # List for storing tasks + :end-at: await asyncio.gather(*tasks) -The :code:`TaskGroup` is required to ensure that all tasks are awaited properly. +In Python 3.11 a :code:`TaskGroup` can be used instead of the :code:`list` with :code:`asyncio.gather`. The helper function publishes the message and awaits the confirmation. This way the helper function knows which message the confirmation, timeout or rejection belongs to. diff --git a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py index 28bdc850..52b23b23 100644 --- a/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py +++ b/docs/source/rabbitmq-tutorial/examples/7-publisher-confirms/publish_asynchronously.py @@ -37,19 +37,23 @@ async def main() -> None: # Declaring queue queue = await channel.declare_queue("hello") - async with asyncio.TaskGroup() as tg: - # Sending the messages - for msg in get_messages_to_publish(): - tg.create_task( - publish_and_handle_confirm( - channel.default_exchange, - queue.name, - msg, - ) + # List for storing tasks + tasks = [] + # Sending the messages + for msg in get_messages_to_publish(): + task = asyncio.create_task( + publish_and_handle_confirm( + channel.default_exchange, + queue.name, + msg, ) - # Yield control flow to event loop, - # so message sending is initiated: - await asyncio.sleep(0) + ) + tasks.append(task) + # Yield control flow to event loop, so message sending is initiated: + await asyncio.sleep(0) + + # Await all tasks + await asyncio.gather(*tasks) print(" [x] Sent and confirmed multiple messages asynchronously. ")