Skip to content

Commit

Permalink
feat(cluster): add lazy connect support (#352)
Browse files Browse the repository at this point in the history
* add lazyConnect to Redis.Cluster

* use _.noop

* fix: handle disconnection with lazyConnect

* remove unnecessary setImmediate

* add comments to setImmediate
  • Loading branch information
shaharmor authored and luin committed Jul 28, 2016
1 parent 9d70cc7 commit f1cadff
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 29 deletions.
2 changes: 1 addition & 1 deletion lib/cluster/connection_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ConnectionPool.prototype.findOrCreate = function (node, readOnly) {
redis = this.nodes.all[node.key];
if (redis.options.readOnly !== readOnly) {
redis.options.readOnly = readOnly;
redis[readOnly ? 'readonly' : 'readwrite']().catch(function () {});
redis[readOnly ? 'readonly' : 'readwrite']().catch(_.noop);
if (readOnly) {
delete this.nodes.master[node.key];
this.nodes.slave[node.key] = redis;
Expand Down
77 changes: 53 additions & 24 deletions lib/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ function Cluster(startupNodes, options) {

this.subscriber = null;

this.connect().catch(noop);
if (this.options.lazyConnect) {
this.setStatus('wait');
} else {
this.connect().catch(_.noop);
}
}

/**
Expand Down Expand Up @@ -161,24 +165,7 @@ Cluster.prototype.connect = function () {

this.once('refresh', refreshListener);
this.once('close', closeListener);

this.once('close', function () {
var retryDelay;
if (!this.manuallyClosing && typeof this.options.clusterRetryStrategy === 'function') {
retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts);
}
if (typeof retryDelay === 'number') {
this.setStatus('reconnecting');
this.reconnectTimeout = setTimeout(function () {
this.reconnectTimeout = null;
debug('Cluster is disconnected. Retrying after %dms', retryDelay);
this.connect().catch(noop);
}.bind(this), retryDelay);
} else {
this.setStatus('end');
this.flushQueue(new Error('None of startup nodes is available'));
}
});
this.once('close', this._handleCloseEvent.bind(this));

this.refreshSlotsCache(function (err) {
if (err && err.message === 'Failed to refresh slots cache.') {
Expand All @@ -190,12 +177,34 @@ Cluster.prototype.connect = function () {
}.bind(this));
};

/**
* Called when closed to check whether a reconnection should be made
*/
Cluster.prototype._handleCloseEvent = function () {
var retryDelay;
if (!this.manuallyClosing && typeof this.options.clusterRetryStrategy === 'function') {
retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts);
}
if (typeof retryDelay === 'number') {
this.setStatus('reconnecting');
this.reconnectTimeout = setTimeout(function () {
this.reconnectTimeout = null;
debug('Cluster is disconnected. Retrying after %dms', retryDelay);
this.connect().catch(_.noop);
}.bind(this), retryDelay);
} else {
this.setStatus('end');
this.flushQueue(new Error('None of startup nodes is available'));
}
};

/**
* Disconnect from every node in the cluster.
*
* @public
*/
Cluster.prototype.disconnect = function (reconnect) {
var status = this.status;
this.setStatus('disconnecting');

if (!reconnect) {
Expand All @@ -205,7 +214,13 @@ Cluster.prototype.disconnect = function (reconnect) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
this.connectionPool.reset([]);

if (status === 'wait') {
this.setStatus('close');
this._handleCloseEvent();
} else {
this.connectionPool.reset([]);
}
};

/**
Expand All @@ -216,6 +231,7 @@ Cluster.prototype.disconnect = function (reconnect) {
* @public
*/
Cluster.prototype.quit = function (callback) {
var status = this.status;
this.setStatus('disconnecting');

this.manuallyClosing = true;
Expand All @@ -224,6 +240,18 @@ Cluster.prototype.quit = function (callback) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
if (status === 'wait') {
var ret = Promise.resolve('OK').nodeify(callback);

// use setImmediate to make sure "close" event
// being emitted after quit() is returned
setImmediate(function () {
this.setStatus('close');
this._handleCloseEvent();
}.bind(this));

return ret;
}
return Promise.all(this.nodes().map(function (node) {
return node.quit();
})).then(function () {
Expand Down Expand Up @@ -277,12 +305,12 @@ Cluster.prototype.selectSubscriber = function () {
if (!--pending) {
_this.lastActiveSubscriber = _this.subscriber;
}
}).catch(noop);
}).catch(_.noop);
}
});
} else {
if (this.subscriber.status === 'wait') {
this.subscriber.connect().catch(noop);
this.subscriber.connect().catch(_.noop);
}
this.lastActiveSubscriber = this.subscriber;
}
Expand Down Expand Up @@ -389,6 +417,9 @@ Cluster.prototype.executeOfflineCommands = function () {
};

