Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream speed management using async/await (cursor) #1

Closed
Blitzsturm opened this issue Dec 26, 2019 · 18 comments
Closed

Stream speed management using async/await (cursor) #1

Blitzsturm opened this issue Dec 26, 2019 · 18 comments
Assignees
Labels
enhancement New feature or request

Comments

@Blitzsturm
Copy link

Blitzsturm commented Dec 26, 2019

The current stream mechanics make it easy to process rows as they are read from the query rather than return them in a block; however in ultra-large result set cases it's possible the rows will be read faster than they can be processed. See this Reddit thread for more comprehensive specifics. An example use case would be executing a query that yields 1 million+ rows then sending them in blocks of 200 to an external web service that takes several seconds to process each request. In this case it could be possible to send hundreds or thousands of concurrent outbound requests overwhelming heap space and the external service.

There are two potential fixes:

  1. Allow iterator functions to return a promise which is then awaited to ensure more data is not sent to the iterator function than it can handle. The following is a hypothetical use case that buffers up to 200 rows at a time then sends it to an external service with limited speed:
	var rowBuffer = [];
	await sql`select created_at, name from events`.stream(async row => {
		if (rowBuffer.push(row) >= 200) await transformAndSend(rowBuffer.splice(0,200));
	});
	if (rowBuffer.length > 0) await transformAndSend(rowBuffer.splice(0,200));

From a syntax standpoint all that would change from existing functionality is the addition of the optional async. But from the backend the return from the result of the iterator would need to be checked if it is a promise, then awaited. This is an overtly simplified example:

	while (moreRows) {
		var row = await getRow();
		var P = StreamIterator(row);
		if (P instanceof Promise) await P;
	}

I'm not sure how the rows are received weather each is electively read or they come in as events which can be paused. If they are purposefully read then this would be pretty easy. You'd just await the callback and get the next row. If they come in as events you'd need to buffer them to be sent to the callback and pause the incoming results if that buffer gets too large.I

*Ideally the promise resolving to a true or a specified enum should stop the query from completing. So if for example the user executed a query that botches a join and results in 10 billion rows of nonsense or the designation service won't accept the results in the callback, it would be nice to have a means to gracefully and forcefully stop it from reading more rows.

  1. Alternatively (or additionally) making use of Symbol.asyncIterator would allow a standard "for await" loop as a means to asynchronously process data from an async source. This would be (very) easy to add in after altering to existing stream functionality to watch for an await returned promises and could be officially extended from the query result as an .iterate() that returns an Symbol.asyncIterator object to manage this flow for the end user. That would look something like the following in practice:
	var rowBuffer = [];
	for await (let row of sql`select created_at, name from events`.iterate()) {
		if (rowBuffer.push(row) >= 200) await transformAndSend(rowBuffer.splice(0,200));
	});
	if (rowBuffer.length > 0) await transformAndSend(rowBuffer.splice(0,200));

I'm at your disposal if you need assistance in implementing this feature or need to test it once complete. If you can make efficiently iterating through large queries as easy as a "for await" loop you'll have introduced an quality of life improvement well ahead of the curve in terms of future proofing.

@porsager
Copy link
Owner

This is great! It should be fairly straight forward to get in, so I'll look at it right after I'm done with benchmarks ;)

@Blitzsturm
Copy link
Author

You could also look at using streams in object mode which afford some cool tricks like piping it through a stream transform and I believe the node stream object naturally allows for "for await" loops. A package for mssql can do this however it doesn't respect high water marks set by users so even if you pause the incoming stream the stream buffer will just fill up. I've had it consume gigabytes of RAM during some testing with hundreds of thousands of rows when I specified a maximum of 200.

However, if this looks like a fast/easy fix then i'd deem it a much higher priority.

@porsager
Copy link
Owner

porsager commented Dec 30, 2019

Hi @Blitzsturm ..

I'm not sure I see the benefit of using streams in object mode and for await, but that might come down to a taste/coding preference thing.

I've added support for this by adding a new .cursor() method. I think that is more in line with postgres terminology too, and it won't change / complicate the .stream() surface api.

Would you like to check it out and let me know what you think? You can do npm install porsager/postgres#cursor.

