Skip to content

Commit

Permalink
Use helper function to handle confirmations asynchronously in strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
MaPePeR committed Jun 7, 2023
1 parent 01b2b37 commit 2813b50
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 31 deletions.
24 changes: 9 additions & 15 deletions docs/source/rabbitmq-tutorial/7-publisher-confirms.rst
Expand Up @@ -117,28 +117,22 @@ And this solution is still synchronous, so it blocks the publishing of messages.
Strategy #3: Handling Publisher Confirms Asynchronously
+++++++++++++++++++++++++++++++++++++++++++++++++++++++

The broker confirms published messages asynchronously, one just needs to register a callback on the client to be notified of these confirms:

.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py
:language: python
:pyobject: handle_confirm
The broker confirms published messages asynchronously, our helper function will publish the messages and be notified of these confirms:

.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py
:language: python
:start-at: with asyncio.TaskGroup
:end-at: add_done_callback
:end-at: asyncio.sleep(0)

The `.result()` method will either return a `aiormq.abc.ConfirmationFrameType` for confirmed messages
or raise an Exception for nack-ed messages (messages that can be considered lost by the broker).
The :code:`TaskGroup` is required to ensure that all tasks are awaited properly.

The callback does not have the `Message` that corresponds to the `ConfirmationFrame` that is returned by `.result()` or
contained in the `DeliveryError`.
You can use sequence numbers (delivery tag) to understand which message this callback belongs to or retrieve additional
information using a `ContextVar`_.
The `TimeoutError` does not contain a `ConfirmationFrame`, so a `ContextVar` is required to get additional information
about the message that triggered the timeout.
The helper function publishes the message and awaits the confirmation.
This way the helper function knows which message the confirmation, timeout or rejection belongs to.

.. literalinclude:: examples/7-publisher-confirms/publish_asynchronously.py
:language: python
:pyobject: publish_and_handle_confirm

.. _ContextVar: https://docs.python.org/3/library/contextvars.html#contextvars.ContextVar

Summary
+++++++
Expand Down
@@ -1,28 +1,29 @@
import asyncio

from aio_pika import Message, connect

from aiormq.exceptions import DeliveryError
from pamqp.commands import Basic


def get_messages_to_publish():
for i in range(10000):
yield f"Hello World {i}!".encode()


def handle_confirm(confirmation):
async def publish_and_handle_confirm(exchange, queue_name, message_body):
try:
_ = confirmation.result()
# code when message is ack-ed
except DeliveryError:
# code when message is nack-ed
pass
confirmation = await exchange.publish(
Message(message_body),
routing_key=queue_name,
timeout=5.0,
)
except DeliveryError as e:
print(f"Delivery of {message_body!r} failed with exception: {e}")
except TimeoutError:
# code for message timeout
pass
print(f"Timeout occured for {message_body!r}")
else:
# code when message is confirmed
pass
if not isinstance(confirmation, Basic.Ack):
print(f"Message {message_body!r} was not acknowledged by broker!")


async def main() -> None:
Expand All @@ -40,12 +41,15 @@ async def main() -> None:
# Sending the messages
for msg in get_messages_to_publish():
tg.create_task(
channel.default_exchange.publish(
Message(msg),
routing_key=queue.name,
timeout=5.0,
publish_and_handle_confirm(
channel.default_exchange,
queue.name,
msg,
)
).add_done_callback(handle_confirm)
)
# Yield control flow to event loop,
# so message sending is initiated:
await asyncio.sleep(0)

print(" [x] Sent and confirmed multiple messages asynchronously. ")

Expand Down

0 comments on commit 2813b50

Please sign in to comment.