-
Notifications
You must be signed in to change notification settings - Fork 24
Add an asynchronous bytes provider #320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
00a1103 to
cca9900
Compare
This adds a class that allows bytes to be asynchronously exchanged. This primarily serves the purpose of enabling event streaming. Event streams will create a provider under the hood that will be written to. That provider will then be passed as the trasnport request body, which will read it as an async bytes iterable.
cca9900 to
30a3382
Compare
| """ | ||
|
|
||
| def __init__( | ||
| self, intial_data: bytes | None = None, max_buffered_chunks: int = 16 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious why/how we arrived at 16 for buffering? Do we have something informing that or is it just an arbitrary default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's arbitrary. Ultimately there's only one coroutine reading, and N coroutines writing. If N is greater than 1, the buffer will grow. That said, the reader is going to work very quickly since it just dumps the data into a BytesIO immediately. Writers will likely be slower to push data. And I don't expect there to be that many writers to a single stream anyhow.
Depending on how real world usage ends up, we might be able to improve performance by having separate Conditions for readers and writers that share a lock. That may result in the side with fewer participants cycling through their queue faster. But I'm not certain about that, and it seems premature. I'd rather wait until we have actual benchmarks on a live service before adding that extra complication.
Co-authored-by: Nate Prewitt <nate.prewitt@gmail.com>
nateprewitt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once tests pass.
This adds a class that allows bytes to be asynchronously exchanged. This primarily serves the purpose of enabling event streaming. Event streams will create a provider under the hood that will be written to. That provider will then be passed as the trasnport request body, which will read it as an async bytes iterable.
This will also fail until the protocol test suppression is merged.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.