Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the stream catch errors in the query #2638

Merged
merged 9 commits into from
Jun 27, 2018
5 changes: 4 additions & 1 deletion src/dialects/maria/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ assign(Client_MariaSQL.prototype, {
connection.query(sql.sql, sql.bindings)
.on('result', function(res) {
res
.on('error', rejecter)
.on('error', (err) => {
rejecter(err)
stream.emit('error', err)
})
.on('end', function() {
resolver(res.info);
})
Expand Down
9 changes: 8 additions & 1 deletion src/dialects/mysql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,14 @@ assign(Client_MySQL.prototype, {
return new Promise((resolver, rejecter) => {
stream.on('error', rejecter)
stream.on('end', resolver)
connection.query(queryOptions, obj.bindings).stream(options).pipe(stream)
const queryStream = connection.query(queryOptions, obj.bindings).stream(options)

queryStream.on('error', (err) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it only relevant for MySQL?

Copy link
Contributor Author

@fcmatteo fcmatteo May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, I could reproduce the bug only in MySQL because that's the database I am using. The other bug, which I fixed editing runner.js, probably happens in other platforms too.

rejecter(err);
stream.emit('error', err)
})

queryStream.pipe(stream)
})
},

Expand Down
4 changes: 4 additions & 0 deletions src/dialects/oracle/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ assign(Client_Oracle.prototype, {
stream.on('end', resolver);
const queryStream = connection.queryStream(obj.sql, obj.bindings, options);
queryStream.pipe(stream);
queryStream.on('error', function(error) {
rejecter(error);
stream.emit('error', error);
});
})
},

Expand Down
10 changes: 5 additions & 5 deletions src/dialects/postgres/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,15 @@ assign(Client_PG.prototype, {
_stream(connection, obj, stream, options) {
const PGQueryStream = process.browser ? undefined : require('pg-query-stream');
const sql = obj.sql;

return new Promise(function(resolver, rejecter) {
const queryStream = connection.query(new PGQueryStream(sql, obj.bindings, options));
queryStream.on('error', function(error) { stream.emit('error', error); });
// 'error' is not propagated by .pipe, but it breaks the pipe
stream.on('error', function(error) {
// Ensure the queryStream is closed so the connection can be released.
queryStream.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was any of this changed? This query still needs to be closed on error or it will continue to execute on the server. See #1935

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! @fcmatteo Any idea why queryStream.close() was dropped?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch. Can we get a test for this?..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test case for original fix would have catch this that is the reason I'm so insistent that all the fixes has test included. So when this is fixed again, a test would be great to have.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kibertoad you were faster 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe by mocking queryStream.close method or by checking that stream.on('error'... event handler is set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or with integration test which queries for open connections from database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember. Following the thread of the PR, there were two failing tests in PostgreSQL that apparently were fixed by that.
But testing on local with the previous chunk of code seems to work too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, better to just put it back then and add new test

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed up with #2773


queryStream.on('error', function(error) {
rejecter(error);
stream.emit('error', error);
});

// 'end' IS propagated by .pipe, by default
stream.on('end', resolver);
queryStream.pipe(stream);
Expand Down
16 changes: 10 additions & 6 deletions src/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,17 @@ assign(Runner.prototype, {
const promise = Promise.using(this.ensureConnection(), function(connection) {
hasConnection = true;
runner.connection = connection;
const sql = runner.builder.toSQL()
const err = new Error('The stream may only be used with a single query statement.');
if (isArray(sql)) {
if (hasHandler) throw err;
stream.emit('error', err);
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change needs to be checked carefully...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which part looks dangerous to you? The fact that thrown exception is now being caught?

Copy link
Collaborator

@kibertoad kibertoad Jun 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elhigu The only difference I see now (after latest change) is that it will now emit an error even if hasHandler is truthy. Would you consider that an unwelcome change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was just a note for my futureself who has time to check this out better.

const sql = runner.builder.toSQL()

if (isArray(sql) && hasHandler) {
throw new Error('The stream may only be used with a single query statement.');
}

return runner.client.stream(runner.connection, sql, stream, options)
} catch (e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fcmatteo Is it intended that we are catching the error that we itself throw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the only way I found to emit the error to the stream. We could throw the error again in the catch, after emitting to the stream...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should rethrow it to preserve the original logic, unless there is a good reason not to interrupt execution here.

stream.emit('error', e)
}
return runner.client.stream(runner.connection, sql, stream, options);
})

// If a function is passed to handle the stream, send the stream
Expand Down
38 changes: 27 additions & 11 deletions test/integration/builder/selects.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/*global describe, expect, it, testPromise, d*/
'use strict';

var _ = require('lodash')
var assert = require('assert')
var Promise = testPromise;
var Runner = require('../../../lib/runner');
const _ = require('lodash')
const assert = require('assert')
const Promise = testPromise;
const Runner = require('../../../lib/runner');

module.exports = function(knex) {

Expand Down Expand Up @@ -212,7 +212,7 @@ module.exports = function(knex) {
return knex('accounts')
.options({
typeCast (field, next) {
var val
let val
if (field.type === 'VAR_STRING') {
val = field.string()
return val == null ? val : val.toUpperCase()
Expand All @@ -235,22 +235,22 @@ module.exports = function(knex) {
})

it('emits error on the stream, if not passed a function, and connecting fails', function() {
var expected = new Error();
var original = Runner.prototype.ensureConnection;
const expected = new Error();
const original = Runner.prototype.ensureConnection;
Runner.prototype.ensureConnection = function() {
return Promise.reject(expected);
};

var restore = () => {
const restore = () => {
Runner.prototype.ensureConnection = original;
};

var promise = new Promise((resolve, reject) => {
var timeout = setTimeout(() => {
const promise = new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Timeout'));
}, 5000);

var stream = knex('accounts').stream();
const stream = knex('accounts').stream();
stream.on('error', function(actual) {
clearTimeout(timeout);

Expand All @@ -266,6 +266,22 @@ module.exports = function(knex) {
return promise;
});

it('emits error on the stream, if not passed a function, and query fails', function(done) {
const stream = knex('accounts').select('invalid_field').stream()
stream.on('error', function(err) {
assert(err instanceof Error)
done()
})
})

it('emits error if not passed a function and the query has wrong bindings', function(done) {
const stream = knex('accounts').whereRaw('id = ? and first_name = ?', ['2']).stream()
stream.on('error', function(err) {
assert(err instanceof Error)
done()
})
})

it('properly escapes postgres queries on streaming', function() {
let count = 0;
return knex('accounts').where('id', 1).stream(function(rowStream) {
Expand Down