Skip to content

Commit

Permalink
Perform clean shutdown when PooledConnection.shutdown() is called
Browse files Browse the repository at this point in the history
  • Loading branch information
ctavan committed Jan 30, 2012
1 parent 77c9e88 commit d5c7427
Showing 1 changed file with 84 additions and 13 deletions.
97 changes: 84 additions & 13 deletions lib/driver.js
Expand Up @@ -331,10 +331,33 @@ Connection.prototype.connect = function(callback) {
timeoutId = setTimeout(connectTimeout, this.timeout);
};

Connection.prototype.close = function() {
this.con.end();
this.con = null;
this.client = null;
/**
* Closes the current connection
*
* `this.con` is a socket connection. For failed socket connections
* `this.con.end()` may not trigger a `close` event. So in the cases where we
* are experiencing problems with the connection, we can just `end()` it
* without waiting for the `close` event.
*
* Note that the callback is only called, if the `close` event is fired. Also
* we only wait for the `close` event if a callback is given.
*
* @param {function} callback
*/
Connection.prototype.close = function(callback) {
var self = this;
if (!callback) {
self.con.end();
self.con = null;
self.client = null;
return;
}
self.con.on('close', function(err) {
self.con = null;
self.client = null;
callback();
});
self.con.end();
};

/**
Expand Down Expand Up @@ -493,7 +516,13 @@ var PooledConnection = module.exports.PooledConnection = function(config) {
this.use_bigints = config.use_bigints ? true : false;
this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
this.log_time = config.log_time || false;


// Number of currently running queries
this.running = 0;

// Shutdown mode
this.shuttingDown = false;

// Construct a list of nodes from hosts in <host>:<port> form
for (var i = 0; i < config.hosts.length; i++) {
var hostSpec = config.hosts[i];
Expand Down Expand Up @@ -562,8 +591,23 @@ PooledConnection.prototype._incr = function() {
* UPDATE : callback(err)
* DELETE : callback(err)
*/
PooledConnection.prototype.execute = function(query, args, callback) {
PooledConnection.prototype.execute = function(query, args, executeCallback) {
var self = this;

if (self.shuttingDown) {
executeCallback(new Error('Unable to execute query, connection pool is shutting down.'));
return;
}

self.running++;
var callback = function() {
self.running--;
if (self.running === 0) {
self.emit('drain');
}
executeCallback.apply(self, arguments);
};

self._getNextCon(function(err, con) {
if (err) {
callback(err, null);
Expand Down Expand Up @@ -631,6 +675,7 @@ PooledConnection.prototype._getNextCon = function(callback) {
} else if (c.unhealthyAt > 0) {
callback();
} else if (!c.connected) {
c.taken = true;
c.connect(function(err) {
if (c.connected) {
con = c;
Expand Down Expand Up @@ -665,14 +710,40 @@ PooledConnection.prototype._getNextCon = function(callback) {
* @param callback called when the pool is fully shutdown
*/
PooledConnection.prototype.shutdown = function(callback) {
// todo: we need to be able to let pending execute()s finish and block executes from happening while shutting down.
this.connections.forEach(function(con) {
if (con.connected) {
con.close();
}
var self = this;

// Start shutdown mode, causes no new execute()'s to be accepted
if (self.shuttingDown) {
return;
}
self.shuttingDown = true;

callback = callback || function() {};

// Close all open connections as soon as the pool has drained
self.on('drain', function() {
self._closeConnections(callback);
});
if (callback) {
callback();

// If no queries were running, emit the drain event immediately
if (self.running === 0) {
self.emit('drain');
}
};

/**
* Close all connected connections.
*
* @param {function} closeCallback that is fired once all connections are closed
*/
PooledConnection.prototype._closeConnections = function(closeCallback) {
async.forEach(this.connections, function(con, cb) {
if (con.connected) {
con.close(cb);
} else {
cb(null);
}
}, function(err) {
closeCallback(err);
});
};

0 comments on commit d5c7427

Please sign in to comment.