Docs are here: https://github.com/porsager/postgres/tree/cursor#cursor-sql-cursorrows--1-fn---promise

@porsager porsager added the enhancement New feature or request label Dec 30, 2019
@porsager porsager self-assigned this Dec 30, 2019
@porsager porsager changed the title Stream speed management using async/await Stream speed management using async/await (cursor) Dec 30, 2019
@tracker1
Copy link

First, really nice interfaces on the library all around... I came to mostly say the same regarding async streams... If you do open an object stream interface, that would allow for await of[1] syntax with current node releases, which I find very intuitive to use. Maybe a asReadStream() method?

I also feel awaiting any promise returned from the per-row function would be very useful as mentioned in the issue, which is imho the only real shortcoming in this library, as it's something I've come across syncing data to DynamoDB and similar (grouping records), though slightly more complex in terms of sizing.

[1] https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of

@Blitzsturm
Copy link
Author

Blitzsturm commented Dec 30, 2019

Brilliant, it works as advertised. Though I do have one last suggestion. The below test demonstrates that the cursor flow does throttle the flow and prevents asynchronous row handling from stepping on each-other however there is a pretty significant hit to performance between the two. This would make sense if it's reading one row at a time. To counter this you may want to look at buffering some rows; perhaps the initial request retrieves 1000 rows then when on each await, if that buffer is below 900, get 100 more until the query is out of rows then let the async handler drain them until the buffer is empty then resolve the overall query-cursor promise. That would likely give it a significant performance jump.

Ideally the semantics of how postgres handles the connection and cursor mechanics should be invisible to the end user so doing something like this would make it look and feel the same but much faster; and while it does indeed work for it's intended purpose, poor streaming performance could dissuade power users. pg-cursor allows more control over cursor behavior but can be somewhat cumbersome in code. With that written I could easily write a wrapper for the "for await" behavior if I really want it or maybe add it in later if there is demand.

As previously mentioned Implementing a well tuned object stream would manage this buffering process for you and allow users to "for await" loop to asynchronously iterate directly from it even allowing power users to adjust buffering behavior by manually adjusting watermarks.

Overall, it's looking great. Some fine tuning for performance and I'd call it ready for production use.

Edit: also; the documentation is missing the "async" in ".cursor((row, cancel)" to ".cursor(async (row, cancel)"

"use strict";
const postgres = require("postgres");
Main().catch(console.error);

async function Main()
{
	var streamResults = await StreamTest();

	console.log("waiting 10 seconds between connections in tests...");
	await new Promise(r => setTimeout(r, 10000));

	var cursorResults = await CursorTest();

	console.log(`Stream Order (${streamResults.order.length}): ${streamResults.order.join(",")}`);
	console.log(`Cursor Order (${cursorResults.order.length}): ${cursorResults.order.join(",")}`);
	console.log(`Stream Speed: ${streamResults.speed} ms`);
	console.log(`Cursor Speed: ${cursorResults.speed} ms`);
}

async function StreamTest()
{
	const sql = postgres(process.env.DATABASE_URL);

	console.log("================================\r\nStream Ordering");
	var streamOrder = [];
	await sql`SELECT * FROM generate_series(1,20)`.stream(async (row) =>
	{
		await new Promise(r => setTimeout(r, Math.floor(Math.random()*1000)));
		streamOrder.push(row.generate_series);
		console.log(`Stream Row: ${row.generate_series}`);
	});

	await new Promise(r => setTimeout(r, 1000)); // (Ensure all have completed)

	console.log("================================\r\nStream Speed");
	var streamStart = new Date().getTime();
	await sql`SELECT * FROM generate_series(1,1000)`.stream(async (row) =>
	{
		console.log(`Stream Row: ${row.generate_series}`);
	});
	var streamEnd = new Date().getTime();

	await sql.end();

	return {order: streamOrder, speed: streamEnd - streamStart};
}


