Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

making publish asynchronous with a batching mechanism #32

Merged
merged 3 commits into from
Mar 7, 2023

Conversation

DanielePalaia
Copy link
Collaborator

@DanielePalaia DanielePalaia commented Feb 15, 2023

This PR gives the possibility to make the publish() asynchronous with the implementation of a batching mechanism, similarly to what we do in our GO/.NET client.
the publish now takes in input another parameter "send_batch_enabled", that if it is set to True enables the _get_or_create_publisher to create an asyncio task that will send messages in batch using the publish_batch after a given interval of time. The publish in turn will put the messages in a buffer instead of sending them directly (still happening when send_batch_enabled is set to True)

@qweeze
Copy link
Owner

qweeze commented Feb 16, 2023

Thanks @DanielePalaia
I like the idea of adding this buffering mechanism, but I have a couple of thoughts about implementation

My main concern is that the new flag send_batch_enabled completely changes the publish method's behavior and makes it way more complex.
We can't return publishing_id with send_batch_enabled and this kinda breaks the semantics of this method. Returning either id or 0 depending on if the flag enabled or not feels a little confusing
There's also possible confusion between publish_batch(...) and publish(..., send_batch_enabled=True)

I think maybe a better way would be to make a separate method for publishing with buffering.
Keeping this async batching publishing mechanism separately from regular publishing should allow us to provide more clear API and simpler internal implementation. For example (just an idea):

async with producer.background_sender(stream="mystream", interval=0.2) as sender:
    # ...
    for msg in messages:
        sender.send(msg)  # no need for await here

This way Producer class won't need to have all the extra attributes (_buffered_messages, task, ...) and context manager makes managing background task more explicit. Also if we pass stream and publisher_name at initialization time we won't have to make a call to _get_or_create_publisher and acquire a lock on each message. So the send method can just do self.buffer.append(message)

Or we could additionally even make a separate public class that wraps producer:

sender = BackgroundSender(producer, stream)
await sender.start()
async for msg in messages:
    sender.send(msg)
await sender.stop()

@DanielePalaia
Copy link
Collaborator Author

Hi @qweeze yes I agree with your comment that is a good idea to separate the two versions but, the fact is that the main idea is to have the publish() run asynchronously by default and leave just the batch_send() run synchronous in line with the other clients.

For the moment I just left the send_batch_enabled parameter in order to be able to do some tests/performance profiling with older/newer version ecc of the publish() but I was planning to remove the synchronous part eventually.

Also another part of the task will be to not let the publish_batch blocked on_confirm but to be able to manage this on a separate thread on client side.

Also discussion with @Gsantomaggio while is a good idea to rely on the ContextManager to manage few things, we prefer the API to be similar to the other clients (having here both background_sender and a send may be too different from the other clients apis)

@qweeze
Copy link
Owner

qweeze commented Feb 25, 2023

have the publish() run asynchronously by default and leave just the batch_send() run synchronous in line with the other clients

Oh I see. As a reference, aiokafka provides two methods - send and send_and_wait, maybe we can do something similar?

to not let the publish_batch blocked on_confirm but to be able to manage this on a separate thread on client side.

I'm not sure I fully understand this, could you please explain a bit more? Or maybe provide a code snippet

@Gsantomaggio
Copy link
Collaborator

I'm not sure I fully understand this, could you please explain a bit more? Or maybe provide a code snippet

By protocol, the publish confirmation is asynchronous, see for example, the DotNet client where the confirmation is done by a separated thread. This increases the performance by not blocking the producing messages waiting for confirmation.

DotNet (like other clients) implements a backpressure pattern to avoid flooding the server, see:
https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#_creating_a_producer
MaxInFlight parameter.

We'd like to implement this client in the same way, avoiding blocking the publisher by waiting for the confirmation.

So this client would have:

  • send: asynchronous, automatic buffer and send the messages

  • batch_send: Synchronous, the user buffers the messages and sends them

  • publish_confirm: A call-back where the user will receive the confirmation ids in a separate thread.

