Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream backpressure #323

Open
rohiievych opened this issue Jul 13, 2022 · 4 comments
Open

Implement stream backpressure #323

rohiievych opened this issue Jul 13, 2022 · 4 comments

Comments

@rohiievych
Copy link
Member

The task

Metacom streaming consists of data source on one end and reader on another. These endpoints are independent in sense of data stream speed. It means the stream overflow is possible. To prevent the case when the data source produces chunks too fast for reader's consumption capability we can implement backpressure control over the stream.

How to implement

  1. Extend stream protocol with messages to pause and resume remote endpoint.
  2. Create streaming queue threshold for which consumer can pause producer to free stream channel.
  3. When stream queue is capable to hold corresponding amount of data - notify producer to resume its data flow.

Create a PR to branch streams

@pgnedoy
Copy link
Contributor

pgnedoy commented Jul 27, 2022

When the pause event is sent, MetacomWritable start pausing sending data and waiting for the resume event. All this time data piped to the MetacomWritable should be stored into MetacomWritable's internal buffer or MetacomWritable.write should return false and when resumed emitting drain event?

cc @tshemsedinov @rohiievych

@rohiievych
Copy link
Member Author

When the pause event is sent, MetacomWritable start pausing sending data and waiting for the resume event. All this time data piped to the MetacomWritable should be stored into MetacomWritable's internal buffer or MetacomWritable.write should return false and when resumed emitting drain event?

cc @tshemsedinov @rohiievych

Readable should pause writable if it's internal queue is being emptied too slowly, what leads to stream overflow. The main goal of this mechanism is to prevent such overflow, so the data between pause and resume is collected in the readable's internal queue.

@pgnedoy
Copy link
Contributor

pgnedoy commented Jul 27, 2022

When the pause event is sent, MetacomWritable start pausing sending data and waiting for the resume event. All this time data piped to the MetacomWritable should be stored into MetacomWritable's internal buffer or MetacomWritable.write should return false and when resumed emitting drain event?
cc @tshemsedinov @rohiievych

Readable should pause writable if it's internal queue is being emptied too slowly, what leads to stream overflow. The main goal of this mechanism is to prevent such overflow, so the data between pause and resume is collected in the readable's internal queue.

That's clear. I am about another. We have the following flow: readable.pipe(MetacomWritable)->(Metacom)->(MetacomReadable).
Readable does pause writable. At this time data is still piped to the MetacomWritable from another source. How to handle that? Will set MetacomWritable unable to process incoming chunks? So as I mentioned above make MetacomWritable.write returning false, so pipe will wait for the drain event prior to sending incoming chunks.

@rohiievych
Copy link
Member Author

@pgnedoy
Yes, we can make write() to return false and wait for a drain event. This is like native nodejs streams do. But I suppose we can make async write(), which will be waiting for a drain internally, so the writing source will wait for writable's availability.

@rohiievych rohiievych assigned rohiievych and pgnedoy and unassigned rohiievych Jul 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants