Skip to content

Commit

Permalink
refactor: use sequelize-pool for pooling (#10051)
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantdhiman committed Oct 28, 2018
1 parent 88a340d commit 567c019
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 137 deletions.
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ changelog.md
Makefile
coverage*
.github
.vscode

appveyor-setup.ps1
appveyor.yml
Expand Down
152 changes: 53 additions & 99 deletions lib/dialects/abstract/connection-manager.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
'use strict';

const Pooling = require('generic-pool');
const { Pool } = require('sequelize-pool');
const _ = require('lodash');
const semver = require('semver');
const Promise = require('../../promise');
const errors = require('../../errors');
const logger = require('../../utils/logger');
const debug = logger.getLogger().debugContext('pool');

const defaultPoolingConfig = {
max: 5,
min: 0,
idle: 10000,
acquire: 60000,
evict: 1000
};

/**
* Abstract Connection Manager
*
* Connection manager which handles pool, replication and determining database version
* Works with generic-pool to maintain connection pool
* Connection manager which handles pooling & replication.
* Uses sequelize-pool for pooling
*
* @private
*/
Expand All @@ -38,9 +30,13 @@ class ConnectionManager {
throw new Error('Support for pool:false was removed in v4.0');
}

config.pool = _.defaults(config.pool || {}, defaultPoolingConfig, {
validate: this._validate.bind(this),
Promise
config.pool = _.defaults(config.pool || {}, {
max: 5,
min: 0,
idle: 10000,
acquire: 60000,
evict: 1000,
validate: this._validate.bind(this)
});

this.initPools();
Expand Down Expand Up @@ -102,7 +98,7 @@ class ConnectionManager {

return this.pool.drain().then(() => {
debug('connection drain due to process exit');
return this.pool.clear();
return this.pool.destroyAllNow();
});
}

Expand All @@ -128,35 +124,26 @@ class ConnectionManager {
const config = this.config;

if (!config.replication) {
this.pool = Pooling.createPool({
create: () => this._connect(config).catch(err => err),
destroy: mayBeConnection => {
if (mayBeConnection instanceof Error) {
return Promise.resolve();
}

return this._disconnect(mayBeConnection)
this.pool = new Pool({
name: 'sequelize',
create: () => this._connect(config),
destroy: connection => {
return this._disconnect(connection)
.tap(() => { debug('connection destroy'); });
},
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
testOnBorrow: true,
autostart: false,
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
reapIntervalMillis: config.pool.evict
});

debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, no replication`);

return;
}

let reads = 0;

if (!Array.isArray(config.replication.read)) {
config.replication.read = [config.replication.read];
}
Expand All @@ -170,84 +157,70 @@ class ConnectionManager {
);

// custom pooling for replication (original author @janmeier)
let reads = 0;
this.pool = {
release: client => {
if (client.queryType === 'read') {
return this.pool.read.release(client);
this.pool.read.release(client);
} else {
return this.pool.write.release(client);
this.pool.write.release(client);
}
},
acquire: (priority, queryType, useMaster) => {
acquire: (queryType, useMaster) => {
useMaster = _.isUndefined(useMaster) ? false : useMaster;
if (queryType === 'SELECT' && !useMaster) {
return this.pool.read.acquire(priority);
return this.pool.read.acquire();
} else {
return this.pool.write.acquire(priority);
return this.pool.write.acquire();
}
},
destroy: mayBeConnection => {
if (mayBeConnection instanceof Error) {
return Promise.resolve();
}

return this.pool[mayBeConnection.queryType].destroy(mayBeConnection)
.tap(() => { debug('connection destroy'); });
destroy: connection => {
this.pool[connection.queryType].destroy(connection);
debug('connection destroy');
},
clear: () => {
destroyAllNow: () => {
return Promise.join(
this.pool.read.clear(),
this.pool.write.clear()
).tap(() => { debug('all connection clear'); });
this.pool.read.destroyAllNow(),
this.pool.write.destroyAllNow()
).tap(() => { debug('all connections destroyed'); });
},
drain: () => {
return Promise.join(
this.pool.write.drain(),
this.pool.read.drain()
);
},
read: Pooling.createPool({
read: new Pool({
name: 'sequelize:read',
create: () => {
const nextRead = reads++ % config.replication.read.length; // round robin config
return this
._connect(config.replication.read[nextRead])
.tap(connection => {
connection.queryType = 'read';
})
.catch(err => err);
// round robin config
const nextRead = reads++ % config.replication.read.length;
return this._connect(config.replication.read[nextRead]).tap(connection => {
connection.queryType = 'read';
});
},
destroy: connection => this._disconnect(connection),
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
testOnBorrow: true,
autostart: false,
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
reapIntervalMillis: config.pool.evict
}),
write: Pooling.createPool({
write: new Pool({
name: 'sequelize:write',
create: () => {
return this
._connect(config.replication.write)
.tap(connection => {
connection.queryType = 'write';
})
.catch(err => err);
return this._connect(config.replication.write).tap(connection => {
connection.queryType = 'write';
});
},
destroy: connection => this._disconnect(connection),
validate: config.pool.validate
}, {
Promise: config.pool.Promise,
testOnBorrow: true,
autostart: false,
validate: config.pool.validate,
max: config.pool.max,
min: config.pool.min,
acquireTimeoutMillis: config.pool.acquire,
idleTimeoutMillis: config.pool.idle,
evictionRunIntervalMillis: config.pool.evict
reapIntervalMillis: config.pool.evict
})
};

Expand All @@ -259,7 +232,6 @@ class ConnectionManager {
* Call pool.acquire to get a connection
*
* @param {Object} [options] Pool options
* @param {Integer} [options.priority] Set priority for this call. Read more at https://github.com/coopernurse/node-pool#priority-queueing
* @param {string} [options.type] Set which replica to use. Available options are `read` and `write`
* @param {boolean} [options.useMaster=false] Force master or write replica to get connection from
*
Expand Down Expand Up @@ -297,9 +269,8 @@ class ConnectionManager {
}

return promise.then(() => {
return this.pool.acquire(options.priority, options.type, options.useMaster)
.then(mayBeConnection => this._determineConnection(mayBeConnection))
.catch({name: 'TimeoutError'}, err => { throw new errors.ConnectionAcquireTimeoutError(err); })
return this.pool.acquire(options.type, options.useMaster)
.catch(Promise.TimeoutError, err => { throw new errors.ConnectionAcquireTimeoutError(err); })
.tap(() => { debug('connection acquired'); });
});
}
Expand All @@ -312,27 +283,10 @@ class ConnectionManager {
* @returns {Promise}
*/
releaseConnection(connection) {
return this.pool.release(connection)
.tap(() => { debug('connection released'); })
.catch(/Resource not currently part of this pool/, () => {});
}

/**
* Check if something acquired by pool is indeed a connection but not an Error instance
* Why we need to do this https://github.com/sequelize/sequelize/pull/8330
*
* @param {Object|Error} mayBeConnection Object which can be either connection or error
*
* @returns {Promise<Connection>}
*/
_determineConnection(mayBeConnection) {
if (mayBeConnection instanceof Error) {
return Promise.resolve(this.pool.destroy(mayBeConnection))
.catch(/Resource not currently part of this pool/, () => {})
.then(() => { throw mayBeConnection; });
}

return Promise.resolve(mayBeConnection);
return Promise.try(() => {
this.pool.release(connection);
debug('connection released');
});
}

/**
Expand Down
3 changes: 1 addition & 2 deletions lib/dialects/mssql/connection-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ class ConnectionManager extends AbstractConnectionManager {
switch (error.code) {
case 'ESOCKET':
case 'ECONNRESET':
this.pool.destroy(resourceLock)
.catch(/Resource not currently part of this pool/, () => {});
this.pool.destroy(resourceLock);
}
});
}).catch(error => {
Expand Down
22 changes: 6 additions & 16 deletions lib/dialects/mysql/connection-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const AbstractConnectionManager = require('../abstract/connection-manager');
const SequelizeErrors = require('../../errors');
const Utils = require('../../utils');
const Promise = require('../../promise');
const logger = require('../../utils/logger');
const DataTypes = require('../../data-types').mysql;
const momentTz = require('moment-timezone');
Expand Down Expand Up @@ -72,7 +72,7 @@ class ConnectionManager extends AbstractConnectionManager {
}
}

return new Utils.Promise((resolve, reject) => {
return new Promise((resolve, reject) => {
const connection = this.lib.createConnection(connectionConfig);

const errorHandler = e => {
Expand Down Expand Up @@ -103,12 +103,11 @@ class ConnectionManager extends AbstractConnectionManager {
case 'ECONNRESET':
case 'EPIPE':
case 'PROTOCOL_CONNECTION_LOST':
this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
this.pool.destroy(connection);
}
});

return new Utils.Promise((resolve, reject) => {
return new Promise((resolve, reject) => {
if (!this.sequelize.config.keepDefaultTimezone) {
// set timezone for this connection
// but named timezone are not directly supported in mysql, so get its offset first
Expand Down Expand Up @@ -145,19 +144,10 @@ class ConnectionManager extends AbstractConnectionManager {
// Don't disconnect connections with CLOSED state
if (connection._closing) {
debug('connection tried to disconnect but was already at CLOSED state');
return Utils.Promise.resolve();
return Promise.resolve();
}

return new Utils.Promise((resolve, reject) => {
connection.end(err => {
if (err) {
reject(new SequelizeErrors.ConnectionError(err));
} else {
debug('connection disconnected');
resolve();
}
});
});
return Promise.fromCallback(callback => connection.end(callback));
}

validate(connection) {
Expand Down
11 changes: 7 additions & 4 deletions lib/dialects/postgres/connection-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,18 @@ class ConnectionManager extends AbstractConnectionManager {
// Don't let a Postgres restart (or error) to take down the whole app
connection.on('error', error => {
connection._invalid = true;
debug(`connection error ${error.code}`);

this.pool.destroy(connection)
.catch(/Resource not currently part of this pool/, () => {});
debug(`connection error ${error.code || error.message}`);
this.pool.destroy(connection);
});
});
}

disconnect(connection) {
if (connection._ending) {
debug('connection tried to disconnect but was already at ENDING state');
return Promise.resolve();
}

return Promise.fromCallback(callback => connection.end(callback));
}

Expand Down
5 changes: 2 additions & 3 deletions lib/sequelize.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ class Sequelize {
* @param {Object} [options.pool] sequelize connection pool configuration
* @param {number} [options.pool.max=5] Maximum number of connection in pool
* @param {number} [options.pool.min=0] Minimum number of connection in pool
* @param {number} [options.pool.idle=10000] The maximum time, in milliseconds, that a connection can be idle before being released. Use with combination of evict for proper working, for more details read https://github.com/coopernurse/node-pool/issues/178#issuecomment-327110870
* @param {number} [options.pool.idle=10000] The maximum time, in milliseconds, that a connection can be idle before being released.
* @param {number} [options.pool.acquire=60000] The maximum time, in milliseconds, that pool will try to get connection before throwing error
* @param {number} [options.pool.evict=1000] The time interval, in milliseconds, for evicting stale connections. Set it to 0 to disable this feature.
* @param {boolean} [options.pool.handleDisconnects=true] Controls if pool should handle connection disconnect automatically without throwing errors
* @param {number} [options.pool.evict=1000] The time interval, in milliseconds, after which sequelize-pool will remove idle connections.
* @param {Function} [options.pool.validate] A function that validates a connection. Called with client. The default function checks that client is an object, and that its state is not disconnected
* @param {boolean} [options.quoteIdentifiers=true] Set to `false` to make table names and attributes case-insensitive on Postgres and skip double quoting of them. WARNING: Setting this to false may expose vulnerabilities and is not recommended!
* @param {string} [options.transactionType='DEFERRED'] Set the default transaction type. See `Sequelize.Transaction.TYPES` for possible options. Sqlite only.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
"debug": "^4.1.0",
"depd": "^2.0.0",
"dottie": "^2.0.0",
"generic-pool": "^3.4.0",
"inflection": "1.12.0",
"lodash": "^4.17.11",
"moment": "^2.22.2",
"moment-timezone": "^0.5.21",
"retry-as-promised": "^3.1.0",
"semver": "^5.6.0",
"sequelize-pool": "^1.0.0",
"toposort-class": "^1.0.1",
"uuid": "^3.2.1",
"validator": "^10.4.0",
Expand Down
Loading

0 comments on commit 567c019

Please sign in to comment.