Cluster.prototype.sendCommand = function (command, stream, node) {
if (this.status === 'wait') {
this.connect().catch(_.noop);
}
if (this.status === 'end') {
command.reject(new Error(utils.CONNECTION_CLOSED_ERROR_MSG));
return command.promise;
Expand Down Expand Up @@ -627,6 +658,4 @@ Cluster.prototype._readyCheck = function (callback) {

require('../transaction').addTransactionSupport(Cluster.prototype);

function noop() {}

module.exports = Cluster;
4 changes: 2 additions & 2 deletions lib/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ function Redis() {
if (this.options.lazyConnect) {
this.setStatus('wait');
} else {
this.connect().catch(function () {});
this.connect().catch(_.noop);
}
}

Expand Down Expand Up @@ -544,7 +544,7 @@ require('./transaction').addTransactionSupport(Redis.prototype);
*/
Redis.prototype.sendCommand = function (command, stream) {
if (this.status === 'wait') {
this.connect().catch(function () {});
this.connect().catch(_.noop);
}
if (this.status === 'end') {
command.reject(new Error(utils.CONNECTION_CLOSED_ERROR_MSG));
Expand Down
5 changes: 3 additions & 2 deletions lib/redis/event_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
var debug = require('debug')('ioredis:connection');
var Command = require('../command');
var utils = require('../utils');
var _ = require('lodash');

exports.connectHandler = function (self) {
return function () {
Expand Down Expand Up @@ -91,7 +92,7 @@ exports.closeHandler = function (self) {
self.setStatus('reconnecting', retryDelay);
self.reconnectTimeout = setTimeout(function () {
self.reconnectTimeout = null;
self.connect().catch(function () {});
self.connect().catch(_.noop);
}, retryDelay);
};

Expand Down Expand Up @@ -145,7 +146,7 @@ exports.readyHandler = function (self) {

if (self.options.readOnly) {
debug('set the connection to readonly mode');
self.readonly().catch(function () {});
self.readonly().catch(_.noop);
}

if (self.prevCondition) {
Expand Down
51 changes: 51 additions & 0 deletions test/functional/lazy_connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,55 @@ describe('lazy connect', function () {
});
redis.disconnect();
});

describe('Cluster', function () {
it('should not call `connect` when init', function () {
stub(Redis.Cluster.prototype, 'connect').throws(new Error('`connect` should not be called'));
new Redis.Cluster([], { lazyConnect: true });
Redis.Cluster.prototype.connect.restore();
});

it('should quit before "close" being emited', function (done) {
stub(Redis.Cluster.prototype, 'connect').throws(new Error('`connect` should not be called'));
var cluster = new Redis.Cluster([], { lazyConnect: true });
cluster.quit(function () {
cluster.once('close', function () {
cluster.once('end', function () {
Redis.Cluster.prototype.connect.restore();
done();
});
});
});
});

it('should disconnect before "close" being emited', function (done) {
stub(Redis.Cluster.prototype, 'connect').throws(new Error('`connect` should not be called'));
var cluster = new Redis.Cluster([], { lazyConnect: true });
cluster.disconnect();
cluster.once('close', function () {
cluster.once('end', function () {
Redis.Cluster.prototype.connect.restore();
done();
});
});
});

it('should support disconnecting with reconnect', function (done) {
stub(Redis.Cluster.prototype, 'connect').throws(new Error('`connect` should not be called'));
var cluster = new Redis.Cluster([], {
lazyConnect: true,
clusterRetryStrategy: function () {
return 1;
}
});
cluster.disconnect(true);
cluster.once('close', function () {
Redis.Cluster.prototype.connect.restore();
stub(Redis.Cluster.prototype, 'connect', function () {
Redis.Cluster.prototype.connect.restore();
done();
});
});
});
});
});

0 comments on commit f1cadff

Please sign in to comment.