Skip to content

Commit

Permalink
feat(cluster): redirect on TRYAGAIN error
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Mar 7, 2016
1 parent c4fee4f commit b1a4b62
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 96 deletions.
1 change: 1 addition & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ Creates a Redis Cluster instance
| [options.maxRedirections] | <code>number</code> | <code>16</code> | When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. |
| [options.retryDelayOnFailover] | <code>number</code> | <code>100</code> | When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), |
| [options.retryDelayOnClusterDown] | <code>number</code> | <code>100</code> | When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. |
| [options.retryDelayOnTryAgain] | <code>number</code> | <code>100</code> | When a TRYAGAIN error is received, client will retry if `retryDelayOnTryAgain` is valid delay time. |
| [options.redisOptions] | <code>Object</code> | | Passed to the constructor of `Redis`. |

<a name="Cluster+connect"></a>
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,8 @@ but a few so that if one is unreachable the client will try the next one, and th
to insure that no command will fail during a failover.
* `retryDelayOnClusterDown`: When a cluster is down, all commands will be rejected with the error of `CLUSTERDOWN`. If this option is a number (by default, it is `100`), the client
will resend the commands after the specified time (in ms).
* `retryDelayOnTryAgain`: If this option is a number (by default, it is `100`), the client
will resend the commands rejected with `TRYAGAIN` error after the specified time (in ms).
* `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node.

### Read-write splitting
Expand Down
48 changes: 48 additions & 0 deletions lib/cluster/delay_queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';

var Deque = require('double-ended-queue');
var debug = require('debug')('ioredis:delayqueue');

function DelayQueue() {
this.queues = {};
this.timeouts = {};
}

DelayQueue.prototype.push = function (bucket, item, options) {
var callback = options.callback || process.nextTick;
if (!this.queues[bucket]) {
this.queues[bucket] = new Deque();
}

var queue = this.queues[bucket];
queue.push(item);

if (!this.timeouts[bucket]) {
var _this = this;
this.timeouts[bucket] = setTimeout(function () {
callback(function () {
_this.timeouts[bucket] = null;
_this._execute(bucket);
});
}, options.timeout);
}
};

DelayQueue.prototype._execute = function (bucket) {
var queue = this.queues[bucket];
if (!queue) {
return;
}
var length = queue.length;
if (!length) {
return;
}
debug('send %d commands in %s queue', length, bucket);

this.queues[bucket] = null;
while (queue.length > 0) {
queue.shift()();
}
};

module.exports = DelayQueue;
73 changes: 19 additions & 54 deletions lib/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var Commander = require('../commander');
var Command = require('../command');
var commands = require('redis-commands');
var ConnectionPool = require('./connection_pool');
var DelayQueue = require('./delay_queue');

/**
* Creates a Redis Cluster instance
Expand All @@ -32,6 +33,8 @@ var ConnectionPool = require('./connection_pool');
* "Connection is closed." when the target Redis node is down),
* @param {number} [options.retryDelayOnClusterDown=100] - When a CLUSTERDOWN error is received, client will retry
* if `retryDelayOnClusterDown` is valid delay time.
* @param {number} [options.retryDelayOnTryAgain=100] - When a TRYAGAIN error is received, client will retry
* if `retryDelayOnTryAgain` is valid delay time.
* @param {Object} [options.redisOptions] - Passed to the constructor of `Redis`.
* @extends [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
* @extends Commander
Expand Down Expand Up @@ -70,8 +73,7 @@ function Cluster(startupNodes, options) {
this.retryAttempts = 0;

this.resetOfflineQueue();
this.resetFailoverQueue();
this.resetClusterDownQueue();
this.delayQueue = new DelayQueue();

this.subscriber = null;

Expand All @@ -93,7 +95,8 @@ Cluster.defaultOptions = {
scaleReads: 'master',
maxRedirections: 16,
retryDelayOnFailover: 100,
retryDelayOnClusterDown: 100
retryDelayOnClusterDown: 100,
retryDelayOnTryAgain: 100
};

util.inherits(Cluster, EventEmitter);
Expand All @@ -103,14 +106,6 @@ Cluster.prototype.resetOfflineQueue = function () {
this.offlineQueue = new Deque();
};

Cluster.prototype.resetFailoverQueue = function () {
this.failoverQueue = new Deque();
};

Cluster.prototype.resetClusterDownQueue = function () {
this.clusterDownQueue = new Deque();
};

/**
* Connect to a cluster
*
Expand Down Expand Up @@ -365,30 +360,6 @@ Cluster.prototype.executeOfflineCommands = function () {
}
};

Cluster.prototype.executeFailoverCommands = function () {
if (this.failoverQueue.length) {
debug('send %d commands in failover queue', this.failoverQueue.length);
var failoverQueue = this.failoverQueue;
this.resetFailoverQueue();
while (failoverQueue.length > 0) {
var item = failoverQueue.shift();
item();
}
}
};

Cluster.prototype.executeClusterDownCommands = function () {
if (this.clusterDownQueue.length) {
debug('send %d commands in cluster down queue', this.clusterDownQueue.length);
var clusterDownQueue = this.clusterDownQueue;
this.resetClusterDownQueue();
while (clusterDownQueue.length > 0) {
var item = clusterDownQueue.shift();
item();
}
}
};

Cluster.prototype.sendCommand = function (command, stream, node) {
if (this.status === 'end') {
command.reject(new Error('Connection is closed.'));
Expand Down Expand Up @@ -427,6 +398,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
debug('command %s is required to ask %s:%s', command.name, key);
tryConnection(false, key);
},
tryagain: partialTry,
clusterDown: partialTry,
connectionClosed: partialTry,
maxRedirections: function (redirectionError) {
Expand Down Expand Up @@ -511,7 +483,6 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
};

Cluster.prototype.handleError = function (error, ttl, handlers) {
var _this = this;
if (typeof ttl.value === 'undefined') {
ttl.value = this.options.maxRedirections;
} else {
Expand All @@ -524,26 +495,20 @@ Cluster.prototype.handleError = function (error, ttl, handlers) {
var errv = error.message.split(' ');
if (errv[0] === 'MOVED' || errv[0] === 'ASK') {
handlers[errv[0] === 'MOVED' ? 'moved' : 'ask'](errv[1], errv[2]);
} else if (errv[0] === 'TRYAGAIN') {
this.delayQueue.push('tryagain', handlers.tryagain, {
timeout: this.options.retryDelayOnTryAgain
});
} else if (errv[0] === 'CLUSTERDOWN' && this.options.retryDelayOnClusterDown > 0) {
this.clusterDownQueue.push(handlers.clusterDown);
if (!this.clusterDownTimeout) {
this.clusterDownTimeout = setTimeout(function () {
_this.refreshSlotsCache(function () {
_this.clusterDownTimeout = null;
_this.executeClusterDownCommands();
});
}, this.options.retryDelayOnClusterDown);
}
this.delayQueue.push('clusterdown', handlers.connectionClosed, {
timeout: this.options.retryDelayOnClusterDown,
callback: this.refreshSlotsCache.bind(this)
});
} else if (error.message === 'Connection is closed.' && this.options.retryDelayOnFailover > 0) {
this.failoverQueue.push(handlers.connectionClosed);
if (!this.failoverTimeout) {
this.failoverTimeout = setTimeout(function () {
_this.refreshSlotsCache(function () {
_this.failoverTimeout = null;
_this.executeFailoverCommands();
});
}, this.options.retryDelayOnFailover);
}
this.delayQueue.push('failover', handlers.connectionClosed, {
timeout: this.options.retryDelayOnFailover,
callback: this.refreshSlotsCache.bind(this)
});
} else {
handlers.defaults();
}
Expand Down
12 changes: 6 additions & 6 deletions lib/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ Pipeline.prototype.fillResult = function (value, position) {
if (typeof this.leftRedirections === 'undefined') {
this.leftRedirections = {};
}
var exec = function () {
_this.exec();
};
this.redis.handleError(commonError, this.leftRedirections, {
moved: function (slot, key) {
_this.preferKey = key;
Expand All @@ -113,12 +116,9 @@ Pipeline.prototype.fillResult = function (value, position) {
_this.preferKey = key;
_this.exec();
},
clusterDown: function () {
_this.exec();
},
connectionClosed: function () {
_this.exec();
},
tryagain: exec,
clusterDown: exec,
connectionClosed: exec,
maxRedirections: function () {
matched = false;
},
Expand Down
35 changes: 33 additions & 2 deletions test/functional/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ describe('cluster', function () {

var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
], { lazyConnect: false });
]);
cluster.get('foo', function () {
cluster.get('foo');
});
Expand Down Expand Up @@ -361,7 +361,7 @@ describe('cluster', function () {
});
var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
], { lazyConnect: false });
], { retryDelayOnFailover: 1 });
cluster.get('foo', function (err, res) {
expect(res).to.eql('bar');
cluster.disconnect();
Expand Down Expand Up @@ -453,6 +453,37 @@ describe('cluster', function () {
});
});

describe('TRYAGAIN', function () {
it('should retry the command', function (done) {
var times = 0;
var slotTable = [
[0, 16383, ['127.0.0.1', 30001]]
];
var server = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return slotTable;
}
if (argv[0] === 'get' && argv[1] === 'foo') {
if (times++ === 1) {
process.nextTick(function () {
cluster.disconnect();
disconnect([server], done);
});
} else {
return new Error('TRYAGAIN Multiple keys request during rehashing of slot');
}
}
});

var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
], { retryDelayOnTryAgain: 1 });
cluster.get('foo', function () {
cluster.get('foo');
});
});
});

describe('CLUSTERDOWN', function () {
it('should redirect the command to a random node', function (done) {
var slotTable = [
Expand Down
34 changes: 0 additions & 34 deletions test/unit/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,4 @@ describe('cluster', function () {
expect(cluster.options).to.have.property('showFriendlyErrorStack', false);
expect(cluster.options).to.have.property('scaleReads', 'master');
});

describe('#executeFailoverCommands', function () {
it('should execute the commands', function (done) {
var cluster = {
resetFailoverQueue: function () {
this.failoverQueue = [];
},
failoverQueue: []
};

cluster.failoverQueue.push(function () {
expect(this.failoverQueue).to.have.length(0);
done();
}.bind(cluster));
Cluster.prototype.executeFailoverCommands.call(cluster);
});
});

describe('#executeClusterDownCommands', function () {
it('should execute the commands', function (done) {
var cluster = {
resetClusterDownQueue: function () {
this.clusterDownQueue = [];
},
clusterDownQueue: []
};

cluster.clusterDownQueue.push(function () {
expect(this.clusterDownQueue).to.have.length(0);
done();
}.bind(cluster));
Cluster.prototype.executeClusterDownCommands.call(cluster);
});
});
});

0 comments on commit b1a4b62

Please sign in to comment.