Permalink
Browse files

Remove autoReconnect option (just set maxReconn...=0). Dealing with u…

…ncertain number of commands processed when Redis is brought down and we try to reconnect and resend anything queued. Works fine still when Redis is up of course.
  • Loading branch information...
1 parent 57dfb00 commit 6e5ae8c77817d29885e7dc8e65879aa37b2f9f0c @fictorial fictorial committed Apr 20, 2010
Showing with 99 additions and 32 deletions.
  1. +45 −32 lib/redis-client.js
  2. +54 −0 test/test_shutdown_reconnect.js
View
77 lib/redis-client.js
@@ -257,11 +257,12 @@ ReplyParser.prototype.feed = function (inbound) {
*
* - 'connected' when connected (or on a reconnection, reconnected).
* - 'reconnecting' when about to retry to connect to Redis.
+ * - 'reconnected' when connected after a reconnection was established.
* - 'noconnection' when a connection (or reconnection) cannot be established.
+ * - 'drained' when no submitted commands are expecting a reply from Redis.
*
* Options:
*
- * - autoReconnect (default: true)
* - maxReconnectionAttempts (default: 10)
*/
@@ -276,15 +277,13 @@ function Client(stream, options) {
this.requestBuffer = new Buffer(512);
this.replyParser = new ReplyParser(this.onReply_, this);
this.reconnectionTimer = null;
- this.autoReconnect = true;
this.maxReconnectionAttempts = 10;
this.reconnectionAttempts = 0;
this.reconnectionDelay = 500; // doubles, so starts at 1s delay
+ this.connectionsMade = 0;
- if (options !== undefined) {
- this.autoReconnect = !!options.autoReconnect;
+ if (options !== undefined)
this.maxReconnectionAttempts = Math.abs(options.maxReconnectionAttempts || 10);
- }
var client = this;
@@ -301,25 +300,11 @@ function Client(stream, options) {
clearTimeout(client.reconnectionTimer);
client.reconnectionTimer = null;
}
+
+ client.originalCommands = [];
+ client.flushQueuedCommands();
- // Send any commands that were queued while we were not connected.
-
- if (exports.debugMode) {
- sys.debug("[DEQUEUE] Sending " +
- client.queuedRequestBuffers.length +
- " queued request buffers.");
- }
-
- for (var i=0, n=client.queuedRequestBuffers.length; i<n; ++i) {
- client.stream.write(client.queuedRequestBuffers[i], 'binary');
- client.originalCommands.push(client.queuedOriginalCommands.shift());
-
- if (exports.debugMode)
- sys.debug("[SEND] " + debugFilter(client.queuedRequestBuffers[i]));
- }
- client.queuedRequestBuffers = [];
-
- client.emit('connected', client);
+ client.emit(client.connectionsMade++ == 0 ? 'connected' : 'reconnected', client);
});
stream.addListener("data", function (buffer) {
@@ -330,20 +315,20 @@ function Client(stream, options) {
});
stream.addListener("end", function () {
- if (exports.debugMode) {
- sys.debug("Connection to redis closed. There are " +
- client.originalCommands.length +
- " commands pending replies.");
+ if (exports.debugMode && client.originalCommands.length > 0) {
+ sys.debug("Connection to redis closed with " +
+ client.originalCommands.length +
+ " commands pending replies that will never arrive!");
}
stream.end();
});
stream.addListener("close", function (hadError) {
if (exports.debugMode)
- sys.debug("[DISCONNECT]");
+ sys.debug("[NO CONNECTION]");
- if (hadError && client.autoReconnect) {
+ if (hadError && client.maxReconnectionAttempts > 0) {
if (client.reconnectionAttempts++ >= client.maxReconnectionAttempts) {
client.emit('noconnection', client);
return;
@@ -352,8 +337,9 @@ function Client(stream, options) {
client.reconnectionDelay *= 2;
if (exports.debugMode) {
- sys.debug("[RECONNECT " + client.reconnectionAttempts + "/" +
+ sys.debug("[RECONNECTING " + client.reconnectionAttempts + "/" +
client.maxReconnectionAttempts + "]");
+
sys.debug("[WAIT " + client.reconnectionDelay + " ms]");
}
@@ -386,6 +372,8 @@ Client.prototype.close = function () {
};
Client.prototype.onReply_ = function (reply) {
+ this.flushQueuedCommands();
+
if (this.handlePublishedMessage_(reply))
return;
@@ -401,6 +389,9 @@ Client.prototype.onReply_ = function (reply) {
callback(null, maybeConvertReplyValue(originalCommand[0], reply));
}
}
+
+ if (this.originalCommands.length == 0)
+ this.emit('drained', this);
};
Client.prototype.handlePublishedMessage_ = function (reply) {
@@ -653,6 +644,8 @@ var commands = [
// arguments[N-1] = callback function
Client.prototype.sendCommand = function () {
+ this.flushQueuedCommands();
+
var commandName = arguments[0].toLowerCase();
var originalCommand = Array.prototype.slice.call(arguments);
@@ -720,15 +713,16 @@ Client.prototype.sendCommand = function () {
}
}
- // If the stream is writable, write the command. Else queue the command
+ // If the stream is writable, write the command. Else enqueue the command
// for when we first establish a connection or reconnect.
if (this.stream.writable) {
this.originalCommands.push(originalCommand);
this.stream.write(this.requestBuffer.slice(0, offset), 'binary');
if (exports.debugMode)
- sys.debug("[SEND] " + debugFilter(this.requestBuffer, offset));
+ sys.debug("[SEND] " + debugFilter(this.requestBuffer, offset) +
+ " originalCommands = " + this.originalCommands.length + " COUNT = " + COUNT);
} else {
var toEnqueue = new Buffer(offset);
this.requestBuffer.copy(toEnqueue, 0, 0, offset); // dst, dstStart, srcStart, srcEnd
@@ -750,6 +744,25 @@ commands.forEach(function (commandName) {
};
});
+// Send any commands that were queued while we were not connected.
+
+Client.prototype.flushQueuedCommands = function () {
+ if (exports.debugMode && this.queuedRequestBuffers.length > 0)
+ sys.debug("[FLUSH QUEUE] " + this.queuedRequestBuffers.length +
+ " queued request buffers.");
+
+ for (var i=0; i<this.queuedRequestBuffers.length && this.stream.writable; ++i) {
+ var buffer = this.queuedRequestBuffers.shift();
+ this.stream.write(buffer, 'binary');
+ this.originalCommands.push(this.queuedOriginalCommands.shift());
+
+ if (exports.debugMode)
+ sys.debug("[DEQUEUE/SEND] " + debugFilter(buffer) +
+ ". queued buffers remaining = " +
+ this.queuedRequestBuffers.length);
+ }
+};
+
// Wraps 'subscribe' and 'psubscribe' methods to manage a single
// callback function per subscribed channel name/pattern.
//
View
54 test/test_shutdown_reconnect.js
@@ -0,0 +1,54 @@
+#!/usr/bin/env node
+
+// This is a test of robustly handling reconnection to Redis when Redis is
+// brought down temporarily and then restarted.
+//
+// The client will queue commands and retry to connect with exponential
+// backoff.
+//
+// Load up a test Redis instance in the foreground (daemonize off). Then in
+// another terminal run test/test_shutdown_reconnect.js This will issue 50,000
+// GET commands. While the output is scrolling by, kill Redis in the first
+// terminal (^C should do fine). You should see the client tell you it is
+// queueing commands. Then, restart Redis. Watch as the client submits the
+// queued commands after the current reconnection timeout expires. Don't
+// wait too long; the delay is exponential remember.
+//
+
+// -------------
+// This has uncovered a rather deep issue. We may submit requests to Redis,
+// and then kill Redis, waiting for replies for submitted commands that will
+// never come. Fine, replay the commands that didn't get a reply, right?
+// No. Redis might have started processing some of the commands...
+// See http://gist.github.com/372038
+// -------------
+
+var
+ sys = require('sys'),
+ redis = require('../lib/redis-client');
+
+redis.debugMode = true;
+
+client = redis.createClient();
+
+var rem = 50000;
+
+// We do not have to, but let the client stream establish
+// a connection to Redis before sending commands.
+
+client.addListener("connected", function () {
+ client.del("foo");
+
+ for (var i=0; i<50000; ++i)
+ client.incr("foo", function () {
+ if (--rem <= 0) {
+ sys.puts("all 50000 callbacks called");
+ process.exit(0);
+ }
+ });
+});
+
+setInterval(function () {
+ sys.puts("test is waiting for " + rem + " callbacks.");
+}, 1000);
+

0 comments on commit 6e5ae8c

Please sign in to comment.