Skip to content

Commit

Permalink
Use tarn as pool (#2450)
Browse files Browse the repository at this point in the history
* replace generic-pool with Tarn

* fix bug where mariadb client didn't propagate connection errors to pool

* destroy connection if a query times out (and cancellation is not enabled)

Before this change, when a query timeout happened without { cancel: true }
the connection was immediately released back to the pool. If the query
timed out because of a time consuming query, the released connection would
still execute the query to its end and the connection would be useless until
that time. This commit marks the connection as disposed so that the pool never
again gives that connection to anyone, and destorys it in the near future.

* fix a weird database version dependent casing bug in a tests

* fix an unstable test that failed randomly because the query order was not guaranteed
  • Loading branch information
koskimas authored and elhigu committed Feb 7, 2018
1 parent 053736f commit 8771bd4
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 161 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"chalk": "2.3.0",
"commander": "^2.13.0",
"debug": "3.1.0",
"generic-pool": "^3.4.0",
"inherits": "~2.0.3",
"interpret": "^1.1.0",
"liftoff": "2.5.0",
Expand All @@ -19,6 +18,7 @@
"pg-connection-string": "2.0.0",
"readable-stream": "2.3.3",
"safe-buffer": "^5.1.1",
"tarn": "^1.1.2",
"tildify": "1.2.0",
"uuid": "^3.2.1",
"v8flags": "^3.0.1"
Expand Down
175 changes: 90 additions & 85 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ import TableCompiler from './schema/tablecompiler';
import ColumnBuilder from './schema/columnbuilder';
import ColumnCompiler from './schema/columncompiler';

import * as genericPool from 'generic-pool';
import * as genericPoolErrors from 'generic-pool/lib/errors'
import { Pool, TimeoutError } from 'tarn';
import inherits from 'inherits';
import { EventEmitter } from 'events';

import { makeEscape } from './query/string'
import { assign, uniqueId, cloneDeep, defaults, get } from 'lodash'
import { assign, uniqueId, cloneDeep, defaults } from 'lodash'

const debug = require('debug')('knex:client')
const debugQuery = require('debug')('knex:query')
Expand Down Expand Up @@ -194,83 +193,75 @@ assign(Client.prototype, {
},

poolDefaults() {
return {min: 2, max: 10, testOnBorrow: true, Promise}
return {min: 2, max: 10, propagateCreateError: true}
},

getPoolSettings(poolConfig) {
poolConfig = defaults({}, poolConfig, this.poolDefaults());
const timeoutValidator = (config, path) => {
let timeout = get(config, path)
if (timeout !== undefined) {
timeout = parseInt(timeout, 10)
if (isNaN(timeout) || timeout <= 0) {
throw new Error(`${path} must be a positive int`)
}

[
'maxWaitingClients',
'testOnBorrow',
'fifo',
'priorityRange',
'autostart',
'evictionRunIntervalMillis',
'numTestsPerRun',
'softIdleTimeoutMillis',
'Promise'
].forEach(option => {
if (option in poolConfig) {
helpers.warn([
`Pool config option "${option}" is no longer supported.`,
`See https://github.com/Vincit/tarn.js for possible pool config options.`
].join(' '))
}
return timeout
}
})

const timeouts = [
this.config.acquireConnectionTimeout || 60000,
poolConfig.acquireTimeoutMillis
].filter(timeout => timeout !== undefined);

// acquire connection timeout can be set on config or config.pool
// choose the smallest, positive timeout setting and set on poolConfig
const timeouts = [
timeoutValidator(this.config, 'acquireConnectionTimeout') || 60000,
timeoutValidator({pool: poolConfig}, 'pool.acquireTimeoutMillis')
].filter(timeout => timeout !== undefined)
poolConfig.acquireTimeoutMillis = Math.min(...timeouts);

return {
config: poolConfig,
factory: {
create: () => {
return this.acquireRawConnection()
.tap(function(connection) {
connection.__knexUid = uniqueId('__knexUid')
if (poolConfig.afterCreate) {
return Promise.promisify(poolConfig.afterCreate)(connection)
}
})
.catch(err => {
// Acquire connection must never reject, because generic-pool
// will retry trying to get connection until acquireConnectionTimeout is
// reached. acquireConnectionTimeout should trigger in knex only
// in that case if aquiring connection waits because pool is full
// https://github.com/coopernurse/node-pool/pull/184
// https://github.com/tgriesser/knex/issues/2325
return {
genericPoolMissingRetryCountHack: true,
__knex__disposed: err,
query: () => {
throw err; // pass error to query
}
};
});
},
destroy: (connection) => {
if (connection.genericPoolMissingRetryCountHack) {
return;
}
if (poolConfig.beforeDestroy) {
helpers.warn(`
beforeDestroy is deprecated, please open an issue if you use this
to discuss alternative apis
`)
poolConfig.beforeDestroy(connection, function() {})
}
if (connection !== void 0) {
return this.destroyRawConnection(connection)
}
return Object.assign(poolConfig, {
create: () => {
return this.acquireRawConnection().tap(connection => {
connection.__knexUid = uniqueId('__knexUid')

return Promise.resolve();
},
validate: (connection) => {
if (connection.__knex__disposed) {
helpers.warn(`Connection Error: ${connection.__knex__disposed}`)
return Promise.resolve(false);
if (poolConfig.afterCreate) {
return Promise.promisify(poolConfig.afterCreate)(connection)
}
return this.validateConnection(connection)
});
},

destroy: (connection) => {
if (poolConfig.beforeDestroy) {
helpers.warn(`
beforeDestroy is deprecated, please open an issue if you use this
to discuss alternative apis
`)

poolConfig.beforeDestroy(connection, function() {})
}

if (connection !== void 0) {
return this.destroyRawConnection(connection)
}
},
}

validate: (connection) => {
if (connection.__knex__disposed) {
helpers.warn(`Connection Error: ${connection.__knex__disposed}`)
return false
}

return this.validateConnection(connection)
}
})
},

initializePool(config) {
Expand All @@ -279,25 +270,25 @@ assign(Client.prototype, {
return
}

const poolSettings = this.getPoolSettings(config.pool);

this.pool = genericPool.createPool(poolSettings.factory, poolSettings.config)
this.pool = new Pool(this.getPoolSettings(config.pool))
},

validateConnection(connection) {
return Promise.resolve(true);
return true
},

// Acquire a connection from the pool.
acquireConnection() {
if (!this.pool) {
return Promise.reject(new Error('Unable to acquire a connection'))
}
return this.pool.acquire()

return Promise
.try(() => this.pool.acquire().promise)
.tap(connection => {
debug('acquired connection from pool: %s', connection.__knexUid)
})
.catch(genericPoolErrors.TimeoutError, () => {
.catch(TimeoutError, () => {
throw new Promise.TimeoutError(
'Knex: Timeout acquiring a connection. The pool is probably full. ' +
'Are you missing a .transacting(trx) call?'
Expand All @@ -309,24 +300,38 @@ assign(Client.prototype, {
// returning a promise resolved when the connection is released.
releaseConnection(connection) {
debug('releasing connection to pool: %s', connection.__knexUid)
return this.pool.release(connection).catch(() => {
const didRelease = this.pool.release(connection)

if (!didRelease) {
debug('pool refused connection: %s', connection.__knexUid)
})
}

return Promise.resolve()
},

// Destroy the current connection pool for the client.
destroy(callback) {
return Promise.resolve(
this.pool &&
this.pool.drain()
.then(() => this.pool.clear())
.then(() => {
this.pool = void 0
if(typeof callback === 'function') {
callback();
}
})
);
let promise = null

if (this.pool) {
promise = this.pool.destroy()
} else {
promise = Promise.resolve()
}

return promise.then(() => {
this.pool = void 0

if (typeof callback === 'function') {
callback()
}
}).catch(err => {
if (typeof callback === 'function') {
callback(err)
}

return Promise.reject(err)
})
},

// Return the database being used by this client.
Expand Down
13 changes: 8 additions & 5 deletions src/dialects/maria/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,21 @@ assign(Client_MariaSQL.prototype, {
connection.connect(assign({metadata: true}, this.connectionSettings))
connection
.on('ready', function() {
connection.removeAllListeners('error');
resolver(connection);
})
.on('error', rejecter);
.on('error', err => {
connection.__knex__disposed = err
rejecter(err)
});
})
},

validateConnection(connection) {
if(connection.connected === true) {
return Promise.resolve(true);
if (connection.connected === true) {
return true
}
return Promise.resolve(false);

return false
},

// Used to explicitly close a connection, called internally by the pool
Expand Down
7 changes: 4 additions & 3 deletions src/dialects/mssql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ assign(Client_MSSQL.prototype, {
},

validateConnection(connection) {
if(connection.connected === true) {
return Promise.resolve(true);
if (connection.connected === true) {
return true
}
return Promise.resolve(false);

return false
},

// Used to explicitly close a connection, called internally by the pool
Expand Down
7 changes: 4 additions & 3 deletions src/dialects/mysql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ assign(Client_MySQL.prototype, {
},

validateConnection(connection) {
if(connection.state === 'connected' || connection.state === 'authenticated') {
return Promise.resolve(true);
if (connection.state === 'connected' || connection.state === 'authenticated') {
return true
}
return Promise.resolve(false);

return false
},

// Grab a connection, run the query via the MySQL streaming interface,
Expand Down
7 changes: 4 additions & 3 deletions src/dialects/mysql2/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ assign(Client_MySQL2.prototype, {
},

validateConnection(connection) {
if(connection._fatalError) {
return Promise.resolve(false);
if (connection._fatalError) {
return false
}
return Promise.resolve(true);

return true
},

// Get a raw connection, called by the `pool` whenever a new
Expand Down
11 changes: 11 additions & 0 deletions src/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,22 @@ assign(Runner.prototype, {
if (obj.cancelOnTimeout) {
cancelQuery = this.client.cancelQuery(this.connection);
} else {
// If we don't cancel the query, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
// return the connection to the pool, it will be useless until the current operation
// that timed out, finally finishes.
this.connection.__knex__disposed = error
cancelQuery = Promise.resolve();
}

return cancelQuery
.catch((cancelError) => {
// If the cancellation failed, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
// return the connection to the pool, it will be useless until the current operation
// that timed out, finally finishes.
this.connection.__knex__disposed = error

// cancellation failed
throw assign(cancelError, {
message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
Expand Down
5 changes: 1 addition & 4 deletions test/docker/reconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module.exports = function(config, knex) {
var dockerConf = config.docker;
var ContainerClass = require(dockerConf.factory);

var EVICTION_RUN_INTERVAL_MILLIS = 15 * 1000;
var IDLE_TIMEOUT_MILLIS = 20 * 1000;
var ACQUIRE_CONNECTION_TIMEOUT = 10 * 1000;
var ACQUIRE_TIMEOUT_MILLIS = 10 * 1000;
Expand Down Expand Up @@ -118,9 +117,7 @@ module.exports = function(config, knex) {
min: 7,
max: 7,
idleTimeoutMillis: IDLE_TIMEOUT_MILLIS,
acquireTimeoutMillis: ACQUIRE_TIMEOUT_MILLIS,
evictionRunIntervalMillis: EVICTION_RUN_INTERVAL_MILLIS,
testOnBorrow: true
acquireTimeoutMillis: ACQUIRE_TIMEOUT_MILLIS
},
connection: {
database: dockerConf.database,
Expand Down
2 changes: 1 addition & 1 deletion test/integration/suite.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module.exports = function(knex) {

describe('knex.destroy', function() {
it('should allow destroying the pool with knex.destroy', function() {
var spy = sinon.spy(knex.client.pool, 'clear');
var spy = sinon.spy(knex.client.pool, 'destroy');
return knex.destroy().then(function() {
expect(spy).to.have.callCount(1);
expect(knex.client.pool).to.equal(undefined);
Expand Down
Loading

0 comments on commit 8771bd4

Please sign in to comment.