Skip to content

Commit

Permalink
Re-wrote transaction code. All tests pass. Added README
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoguchi committed Aug 27, 2010
1 parent 06ca4e3 commit 45f9271
Show file tree
Hide file tree
Showing 18 changed files with 293 additions and 2,026 deletions.
31 changes: 31 additions & 0 deletions README.md
@@ -0,0 +1,31 @@
## redis-node - Complete Redis Client for Node.js
---

See this [blog post](http://ngchi.wordpress.com/2010/08/23/towards-auto-sharding-in-your-node-js-app/) for more information.

### Dependencies
The Vows Testing Framework:
From git:
git clone http://github.com/cloudhead/vows.git
Or from npm:
npm install vows

### Installation
From git:
git clone http://github.com/bnoguchi/redis-node.git
Npm installation coming...

### Example
See test/ for examples.

### Test Coverage
See [./test/](http://github.com/bnoguchi/redis-node) for the list of tests.
To run the tests from the command line.
vows test/*.vows.js

### License
MIT License

---
### Author
Brian Noguchi
265 changes: 174 additions & 91 deletions lib/client.js
Expand Up @@ -51,7 +51,9 @@ var Client = exports.Client = function Client (port, host, options) {
this.channelCallbacks = {};

// State specifying if we're in the middle of a transaction or not.
this.doConsiderCommandsTransactional = false;
this.isTransacting = false;
this.cmdsToRunAfterTxn = [];
this.queuedTransactionBlocks = []; // If more than 1 transaction is called near the same time

// For storing queued commands that build up when there isn't a connection
Expand All @@ -61,8 +63,6 @@ var Client = exports.Client = function Client (port, host, options) {

this.connectionsMade = 0;

var client = this; // For closures

// Re-usable parser used to interpret the leading line of every reply
this._firstLineParser = new FirstLine();

Expand All @@ -72,72 +72,10 @@ var Client = exports.Client = function Client (port, host, options) {
// Setup the TCP connection
var stream = this.stream = net.createConnection(this.port = port, this.host = host);

var replyStream = new ReplyStream(stream, client);
replyStream.on("reply", function (reply) {
// sys.log(sys.inspect(reply)); // Uncomment this to see the reply
/* Handle special case of PubSub */
var pubSubCallback, replyValue;
if (reply.isMessage || reply.isPMessage) {
replyValue = reply.replyValue;
pubSubCallback = client.channelCallbacks[replyValue.channelPattern || replyValue.channelName];
pubSubCallback(replyValue.channelName, replyValue.message, replyValue.channelPattern)
return;
}

// Now handle all other replies

// 1. Find the command name corresponding to the reply
// 2. Find or define a callback (needed for ALL reply types)
var commandForReply = client.commandHistory.shift(),
commandName = commandForReply.commandName,
commandCallback = commandForReply.commandCallback;

// sys.log(sys.inspect(commandForReply)); // Uncomment this to see which command corresponds to this

/* Handle Errors */
if (reply.replyType === ERROR) {
commandCallback(new Error(reply.replyValue), null);
return;
}

/* Handle Non-errors */

// If we switched database numbers, then save that
// number in this.currDB, so that we can switch back
// to the same DB on reconnection. We do this because
// reconnecting to Redis as a client connects you to
// database number 1 always.
if (commandName === "select") {
client.currDB = commandForReply.commandArgs[0];
}

// Handle specific command result typecasting
if (commandName === "info") {
var info = {};
reply.replyValue.replace(/\r\n$/, '').split("\r\n").forEach( function (line) {
var parts = line.split(":");
info[parts[0]] = parts[1];
});
reply.replyValue = info;
} else if (commandName === "exists") {
reply.replyValue = (reply.replyValue === 1) ? true : false;
} else if ((commandName === "zrange" || commandName === "zrevrange" || commandName === "zrangebyscore") && commandForReply.commandArgs[commandForReply.commandArgs.length-1] === "withscores" && reply.replyValue) {
var arr = reply.replyValue, hash, currKey, newArr = [];
for (var i = 0, len = arr.length; i < len; i++) {
if ((i % 2) === 0) {
currKey = arr[i];
} else {
hash = {};
hash[currKey] = arr[i];
newArr.push(hash);
}
}
reply.replyValue = newArr;
}

commandCallback(null, reply.replyValue);
});
var replyStream = new ReplyStream(stream, this);
replyStream.on("reply", this.handleReply);