In this way the python will have the same behaviour like the other clients.

Let us know, so we can go ahead.

thank you

@qweeze
Copy link
Owner

qweeze commented Feb 27, 2023

Thanks for clarifying @Gsantomaggio
I'm totally fine with your plan, just a few comments to make sure there's no misunderstanding:

  1. publish confirmation is asynchronous, and is done by a separated thread

Currently we also have asynchronous confirmations handling which are done in a separate coroutine (Client._listener), so the difference is that you want users to be able to pass their own handler function, right?

  1. This increases the performance by not blocking the producing messages waiting for confirmation

We currently have sync flag, with sync=False, publish() doesn't wait for confirmation

for msg in messages:
    await producer.publish(stream, msg, sync=False)

(But in terms of performance it still waits for sending each message, that's the reason why batching is faster than consecutive calls to publish())

  1. With automatic buffering, do you want to support publisher_name parameter? If yes, how the buffering will work in such case?

  2. Another thing I just want to point out that if we allow handling confirmations only in a callback, then a scenario when a user wants to publish a message and make sure it is delivered will become much harder to implement. So maybe we can have both sync and async methods, like send and send_and_wait / send(wait=True)?

@Gsantomaggio
Copy link
Collaborator

With automatic buffering, do you want to support publisher_name parameter? If yes, how the buffering will work in such case?

publisher_name == Reference is needed only for deduplication, by default should not be used.

Java and DotNet use two different ways to handle it. For example, in DotNet there is a specific class for that where you explicitly pass the id. The idea would be to have something similar here.

send and send_and_wait / send(wait=True)?

Ok so:
1 - send is async - the client buffers and sends the messages.
2 - send_batch is synchronous

3 - send_wait- is synchronous ( that's mandatory to wait for the confirmation).

1 and 2 use the user callback for confirmation.

1- Easy to use and fast for throughput
2- Useful for latency; there is an interesting thread about that . (Go client batch vs send batch)

3- Different use cases where the user doesn't need performances but more control in confirmation. But here, we'd need to introduce some logic like:

  • What happens if the client does not receive the confirmation within X seconds? else, the wait is not helpful and unpredictable.

@Gsantomaggio Gsantomaggio mentioned this pull request Feb 28, 2023
@DanielePalaia DanielePalaia force-pushed the implement_async_publish branch 6 times, most recently from e2c4845 to d979f15 Compare February 28, 2023 14:36
@DanielePalaia
Copy link
Collaborator Author

Hi @Gsantomaggio and @qweeze I updated the PR with the API discussed! Feel free to have a look to it. For the moment I updated the tests using send_wait and just added a test with send (asynchronous). Will add new ones afterwards.

@Gsantomaggio
Copy link
Collaborator

Ok great @DanielePalaia in this way have the send API like the other clients. You should update also the README with the basic example.

We will write the documentation with all the other methods.

@Gsantomaggio Gsantomaggio marked this pull request as ready for review March 1, 2023 08:18
Copy link
Collaborator

@Gsantomaggio Gsantomaggio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DanielePalaia Can you please update the README?

@DanielePalaia DanielePalaia force-pushed the implement_async_publish branch 5 times, most recently from c1a4127 to 6b9b2bc Compare March 1, 2023 08:44
@DanielePalaia
Copy link
Collaborator Author

@Gsantomaggio Done!

@DanielePalaia DanielePalaia force-pushed the implement_async_publish branch 2 times, most recently from 1e12e18 to d86b7fe Compare March 1, 2023 13:35
@DanielePalaia
Copy link
Collaborator Author

Added few more tests as well for send()

Copy link
Owner

@qweeze qweeze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DanielePalaia added some comments, mostly minor issues (code style, etc)

rstream/producer.py Outdated Show resolved Hide resolved
rstream/producer.py Outdated Show resolved Hide resolved
rstream/producer.py Outdated Show resolved Hide resolved
rstream/producer.py Outdated Show resolved Hide resolved
rstream/producer.py Outdated Show resolved Hide resolved
rstream/producer.py Outdated Show resolved Hide resolved
rstream/producer.py Outdated Show resolved Hide resolved
async with self._buffered_messages_lock:
self._buffered_messages[stream].append(message)

await asyncio.sleep(0)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm do we really need it here? This statement only makes sense if there's no other __await__ statements in a function and we want to force a context switch anyway

Copy link
Collaborator Author

@DanielePalaia DanielePalaia Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qweeze. I would like to discuss about this point because I think it's the most important one. I agree with you on this but apparently on the tests made it appears that if I don't force the context switch there, all the cpu get taken by the calling process which is continuously calling send() and the background thread don't get even activated. I'm not really an asyncio expert but I think it is related on what explained on this article here: https://towardsdatascience.com/asyncio-is-not-parallelism-70bfed470489.
Also I see around that using asyncio.sleep(0) is not considered a bad practice with this library after all (https://superfastpython.com/asyncio-sleep/).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if starting the task before we start looping to send() messages may improve the situation without forcing us to use the sleep. Will do few tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I'm having the same behavior starting the thread even before looping for send(), it seems like the sleep() is necessary.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the expected behavior because asyncio's concurrency model is different from threading (cooperative vs preemptive multitasking) - the event loop can only switch to another coroutine when the current coroutine is suspended waiting for future result. That's why any long-running sync code effectively blocks the event loop and considered a bad thing when writing asyncio apps

In general as a library authors we can't prevent users from blocking the event loop - for example, someone can just put time.sleep(5) in their code and it will block our background _timer task. But it's expected that a programmer is aware that they should avoid blocking code.

In this specific case with send method I think the problem is that the method is async, but it actually awaits IO only on the first call, when self._publishers and self._clients are not initialized. And on subsequent calls all the operations are sync, which makes the call effectively blocking. So I agree that we can use asyncio.sleep(0) here for consistency.

Or alternatively we can use asyncio.Queue instead of a list for buffering messages, that way we'll have "natural" context switch, and also a way to limit the number of messages in a buffer. (also _buffered_messages_lock will become unnecessary)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback.
Ok great we will try with asyncio.Queue. In case of problems with asyncio.Queue we will merge the current version if there isn't any other feedback.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a few tests trying to use asyncio.Queue but it seems like performance deteriorates even when removing the old lock we used for the list. Also the send uses internally the send_batch which takes in input a normal list[]. So some sort of conversion is needed. I think for the moment we can keep it with the sleep(0) and open a new issue to better investigate the usage of asyncio.Queue

rstream/producer.py Outdated Show resolved Hide resolved
rstream/producer.py Outdated Show resolved Hide resolved
@DanielePalaia DanielePalaia force-pushed the implement_async_publish branch 2 times, most recently from 6c28751 to 9bc4bce Compare March 1, 2023 17:07
@DanielePalaia
Copy link
Collaborator Author

Hi @qweeze thanks a lot for your feedbacks. Yes I agree with you that we should definitively go for PEP8. I fixed a few straightforward suggestions already and I will mark them as resolved. I will come back to you tomorrow for the ones that may require some discussion.

@DanielePalaia DanielePalaia force-pushed the implement_async_publish branch 2 times, most recently from 9284c39 to 0e1741b Compare March 2, 2023 13:33
@DanielePalaia
Copy link
Collaborator Author

Hi @qweeze I reviewed the PR with the input you provided, except the two open issues that maybe require more discussion!

@DanielePalaia
Copy link
Collaborator Author

Hi, I made the last fixes as suggested. For the moment sending 1 milion of messages in a stream with the old send (non-blocking) was taking around a minute while now with the buffering mechanism we are at around 38seconds. From this one I will open new issues regarding on_publish_confirmation we discussed earlier, the usage of asyncio.Queue, and maybe improve the locking mechanisms)

@DanielePalaia DanielePalaia merged commit 1896383 into qweeze:master Mar 7, 2023
@DanielePalaia DanielePalaia deleted the implement_async_publish branch March 7, 2023 08:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants