Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use tarn as pool #2450

Merged
merged 5 commits into from Feb 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -9,7 +9,6 @@
"chalk": "2.3.0",
"commander": "^2.13.0",
"debug": "3.1.0",
"generic-pool": "^3.4.0",
"inherits": "~2.0.3",
"interpret": "^1.1.0",
"liftoff": "2.5.0",
Expand All @@ -19,6 +18,7 @@
"pg-connection-string": "2.0.0",
"readable-stream": "2.3.3",
"safe-buffer": "^5.1.1",
"tarn": "^1.1.2",
"tildify": "1.2.0",
"uuid": "^3.2.1",
"v8flags": "^3.0.1"
Expand Down
175 changes: 90 additions & 85 deletions src/client.js
Expand Up @@ -16,13 +16,12 @@ import TableCompiler from './schema/tablecompiler';
import ColumnBuilder from './schema/columnbuilder';
import ColumnCompiler from './schema/columncompiler';

import * as genericPool from 'generic-pool';
import * as genericPoolErrors from 'generic-pool/lib/errors'
import { Pool, TimeoutError } from 'tarn';
import inherits from 'inherits';
import { EventEmitter } from 'events';

import { makeEscape } from './query/string'
import { assign, uniqueId, cloneDeep, defaults, get } from 'lodash'
import { assign, uniqueId, cloneDeep, defaults } from 'lodash'

const debug = require('debug')('knex:client')
const debugQuery = require('debug')('knex:query')
Expand Down Expand Up @@ -194,83 +193,75 @@ assign(Client.prototype, {
},

poolDefaults() {
return {min: 2, max: 10, testOnBorrow: true, Promise}
return {min: 2, max: 10, propagateCreateError: true}
},

getPoolSettings(poolConfig) {
poolConfig = defaults({}, poolConfig, this.poolDefaults());
const timeoutValidator = (config, path) => {
let timeout = get(config, path)
if (timeout !== undefined) {
timeout = parseInt(timeout, 10)
if (isNaN(timeout) || timeout <= 0) {
throw new Error(`${path} must be a positive int`)
}

[
'maxWaitingClients',
'testOnBorrow',
'fifo',
'priorityRange',
'autostart',
'evictionRunIntervalMillis',
'numTestsPerRun',
'softIdleTimeoutMillis',
'Promise'
].forEach(option => {
if (option in poolConfig) {
helpers.warn([
`Pool config option "${option}" is no longer supported.`,
`See https://github.com/Vincit/tarn.js for possible pool config options.`
].join(' '))
}
return timeout
}
})

const timeouts = [
this.config.acquireConnectionTimeout || 60000,
poolConfig.acquireTimeoutMillis
].filter(timeout => timeout !== undefined);

// acquire connection timeout can be set on config or config.pool
// choose the smallest, positive timeout setting and set on poolConfig
const timeouts = [
timeoutValidator(this.config, 'acquireConnectionTimeout') || 60000,
timeoutValidator({pool: poolConfig}, 'pool.acquireTimeoutMillis')
].filter(timeout => timeout !== undefined)
poolConfig.acquireTimeoutMillis = Math.min(...timeouts);

return {
config: poolConfig,
factory: {
create: () => {
return this.acquireRawConnection()
.tap(function(connection) {
connection.__knexUid = uniqueId('__knexUid')
if (poolConfig.afterCreate) {
return Promise.promisify(poolConfig.afterCreate)(connection)
}
})
.catch(err => {
// Acquire connection must never reject, because generic-pool
// will retry trying to get connection until acquireConnectionTimeout is
// reached. acquireConnectionTimeout should trigger in knex only
// in that case if aquiring connection waits because pool is full
// https://github.com/coopernurse/node-pool/pull/184
// https://github.com/tgriesser/knex/issues/2325
return {
genericPoolMissingRetryCountHack: true,
__knex__disposed: err,
query: () => {
throw err; // pass error to query
}
};
});
},
destroy: (connection) => {
if (connection.genericPoolMissingRetryCountHack) {
return;
}
if (poolConfig.beforeDestroy) {
helpers.warn(`
beforeDestroy is deprecated, please open an issue if you use this
to discuss alternative apis
`)
poolConfig.beforeDestroy(connection, function() {})
}
if (connection !== void 0) {
return this.destroyRawConnection(connection)
}
return Object.assign(poolConfig, {
create: () => {
return this.acquireRawConnection().tap(connection => {
connection.__knexUid = uniqueId('__knexUid')

return Promise.resolve();
},
validate: (connection) => {
if (connection.__knex__disposed) {
helpers.warn(`Connection Error: ${connection.__knex__disposed}`)
return Promise.resolve(false);
if (poolConfig.afterCreate) {
return Promise.promisify(poolConfig.afterCreate)(connection)
}
return this.validateConnection(connection)
});
},

destroy: (connection) => {
if (poolConfig.beforeDestroy) {
helpers.warn(`
beforeDestroy is deprecated, please open an issue if you use this
to discuss alternative apis
`)

poolConfig.beforeDestroy(connection, function() {})
}

if (connection !== void 0) {
return this.destroyRawConnection(connection)
}
},
}

validate: (connection) => {
if (connection.__knex__disposed) {
helpers.warn(`Connection Error: ${connection.__knex__disposed}`)
return false
}

return this.validateConnection(connection)
}
})
},

initializePool(config) {
Expand All @@ -279,25 +270,25 @@ assign(Client.prototype, {
return
}

const poolSettings = this.getPoolSettings(config.pool);

this.pool = genericPool.createPool(poolSettings.factory, poolSettings.config)
this.pool = new Pool(this.getPoolSettings(config.pool))
},

validateConnection(connection) {
return Promise.resolve(true);
return true
},

// Acquire a connection from the pool.
acquireConnection() {
if (!this.pool) {
return Promise.reject(new Error('Unable to acquire a connection'))
}
return this.pool.acquire()

return Promise
.try(() => this.pool.acquire().promise)
.tap(connection => {
debug('acquired connection from pool: %s', connection.__knexUid)
})
.catch(genericPoolErrors.TimeoutError, () => {
.catch(TimeoutError, () => {
throw new Promise.TimeoutError(
'Knex: Timeout acquiring a connection. The pool is probably full. ' +
'Are you missing a .transacting(trx) call?'
Expand All @@ -309,24 +300,38 @@ assign(Client.prototype, {
// returning a promise resolved when the connection is released.
releaseConnection(connection) {
debug('releasing connection to pool: %s', connection.__knexUid)
return this.pool.release(connection).catch(() => {
const didRelease = this.pool.release(connection)

if (!didRelease) {
debug('pool refused connection: %s', connection.__knexUid)
})
}

return Promise.resolve()
},

// Destroy the current connection pool for the client.
destroy(callback) {
return Promise.resolve(
this.pool &&
this.pool.drain()
.then(() => this.pool.clear())
.then(() => {
this.pool = void 0
if(typeof callback === 'function') {
callback();
}
})
);
let promise = null

if (this.pool) {
promise = this.pool.destroy()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroy drains and clears the pool just like drain + clear with the old pool.

} else {
promise = Promise.resolve()
}

return promise.then(() => {
this.pool = void 0

if (typeof callback === 'function') {
callback()
}
}).catch(err => {
if (typeof callback === 'function') {
callback(err)
}

return Promise.reject(err)
})
},

// Return the database being used by this client.
Expand Down
13 changes: 8 additions & 5 deletions src/dialects/maria/index.js
Expand Up @@ -36,18 +36,21 @@ assign(Client_MariaSQL.prototype, {
connection.connect(assign({metadata: true}, this.connectionSettings))
connection
.on('ready', function() {
connection.removeAllListeners('error');
resolver(connection);
})
.on('error', rejecter);
.on('error', err => {
connection.__knex__disposed = err
rejecter(err)
});
})
},

validateConnection(connection) {
if(connection.connected === true) {
return Promise.resolve(true);
if (connection.connected === true) {
return true
}
return Promise.resolve(false);

return false
},

// Used to explicitly close a connection, called internally by the pool
Expand Down
7 changes: 4 additions & 3 deletions src/dialects/mssql/index.js
Expand Up @@ -88,10 +88,11 @@ assign(Client_MSSQL.prototype, {
},

validateConnection(connection) {
if(connection.connected === true) {
return Promise.resolve(true);
if (connection.connected === true) {
return true
}
return Promise.resolve(false);

return false
},

// Used to explicitly close a connection, called internally by the pool
Expand Down
7 changes: 4 additions & 3 deletions src/dialects/mysql/index.js
Expand Up @@ -87,10 +87,11 @@ assign(Client_MySQL.prototype, {
},

validateConnection(connection) {
if(connection.state === 'connected' || connection.state === 'authenticated') {
return Promise.resolve(true);
if (connection.state === 'connected' || connection.state === 'authenticated') {
return true
}
return Promise.resolve(false);

return false
},

// Grab a connection, run the query via the MySQL streaming interface,
Expand Down
7 changes: 4 additions & 3 deletions src/dialects/mysql2/index.js
Expand Up @@ -30,10 +30,11 @@ assign(Client_MySQL2.prototype, {
},

validateConnection(connection) {
if(connection._fatalError) {
return Promise.resolve(false);
if (connection._fatalError) {
return false
}
return Promise.resolve(true);

return true
},

// Get a raw connection, called by the `pool` whenever a new
Expand Down
11 changes: 11 additions & 0 deletions src/runner.js
Expand Up @@ -158,11 +158,22 @@ assign(Runner.prototype, {
if (obj.cancelOnTimeout) {
cancelQuery = this.client.cancelQuery(this.connection);
} else {
// If we don't cancel the query, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
// return the connection to the pool, it will be useless until the current operation
// that timed out, finally finishes.
this.connection.__knex__disposed = error
cancelQuery = Promise.resolve();
}

return cancelQuery
.catch((cancelError) => {
// If the cancellation failed, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
// return the connection to the pool, it will be useless until the current operation
// that timed out, finally finishes.
this.connection.__knex__disposed = error

// cancellation failed
throw assign(cancelError, {
message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
Expand Down
5 changes: 1 addition & 4 deletions test/docker/reconnect.js
Expand Up @@ -9,7 +9,6 @@ module.exports = function(config, knex) {
var dockerConf = config.docker;
var ContainerClass = require(dockerConf.factory);

var EVICTION_RUN_INTERVAL_MILLIS = 15 * 1000;
var IDLE_TIMEOUT_MILLIS = 20 * 1000;
var ACQUIRE_CONNECTION_TIMEOUT = 10 * 1000;
var ACQUIRE_TIMEOUT_MILLIS = 10 * 1000;
Expand Down Expand Up @@ -118,9 +117,7 @@ module.exports = function(config, knex) {
min: 7,
max: 7,
idleTimeoutMillis: IDLE_TIMEOUT_MILLIS,
acquireTimeoutMillis: ACQUIRE_TIMEOUT_MILLIS,
evictionRunIntervalMillis: EVICTION_RUN_INTERVAL_MILLIS,
testOnBorrow: true
acquireTimeoutMillis: ACQUIRE_TIMEOUT_MILLIS
},
connection: {
database: dockerConf.database,
Expand Down
2 changes: 1 addition & 1 deletion test/integration/suite.js
Expand Up @@ -30,7 +30,7 @@ module.exports = function(knex) {

describe('knex.destroy', function() {
it('should allow destroying the pool with knex.destroy', function() {
var spy = sinon.spy(knex.client.pool, 'clear');
var spy = sinon.spy(knex.client.pool, 'destroy');
return knex.destroy().then(function() {
expect(spy).to.have.callCount(1);
expect(knex.client.pool).to.equal(undefined);
Expand Down