Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer RabbitMQ publisher confirms tutorial to aio-pika. #550

Merged
merged 7 commits into from Jun 7, 2023

Conversation

MaPePeR
Copy link
Contributor

@MaPePeR MaPePeR commented May 29, 2023

Because I was experimenting with it anyway I gave it a shot to transfer the RabbitMQ Publisher Confirms tutorial to aio-pika

Someone who actually knows what they are doing should take a deep look at this, though (I don't).
Especially my example codes. (For some reason the batch example is faster than the asynchronous example on my limited testing?)

  • Is await asyncio.wait_for(asyncio.gather(...)) the correct construct?
  • wait_for cancels the tasks when a timeout occurs. Is it OK to cancel the confirmation tasks when a timeout occurs or should they be shielded?
  • I used a TaskGroup in the asynchronous example to make sure the tasks are awaited somewhere. Is this the correct approach?
  • Which exceptions are raised and how to use them in the asynchronous example?

@MaPePeR
Copy link
Contributor Author

MaPePeR commented May 30, 2023

I just noticed, that it would have probably been better to use the timeout= parameter instead of wait_for with a timeout.

@coveralls
Copy link

coveralls commented May 30, 2023

Coverage Status

coverage: 88.26%. remained the same when pulling 6d3d073 on MaPePeR:publisher_confirms_tutorial into 02c731a on mosquito:master.

@mosquito
Copy link
Owner

@MaPePeR thank you for this changes, please fix linter issues in actions

docs/source/rabbitmq-tutorial/7-publisher-confirms.rst Outdated Show resolved Hide resolved
docs/source/rabbitmq-tutorial/7-publisher-confirms.rst Outdated Show resolved Hide resolved

def handle_confirm(confirmation):
try:
result = confirmation.result()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think will be better check the instance type for ConfirmationFrameType . And add separate branches for Basic.Ack, Basic.Nack, Basic.Reject.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admittedly I didn't put a lot of time into it, but I couldn't figure out a good way to actually import Basic.Ack without going through aiormq, which was unexpected to me (maybe my expectations are wrong, though).
I expected something like aio_pika.messages.Basic.Ack or just aio_pika.Basic.Ack (what works is aio_pika.message.aiormq.spec.Basic.Ack, but that is unwieldy)
There is no example or documentation that uses aiormq, yet.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just remember why it's done that way, the point is that it just creates asyncio.Future instances, and you need to explicitly return them without creating a coroutine, this will give a good balance between heavy task creation if the user wants to pass it to asyncio.wait, or asyncio.gather.

So asyncio.wait is a really good way in this case to separate responses with ack from others. And we shouldn't forget about returning messages which will also be considered an error, and if the brocker passed the message body back, it will be in the exception instance.

@MaPePeR
Copy link
Contributor Author

MaPePeR commented May 31, 2023

I changed the paragraph to state, that one needs to use the delivery tag to determine which Message the ConfirmationFrame belongs to, because aio-pika does not provide it in the callback.

But I just noticed: I don't think there is a way in aio-pika to actually retrieve the generated delivery tag or message_id for a message?

The PHP library, for example, writes DeliveryInfo back into the provided message object, but aio-pika does not.

If it would one could do something like this to carry the message and delivery tag to the handler:

                message_var.set(Message(msg)) #This is a reference, so `publish()` can modify it, before it is picked up in handle_confirm
                task = tg.create_task(
                    channel.default_exchange.publish(
                        message_var.get(),
                        routing_key=queue.name,
                        timeout=5.0,
                    )
                ).add_done_callback(handle_confirm)

But without that I don't really see a good way to actually handle these confirmations asynchronously? But maybe I'm missing something here. I'm quite new to this, after all.

Comment on lines 13 to 27
def handle_confirm(confirmation):
try:
_ = confirmation.result()
# code when message is ack-ed
except DeliveryError:
# code when message is nack-ed
pass
except TimeoutError:
# code for message timeout
pass
else:
# code when message is confirmed
pass

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def handle_confirm(confirmation):
try:
_ = confirmation.result()
# code when message is ack-ed
except DeliveryError:
# code when message is nack-ed
pass
except TimeoutError:
# code for message timeout
pass
else:
# code when message is confirmed
pass

Comment on lines 39 to 54
async with asyncio.TaskGroup() as tg:
# Sending the messages
for msg in get_messages_to_publish():
tg.create_task(
channel.default_exchange.publish(
Message(msg),
routing_key=queue.name,
timeout=5.0,
)
).add_done_callback(handle_confirm)

print(" [x] Sent and confirmed multiple messages asynchronously. ")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with asyncio.TaskGroup() as tg:
# Sending the messages
for msg in get_messages_to_publish():
tg.create_task(
channel.default_exchange.publish(
Message(msg),
routing_key=queue.name,
timeout=5.0,
)
).add_done_callback(handle_confirm)
print(" [x] Sent and confirmed multiple messages asynchronously. ")
# List for async tasks
tasks = []
# Sending the messages
for msg in get_messages_to_publish():
task = asyncio.create_task(
channel.default_exchange.publish(
Message(msg),
routing_key=queue.name,
timeout=5,
)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
success = 0
error = 0
for result in results:
if isinstance(result, (DeliveryError, asyncio.TimeoutError)):
error += 1
else:
success += 1
print(" [x] Sent and confirmed multiple messages asynchronously. ")
print("Success:", success)
print("Errored:", error)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this solves the problem. This is more like a variation of the batch confirmation, except it sends all messages as a single batch. It also has no way to determine which messages failed or succeeded. It only ever detects failures after all messages have been published, so it cannot react to delivery errors or confirmations asynchronously. It only works for a fixed amount of messages and not for sending an infinite amount of messages.

So I don't think this code suggestion matches the ideas that are presented as the asynchronous "Strategy 3" in the original RabbitMQ tutorial

I suppose with the new get_underlay_channel() method one could use channel.get_underlay_channel().delivery_tag to get the latest delivery tag:

awaiting_confirmations = {}
delivery_tag_var = ContextVar('delivery_tag')
#...

for msg in get_messages_to_publish():
    task = asyncio.create_task(
        channel.default_exchange.publish(
            Message(msg),
            routing_key=queue.name,
            timeout=5,
        )
    )
    delivery_tag = channel.get_underlay_channel().delivery_tag #Get last used delivery tag from underlying channel
    delivery_tag_var.set(delivery_tag) #Required, so we can also know the delivery_tag in case of Timeout. For DeliveryError we can get it from the frame contained in the exception
    awaiting_confirmations[delivery_tag] = msg #Store message body to maybe resend
    task.add_done_callback(handle_confirms)

What do you think?

@MaPePeR
Copy link
Contributor Author

MaPePeR commented Jun 3, 2023

I think I just noticed a fatal flaw in my examples (and possibly in aio-pika itself?). I assumed that calling publish would schedule the message to be sent immediately and calling await on the returned Awaitable would just wait for sending to be completed and it's confirmation. (is that a reasonable assumption? I thought so)

But thinking about that more I don't think this is the case at all.

In python a coroutine(very much in contrast to JavaScripts async function) is only scheduled to be executed if it is awaited or if it is turned into a task and something else is yielding control flow.

asyncio docs:

Note that simply calling a coroutine will not schedule it to be executed:

In my current batch example that does not happen, meaning, that the messages are not even started to be sent to the broker until a batch is completed with asyncio.gather? A simple async for to yield control flow in-between iterations would not solve it, because the coroutines are not created as tasks. That does not feel very asynchronous to me.
And creating a Task for every message send seems wasteful/undesirable to me if the goal is batch processing?
(This approach could also mess with the message sending order, which in some use cases might be very important? Because now message order depends on the event loop?)

I haven't confirmed that suspicion, yet, though.

If this is correct that makes me wonder if publish should even be an async function?
asyncio.StreamWriter.write, for example, is not a coroutine and attempts writing immediately.

It does not have to be async to return an Awaitable and could call and return an internal coroutine without awaiting it.

@mosquito
Copy link
Owner

mosquito commented Jun 5, 2023

@MaPePeR asyncio.create_task plans a task and starts executing it at the first context switch, just like in javascript, if you write while () {} it won't continue because the context switch won't happen.

Even if you try to make StreamWriter.write be called as soon as possible, it is just writing data to the buffer, it may not be sent over the network if the event loop is blocked (the same while 1: pass), this really like in the JS, but in that case the event-loop in implicitly hideen by JS-runtime.

That's the way the world works, not perfectly, of course.

@MaPePeR
Copy link
Contributor Author

MaPePeR commented Jun 5, 2023

just like in javascript

This is not true.
In Javascript, when an async function is called, the context is immediately switched to that function. Only on await does it switch context to something else.

Compare these two examples between JavaScript and Python:

async function asyncFunction() {
    console.log("2. Starting async function synchronously");
    await new Promise(resolve => setTimeout(resolve, 1)); // something like asyncio.sleep
    console.log("4. End of async function");
}

console.log("1. Scheduling async function")
a = asyncFunction();
console.log("3. Do something else, then await");
await a
console.log("5. Done")

outputs:

1. Scheduling async function
2. Starting async function synchronously
3. Do something else, then await
4. End of async function
5. Done

compare to the python version:

async def asyncFunction():
    print("3. Starting async function only after it has been awaited")
    await asyncio.sleep(1)
    print("4. End of async function")


print("1. Scheduling async function")
a = asyncFunction()
print("2. Do something else, then await")
await a
print("5. Done")

outputs:

1. Scheduling async function
2. Do something else, then await
3. Starting async function only after it has been awaited
4. End of async function
5. Done

This means that, in Python, if you have a synchronous part in your async def function that is required to initialize the I/O, then your async function is actually not asynchronously doing I/O, because the I/O was never initialized.

From the Javascript documentation:

The body of an async function can be thought of as being split by zero or more await expressions. Top-level code, up to and including the first await expression (if there is one), is run synchronously. In this way, an async function without an await expression will run synchronously. If there is an await expression inside the function body, however, the async function will always complete asynchronously.

This is even true for the Promise in Javascript:

This promise is already resolved at the time when it's created (because the resolveOuter is called synchronously)

Compare to python documentation:

Note that simply calling a coroutine will not schedule it to be executed:

>>>main()
<coroutine object main at 0x1053bb7c8>

Even if you try to make StreamWriter.write be called as soon as possible, it is just writing data to the buffer, it may not be sent over the network

If StreamWriter.write is implemented correctly, it writes the data into a buffer that is handled by the Operating System.
Handing the write process over, so it does not depend on the runtime of our program anymore. So it can happen asynchronously of our program execution.

@mosquito
Copy link
Owner

mosquito commented Jun 5, 2023

This is not true.
In Javascript, when an async function is called, the context is immediately switched to that function. Only on await does it switch context to something else.

That is exacly what I write, the context switch will be does when first next await statement.

function sleep (time) {
  return new Promise((resolve) => setTimeout(resolve, time));
}

async function delayed() {
    await sleep(1);
    console.log("Hello");
}

function main() {
    var result = delayed();
    while (1) {};
}

console.log("Running");
main();

// No any hello has been written

@MaPePeR
Copy link
Contributor Author

MaPePeR commented Jun 5, 2023

If you put a console.log before the await sleep(1) it will be executed regardless of the infinite loop. In Python it wont.

And that is the location of the synchronous code that actually creates and hands over the asynchronous IO work to the OS so it can be done asynchronously.

@mosquito
Copy link
Owner

mosquito commented Jun 5, 2023

@MaPePeR the similar example on python, no differencies IMHO:

import asyncio


def sleep(time):
    print("start sleeping")
    future = asyncio.Future()
    asyncio.get_running_loop().call_later(time, future.set_result, None)
    return future


async def delayed():
    await sleep(1)
    print("Hello")


async def main():
    result = delayed()
    while True:
        pass

print("Running")
asyncio.run(main())

@MaPePeR
Copy link
Contributor Author

MaPePeR commented Jun 5, 2023

The difference occurs if you put the print/console.log before the await sleep(1).

I already provided example code with output above:

Javascript:                              | Python:
1. Scheduling async function             | 1. Scheduling async function
2. Starting async function synchronously | 2. Do something else, then await
3. Do something else, then await         | 3. Starting async function only after it has been awaited
4. End of async function                 | 4. End of async function
5. Done                                  | 5. Done 

See the swapped order in 2. and 3.

@mosquito
Copy link
Owner

mosquito commented Jun 5, 2023

@MaPePeR the real reason is the JS has no a coroutines, just Promises (aka Futures in python). Coroutines is like a generators not starting before the __await__ will be called. But I really doesn't understand why it's a problem?

@MaPePeR
Copy link
Contributor Author

MaPePeR commented Jun 5, 2023

In the batch example I assumed that when calling the publish method without awaiting (or "gathering") its result would happily already initiated sending the message to the broker. Doing the IO in the background, while chugging along.
This is was I understand as "asynchronous IO".

In computer science, asynchronous I/O (also non-sequential I/O) is a form of input/output processing that permits other processing to continue before the transmission has finished.
[...]
[...] it is possible to start the communication and then perform processing that does not require that the I/O be completed. This approach is called asynchronous input/output. Any task that depends on the I/O having completed (this includes both using the input values and critical operations that claim to assure that a write operation has been completed) still needs to wait for the I/O operation to complete, and thus is still blocked, but other processing that does not have a dependency on the I/O operation can continue.

This is what happens if you use aio_write and probably what happens with StreamWriter.write.

What happens instead is, that the message is queued locally and only upon calling await is the sending initiated. Resulting in a (potentially) huge message spike when using asyncio.gather and also adds latency to all the messages.

For some/maybe a lot of use cases that might not be a dealbreaker. But asynchronous I/O it is not.

Slapping async on a function also makes it incredible hard to actually get this asynchronous behavior. The only way I found so far was to use asyncio.create_task(coro()) and then use asyncio.sleep(0), so the event loop switches to the coroutine. But I don't think a lot of people actually do that, so the execution is not actually asynchronous and it requires creating a Task and a "weird" call to asyncio.sleep.

In Javascript you get this behavior for free. In Python you have to do something like this to fix it from a library perspective:

def sendMessage(msg) -> Awaitable:
    # do message preprocessing
    awaitable = some.internal.lib.sendMessageAsync(preprocessed_msg)
    return asyncWaitForSend(awaitable) # Calling async function from non-async function is fine. We can't just await it, but we leave awaiting to the caller

async def asyncWaitForSend(awaitable):
    send_result = await awaitable
    # do message post processing
   return processed_send_result

And this is only asynchronous if some.internal.lib.sendMessageAsync also uses this or a similar pattern to actually initiate the sending synchronously.

@MaPePeR
Copy link
Contributor Author

MaPePeR commented Jun 5, 2023

There was a discussion about the "sync write"/"async read" asymmetry in a google group. I think Martin Teichmann has a good explanation:

[On the topic of why write is not async, but read is]
This is actually very simple and obvious: when you call read, you
need the result of that to continue working, with write, you don't.
So you need to wait for the former, but not for the latter.
There simply is no symmetry between read and write when it comes
to asyncio. This is like asking "why does write have a parameter
data, while read doesn't?", well, because, they are different.

The entire coolness of asyncio lies in the fact that you clearly mark
the places where concurrency enters. By making just everything a
coroutine, this great idea just disappears. And write is a very good
example for this. Sometimes in my code I have two writes:

writer.write("something")
writer.write("else")

because this is asyncio, I know that those two writes will be called
directly after each other, and that the data goes out into the world
just like that. This is the large advantage over multi-threaded
programming: there some other thread might write something
in between those lines, and the result is just some mingled data.

Sure, in multi-threaded programming I can just use mutexes. But
you quickly run into large problems with hardly debuggable code.

So in short: when designing an API for use with asyncio, try
to limit the use of coroutines to where it is really, really
necessary.

@mosquito
Copy link
Owner

mosquito commented Jun 5, 2023

@MaPePeR as I wrote earlier, first of all you have no guarantee that the data is sent, because write is just send it to the buffer, and if that buffer is full or tcp-window is full, the remote party doesn't respond, the data won't send anywhere. Secondly, the AMQP 0.9.x protocol is designed so that one message consists of at least three frames, and before all of them are sent, no other frames can be sent to that channel, otherwise this will lead to channel closing with an error. For this reason, you must take a lock on sending a message in a particular channel.

@MaPePeR
Copy link
Contributor Author

MaPePeR commented Jun 5, 2023

Yes, there are no guarantees that the data is sent, but there is a pledge to attempt to send the data as soon as possible. The StreamWriter.write doc describes it like this:

The method attempts to write the data to the underlying socket immediately. If that fails, the data is queued in an internal write buffer until it can be sent.

aio_write promises to have the request enqueued (I think that means, that this function is blocking if the queue is full):

The "asynchronous" means that this call returns as soon as the request has been enqueued; the write may or may not have completed when the call returns.

It doesn't really matter, that there are exceptions that result in the data not being sent immediately, but queued instead. The big benefit of asyncio is, that they are sent immediately when the buffer and tcp-window aren't full.

I don't see how the multiple frames are a problem. There is only every one coroutine/task active anyway and as long as that does not do any await calls in between writing all frames they will be in order. (Given, that the used write-method also isn't defined as async and has to be awaited, of course).

But fixing this is probably not really feasible, as its a problem that goes through all the layers. From exchange.publish to StreamWriter.write without an await. The asyncio.Queue also makes this impossible, because that enforces the context switch to read it. (Having a separate write thread and a janus.Queue could circumvent that)

Maybe my Mindset is just wrong. If I see an asynchronous function call I think it will do something and the await is to wait for its completion. I don't read it as "gently ask to execute something later and then wait for execution to happen and if you really want to execute it you need to create a Task and make sure you do some context switches"

confirmations= [] 
async for i in async_range(1000000):
    confirmations.append(exchange.publish(...)) # Don't await individually, so we can send a lot faster
await asyncio.sleep(5)
#Surely all messages are published now, just gather the confirmations...
await asyncio.gather(*confirmations) #Nevermind, I will start sending messages now

The fact that above code will only ever start to send messages when asyncio.gather is reached is just mindboggling to me.
This lazy evaluation also means, that I cannot use (await channel.get_underlay_channel()).delivery_tag to reliable get the delivery_tag for the last message to handle batch confirmations - because I have no way to determine if or how many coroutines/tasks were executed, so the tag might not have been incremented at all or already been incremented multiple times.

I think another pain point for using it asynchronously is, that the await exchange.publish is not "single purpose". It always waits for sending and confirmations. So there is only information returned about the confirmation, but not about the sending.
Having something like a Awaitable[MessageReciept] that resolves after sending and contains stuff like delivery_tag and an Awaitable[ConfirmationFrame] for the confirmation would make it easier to control and allow to get the required information.

@mosquito
Copy link
Owner

mosquito commented Jun 6, 2023

@MaPePeR this example is the simple way to get what you really want, I guess, but I really doesn't know why it's useful for real life.

import asyncio

from aio_pika import Message, connect


def get_messages_to_publish():
    for i in range(1000):
        yield f"Hello World {i}!".encode()


async def main() -> None:
    # Perform connection
    async with await connect("amqp://guest:guest@localhost/") as connection:

        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue("hello")

        # List for async tasks
        tasks = []

        async def publisher(body):
            print("Start sending:", body)
            result = await channel.default_exchange.publish(
                Message(msg),
                routing_key=queue.name,
                timeout=5,
            )
            print("Body sent:", body)
            return result

        # Sending the messages
        for msg in get_messages_to_publish():
            task = asyncio.create_task(publisher(msg))
            tasks.append(task)

            # enforce context switch and tasks will go on
            await asyncio.sleep(0)

        results = await asyncio.gather(*tasks, return_exceptions=True)
        print("Last delivery tag is:", results[-1].delivery_tag)
        # Last delivery tag is: 1000


if __name__ == "__main__":
    asyncio.run(main())

Usually in real practice you either need to send a huge batch of messages, and then you don't need a publisher confrms at all, and you should use transactions instead.

Or there are many coroutines is running in the application and sending messages independently, then this is useful and each of them will verify the status of a particular delivery.

Everything you described above is fine in some general and bookish case, and "should probably" work, but the world is not perfect, and if you try to implement your idea, on a message with a body of a couple of hundred megabytes, everything will break, believe me, I've already checked it.

btw: If you look at the commits at the link, you'll see that your idea is not new, and I gave it up back in 2017.

@MaPePeR MaPePeR force-pushed the publisher_confirms_tutorial branch from cf58e37 to 2813b50 Compare June 7, 2023 11:28
@MaPePeR
Copy link
Contributor Author

MaPePeR commented Jun 7, 2023

I tried to write the tutorial in a way, so it stays "true" to the original intent of the original rabbitmq tutorial I was copying. That's the reason.

I changed the examples now and thought that would be the final version, but I just noticed, that when I execute the "asynchronous" (Strategy 3) version with 100.000 messages it will start to timeout messages after ~5200. Probably because the event loops prefers creating new tasks over actually sending or receiving messages. 😕

@mosquito mosquito merged commit 05d3820 into mosquito:master Jun 7, 2023
9 checks passed
@mosquito
Copy link
Owner

mosquito commented Jun 7, 2023

@MaPePeR nice work. Thank you.

@MaPePeR MaPePeR deleted the publisher_confirms_tutorial branch June 7, 2023 15:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants