Skip to content

Commit

Permalink
Fix #1694
Browse files Browse the repository at this point in the history
  • Loading branch information
tgriesser committed Oct 9, 2016
1 parent 9d67985 commit 3a3cad9
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 83 deletions.
35 changes: 20 additions & 15 deletions src/client.js
Expand Up @@ -218,26 +218,31 @@ assign(Client.prototype, {

// Acquire a connection from the pool.
acquireConnection() {
let request = null
const completed = new Promise((resolver, rejecter) => {
return new Promise((resolver, rejecter) => {
if (!this.pool) {
return rejecter(new Error('Unable to acquire a connection'))
}
request = this.pool.acquire(function(err, connection) {
if (err) return rejecter(err)
debug('acquired connection from pool: %s', connection.__knexUid)
resolver(connection)
let wasRejected = false
const t = setTimeout(() => {
wasRejected = true
rejecter(new Promise.TimeoutError(
'Knex: Timeout acquiring a connection. The pool is probably full. ' +
'Are you missing a .transacting(trx) call?'
))
}, this.config.acquireConnectionTimeout || 60000)
this.pool.acquire((err, connection) => {
if (err) {
return rejecter(err)
}
clearTimeout(t)
if (wasRejected) {
this.pool.release(connection)
} else {
debug('acquired connection from pool: %s', connection.__knexUid)
resolver(connection)
}
})
})
const abort = function(reason) {
if (request && !request.fulfilled) {
request.abort(reason)
}
}
return {
completed: completed,
abort: abort
}
},

// Releases a connection back to the connection pool,
Expand Down
2 changes: 1 addition & 1 deletion src/dialects/mssql/transaction.js
Expand Up @@ -50,7 +50,7 @@ export default class Transaction_MSSQL extends Transaction {
return Promise.try(() => {
return (t.outerTx ? t.outerTx.conn : null) ||
configConnection ||
t.client.acquireConnection().completed;
t.client.acquireConnection();
}).tap(function(conn) {
if (!t.outerTx) {
t.conn = conn
Expand Down
2 changes: 1 addition & 1 deletion src/dialects/mysql/index.js
Expand Up @@ -145,7 +145,7 @@ assign(Client_MySQL.prototype, {
canCancelQuery: true,

cancelQuery(connectionToKill) {
const acquiringConn = this.acquireConnection().completed
const acquiringConn = this.acquireConnection()

// Error out if we can't acquire connection in time.
// Purposely not putting timeout on `KILL QUERY` execution because erroring
Expand Down
2 changes: 1 addition & 1 deletion src/dialects/oracle/transaction.js
Expand Up @@ -32,7 +32,7 @@ export default class Oracle_Transaction extends Transaction {
acquireConnection(config) {
const t = this
return Promise.try(() =>
config.connection || t.client.acquireConnection().completed
config.connection || t.client.acquireConnection()
).tap(connection => {
if (!t.outerTx) {
connection.setAutoCommit(false)
Expand Down
2 changes: 1 addition & 1 deletion src/dialects/oracledb/transaction.js
Expand Up @@ -34,7 +34,7 @@ export default class Oracle_Transaction extends Transaction {
acquireConnection(config) {
const t = this;
return Promise.try(function() {
return t.client.acquireConnection().completed.then(function(cnx) {
return t.client.acquireConnection().then(function(cnx) {
cnx.isTransaction = true;
return cnx;
});
Expand Down
18 changes: 5 additions & 13 deletions src/dialects/websql/index.js
Expand Up @@ -28,12 +28,11 @@ assign(Client_WebSQL.prototype, {

// Get a raw connection from the database, returning a promise with the connection object.
acquireConnection() {
const client = this;
const completed = new Promise(function(resolve, reject) {
return new Promise((resolve, reject) => {
try {
/*jslint browser: true*/
const db = openDatabase(
client.name, client.version, client.displayName, client.estimatedSize
this.name, this.version, this.displayName, this.estimatedSize
);
db.transaction(function(t) {
t.__knexUid = uniqueId('__knexUid');
Expand All @@ -43,13 +42,6 @@ assign(Client_WebSQL.prototype, {
reject(e);
}
});
const abort = function(reason) {
// Do nothing, websql has no pool
}
return {
completed: completed,
abort: abort
};
},

// Used to explicitly close a connection, called internally by the pool
Expand All @@ -61,12 +53,12 @@ assign(Client_WebSQL.prototype, {
// Runs the query on the specified connection,
// providing the bindings and any other necessary prep work.
_query(connection, obj) {
return new Promise(function(resolver, rejecter) {
return new Promise((resolver, rejecter) => {
if (!connection) return rejecter(new Error('No connection provided.'));
connection.executeSql(obj.sql, obj.bindings, function(trx, response) {
connection.executeSql(obj.sql, obj.bindings, (trx, response) => {
obj.response = response;
return resolver(obj);
}, function(trx, err) {
}, (trx, err) => {
rejecter(err);
});
});
Expand Down
35 changes: 8 additions & 27 deletions src/runner.js
Expand Up @@ -181,40 +181,21 @@ assign(Runner.prototype, {

// Check whether there's a transaction flag, and that it has a connection.
ensureConnection() {
const runner = this
const acquireConnectionTimeout = runner.client.config.acquireConnectionTimeout || 60000;
return Promise.try(() => {
return runner.connection || new Promise((resolver, rejecter) => {
const acquireConnection = runner.client.acquireConnection();

acquireConnection.completed
.timeout(acquireConnectionTimeout)
return this.connection || new Promise((resolver, rejecter) => {
this.client.acquireConnection()
.then(resolver)
.catch(Promise.TimeoutError, (error) => {
const timeoutError = new Error(
'Knex: Timeout acquiring a connection. The pool is probably full. ' +
'Are you missing a .transacting(trx) call?'
);
const additionalErrorInformation = {
timeoutStack: error.stack
}

if(runner.builder) {
additionalErrorInformation.sql = runner.builder.sql;
additionalErrorInformation.bindings = runner.builder.bindings;
if (this.builder) {
error.sql = this.builder.sql;
error.bindings = this.builder.bindings;
}

assign(timeoutError, additionalErrorInformation)

// Let the pool know that this request for a connection timed out
acquireConnection.abort('Knex: Timeout acquiring a connection.')

rejecter(timeoutError)
throw error
})
.catch(rejecter)
})
}).disposer(function() {
runner.client.releaseConnection(runner.connection)
}).disposer(() => {
this.client.releaseConnection(this.connection)
})
}

Expand Down
9 changes: 3 additions & 6 deletions src/transaction.js
Expand Up @@ -141,7 +141,7 @@ export default class Transaction extends EventEmitter {
// the original promise is marked completed.
acquireConnection(client, config, txid) {
const configConnection = config && config.connection
return Promise.try(() => configConnection || client.acquireConnection().completed)
return Promise.try(() => configConnection || client.acquireConnection())
.disposer(function(connection) {
if (!configConnection) {
debug('%s: releasing connection', txid)
Expand Down Expand Up @@ -225,11 +225,8 @@ function makeTxClient(trx, client, connection) {
return _stream.call(trxClient, conn, obj, stream, options)
})
}
trxClient.acquireConnection = function () {
return {
completed: trx._previousSibling.reflect().then(() => connection),
abort: noop
}
trxClient.acquireConnection = function() {
return Promise.resolve(connection)
}
trxClient.releaseConnection = function() {
return Promise.resolve()
Expand Down
53 changes: 36 additions & 17 deletions test/integration/builder/transaction.js
Expand Up @@ -287,25 +287,44 @@ module.exports = function(knex) {
var knexDb = new Knex(knexConfig);

//Create a transaction that will occupy the only available connection, and avoid trx.commit.
return knexDb.transaction(function(trx) {
trx.raw('SELECT 1 = 1').then(function () {
//No connection is available, so try issuing a query without transaction.
//Since there is no available connection, it should throw a timeout error based on `aquireConnectionTimeout` from the knex config.
return knexDb.raw('select * FROM accounts WHERE username = ?', ['Test'])
})
.then(function () {
//Should never reach this point
expect(false).to.be.ok();
}).catch(function (error) {
expect(error.bindings).to.be.an('array');
expect(error.bindings[0]).to.equal('Test');
expect(error.sql).to.equal('select * FROM accounts WHERE username = ?');
expect(error.message).to.equal('Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?');
trx.commit();//Test done
});
return knexDb.transaction(function(trx) {
trx.raw('SELECT 1 = 1').then(function () {
//No connection is available, so try issuing a query without transaction.
//Since there is no available connection, it should throw a timeout error based on `aquireConnectionTimeout` from the knex config.
return knexDb.raw('select * FROM accounts WHERE username = ?', ['Test'])
})
.then(function () {
//Should never reach this point
expect(false).to.be.ok();
})
.catch(function (error) {
expect(error.bindings).to.be.an('array');
expect(error.bindings[0]).to.equal('Test');
expect(error.sql).to.equal('select * FROM accounts WHERE username = ?');
expect(error.message).to.equal('Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?');
trx.commit();//Test done
});
});
});

});
it('#1694, #1703 it should return connections to pool if acquireConnectionTimeout is triggered', () => {
const db = Knex({
client: knex.client.driverName,
pool: {
min: 0,
max: 1
},
connection: knex.client.connectionSettings,
acquireConnectionTimeout: 300
})
return db.transaction(function() {
return db.transaction(function() {})
}).then(function () {
throw new Error('should not get here')

This comment has been minimized.

Copy link
@koskimas

koskimas Oct 9, 2016

Collaborator

You catch the error thrown here. This test will never fail

}).catch((error) => {
// Should get here after 300 ms.
})
})

});
};
2 changes: 1 addition & 1 deletion test/tape/knex.js
Expand Up @@ -43,7 +43,7 @@ test('it should allow to use proprietary dialect', function(t) {
knexObj.destroy()
})

test('it should use knex suppoted dialect', function(t) {
test('it should use knex supported dialect', function(t) {
t.plan(1)
var knexObj = knex({
client: 'postgres',
Expand Down

0 comments on commit 3a3cad9

Please sign in to comment.