Permalink
Browse files

Queue commands when not connected; auto-reconnect works; emits events…

… 'connected', 'reconnecting', and 'noconnection' now; (thanks to pilif for motivating me to fix these)
  • Loading branch information...
1 parent 4ff4561 commit 1f9a3c7ef7aec14e47bb759c5490c203ead7375b @fictorial fictorial committed Apr 20, 2010
Showing with 137 additions and 47 deletions.
  1. +12 −16 README.md
  2. +5 −0 TODO.md
  3. +7 −0 examples/redis-version.js
  4. +113 −31 lib/redis-client.js
View
@@ -5,24 +5,22 @@
- Talk to Redis from Node.js
- Fully asynchronous; your code is called back when an operation completes
- [Binary-safe](http://github.com/fictorial/redis-node-client/blob/master/test/test.js#L353-363); uses Node.js Buffer objects for request serialization and reply parsing
+ - e.g. store a PNG in Redis if you'd like
- Client API directly follows Redis' [command specification](http://code.google.com/p/redis/wiki/CommandReference)
- *You have to understand how Redis works and the semantics of its command set to most effectively use this client*
-- Supports Redis' new exciting PUBSUB commands
-
-Recent changes completely break backwards compatibility. Sorry, it was time.
+- Supports Redis' new exciting [PUBSUB](http://code.google.com/p/redis/wiki/PublishSubscribe) commands
+- Automatically reconnects to Redis (doesn't drop commands sent while waiting to reconnect either) using [exponential backoff](http://en.wikipedia.org/wiki/Exponential_backoff)
## Synopsis
When working from a git clone:
- var client = require("./lib/redis-client").createClient();
var sys = require("sys");
- client.stream.addListener("connect", function () {
- client.info(function (err, info) {
- if (err) throw new Error(err);
- sys.puts("Redis Version is: " + info.redis_version);
- client.close();
- });
+ var client = require("../lib/redis-client").createClient();
+ client.info(function (err, info) {
+ if (err) throw new Error(err);
+ sys.puts("Redis Version is: " + info.redis_version);
+ client.close();
});
When working with a Kiwi-based installation:
@@ -33,12 +31,10 @@ When working with a Kiwi-based installation:
kiwi = require("kiwi"),
client = kiwi.require("redis-client").createClient();
- client.stream.addListener("connect", function () {
- client.info(function (err, info) {
- if (err) throw new Error(err);
- sys.puts("Redis Version is: " + info.redis_version);
- client.close();
- });
+ client.info(function (err, info) {
+ if (err) throw new Error(err);
+ sys.puts("Redis Version is: " + info.redis_version);
+ client.close();
});
- Refer to the many tests in `test/test.js` for many usage examples.
View
@@ -8,6 +8,11 @@
reply, does not handle multi-bulk replies inside multi-bulk replies.
This is required for MULTI/EXEC.
+- Fix reconnection logic.
+ - Queue all commands.
+ - If stream is writable, write.
+ - On reconnection established, send
+
## Later
- Provide wrapper around the pretty-raw sort method?
@@ -0,0 +1,7 @@
+var sys = require("sys");
+var client = require("../lib/redis-client").createClient();
+client.info(function (err, info) {
+ if (err) throw new Error(err);
+ sys.puts("Redis Version is: " + info.redis_version);
+ client.close();
+});
View
@@ -34,10 +34,10 @@ exports.debugMode = false;
var net = require("net"),
sys = require("sys"),
Buffer = require('buffer').Buffer,
+ events = require('events'),
CRLF = "\r\n",
CRLF_LEN = 2,
- MAX_RECONNECTION_ATTEMPTS = 10,
PLUS = exports.PLUS = 0x2B, // +
MINUS = exports.MINUS = 0x2D, // -
@@ -58,7 +58,7 @@ function debugFilter(buffer, len) {
// Redis is binary-safe but assume for debug display that
// the encoding of textual data is UTF-8.
- var filtered = buffer.utf8Slice(0, len);
+ var filtered = buffer.utf8Slice(0, len || buffer.length);
filtered = filtered.replace(/\r\n/g, '<CRLF>');
filtered = filtered.replace(/\r/g, '<CR>');
@@ -252,55 +252,128 @@ ReplyParser.prototype.feed = function (inbound) {
}
};
-function Client(stream) {
+/**
+ * Emits:
+ *
+ * - 'connected' when connected (or on a reconnection, reconnected).
+ * - 'reconnecting' when about to retry to connect to Redis.
+ * - 'noconnection' when a connection (or reconnection) cannot be established.
+ *
+ * Options:
+ *
+ * - autoReconnect (default: true)
+ * - maxReconnectionAttempts (default: 10)
+ */
+
+function Client(stream, options) {
+ events.EventEmitter.call(this);
+
+ this.stream = stream;
this.originalCommands = [];
+ this.queuedOriginalCommands = [];
+ this.queuedRequestBuffers = [];
this.channelCallbacks = {};
this.requestBuffer = new Buffer(512);
this.replyParser = new ReplyParser(this.onReply_, this);
+ this.reconnectionTimer = null;
+ this.autoReconnect = true;
+ this.maxReconnectionAttempts = 10;
+ this.reconnectionAttempts = 0;
+ this.reconnectionDelay = 500; // doubles, so starts at 1s delay
+
+ if (options !== undefined) {
+ this.autoReconnect = !!options.autoReconnect;
+ this.maxReconnectionAttempts = Math.abs(options.maxReconnectionAttempts || 10);
+ }
+
var client = this;
- this.stream = stream;
- this.stream.addListener("connect", function () {
+ stream.addListener("connect", function () {
if (exports.debugMode)
- sys.debug("[CONNECTED]");
+ sys.debug("[CONNECT]");
- this.setNoDelay();
- this.setTimeout(0);
+ stream.setNoDelay();
+ stream.setTimeout(0);
- client.noReconnect = false;
client.reconnectionAttempts = 0;
+ client.reconnectionDelay = 500;
+ if (client.reconnectionTimer) {
+ clearTimeout(client.reconnectionTimer);
+ client.reconnectionTimer = null;
+ }
+
+ // Send any commands that were queued while we were not connected.
+
+ if (exports.debugMode) {
+ sys.debug("[DEQUEUE] Sending " +
+ client.queuedRequestBuffers.length +
+ " queued request buffers.");
+ }
+
+ for (var i=0, n=client.queuedRequestBuffers.length; i<n; ++i) {
+ client.stream.write(client.queuedRequestBuffers[i], 'binary');
+ client.originalCommands.push(client.queuedOriginalCommands.shift());
+
+ if (exports.debugMode)
+ sys.debug("[SEND] " + debugFilter(client.queuedRequestBuffers[i]));
+ }
+ client.queuedRequestBuffers = [];
+
+ client.emit('connected', client);
});
- this.stream.addListener("data", function (buffer) {
+ stream.addListener("data", function (buffer) {
if (exports.debugMode)
- sys.debug("[RECV] " + debugFilter(buffer, buffer.length));
+ sys.debug("[RECV] " + debugFilter(buffer));
client.replyParser.feed(buffer);
});
- this.stream.addListener("end", function () {
- this.end();
+ stream.addListener("end", function () {
+ if (exports.debugMode) {
+ sys.debug("Connection to redis closed. There are " +
+ client.originalCommands.length +
+ " commands pending replies.");
+ }
+
+ stream.end();
});
- stream.addListener("close", function (inError) {
+ stream.addListener("close", function (hadError) {
if (exports.debugMode)
- sys.debug("[DISCONNECTED]");
+ sys.debug("[DISCONNECT]");
+
+ if (hadError && client.autoReconnect) {
+ if (client.reconnectionAttempts++ >= client.maxReconnectionAttempts) {
+ client.emit('noconnection', client);
+ return;
+ }
+
+ client.reconnectionDelay *= 2;
+
+ if (exports.debugMode) {
+ sys.debug("[RECONNECT " + client.reconnectionAttempts + "/" +
+ client.maxReconnectionAttempts + "]");
+ sys.debug("[WAIT " + client.reconnectionDelay + " ms]");
+ }
- if (!client.noReconnect &&
- client.reconnectionAttempts++ < MAX_RECONNECTION_ATTEMPTS) {
- this.setTimeout(30);
- this.connect(this.port, this.host);
+ client.reconnectionTimer = setTimeout(function () {
+ client.emit('reconnecting', client);
+ stream.connect(client.port, client.host);
+ }, client.reconnectionDelay);
}
});
}
+sys.inherits(Client, events.EventEmitter);
+
exports.Client = Client;
-exports.createClient = function (port, host) {
+exports.createClient = function (port, host, options) {
var port = port || 6379;
var host = host || '127.0.0.1';
- var client = new Client(new net.createConnection(port, host));
+ var client = new Client(net.createConnection(port, host, options));
client.port = port;
client.host = host;
@@ -309,7 +382,6 @@ exports.createClient = function (port, host) {
};
Client.prototype.close = function () {
- this.noReconnect = true;
this.stream.end();
};
@@ -648,20 +720,30 @@ Client.prototype.sendCommand = function () {
}
}
- this.originalCommands.push(originalCommand);
- this.stream.write(this.requestBuffer.slice(0, offset));
+ // If the stream is writable, write the command. Else queue the command
+ // for when we first establish a connection or reconnect.
+
+ if (this.stream.writable) {
+ this.originalCommands.push(originalCommand);
+ this.stream.write(this.requestBuffer.slice(0, offset), 'binary');
+
+ if (exports.debugMode)
+ sys.debug("[SEND] " + debugFilter(this.requestBuffer, offset));
+ } else {
+ var toEnqueue = new Buffer(offset);
+ this.requestBuffer.copy(toEnqueue, 0, 0, offset); // dst, dstStart, srcStart, srcEnd
+ this.queuedRequestBuffers.push(toEnqueue);
+ this.queuedOriginalCommands.push(originalCommand);
- if (exports.debugMode)
- sys.debug("[SEND] " + debugFilter(this.requestBuffer, offset));
+ if (exports.debugMode) {
+ sys.debug("[ENQUEUE] Not connected. Request queued. There are " +
+ this.queuedRequestBuffers.length + " requests queued.");
+ }
+ }
};
commands.forEach(function (commandName) {
Client.prototype[commandName] = function () {
- if (this.stream.readyState != "open")
- throw new Error("Sorry, the command cannot be sent to Redis. " +
- "The connection state is '" +
- this.stream.readyState + "'.");
-
var args = Array.prototype.slice.call(arguments);
args.unshift(commandName);
this.sendCommand.apply(this, args);

0 comments on commit 1f9a3c7

Please sign in to comment.