Skip to content

Commit

Permalink
Merge pull request #202 from shaharmor/cluster-deque
Browse files Browse the repository at this point in the history
add queue support for cluster failover & cluster down handling
  • Loading branch information
luin committed Dec 2, 2015
2 parents cf13fc6 + dd7d468 commit eb2ba96
Showing 1 changed file with 62 additions and 16 deletions.
78 changes: 62 additions & 16 deletions lib/cluster.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var Promise = require('bluebird');
var Deque = require('double-ended-queue');
var Redis = require('./redis');
var utils = require('./utils');
var util = require('util');
Expand Down Expand Up @@ -58,10 +59,13 @@ function Cluster(startupNodes, options) {
this.nodes = {};
this.masterNodes = {};
this.slots = [];
this.connections = {};

this.retryAttempts = 0;
this.options = _.defaults(options || {}, this.options || {}, Cluster.defaultOptions);
this.offlineQueue = [];

this.resetOfflineQueue();
this.resetFailoverQueue();
this.resetClusterDownQueue();

this.subscriber = null;

Expand All @@ -87,6 +91,18 @@ Cluster.defaultOptions = _.assign({}, Redis.defaultOptions, {
util.inherits(Cluster, EventEmitter);
_.assign(Cluster.prototype, Commander.prototype);

Cluster.prototype.resetOfflineQueue = function() {
this.offlineQueue = new Deque();
};

Cluster.prototype.resetFailoverQueue = function() {
this.failoverQueue = new Deque();
};

Cluster.prototype.resetClusterDownQueue = function() {
this.clusterDownQueue = new Deque();
};

Cluster.prototype.connect = function () {
return new Promise(function (resolve, reject) {
if (this.status === 'connecting' || this.status === 'connect' || this.status === 'ready') {
Expand Down Expand Up @@ -289,9 +305,7 @@ Cluster.prototype.setStatus = function (status) {
Cluster.prototype.refreshSlotsCache = function (callback) {
if (this.isRefreshing) {
if (typeof callback === 'function') {
process.nextTick(function () {
callback();
});
process.nextTick(callback);
}
return;
}
Expand Down Expand Up @@ -352,14 +366,38 @@ Cluster.prototype.executeOfflineCommands = function () {
if (this.offlineQueue.length) {
debug('send %d commands in offline queue', this.offlineQueue.length);
var offlineQueue = this.offlineQueue;
this.offlineQueue = [];
this.resetOfflineQueue();
while (offlineQueue.length > 0) {
var item = offlineQueue.shift();
this.sendCommand(item.command, item.stream, item.node);
}
}
};

Cluster.prototype.executeFailoverCommands = function () {
if (this.failoverQueue.length) {
debug('send %d commands in failover queue', this.failoverQueue.length);
var failoverQueue = this.failoverQueue;
this.resetFailoverQueue();
while (failoverQueue.length > 0) {
var item = failoverQueue.shift();
item();
}
}
};

Cluster.prototype.executeClusterDownCommands = function () {
if (this.clusterDownQueue.length) {
debug('send %d commands in cluster down queue', this.clusterDownQueue.length);
var clusterDownQueue = this.clusterDownQueue;
this.resetClusterDownQueue();
while (clusterDownQueue.length > 0) {
var item = clusterDownQueue.shift();
item();
}
}
};

Cluster.prototype.sendCommand = function (command, stream, node) {
if (this.status === 'end') {
command.reject(new Error('Connection is closed.'));
Expand Down Expand Up @@ -463,17 +501,25 @@ Cluster.prototype.handleError = function (error, ttl, handlers) {
handlers.ask(node, errv[1], hostPort);
}
} else if (errv[0] === 'CLUSTERDOWN' && this.options.retryDelayOnClusterDown > 0) {
setTimeout(function () {
_this.refreshSlotsCache(function () {
handlers.clusterDown();
});
}, this.options.retryDelayOnClusterDown);
this.clusterDownQueue.push(handlers.clusterDown);
if (!this.clusterDownTimeout) {
this.clusterDownTimeout = setTimeout(function () {
_this.refreshSlotsCache(function () {
_this.clusterDownTimeout = null;
_this.executeClusterDownCommands();
});
}, this.options.retryDelayOnClusterDown);
}
} else if (error.message === 'Connection is closed.' && this.options.retryDelayOnFailover > 0) {
setTimeout(function () {
_this.refreshSlotsCache(function () {
handlers.connectionClosed();
});
}, this.options.retryDelayOnFailover);
this.failoverQueue.push(handlers.connectionClosed);
if (!this.failoverTimeout) {
this.failoverTimeout = setTimeout(function () {
_this.refreshSlotsCache(function () {
_this.failOverTimeout = null;
_this.executeFailoverCommands();
});
}, this.options.retryDelayOnFailover);
}
} else {
handlers.defaults();
}
Expand Down

0 comments on commit eb2ba96

Please sign in to comment.