Permalink
Browse files

Reconnection handling. Client will now always callback when it cannot…

… connect or reconnect.

Node.js fails to emit 'error' when Redis is killed.  Thus, we use `.expectClose` on the client as a workaround.

A test script:

    var sys = require('sys');
    var redis = require("../lib/redis-client");
    redis.debugMode = true;
    var client = redis.createClient(redis.DEFAULT_PORT, redis.DEFAULT_HOST, { maxReconnectionAttempts: 2 });
    client.set('test:foo', 'bar', function (err, data){
      if (err) sys.log("set test:foo: " + err);
      else     sys.log("set test:foo: " + data);
    });
    setTimeout(function() {
      sys.log("timeout fired");
      client.get('test:foo', function (err, data) {
          if (err) sys.log("get test:foo: " + err);
          else     sys.log("get test:foo: " + data);

          sys.log("closing client");
          client.close();
      })}, 5000);

When Redis is down the entire time:

    DEBUG: [ENQUEUE] Not connected. Request queued. There are 1 requests queued.
    DEBUG: [NO CONNECTION]
    21 Apr 17:40:22 - set test:foo: Error: failed to establish a connection to Redis
    21 Apr 17:40:27 - timeout fired
    21 Apr 17:40:27 - get test:foo: Error: failed to establish a connection to Redis
    21 Apr 17:40:27 - closing client

When Redis is up the entire time:

    ~/projects/redis-node-client(master) ⚡ node test/phillip.js
    DEBUG: [ENQUEUE] Not connected. Request queued. There are 1 requests queued.
    DEBUG: [CONNECT]
    DEBUG: [FLUSH QUEUE] 1 queued request buffers.
    DEBUG: [DEQUEUE/SEND] *3<CRLF>$3<CRLF>set<CRLF>$8<CRLF>test:foo<CRLF>$3<CRLF>bar<CRLF>. queued buffers remaining = 0
    DEBUG: [RECV] +OK<CRLF>
    21 Apr 17:42:40 - set test:foo: true
    21 Apr 17:42:43 - timeout fired
    DEBUG: [SEND] *2<CRLF>$3<CRLF>get<CRLF>$8<CRLF>test:foo<CRLF> originalCommands = 1
    DEBUG: [RECV] $3<CRLF>bar<CRLF>
    21 Apr 17:42:43 - get test:foo: bar
    21 Apr 17:42:43 - closing client
    DEBUG: [NO CONNECTION]