var client = this; // For closures
stream.on("connect", function () {
var eventName = client.connectionsMade === 0
? "connected"
Expand Down Expand Up @@ -196,6 +134,80 @@ Client.prototype.DEFAULT_OPTIONS = {
reconnectionTimer: null
};

Client.prototype.handleReply = function (reply, client, isParsingExecReply) {
// sys.log(sys.inspect(reply)); // Uncomment this to see the reply
/* Handle special case of PubSub */
var pubSubCallback, replyValue;
if (reply.isMessage || reply.isPMessage) {
replyValue = reply.replyValue;
pubSubCallback = client.channelCallbacks[replyValue.channelPattern || replyValue.channelName];
pubSubCallback(replyValue.channelName, replyValue.message, replyValue.channelPattern)
return;
}

// Now handle all other replies

// 1. Find the command name corresponding to the reply
// 2. Find or define a callback (needed for ALL reply types)
var commandForReply, txnCommand, commandName, commandCallback;
if (isParsingExecReply) {
txnCommand = client.currTxnCommands.shift();
commandName = txnCommand.commandName;
commandCallback = txnCommand.callback;
replyValue = reply;
} else {
commandForReply = client.commandHistory.shift();
commandName = commandForReply.commandName;
commandCallback = commandForReply.commandCallback;
replyValue = reply.replyValue;
}

// sys.log(sys.inspect(commandForReply)); // Uncomment this to see which command corresponds to this

/* Handle Errors */
if (reply.replyType === ERROR) {
commandCallback(new Error(reply.replyValue), null);
return;
}

/* Handle Non-errors */

// If we switched database numbers, then save that
// number in this.currDB, so that we can switch back
// to the same DB on reconnection. We do this because
// reconnecting to Redis as a client connects you to
// database number 1 always.
if (commandName === "select") {
client.currDB = commandForReply.commandArgs[0];
}

// Handle specific command result typecasting
if (commandName === "info") {
var info = {};
replyValue.replace(/\r\n$/, '').split("\r\n").forEach( function (line) {
var parts = line.split(":");
info[parts[0]] = parts[1];
});
replyValue = info;
} else if (commandName === "exists") {
replyValue = (replyValue === 1) ? true : false;
} else if ((commandName === "zrange" || commandName === "zrevrange" || commandName === "zrangebyscore") && commandForReply.commandArgs[commandForReply.commandArgs.length-1] === "withscores" && replyValue) {
var arr = replyValue, hash, currKey, newArr = [];
for (var i = 0, len = arr.length; i < len; i++) {
if ((i % 2) === 0) {
currKey = arr[i];
} else {
hash = {};
hash[currKey] = arr[i];
newArr.push(hash);
}
}
replyValue = newArr;
}

commandCallback(null, replyValue);
};

Client.prototype.close = function () {
this.expectingClose = true;
this.stream.end();
Expand Down Expand Up @@ -259,9 +271,7 @@ var commands = require("./commandList");
commands.forEach( function (commandName) {
Client.prototype[commandName] = function () {
var args = Array.prototype.slice.call(arguments);
if (!(args[0] instanceof Command)) {
args.unshift(commandName);
}
args.unshift(commandName);
this.sendCommand.apply(this, args);
};
});
Expand All @@ -272,29 +282,59 @@ commands.forEach( function (commandName) {
// 1. Start transaction via MULTI
// 2. Send commands (returns immediately)
// 3. Send exec
Client.prototype.sendCommandInsideTransaction = function () {
var args = Array.prototype.slice.call(arguments);
args.push(true); // pass a flag to #sendCommand to indicate we're in a transaction
this.sendCommand.apply(this, args);
};
// TODO Remove this
//Client.prototype.sendCommandInsideTransaction = function () {
// var args = Array.prototype.slice.call(arguments);
// args.push(true); // pass a flag to #sendCommand to indicate we're in a transaction
// this.sendCommand.apply(this, args);
//};

// sendCommand(commandName[, arg1[, arg2[,...[, callback]]]])
Client.prototype.sendCommand = function () {
var stream = this.stream,
client = this,
args = Array.prototype.slice.call(arguments),
command,
insideTransaction = (args[args.length-1] === true) ?
args.pop() : // Remove insideTransaction flag from args, so it isn't sent to Redis
false;

if (args[0] instanceof Command) {
command = args[0];
insideTransaction = true;
} else {
command = new Command(args, this);
command;
if (args[0] !== "discard" && !this.doConsiderCommandsTransactional && this.isTransacting) {// && (this.numUnackedTxnCmds > 0 || this.cmdsToRunAfterTxn.length > 0)) {
this.cmdsToRunAfterTxn.push(args);
return;
}
command = new Command(args, this);

if (this.doConsiderCommandsTransactional) { // && command.commandName !== "exec") {
this.numUnackedTxnCmds++;
// command.transformCuzPartOfTransaction();
var intendedCommandCallback = command.commandCallback;
var currTxnCommands = this.currTxnCommands = this.currTxnCommands || [];
var ackCallback = function (err, reply) {
if (!err && reply !== "QUEUED") {
err = command.commandName + " was not queued in the transaction.";
}
if (err) { // If there was an error in the syntax of this command
// Remove the transaction commands still ahead of me:
for (var i = 0, len = this.numUnackedTxnCmds; i < len; i++); {
client.commandHistory.shift();
}
// Tell the Redis server to cancel the transaction,
// so it doesn't block other clients' commands
client.isTransacting = false;
client.discard( function (errDiscard, reply) {
client.currTxnCommands = [];
client.runPostTxnCommands();
});
// TODO How do I inform the user that the transaction was rolled back?
// throw err;
} else {
client.numUnackedTxnCmds--;
if (client.didRegisterAllCommands && client.numUnackedTxnCmds === 0) {
client.sendExecToServer();
}
}
}
command.commandCallback = ackCallback;
currTxnCommands.push({commandName: command.commandName, callback: intendedCommandCallback});
}
if (!stream.writable || (this.isTransacting && !insideTransaction)) {
if (!stream.writable) { // TODO Analyze this condition with transaction scenario
this.queuedCommandHistory.push(command);
// this.queuedCommandBuffers.push(command.toBuffer());
} else {
Expand Down Expand Up @@ -324,11 +364,54 @@ Client.prototype.transaction = function (doStuffInsideTransaction) {
// client.transaction (function (t2) {
// // ... Do stuff here
// }
if (this.isTransacting) { // Handles consecutive
this.queuedTransactionBlocks.push(doStuffInsideTransaction);
if (this.doConsiderCommandsTransactional) {
doStuffInsideTransaction();
} else if (!this.isTransacting) {
this.sendMultiToServer();
this.isTransacting = true;
this.doConsiderCommandsTransactional = true;
this.didRegisterAllCommands = false;
this.numUnackedTxnCmds = 0;
doStuffInsideTransaction();
this.didRegisterAllCommands = true;
this.doConsiderCommandsTransactional = false;
if (this.numUnackedTxnCmds === 0) this.sendExecToServer();
} else {
var txn = this.txn = new Transaction(this);
txn.execute(doStuffInsideTransaction);
this.cmdsToRunAfterTxn.push(doStuffInsideTransaction);
}
// TODO Remove this.queuedTransactionBlocks
// TODO Remove Transaction
};

Client.prototype.sendMultiToServer = function () {
this.multi( function (err, reply) {
if (err) throw err;
if (reply !== true) throw new Error("Expected 'OK'. Reply is " + sys.inspect(reply));
});
};

Client.prototype.sendExecToServer = function () {
var client = this;
this.isTransacting = false;
this.exec( function (err, replies) {
if (err) throw err;
var reply;
while (reply = replies.shift()) {
client.handleReply(reply, client, true);
}
});
this.runPostTxnCommands();
};

Client.prototype.runPostTxnCommands = function () {
var nextCmds = this.cmdsToRunAfterTxn, nextCmdAsArray;
while (nextCmdAsArray = nextCmds.shift()) {
if (typeof nextCmdAsArray === "function") {
this.transaction(nextCmdAsArray);
break;
} else {
this.sendCommand.apply(this, nextCmdAsArray);
}
}
};

Expand Down Expand Up @@ -428,4 +511,4 @@ Client.prototype.unsubscribeFrom = function (nameOrPattern) {
if (callback) commandArgs.push(callback);
this.sendCommand.apply(this, commandArgs);
};
});
});
23 changes: 23 additions & 0 deletions lib/command.js
Expand Up @@ -73,5 +73,28 @@ _maybeResizeRequestBuffer: function (atLeast, offset) {
this.client.requestBuffer.copy(newBuffer, 0, 0, offset);
this.client.requestBuffer = newBuffer;
}
},
_transactionAckCallback: function (err, reply) {
var client = this.client;
if (!err && reply !== "QUEUED") {
err = this.commandName + " was not queued in the transaction.";
}
if (err) { // If there was an error in the syntax of this command
// Remove the transaction commands still ahead of me:
for (var i = 0; i < txn.numUnackedCommands; i++); {
client.commandHistory.shift();
}
// Tell the Redis server to cancel the transaction,
// so it doesn't block other clients' commands
client.sendCommandInsideTransaction("discard", function (errDiscard, reply) {
txn.emit("exit");
});
// throw err;
} else {
txn.numUnackedCommands--;
if (txn.didRegisterAllCommands && (txn.numUnackedCommands === 0)) {
txn.appendExec();
}
}
}
};

0 comments on commit 45f9271

Please sign in to comment.