async function CursorTest()
{
	const sql = postgres(process.env.DATABASE_URL);
	
	console.log("================================\r\nCursor Ordering");
	var cursorOrder = [];
	await sql`SELECT * FROM generate_series(1,20)`.cursor(async (row, cancel) =>
	{
		await new Promise(r => setTimeout(r, Math.floor(Math.random()*1000)));
		cursorOrder.push(row.generate_series);
		console.log(`Cursor Row: ${row.generate_series}`);
	});

	await new Promise(r => setTimeout(r, 1000)); // (Ensure all have completed)

	console.log("================================\r\nCursor Speed");
	var cursorStart = new Date().getTime();
	await sql`SELECT * FROM generate_series(1,1000)`.cursor(async (row, cancel) =>
	{
		console.log(`Cursor Row: ${row.generate_series}`);
	});
	var cursorEnd = new Date().getTime();

	await sql.end();

	return {order: cursorOrder, speed: cursorEnd - cursorStart};
}

@porsager
Copy link
Owner

First, really nice interfaces on the library all around... I came to mostly say the same regarding async streams... If you do open an object stream interface, that would allow for await of[1] syntax with current node releases, which I find very intuitive to use. Maybe a asReadStream() method?

I also feel awaiting any promise returned from the per-row function would be very useful as mentioned in the issue, which is imho the only real shortcoming in this library, as it's something I've come across syncing data to DynamoDB and similar (grouping records), though slightly more complex in terms of sizing.

[1] https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of

Thanks a lot @tracker1 .. Did you check out the branch with the first stab at an implementation of cursors? https://github.com/porsager/postgres/tree/cursor#cursor-sql-cursorrows--1-fn---promise

I suppose sql` `.cursor() without a callback function could return an async iterable..

So usage would look like this:

const cursor = sql`select * from generate_series(1, 1000)`.cursor()

for await (row of cursor) {
  statement
}

Is it something like that you imagined?

@porsager
Copy link
Owner

porsager commented Dec 30, 2019

@Blitzsturm sorry afk until later, but did you see the option of returning more than one row per iteration?

What if there was also an option to control next amount of rows by returning a special token with rows value from callback? Eg.

sql` `.cursor(() => sql.ROWS(75))

Also see my iterable suggestion above.

@Blitzsturm
Copy link
Author

Blitzsturm commented Dec 30, 2019

@porsager I apologize, I somehow completely missed that. Using that makes it MUCH faster; though slightly less clean in use. Ideally I'd like it if it would still iterate only one row at a time but pull blocks of a specified amount. Something like the following under the hood. However it's current state is still cleaner than pg-cursor. So, well done.

await sql`SELECT * FROM generate_series(1,1000)`.cursor(100, async (rows, cancel) =>
{
	for (let row of rows)
	{
		console.log(`Cursor Row: ${row.generate_series}`);
	}
});

Alternatively I suppose if you need to send blocks of x rows to an external service this is indeed more ideal. My original use case would be better off as it wouldn't have to group rows it's self.

await sql`SELECT * FROM generate_series(1,1000)`.cursor(100, async (rows, cancel) =>
{
	await transformAndSend(rows);
});

Perhaps this could be handled as mentioned to @tracker1, I can't imagine a cleaner faster way to do it in syntax.

for await (let row of sql`SELECT * FROM generate_series(1,1000)`.cursor(100))
{
	console.log(`Cursor Row: ${row.generate_series}`);
}

@porsager
Copy link
Owner

Ah good to hear :)

I'm not sure the expanded api surface of opening up for singular iteration but fetching large blocks is worth it compared to simply handling it in userland. It's not that big of an addition for the user, and it is clearer to see what's actually going on. There are also concerns like cancellation that might be non obvious (eg. if you cancel half way through a group of rows (iterated behind the scenes per row) it's not obvious the last half was still fetched from the db).

I'm still thinking the async iterable could be ok, but I don't see what the benefit is over the callback.

await sql`
  SELECT * FROM generate_series(1,1000)
`.cursor(async row => {
  // await do something with row
})

vs

for await (let row of sql`
  SELECT * FROM generate_series(1,1000)
`.cursor()) {
  // await do something with row
}

@Blitzsturm
Copy link
Author

Blitzsturm commented Dec 30, 2019