When Redis is up initially, then killed.

    ~/projects/redis-node-client(master) ⚡ node test/phillip.js
    DEBUG: [ENQUEUE] Not connected. Request queued. There are 1 requests queued.
    DEBUG: [CONNECT]
    DEBUG: [FLUSH QUEUE] 1 queued request buffers.
    DEBUG: [DEQUEUE/SEND] *3<CRLF>$3<CRLF>set<CRLF>$8<CRLF>test:foo<CRLF>$3<CRLF>bar<CRLF>. queued buffers remaining = 0
    ^[[CDEBUG: [RECV] +OK<CRLF>
    21 Apr 17:37:10 - set test:foo: true
    DEBUG: [NO CONNECTION]
    DEBUG: [RECONNECTING 1/2]
    DEBUG: [WAIT 1000 ms]
    DEBUG: [NO CONNECTION]
    DEBUG: [RECONNECTING 2/2]
    DEBUG: [WAIT 2000 ms]
    DEBUG: [NO CONNECTION]
    21 Apr 17:37:14 - timeout fired
    21 Apr 17:37:14 - get test:foo: Error: failed to establish a connection to Redis
    21 Apr 17:37:14 - closing client

When Redis is up initially, then killed, then brought back up immediately.

    ~/projects/redis-node-client(master) ⚡ node test/phillip.js
    DEBUG: [ENQUEUE] Not connected. Request queued. There are 1 requests queued.
    DEBUG: [CONNECT]
    DEBUG: [FLUSH QUEUE] 1 queued request buffers.
    DEBUG: [DEQUEUE/SEND] *3<CRLF>$3<CRLF>set<CRLF>$8<CRLF>test:foo<CRLF>$3<CRLF>bar<CRLF>. queued buffers remaining = 0
    DEBUG: [RECV] +OK<CRLF>
    21 Apr 17:43:17 - set test:foo: true
    DEBUG: [NO CONNECTION]
    DEBUG: [RECONNECTING 1/2]
    DEBUG: [WAIT 1000 ms]
    DEBUG: [CONNECT]
    21 Apr 17:43:22 - timeout fired
    DEBUG: [SEND] *2<CRLF>$3<CRLF>get<CRLF>$8<CRLF>test:foo<CRLF> originalCommands = 1
    DEBUG: [RECV] $3<CRLF>bar<CRLF>
    21 Apr 17:43:22 - get test:foo: bar
    21 Apr 17:43:22 - closing client
    DEBUG: [NO CONNECTION]
  • Loading branch information...
1 parent 06c4348 commit 9c5ea3c9b5e92b31a5fc1b74efb4d3bebc1a0097 @fictorial fictorial committed Apr 21, 2010
Showing with 84 additions and 44 deletions.
  1. +83 −43 lib/redis-client.js
  2. +1 −1 seed.yml
View
@@ -58,6 +58,7 @@ exports.DEFAULT_HOST = '127.0.0.1';
exports.DEFAULT_PORT = 6379;
exports.COMMAND_ORPHANED_ERROR = "connection lost before reply received";
+exports.NO_CONNECTION_ERROR = "failed to establish a connection to Redis";
function debugFilter(buffer, len) {
// Redis is binary-safe but assume for debug display that
@@ -311,6 +312,7 @@ function Client(stream, options) {
: 'reconnected';
client.connectionsMade++;
+ client.expectingClose = false;
// If this a reconnection and there were commands submitted, then they
// are gone! We cannot say with any confidence which were processed by
@@ -329,16 +331,7 @@ function Client(stream, options) {
client.originalCommands.length + "). notifying...");
}
- for (var i=0, n=client.originalCommands.length; i<n; ++i) {
- var cmd = client.originalCommands[i];
- var cb = cmd[cmd.length - 1];
-
- if (typeof cb == "function") {
- var err = new Error(exports.COMMAND_ORPHANED_ERROR);
- err.originalCommand = cmd;
- cb(err); // err, reply (undefined)
- }
- }
+ client.callbackOrphanedCommandsWithError();
}
client.originalCommands = [];
@@ -368,35 +361,7 @@ function Client(stream, options) {
if (exports.debugMode)
sys.debug("[NO CONNECTION]");
- if (hadError) {
- if (client.connectionsMade == 0) {
- // Do not reconnect on first connection failure.
-
- client.noConnection = true;
- client.emit('noconnection', client);
- } else if (client.maxReconnectionAttempts > 0) {
- // Else try to reconnect if we're asked to.
-
- if (client.reconnectionAttempts++ >= client.maxReconnectionAttempts) {
- client.noConnection = true;
- client.emit('noconnection', client);
- } else {
- client.reconnectionDelay *= 2;
-
- if (exports.debugMode) {
- sys.debug("[RECONNECTING " + client.reconnectionAttempts + "/" +
- client.maxReconnectionAttempts + "]");
-
- sys.debug("[WAIT " + client.reconnectionDelay + " ms]");
- }
-
- client.reconnectionTimer = setTimeout(function () {
- client.emit('reconnecting', client);
- stream.connect(client.port, client.host);
- }, client.reconnectionDelay);
- }
- }
- }
+ client.maybeReconnect();
});
}
@@ -408,7 +373,7 @@ exports.createClient = function (port, host, options) {
var port = port || exports.DEFAULT_PORT;
var host = host || exports.DEFAULT_HOST;
- var client = new Client(net.createConnection(port, host, options));
+ var client = new Client(net.createConnection(port, host), options);
client.port = port;
client.host = host;
@@ -417,6 +382,7 @@ exports.createClient = function (port, host, options) {
};
Client.prototype.close = function () {
+ this.expectingClose = true;
this.stream.end();
};
@@ -693,13 +659,20 @@ var commands = [
// arguments[N-1] = callback function
Client.prototype.sendCommand = function () {
- if (this.noConnection && this.connectionsMade == 0)
- throw new Error("Cannot establish connection to Redis.");
+ var originalCommand = Array.prototype.slice.call(arguments);
+
+ // If this client has given up trying to connect/reconnect to Redis,
+ // just call the errback (if any). Regardless, don't enqueue the command.
+
+ if (this.noConnection) {
+ if (arguments.length > 0 && typeof arguments[arguments.length - 1] == 'function')
+ arguments[arguments.length - 1](this.makeErrorForCommand(originalCommand, exports.NO_CONNECTION_ERROR));
+ return;
+ }
this.flushQueuedCommands();
var commandName = arguments[0].toLowerCase();
- var originalCommand = Array.prototype.slice.call(arguments);
// Invariant: number of queued callbacks == number of commands sent to
// Redis whose replies have not yet been received and processed. Thus,
@@ -815,6 +788,73 @@ Client.prototype.flushQueuedCommands = function () {
}
};
+Client.prototype.makeErrorForCommand = function (command, errorMessage) {
+ var err = new Error(errorMessage);
+ err.originalCommand = command;
+ return err;
+};
+
+Client.prototype.callbackCommandWithError = function (command, errorMessage) {
+ var callback = command[command.length - 1];
+ if (typeof callback == "function")
+ callback(this.makeErrorForCommand(command, errorMessage));
+};
+
+Client.prototype.callbackOrphanedCommandsWithError = function () {
+ for (var i=0, n=this.originalCommands.length; i<n; ++i)
+ this.callbackCommandWithError(this.originalCommands[i], exports.COMMAND_ORPHANED_ERROR);
+ this.originalCommands = [];
+};
+
+Client.prototype.callbackQueuedCommandsWithError = function () {
+ for (var i=0, n=this.queuedOriginalCommands.length; i<n; ++i)
+ this.callbackCommandWithError(this.queuedOriginalCommands[i], exports.NO_CONNECTION_ERROR);
+ this.queuedOriginalCommands = [];
+ this.queuedRequestBuffers = [];
+};
+
+Client.prototype.giveupConnectionAttempts = function () {
+ this.callbackOrphanedCommandsWithError();
+ this.callbackQueuedCommandsWithError();
+ this.noConnection = true;
+ this.emit('noconnection', this);
+};
+
+Client.prototype.maybeReconnect = function () {
+ if (this.stream.writable && this.stream.readable)
+ return;
+
+ if (this.expectingClose)
+ return;
+
+ // Do not reconnect on first connection failure.
+ // Else try to reconnect if we're asked to.
+
+ if (this.connectionsMade == 0) {
+ this.giveupConnectionAttempts();
+ } else if (this.maxReconnectionAttempts > 0) {
+ if (this.reconnectionAttempts++ >= this.maxReconnectionAttempts) {
+ this.giveupConnectionAttempts();
+ } else {
+ this.reconnectionDelay *= 2;
+
+ if (exports.debugMode) {
+ sys.debug("[RECONNECTING " + this.reconnectionAttempts + "/" +
+ this.maxReconnectionAttempts + "]");
+
+ sys.debug("[WAIT " + this.reconnectionDelay + " ms]");
+ }
+
+ var self = this;
+
+ this.reconnectionTimer = setTimeout(function () {
+ self.emit('reconnecting', self);
+ self.stream.connect(self.port, self.host);
+ }, this.reconnectionDelay);
+ }
+ }
+};
+
// Wraps 'subscribe' and 'psubscribe' methods to manage a single
// callback function per subscribed channel name/pattern.
//
View
@@ -2,5 +2,5 @@
name: redis-client
description: A Redis client
tags: redis
- version: 0.2.5
+ version: 0.3.0

0 comments on commit 9c5ea3c

Please sign in to comment.