Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Pull through streaming API to prevent back-pressure #542

Closed
jasperblues opened this issue Mar 15, 2020 · 22 comments · Fixed by #831 or #882
Closed

[Feature Request] Pull through streaming API to prevent back-pressure #542

jasperblues opened this issue Mar 15, 2020 · 22 comments · Fixed by #831 or #882
Labels
5.0 PR which should be merged to the 5.0 version feature request

Comments

@jasperblues
Copy link

I have written a Neo4j client library that uses this driver under the hood. (https://drivine.org/).

One of the use-cases is streaming without back-pressure. Back-pressure is where the source produces data faster than the sync can process, for example writing too quickly into a file-stream, say if we were producing a report that presents COVID-19 cases for a region.

The library includes an API as follows:

openCursor<T>(spec: QuerySpecification<T>): Promise<Cursor<T>>;

. . whereby you can open a Cursor<T> representing a set of results. This cursor is AsyncIterable (supports for await (const item of cursor) and more importantly can turn itself into a Node.JS Readable stream.

When these streams are piped together, it is the sink stream that will pull through from the source, at the rate the sink can handle. So no back-pressure problems, excessive memory usage or going over a high-water mark. Good for many use-case. We can easily create an Observable from a stream too.

How it is currently Implemented:

The way this is implemented is to fetch data from Neo4j in batches using SKIP and LIMIT.

I wanted to see if I can replace this using the driver's scheming capabilities, however from what I understand RxJS has no capability to handling back-pressure. It is a push-through library. Right?

For pull-through we should use the companion IxJS (https://github.com/ReactiveX/IxJS) instead. Pulling through will push the onus of handling back-pressure onto the source, which would need to hold the entire result set and emit items when requested. This should not be a problem, as it needs to hold the results in memory for a period of time in any case.

So how about supporting pull-through streaming at the driver level? Either as Readable streams or with IxJS. (Or apparently there is a way to do it in RxJS: https://itnext.io/lossless-backpressure-in-rxjs-b6de30a1b6d4 <--- I'm still digesting this article).

@zhenlineo
Copy link
Contributor

Hi @jasperblues,

We implemented back pressure with a buffer inside this driver with RxJS.

The idea is described in our upgrade guide
The impl of this buffer in this driver is in stream-observers

@jasperblues
Copy link
Author

Hi @zhenlineo ,

From what I can see, back-pressure is handled correctly in the driver using the buffering approach, however for the consumer/user I don't see a facility, for the memory-constrained use-case. We can handle back-pressure as follows:

  • Sampling (this is lossy, so not suitable for our case, but I mention as it is a strategy).
  • Buffering. <--- Requires memory.
  • Pausing, until ready for more. <-- This case appears not covered for user/consumer of the driver?

Pausing:

On pausing:

If the upstream value is already calculated, then this shifts the onus of buffering upstream.

If it is computed then we can defer computation. In our case it is pre-calculated, but held in memory by Neo4j after query execution anyway. So it is "free".

Arguably we should let the consumer digest it at the speed they're able to. But from what I can see, with RxJS, we have some future value, and it will be pushed upon the user (onNext <-- deal with it). And this is why there is such a companion library as IxJS (https://github.com/ReactiveX/IxJS) which uses a pull-through model.

[x] Use-case: Query will return a large result set, I would like to subscribe as soon as first result is available. If the results come hard and fast, I'm ok to buffer this.   
^-- We're good.
[ ] Use-case: This will return a large result set. I have limited memory. I would like to process iteratively, and request more results as I'm able.  This way I can process large data on limited resources over time. 
^-- Not yet covered, as far as I know?```

Perhaps I have misunderstood though. 

@jasperblues
Copy link
Author

jasperblues commented Mar 16, 2020

Apologies if I have misunderstood. If that is the case, it would be good to contribute to the documentation to describe:

  • How to consume results incrementally given resources. (onNext seems to push more results, ready or not).
  • Consequently, how to be confident the driver is not using too much memory.

_above message typed while eating dinner 😊 _

@zhenlineo
Copy link
Contributor

Hi @jasperblues,

When the local buffer is full, the driver will pause auto pulling new batches from the server. That's the design of this driver. Pls let us know if the impl is different from what've designed.

We currently fix the local buffer size to be the same as the fetch size of each batch. This means if a query will return 10000 records back. By default, the driver pull 1000 records each time. If a user can not consume them at this moment, then we will not pull more until the user has consumed 700 records. So we both uses buffer and pause to achieve the back pressure.

The user of this driver can set the fetch size (a.k.a. buffer size) to "control" the driver back pressure.

@jasperblues
Copy link
Author

jasperblues commented Mar 16, 2020

Hello @zhenlineo This is clear now! Thank you very much for the explanation.

(If you like I can send a PR to clarify what you have explained to me in the README)

@jasperblues
Copy link
Author

If a user can not consume them at this moment, then we will not pull more until the user has consumed 700 records.

Actually, sorry it is still not clear. What is the mechanism by with the user signifies it is unable to consume more ? The API provides onNext and the user must accept this. How can the user control the speed at which records are pulled through?

If this option to pause on the user side does not exist, the only remaining option is buffering.

@zhenlineo
Copy link
Contributor

@jasperblues ,

You have a point. We also have a reactive API provided via RxResult, would that make more sense to you? We might not actually make use of pause properly with this current API in javascript driver.

@jasperblues
Copy link
Author

Hello @zhenlineo , sorry for the slow reply - juggling some research and billable work.

Using RxResult would work if it was augmented with something like the following:

declare interface RxResult {
  keys(): Observable<string[]>

  records(): Observable<Record>

  summary(): Observable<ResultSummary>
  
  pause(): void; 
  
  resume(): void; 
  
}

If the pause() and resume() methods were added it would be fully possible to do each of the following:

  • implement AsyncIterable allowing the use of `for await (const item of) where results are pulled in batches.
  • Convert the RxResult to a node.js stream including memory efficient back-pressure.
  • Control the RxJS subscription rate so that there's no memory pressure.

Without some way to control the flow rate, the only option for the end-user is to buffer. This is not always feasible.

@zhenlineo
Copy link
Contributor

Hi @jasperblues,

Thanks for the feedback to the js driver API.

When implementing reactive, we found there are two levels of API that one lib can provide:
Level 1. Raw API such as reactive-streams where you can control the flow (pause/resume via control the pace of calling Subscription#request).
Level 2. High level API such as ReactiveX. With this high level API, the buffering, the management of the pause of restart is all done by the lib.

I feel what you are looking for is the Level 1 API, where you want to control the flow. Currently we only have Java driver which exposes this API:

public interface RxResult {
  Publisher<List<String>> keys();
  Publisher<Record> records();
  Publisher<ResultSummary> consume();
}

The full API doc.

For most end users of reactive, we do not expect them to actually direct consume on this low level API. When using Java driver API, one mostly shall choose either project-reactor or RxJava, as it is not easy to get pause and resume correct.

The js driver decided to expose a Level 2 API RxJs directly, with the promise that we will buffer, pause, and resume for end user properly. We would very much to fix any bug if we failed to provide a smart buffering and fetching with this driver. But we will not give back the control of explicit pause and resume with this API.

We could of course look into the API to give the Level ! API directly. However most users may have problem to directly consume such a raw API to implement their own flow control and back-pressure.

Feel happy to have a call directly if you still have questions :)

@jasperblues
Copy link
Author

When using Java driver API, one mostly shall choose either project-reactor or RxJava, as it is not easy to get pause and resume correct.

@zhenlineo It can be very simple for the end-user in Node.js ! Here's how it is in Drivine:

Cursor Implements AsyncIterable

While looping over the result set, new results will be pulled as needed, until the upstream is depleted. Consumption controls the flow.

const cursor = await repo.asyncRoutesBetween('Cavite Island', 'NYC');
for await (const item of cursor) {

}

Cursor can pose as a Node.js Stream

Streams can be piped together without fear of back-pressure because the sink stream controls the flow. (Don't push grass into the cow. Milk the cow. It will eat when it gets hungry).

cursor.asStream().pipe(fileStream);
await StreamUtils.untilClosed(fileStream);

Neither of the above will result in memory pressure and the only advice needed for the end user is that the second approach should be used if the results will be fed into another source that has a limitation on flow-rate. (Like a file stream).

Most Users Scenarios

The js driver decided to expose a Level 2 API RxJs directly, with the promise that we will buffer, pause, and resume for end user properly.

I'm not sure that it is possible for a driver to know the correct speed to push onto the consumer, because the usage scenarios will vary.

The inspiration for the above features in Drivine was based on production usage. One example is a report that was resulting in an out of memory error. If writing those results into a file-stream, it would be necessary to ensure we prevent going over the high-water mark.

The driver might pause or buffer internally, however it is pushing the results onto the subscriber, which now has no alternative except to buffer. Out of memory errors will happen. We should be able to demonstrate that with a test case.

With Drivine, there is a fully-fledged implementation for AgensGraph. The Cursor<T> is both AsyncIterable and a Readable stream, thus it can also be Rx Observable.

I have provided the same for Neo4j, but only by using SKIP and LIMIT, as I can see an alternative way to do it using this driver. This means that the query needs to be evaluated again, which is not not very efficient.

I agree that we can easily turn an AsyncIterable or Node.js Readable into an RxJS Observable, but going the other way is complicated, because RxJS pushes.

However, once again, I'm puzzled and don't think the driver can correctly buffer and pause because it will not know the correct speed for end-user scenarios. Even though it might be efficient internally, if it pushes too quickly onto the consumer, the consumer is now saddled with the same problem. So not a solution.

So for me personally, I'd really like to see those level one features. They don't have to be pretty because it can I can provide a beautiful level of abstraction for business users as outlined above.

@technige
Copy link
Contributor

We have added this to our future roadmap for later discussion.

@CarsonF
Copy link
Contributor

CarsonF commented Feb 24, 2021

I'm also in the same boat. I'm puzzled how the driver can know when I'm ready to consume more records.

Every database I've worked with has the pull-based approach where I can ask for more records when I'm ready. Push-based doesn't make any sense to me, it's almost the same as just grabbing the entire array and then iterating through each record from that array.

Looking at the Java docs, it looks like that driver has a pull-based approach as well, where StatementResult implements an Iterator with a next() function to fetch the next record, etc.

AsyncIterator is the lowest level interface to do this in javascript. AsyncIterable is sugar on top of this that makes it easier to consume.

@benstevens48
Copy link

This functionality is important to me as well. I'm trying to write the results of a large query to a file without consuming massive memory. Currently neo4j reads the data faster than the file can be written.

I agree that adding pause and resume to RxResult and Result seems like the simplest solution.

@bigmontz
Copy link
Contributor

bigmontz commented Jul 1, 2021

We are planing to provide a AsyncIterator for consuming results in the Javascript. We don't have the final shape of the solution, but I will publish it here as soon as I have it.

@jasperblues
Copy link
Author

jasperblues commented Jul 1, 2021

image

(https://drivine.org docs)

Hi @bigmontz !!

In Drivine I provide a Cursor abstraction, where cursor can be:

  • An AsyncIterator
  • A readable stream (pure NodeJS stream, which can easily be converted to RxJS or IxJS)

The reason for the latter is because while AsyncIterator is very handy, if you were to iterate and feed results into another stream you can overload that stream (back-pressure) - result in messy fiddling with the high-water mark or pausing.

Whereas with a stream and pipes the results will be pulled through at the correct speed. And you can of course have whatever number of transformation required between the source and the sink.


tldr; I suggest to provide both AysncIterator and Stream abstractions (at pure JS level) and instructions (or helper) for how to compose an RxJS Observable from the raw materials.

I don't like to mention other graphDBs but when using Drivine with AgensGraph the streaming works flawlessly, whereas with with Neo4j the only we to control consumption speed from the client side is to fake it with SKIP and LIMIT.

@bigmontz
Copy link
Contributor

Hi @jasperblues,

It's could be a pretty good idea to have the readable stream as part of the API too. I will discuss internally to see in which point of the timeline it will be addressed.

Thanks.

@jasperblues
Copy link
Author

Welcome @bigmontz , glad the feedback was useful. Feel free to take a look at Drivine code for inspiration.

My 2c: I think plain NodeJS stream abstraction is the most flexible starting point. From this we can compose (perhaps with helper method) RxJS (push) or IxJS (pull) streams.

@bigmontz bigmontz linked a pull request Jan 12, 2022 that will close this issue
@bigmontz
Copy link
Contributor

Since we have to support older LTS versions of the NodeJS and Browser, we decide to go with a more standard language interface using async iterators.

Usage example:

      const session = driver.session()
      try {
        const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x')
        const xs = []
        for await (const record of result) {
          xs.push(record.get('x').toInt())
        }
        expect(xs).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
      } finally {
        await session.close()
      }

This API correctly implements watermarks using the fetch size and pauses do not request for more records when iteration is suspended.

The next step will be integrate it to the existing RxJS interface for fixing its current errant behaviour.

@bigmontz bigmontz linked a pull request Feb 28, 2022 that will close this issue
@bigmontz
Copy link
Contributor

bigmontz commented Mar 3, 2022

The changes related to the back-pressure were in the 5.0.0-alpha01.

⚠️ This is an experimental release. It may completely change in the future. It does not contain any connectivity to Neo4j 5.0. In other words, this is equivalent to the Javascript driver 4.4.x driver with a few changes detailed in the changelog:
https://github.com/neo4j/neo4j-javascript-driver/wiki/5.0-changelog#500-alpha01

@bigmontz bigmontz added the 5.0 PR which should be merged to the 5.0 version label Mar 3, 2022
@jasperblues
Copy link
Author

W00t! Way to go @bigmontz !!

@bigmontz
Copy link
Contributor

Neo4j Driver 5.0 has been released. So I will close this issue. Any feedback are welcome.

@jasperblues
Copy link
Author

Great work again @bigmontz 🙏🙏🙏🙏🙏 This was complex feature!!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5.0 PR which should be merged to the 5.0 version feature request
Projects
None yet
7 participants