From a use perspective, not a ton of difference other than it's "the new hotness" in JavaScript and considered a standard means to asynchronously iterate through loops. Worst case scenario 5 years from now someone looks at it and says "why'd they do it that way? I mean it works, but it's non-standard" in the same way many legacy libraries don't make use of async/await.

It's really a matter of individual preference; but if it's easy to build in I don't see a reason to not sprinkle in some syntax sugar. If it was somehow monstrously difficult I think it can be skipped.

Ultimately in it's current state with cursor support, it's and outstanding package that I prefer over pg+pg-cursor and should satisfy any major need I can foresee and I can't complain about anything outside of minor nitpicks. But, if you find the time to throw in an extra feature I'd be slightly happier.

Take that for what it's worth; but overall I think you've done a great job. I'll likely be using it and will let you know if I encounter any bugs or performance issues in the future.

My biggest complaints about some packages relate to excessive unnecessary and bloated dependencies consuming memory and depleting performance as well as convoluted interfaces that needlessly require a lot more code to accomplish an otherwise simple task. I see none of that here.

@porsager
Copy link
Owner

Thanks a lot for the kind words, and for the help thinking this through. It has been a really nice project to do so far.

I'm leaning towards adding support for async iterables too now, but I realize it isn't identical to the current implementation in that the query might not run immediately, but it has to wait until iteration starts. That's currently implicit with the callback implementation. A hack would be to run the first query and keep the rows in memory until iteration starts, but I don't want to do that since it's basically wrong :P

It also has to return an end method on the iterator to support ending early, but that should be fine.

I think, since adding async iterator support later won't be a breaking change, I might just split this up into two releases.

My biggest complaints about some packages relate to excessive unnecessary and bloated dependencies consuming memory and depleting performance as well as convoluted interfaces that needlessly require a lot more code to accomplish an otherwise simple task. I see none of that here.

Yeah, that was also one of the reasons I got started with this in the first place, so it's really nice to hear you see it that way! I'm almost done with the benchmarks too, so it's gonna be interesting to see everything side by side.. Currently I've got pg, pg-native, pg-promise, pg-promise-native and slonik. Do you know of others that would be interesting to compare?

I know pg-promise and slonik are using pg under the hood currently, but it was interesting to find that pg-promise is faster than raw pg and slonik is like 50x - 100x slower than mine 🤨

@Blitzsturm
Copy link
Author

Blitzsturm commented Dec 30, 2019

The only other popular database interface I can think of is knex; which just sits on top of pg; but it does have some nice quality of life features. On that topic, I'd love to see the inclusion of "insert" and "upsert" methods that handle common CRUD operations with ease.

I'd imagine something like this:

var rows =
[
    {name: "John Doe", email: "john.doe@gmail.com", age: 46},
    {name: "Jane Doe", email: "jane.doe@gmail.com", age: 44},
    {name: "Herp Derp", email: "herp.derp@outlook.com", age: 23}
];

var res = await sql.insert("users", ["name", "email", "age"], rows);
/*
    INSERT INTO users
        (name, email, age)
    VALUES
        ('John Doe', 'john.doe@gmail.com', 46),
        ('Jane Doe', 'jane.doe@gmail.com', 44),
        ('Herp Derp', 'herp.derp@outlook.com', 23)
    ON CONFLICT DO NOTHING;
*/

var res = await sql.upsert("users", ["name", "email", "age"], ["email"], rows);
/*
    INSERT INTO users
        (name, email, age)
    VALUES
        ('John Doe', 'john.doe@gmail.com', 46),
        ('Jane Doe', 'jane.doe@gmail.com', 44),
        ('Herp Derp', 'herp.derp@outlook.com', 23)
    ON CONFLICT (email) DO UPDATE SET
        name = EXCLUDED.name,
        email = EXCLUDED.email,
        age = EXCLUDED.age
*/

In addition it would be nice to make column names optional, that when omitted retrieve column names from the data provided. Either by just checking the first row or something more akin to:

function UniqueKeys(rows)
{
	var K = {};
	rows.forEach(r=>Object.keys(r).forEach(c=>K[c]=null));
	return Object.keys(K);
	// this could probably be done better with a set or indexOf()
}

(Where rows with unset heads are set to null.)

