Skip to content

Commit

Permalink
[CONJS-251] permit Piping results with Streams when using callback im…
Browse files Browse the repository at this point in the history
…plementation
  • Loading branch information
rusher committed May 10, 2023
1 parent dd9cacc commit 3393c1f
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 1 deletion.
19 changes: 19 additions & 0 deletions documentation/callback-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,25 @@ connection.query("SELECT * FROM mysql.user")
});
```
#### Piping
piping can be use using .stream() function on query, that returns a Readable object, that will emit rows objects.
```
const logRes = new Writable({
objectMode: true,
decodeStrings: false,
write: (row, encoding, callback) => {
console.log(row);
callback();
}
});

connection.query("SELECT * FROM mysql.user")
.stream()
.pipe(logRes);
```
## `connection.batch(sql, values [, callback])`
> * `sql`: *string | JSON* SQL string value or JSON object to supersede default connections options. JSON objects must have an `"sql"` property. For instance, `{ dateStrings: true, sql: 'SELECT now()' }`
Expand Down
43 changes: 42 additions & 1 deletion lib/cmd/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const Parser = require('./parser');
const Errors = require('../misc/errors');
const Parse = require('../misc/parse');
const TextEncoder = require('./encoder/text-encoder');

const { Readable } = require('stream');
const QUOTE = 0x27;

/**
Expand Down Expand Up @@ -223,6 +223,47 @@ class Query extends Parser {
}
}
}

_stream(socket, options) {
this.socket = socket;
options = options || {};
options.objectMode = true;
options.read = () => {
this.socket.resume();
};
this.inStream = new Readable(options);

this.on('fields', function (meta) {
this.inStream.emit('fields', meta);
});

this.on('error', function (err) {
this.inStream.emit('error', err);
});

this.on('close', function (err) {
this.inStream.emit('error', err);
});

this.on('end', function (err) {
if (err) this.inStream.emit('error', err);
this.socket.resume();
this.inStream.push(null);
});

this.inStream.close = function () {
this.handleNewRows = () => {};
this.socket.resume();
}.bind(this);

this.handleNewRows = function (row) {
if (!this.inStream.push(row)) {
this.socket.pause();
}
};

return this.inStream;
}
}

module.exports = Query;
1 change: 1 addition & 0 deletions lib/connection-callback.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class ConnectionCallback {
};

conn.addCommand(cmd);
cmd.stream = (opt) => cmd._stream(conn.socket, opt);
return cmd;
}

Expand Down
96 changes: 96 additions & 0 deletions test/integration/test-resultset-streaming.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ describe('results-set streaming', () => {
assert.equal(10000, currRow);
});

it('Streaming result-set for-await-of callback', async function () {
let currRow = 0;
const conn = base.createCallbackConnection();
conn.connect(async (err) => {
const cmd = conn.query('SELECT * FROM testStreamResult');
const stream = cmd.stream();
for await (const row of stream) {
assert.equal(currRow++, row.v);
}
assert.equal(10000, currRow);
conn.end();
});
});

it('Streaming execute result-set for-await-of', async function () {
let currRow = 0;
const prepare = await shareConn.prepare('SELECT * FROM testStreamResult');
Expand Down Expand Up @@ -77,6 +91,59 @@ describe('results-set streaming', () => {
stream.close();
});

it('Streaming result-set close callback', function (done) {
let currRow = 0;
let metaReceived = false;
const conn = base.createCallbackConnection();
conn.connect(async (err) => {
const stream = conn.query('SELECT * FROM testStreamResult').stream();
stream
.on('error', (err) => {
done(new Error('must not have thrown any error !'));
})
.on('fields', (meta) => {
assert.equal(meta.length, 1);
metaReceived = true;
})
.on('data', (row) => {
assert.equal(currRow++, row.v);
})
.on('end', () => {
assert.equal(0, currRow);
assert.isOk(metaReceived);
conn.end();
done();
});
stream.close();
});
});

it('Streaming result-set callback', function (done) {
let currRow = 0;
let metaReceived = false;
const conn = base.createCallbackConnection();
conn.connect(async (err) => {
const stream = conn.query('SELECT * FROM testStreamResult').stream();
stream
.on('error', (err) => {
done(new Error('must not have thrown any error !'));
})
.on('fields', (meta) => {
assert.equal(meta.length, 1);
metaReceived = true;
})
.on('data', (row) => {
assert.equal(currRow++, row.v);
})
.on('end', () => {
assert.equal(10000, currRow);
assert.isOk(metaReceived);
conn.end();
done();
});
});
});

it('Streaming result-set with promise implementation', function (done) {
let currRow = 0;
let metaReceived = false;
Expand Down Expand Up @@ -220,6 +287,35 @@ describe('results-set streaming', () => {
shareConn.queryStream('SELECT * FROM testStreamResult').pipe(writableStream);
});

it('Streaming result-set callback pipe', function (done) {
let currRow = 0;
const writableStream = new Writable({
objectMode: true,
decodeStrings: false,
write: (row, encoding, callback) => {
assert.equal(currRow++, row.v);
callback();
if (process.versions.node.startsWith('6.') && currRow === 10000) {
//final was implemented in v8
done();
}
},
writev: (rows, callback) => {
for (let i = 0; i < rows.length; i++) {
assert.equal(++currRow, row.v);
}
callback();
},
final: () => {
assert.equal(10000, currRow);
conn.end();
done();
}
});
const conn = base.createCallbackConnection();
conn.query('SELECT * FROM testStreamResult').stream({ highWaterMark: 10 }).pipe(writableStream);
});

it('Streaming error handling', function (done) {
shareConn.queryStream('SELECT * FROM UnknownTable').on('error', (err) => {
assert.equal(err.errno, 1146);
Expand Down

0 comments on commit 3393c1f

Please sign in to comment.