Skip to content

Commit

Permalink
mysql stream should support query options
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Stewmon committed Nov 1, 2017
1 parent 74f2a03 commit 3f4f69a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
13 changes: 8 additions & 5 deletions src/dialects/mysql/index.js
Expand Up @@ -97,10 +97,11 @@ assign(Client_MySQL.prototype, {
// and pass that through to the stream we've sent back to the client.
_stream(connection, obj, stream, options) {
options = options || {}
const queryOptions = assign({sql: obj.sql}, obj.options)
return new Promise((resolver, rejecter) => {
stream.on('error', rejecter)
stream.on('end', resolver)
connection.query(obj.sql, obj.bindings).stream(options).pipe(stream)
connection.query(queryOptions, obj.bindings).stream(options).pipe(stream)
})
},

Expand All @@ -109,10 +110,12 @@ assign(Client_MySQL.prototype, {
_query(connection, obj) {
if (!obj || typeof obj === 'string') obj = {sql: obj}
return new Promise(function(resolver, rejecter) {
let { sql } = obj
if (!sql) return resolver()
if (obj.options) sql = assign({sql}, obj.options)
connection.query(sql, obj.bindings, function(err, rows, fields) {
if (!obj.sql) {
resolver()
return
}
const queryOptions = assign({sql: obj.sql}, obj.options)
connection.query(queryOptions, obj.bindings, function(err, rows, fields) {
if (err) return rejecter(err)
obj.response = [rows, fields]
resolver(obj)
Expand Down
29 changes: 29 additions & 0 deletions test/integration/builder/selects.js
Expand Up @@ -179,6 +179,35 @@ module.exports = function(knex) {
});
});

it('allows you to stream with mysql dialect options', function() {
if (!['mysql', 'mysql2'].includes(knex.client.dialect)) {
return
}
const rows = []
return knex('accounts')
.options({
typeCast (field, next) {
if (field.type === 'VAR_STRING') {
const val = field.string()
return val == null ? val : val.toUpperCase()
}
return next()
}
})
.stream(function(rowStream) {
rowStream.on('data', function(row) {
rows.push(row)
});
}).then(function() {
expect(rows).to.have.lengthOf(6)
rows.forEach(row => {
['first_name', 'last_name', 'email'].forEach(
field => expect(row[field]).to.equal(row[field].toUpperCase())
)
})
});
})

it('emits error on the stream, if not passed a function, and connecting fails', function() {
var expected = new Error();
var original = Runner.prototype.ensureConnection;
Expand Down

0 comments on commit 3f4f69a

Please sign in to comment.