This would allow for extremely rapid/easy data updates. Imagine a use case where you have a table of user addresses and you want to update them to store latitude and longitude using Google Maps's geocoding API. The code to do so would look something like this:

await sql`
    SELECT
        id,
        address,
        lat,
        lng
    FROM
        users
    WHERE
        address IS NOT NULL
        AND lat IS NULL
        AND lng IS NULL
`.cursor(100, async (rows, cancel) =>
{
    for (let row of rows)
    {
        var geo = await googleMapsGeoCode(row.address);
        row.lat = geo.lat;
        row.lng - geo.lng;
    }

    var upsRes = await sql.upsert("users", ["id"], rows);
    if (!upsRes.success) throw new Error(upsRes.errorMessage); 
    // do thrown exceptions automatically call cancel?
});

insert/updates are especially common yet cumbersome, so something like this baked in would be super nice. I was planning on doing something similar in my own code, it makes sense to be able to put data back in, in the same way you get it out.

@porsager
Copy link
Owner

porsager commented Dec 31, 2019

Did you check out the helpers section of the docs? 😊 https://github.com/porsager/postgres

Your insert example would look like this ;)

await sql`insert into users ${ sql(rows, 'name', 'email, 'age') }`

It's a bit harder for the update case. I wouldn't go all the way and do it as short as yours as I feel that is going too far. Currently I don't feel I want to include something that "builds" sql with special keywords the user doesn't know about. (Eg. in this case I think it's important users understand the implications of on conflict)

Maybe there would be a way to do something like this (note doesn't work currently)

await sql`
  insert into users ${ sql(rows, 'name', 'email', 'age') }
  on conflict(email) do update set
  ${ sql({ excluded: ['name', 'email', 'age'] }) }
`

@Blitzsturm
Copy link
Author

Blitzsturm commented Dec 31, 2019

The helpers go a long way but still require writing out some extra code manually to get what you want. What I was building was something that works a but like knex but allows for some more niche behavior. Virtually every database flavor has a different way to "upsert" which I often have need of. So, I was working on a means to make it as easy as providing json rows to a handler.

I suppose if I want to be clever about it I could just make a function that effectively does this

await sql`
  insert into users ${sql.apply(sql, [rows].concat(Object.keys(rows[0]))}
  on conflict(email) do update set
  ${Object.keys(rows[0]).map(injectionProtection).map(o=>`${o} = EXCLUDED.${o}`).join(', ') }
  // Not so sure this last bit would work...
`;

Maybe have some form of "on conflict" helper based on field names?
query text ${sql.conflictIgnore}
query text ${sql.conflictUpdate}

Is there a way to quickly insert programmatically generated components such as dynamic field name selections into a query or would that just be using "unsafe"?

@porsager
Copy link
Owner

porsager commented Jan 1, 2020

I started my reply, and realized we're off topic, so I thought it was better in it's own issue which could double as improvement for the documentation. Let's continue here: #4 😉

@porsager
Copy link
Owner

porsager commented Feb 3, 2020

I've added cursor support with the callback-style 4156c3f .

I'll work on adding for await of support later. The reason I'm not adding it now is because it requires som changes to the inner workings because it's lazy by default, and I'd like to do it properly, so some more thinking around it is needed.

@porsager porsager closed this as completed Feb 3, 2020
@Blitzsturm
Copy link
Author

Thanks! this change makes reading (very) large data sets much more scalable. An object stream would auto-handle buffer data to optimize speed while still working in "for await" syntax. Also I can provide an example of how to implement it in something of a wrapper if that would help.

For now I'll see if I can use it in some personal/work projects and raise a ticket if anything goes wrong.

@porsager
Copy link
Owner

porsager commented Feb 4, 2020

I know how an object stream would work, and there's currently no upside to using that over simply implementing and returning { [Symbol.asyncIterator]: ... }.

The reason I'm not doing it right away is because I would need to not send the query immediately because I would have to wait for the first call to next. I don't want to hack it in now because that would mean fetching the first chunk unnecessarily before calling next, and worse it would reserve a connection to the db unnecessarily. To make it work it requires a change to the current query flow which I'd like to do properly :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants