Skip to content

Commit

Permalink
[CONJS-168] correct stream backpressure
Browse files Browse the repository at this point in the history
add a stream.close() method to permits closing stream, not having connection hanging when using pipeline.
  • Loading branch information
rusher committed Jun 11, 2021
1 parent a9631a3 commit 594592a
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 10 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Change Log

## [3.0.0-beta](https://github.com/mariadb-corporation/mariadb-connector-nodejs/tree/3.0.0-beta) (08 Jun 2021)
## [3.0.0-beta](https://github.com/mariadb-corporation/mariadb-connector-nodejs/tree/3.0.0-beta) (11 Jun 2021)
[Full Changelog](https://github.com/mariadb-corporation/mariadb-connector-nodejs/compare/2.5.4...3.0.0-beta)

Migrating from 2.x or mysql/mysql2 driver have some breaking changes, see [dedicated part](https://github.com/mariadb-corporation/mariadb-connector-nodejs/blob/maintenance/3.x/documentation/promise-api.md#migrating-from-2x-or-mysqlmysql2-to-3x) documentation.
Expand All @@ -9,6 +9,7 @@ Migrating from 2.x or mysql/mysql2 driver have some breaking changes, see [dedic
* [CONJS-165] Adding initial message error value on Error object
* [CONJS-166] Restrict authentication plugin list
* [CONJS-167] Permit custom logger configuration
* [CONJS-168] correct stream backpressure

New Connection options

Expand Down
11 changes: 9 additions & 2 deletions documentation/promise-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -788,11 +788,18 @@ connection
> * columns : Emits when column metadata from the result-set are received (the parameter is an array of [Metadata](#metadata-field) fields).
> * data : Emits each time a row is received (parameter is a row).
> * end : Emits when the query ends (no parameter).
> a method: close() : permits to close stream (since 3.0)
>
When using the `query()` method, documented above, the Connector returns the entire result-set with all its data in a single call. While this is fine for queries that return small result-sets, it can grow unmanageable in cases of huge result-sets. Instead of retrieving all of the data into memory, you can use the `queryStream()` method, which uses the event drive architecture to process rows one by one, which allows you to avoid putting too much strain on memory.

Query times and result handlers take the same amount of time, but you may want to consider updating the [`net_read_timeout`](https://mariadb.com/kb/en/library/server-system-variables/#net_read_timeout) server system variable. The query must be totally received before this timeout, which defaults to 30 seconds.

!! Warning !!
Querystream handle backpressure, meaning that if data handling takes some amount of time, socket is pause to avoid having node socket buffer growing indefinitely.
When using pipeline, if data handling throws an error, user must explicilty close queryStream to ensure not having connection hangs.




There is 2 differents methods to implement streaming:

Expand Down Expand Up @@ -836,7 +843,7 @@ const transformStream = new stream.Transform({

const queryStream = connection.queryStream("SELECT * FROM mysql.user");

stream.pipeline(queryStream, transformStream, someWriterStream);
stream.pipeline(queryStream, transformStream, someWriterStream, (err) => { queryStream.close(); });

```
## `connection.prepare(sql) → Promise`
Expand Down
29 changes: 27 additions & 2 deletions lib/cmd/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ class Stream extends Query {
this.socket = socket;
this.inStream = new Readable({
objectMode: true,
read: () => {}
read: () => {
this.socket.resume();
}
});

this.on('fields', function (meta) {
Expand All @@ -31,14 +33,37 @@ class Stream extends Query {
this.inStream.emit('error', err);
});

this.on('close', function (err) {
this.inStream.emit('error', err);
});

this.on('destroy', function () {
console.log('destroy');
});
this.on('pause', function () {
console.log('pause');
});

this.on('end', function (err) {
if (err) this.inStream.emit('error', err);
this.socket.resume();
this.inStream.push(null);
});

this.inStream.close = function () {
this.handleNewRows = () => {};
this.socket.resume();
}.bind(this);
}

handleNewRows(row) {
this.inStream.push(row);
try {
if (!this.inStream.push(row)) {
this.socket.pause();
}
} catch (err) {
this.socket.resume();
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/test-pool-callback.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ describe('Pool callback', () => {
});
pool.getConnection((err, conn) => {
conn.query('SELECT SLEEP(1)', () => {
assert(Date.now() - initTime >= 1999, 'expected > 2s, but was ' + (Date.now() - initTime));
assert(Date.now() - initTime >= 1985, 'expected > 2s, but was ' + (Date.now() - initTime));
conn.release();
pool.end((err) => {
done();
Expand Down
59 changes: 55 additions & 4 deletions test/integration/test-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ describe('Pool', () => {
.query('SELECT SLEEP(1)')
.then(() => {
assert(
Date.now() - initTime >= 1999,
Date.now() - initTime >= 1985,
'expected > 2s, but was ' + (Date.now() - initTime)
);
conn.release();
Expand Down Expand Up @@ -1096,8 +1096,6 @@ describe('Pool', () => {
pool
.getConnection()
.then((conn) => {
const someWriterStream = fs.createWriteStream(fileName);

let received = 0;
const transformStream = new stream.Transform({
objectMode: true,
Expand All @@ -1115,8 +1113,10 @@ describe('Pool', () => {
const queryStream = conn.queryStream(
"SELECT seq ,REPEAT('a', 100) as val FROM seq_1_to_10000"
);
const someWriterStream = fs.createWriteStream(fileName);

stream.pipeline(queryStream, transformStream, someWriterStream, () => {
stream.pipeline(queryStream, transformStream, someWriterStream, (err) => {
if (err) queryStream.close();
assert.isTrue(received >= 0 && received < 10000, 'received ' + received + ' results');
conn.query('SELECT 1').then((res) => {
conn.end();
Expand All @@ -1130,6 +1130,57 @@ describe('Pool', () => {
.catch(done);
});

it("ensure pipe ending doesn't stall connection promise", async function () {
if (
process.env.srv === 'maxscale' ||
process.env.srv === 'skysql' ||
process.env.srv === 'skysql-ha'
)
this.skip();
//sequence engine only exist in MariaDB
if (!shareConn.info.isMariaDB()) this.skip();
const ver = process.version.substring(1).split('.');
//promise pipeline doesn't exist before node.js 15.0
if (parseInt(ver[0]) < 15) this.skip();

this.timeout(10000);
const pool = base.createPool({ connectionLimit: 1 });

const conn = await pool.getConnection();
let received = 0;
const transformStream = new stream.Transform({
objectMode: true,
transform: function transformer(chunk, encoding, callback) {
callback(
null,
JSON.stringify(chunk, (key, value) =>
typeof value === 'bigint' ? value.toString() : value
)
);
received++;
}
});

const queryStream = conn.queryStream("SELECT seq ,REPEAT('a', 100) as val FROM seq_1_to_10000");
const someWriterStream = fs.createWriteStream(fileName);
someWriterStream.on('close', () => {
queryStream.close();
});

setTimeout(someWriterStream.destroy.bind(someWriterStream), 2);
try {
const { pipeline } = require('stream/promises');
await pipeline(queryStream, transformStream, someWriterStream);
throw new Error('Error must have been thrown');
} catch (e) {
// eat expect error
}
assert.isTrue(received >= 0 && received < 10000, 'received ' + received + ' results');
const res = await conn.query('SELECT 1');
conn.end();
pool.end();
});

it('test minimum idle decrease', function (done) {
if (
process.env.srv === 'maxscale' ||
Expand Down

0 comments on commit 594592a

Please sign in to comment.