Skip to content
This repository has been archived by the owner on Jan 31, 2018. It is now read-only.

Commit

Permalink
Arrayification of TCP transports. Removed the corruption checks becau…
Browse files Browse the repository at this point in the history
…se the simplified format has no safeguards against it -- relying on TCP for integrity.
  • Loading branch information
David Ellis committed Mar 18, 2013
1 parent bb54184 commit 5731cdc
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 165 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
@@ -1,11 +1,11 @@
language: node_js
node_js:
- 0.8
- 0.9
- 0.10

notifications:
email:
recipients:
- dispatch@uber.com
on_success: change
on_failure: change
on_failure: change
41 changes: 19 additions & 22 deletions lib/transports/client/tcp.js
Expand Up @@ -3,24 +3,12 @@ var util = require('util');
var EventEmitter = require('events').EventEmitter;
var shared = require('../shared/tcp');

require('buffertools');

// Client Transport's data handling function, bound to the TcpTransport
// instance when attached to the data event handler
function onData(data) {
// Since this transport is assumed to be used for JSON-RPC, all data
// is assumed to be UTF8.
this.buffer = this.buffer.concat(data);
// The core of the algorithm is to find null terminals in the data stream
// that demark different response messages. This is done with a while loop
// because a single chunk of data could conceivably contain more than one
// message, or it could contain only part of one message
var result, obj;
while ((result = shared.parseBuffer(this.buffer, this))) {
this.buffer = result[0]; obj = result[1];
this.emit('message', obj);
if(this.requests[obj.id]) this.requests[obj.id].callback(obj);
delete this.requests[obj.id];
function onDataCallback(message) {
if(this.requests[message.id]) {
this.requests[message.id].callback(message);
delete this.requests[message.id];
}
}

Expand All @@ -33,7 +21,6 @@ function onEnd() {
this.emit('retry');
// When reconnecting, all previous buffered data is invalid, so wipe
// it out, and then increment the retry flag
this.buffer = new Buffer('');
this.retry++;
// At the interval specified by the user, attempt to reestablish the
// connection
Expand Down Expand Up @@ -61,11 +48,17 @@ function onEnd() {
}.bind(this));
}.bind(this));
// Reconnect the data and end event handlers to the new connection object
this.con.on('data', onData.bind(this));
this.con.on('data', shared.createDataHandler(this, onDataCallback.bind(this)));
this.con.on('end', onEnd.bind(this));
this.con.on('error', function(e) {
this.con.destroy();
}.bind(this));
this.on('error', function(e) {
// Shared TCP code failed to parse a message, which means corrupt data from the server
// Emit the 'babel' event and how many requests are being retried
this.emit('babel', this.requests.length);
this.con.destroy();
}.bind(this));
}
// If this is the first try, attempt to reconnect immediately
if(this.retry === 1) reconnect.call(this);
Expand Down Expand Up @@ -98,7 +91,6 @@ function TcpTransport(server, port, config) {
// and build the buffer
this.timeout = config.timeout || 2*60*1000;
this.sweepInterval = setInterval(this.sweep.bind(this), this.timeout);
this.buffer = new Buffer('');

// Establish a connection to the server
this.con = net.connect({
Expand All @@ -107,11 +99,17 @@ function TcpTransport(server, port, config) {
});

// And handle incoming data and connection closing
this.con.on('data', onData.bind(this));
this.con.on('data', shared.createDataHandler(this, onDataCallback.bind(this)));
this.con.on('end', onEnd.bind(this));
this.con.on('error', function(e) {
this.con.destroy();
}.bind(this));
this.on('error', function(e) {
// Shared TCP code failed to parse a message, which means corrupt data from the server
// Emit the 'babel' event and how many requests are being retried
this.emit('babel', this.requests.length);
this.con.destroy();
}.bind(this));

return this;
}
Expand All @@ -131,8 +129,7 @@ TcpTransport.prototype.request = function request(body, callback) {
body: body,
timestamp: new Date().getTime()
};
var message = JSON.stringify(body);
if(this.con) this.con.write(Buffer.byteLength(message) + '\0' + message + '\0');
if(this.con) this.con.write(shared.formatMessage(body));
};

// The sweep function looks at the timestamps for each request, and any
Expand Down
21 changes: 5 additions & 16 deletions lib/transports/server/tcp.js
Expand Up @@ -3,8 +3,6 @@ var util = require('util');
var EventEmitter = require('events').EventEmitter;
var shared = require('../shared/tcp');

require('buffertools');

// The Server TCP Transport constructor function
function TcpTransport(port, config) {
// Initialize the EventEmitter for this object
Expand All @@ -26,17 +24,9 @@ function TcpTransport(port, config) {

this.server = net.createServer(function(con) {
this.emit('connection', con);
// For each connection establish a buffer to put UTF8 text into
var buffer = new Buffer('');
con.on('data', function(data) {
buffer = buffer.concat(data);
var result, obj;
while ((result = shared.parseBuffer(buffer, this))) {
buffer = result[0]; obj = result[1];
this.emit('message', obj);
this.handler(obj, this.handlerCallback.bind(this, con));
}
}.bind(this));
con.on('data', shared.createDataHandler(this, function(message) {
this.handler(message, this.handlerCallback.bind(this, con));
}.bind(this)));
con.on('end', function() {
this.emit('closedConnection', con);
// When the connection for a client dies, make sure the handlerCallbacks don't try to use it
Expand Down Expand Up @@ -88,10 +78,9 @@ function TcpTransport(port, config) {
// Attach the EventEmitter prototype into the prototype chain
util.inherits(TcpTransport, EventEmitter);

// An almost ridiculously simple callback handler, whenever the return object comes in, stringify it and send it down the line (along with a message delimiter
// An almost ridiculously simple callback handler, whenever the return object comes in, stringify it and send it down the line (along with a message length prefix)
TcpTransport.prototype.handlerCallback = function handlerCallback(con, retObj) {
var retStr = '' + JSON.stringify(retObj);
if(con) con.write(Buffer.byteLength(retStr) + '\0' + retStr + '\0');
if(con) con.write(shared.formatMessage(retObj));
};

// When asked to shutdown the server, shut it down
Expand Down
136 changes: 106 additions & 30 deletions lib/transports/shared/tcp.js
@@ -1,44 +1,120 @@
// Take a JSON object and transform it into a [Pascal string](http://en.wikipedia.org/wiki/String_%28computer_science%29#Length-prefixed) stored in a buffer.
// The length prefix is big-endian because DEATH TO THE LITTLE ENDIAN LILLIPUTIANS!
function formatMessage(obj) {
var str = JSON.stringify(obj);
return Buffer.byteLength(str) + '\0' + str + '\0';
var strlen = Buffer.byteLength(str);
var buf = new Buffer(4 + strlen);
buf.writeUInt32BE(strlen, 0);
buf.write(str, 4, strlen, 'utf8');
return buf;
}

function containsCompleteMessage(str) {
return str.split('\0').length > 2;
// Since all messages start with a length prefix and the "current" message is the first in the buffers array,
// we can determine the message length just by the first buffer in the array. This technically assumes that
// a buffer is at least 4 bytes large, but that should be a safe assumption.
function getMessageLen(buffers) {
if(buffers[0] && buffers[0].length >= 4) {
return buffers[0].readUInt32BE(0);
} else {
return 0;
}
}

// Simple helper function that returns the minimum value from all values passed into it
function min() {
return Array.prototype.reduce.call(arguments, function(curr, val) {
return (val < curr) ? val : curr;
}, Infinity);
}

function parseBuffer(buffer, eventEmitter) {
var nullSpot = buffer.indexOf('\0');
if (nullSpot !== -1) {
var messageSizeStr = buffer.toString('utf8', 0, nullSpot);
var messageSizePrefixBytes = Buffer.byteLength(messageSizeStr);
var messageSize = Number(messageSizeStr);
if (isNaN(messageSize) || messageSize < 0) {
eventEmitter.emit('error', new Error('Invalid message format: Not a valid message length: "' + messageSizeStr + '"'));
return parseBuffer(buffer.slice(nullSpot + 1), eventEmitter);
// Given an array of buffers, the message length, and the eventEmitter object (in case of error)
// try to parse the message and return the object it contains
function parseBuffer(buffers, messageLen, eventEmitter) {

// Allocate a new buffer the size of the message to copy the buffers into
// and keep track of how many bytes have been copied and what buffer we're currently on
var buf = new Buffer(messageLen);
var bytesCopied = 0;
var currBuffer = 0;

// Continue copying until we've hit the message size
while (bytesCopied < messageLen) {

// bytesToCopy contains how much of the buffer we'll copy, either the
// "whole thing" or "the rest of the message".
var bytesToCopy = 0;

// Since the first buffer contains the message length itself, it's special-cased
// to skip those 4 bytes
if (currBuffer === 0) {
bytesToCopy = min(messageLen, buffers[0].length-4);
buffers[0].copy(buf, bytesCopied, 4, bytesToCopy+4);
} else {
bytesToCopy = min(messageLen-bytesCopied, buffers[currBuffer].length);
buffers[currBuffer].copy(buf, bytesCopied, 0, bytesToCopy);
}

// Increment the number of bytes copied by how many were copied
bytesCopied += bytesToCopy;

// If we're done, we have some cleanup to do; either appending the final chunk of the buffer
// to the next buffer, or making sure that the array slice after the while loop is done
// appropriately
if (bytesCopied === messageLen) {
if(currBuffer === 0) bytesToCopy += 4;
if(buffers[currBuffer].length != bytesToCopy) {
buffers[currBuffer] = buffers[currBuffer].slice(bytesToCopy);
if (buffers[currBuffer].length < 4 && buffers[currBuffer+1]) {
buffers[currBuffer+1] = Buffer.concat([buffers[currBuffer], buffers[currBuffer+1]]);
} else {
currBuffer--; // Counter the increment below
}
}
}
var totalMessageLength = 2 + messageSizePrefixBytes + messageSize;
if (buffer[totalMessageLength - 1] === undefined) {
// Return when we do not have the full contents of the message in the buffer
return;
} else if (buffer[totalMessageLength - 1] !== 0) {
// There is no message delimiter where we expect one - we assume the buffer is
// corrupt and try to recover by advancing to the next delimiter
eventEmitter.emit('error', new Error('Invalid message format: No message delimiter as position ' + (totalMessageLength - 1) + ' for message "' + messageSizeStr + '"'));
return parseBuffer(buffer.slice(nullSpot + 1), eventEmitter);

// Move on to the next buffer in the array
currBuffer++;
}

// Trim the buffers array to the next message
buffers = buffers.slice(currBuffer);

// Parse the buffer we created into a string and then a JSON object, or emit the parsing error
var obj;
try {
obj = JSON.parse(buf.toString());
} catch (e) {
eventEmitter.emit('error', e);
}
return [buffers, obj];
}


function createDataHandler(self, callback) {
var buffers = [], bufferLen = 0, messageLen = 0;
return function dataHandler(data) {
if(buffers[buffers.length-1] && buffers[buffers.length-1].length < 4) {
buffers[buffers.length-1] = Buffer.concat([buffers[buffers.length-1], data], buffers[buffers.length-1].length + data.length);
} else {
buffers.push(data);
}
var message = buffer.toString('utf8', messageSizePrefixBytes + 1, messageSizePrefixBytes + 1 + messageSize);
buffer = buffer.slice(totalMessageLength);
var obj;
try {
obj = JSON.parse(message);
} catch(e) {
eventEmitter.emit('error', e);
bufferLen += data.length;
if(!messageLen) messageLen = getMessageLen(buffers);
if(bufferLen - 4 >= messageLen) {
var result, obj;
while (messageLen && bufferLen - 4 >= messageLen && (result = parseBuffer(buffers, messageLen, self))) {
buffers = result[0]; obj = result[1];
this.emit('message', obj);
callback(obj);
bufferLen = bufferLen - (messageLen + 4);
messageLen = getMessageLen(buffers);
}
}
return [buffer, obj];
}
}

// Export the public methods
module.exports.formatMessage = formatMessage;
module.exports.getMessageLen = getMessageLen;
module.exports.parseBuffer = parseBuffer;
module.exports.containsCompleteMessage = containsCompleteMessage;
module.exports.createDataHandler = createDataHandler;
3 changes: 1 addition & 2 deletions package.json
Expand Up @@ -17,8 +17,7 @@
"main": "lib/index.js",
"dependencies": {
"queue-flow": "*",
"lambda-js": "*",
"buffertools": "*"
"lambda-js": "*"
},
"devDependencies": {
"nodeunit": "*",
Expand Down
24 changes: 15 additions & 9 deletions test/client-tcp.js
Expand Up @@ -5,10 +5,12 @@ var net = require('net');
exports.loopback = function(test) {
test.expect(1);
var server = net.createServer(function(con) {
var buffer = '';
var buffer = new Buffer('');
var messageLen = 0;
con.on('data', function(data) {
buffer += data.toString();
if(shared.containsCompleteMessage(buffer)) {
buffer = Buffer.concat([buffer, data]);
if(messageLen === 0) messageLen = shared.getMessageLen([data]);
if(buffer.length === messageLen + 4) {
con.write(buffer);
con.end();
}
Expand All @@ -27,10 +29,12 @@ exports.loopback = function(test) {
exports.sweep = function(test) {
test.expect(2);
var server = net.createServer(function(con) {
var buffer = '';
var buffer = new Buffer('');
var messageLen = 0;
con.on('data', function(data) {
buffer += data.toString();
if(shared.containsCompleteMessage(buffer)) {
buffer = Buffer.concat([buffer, data]);
if(messageLen === 0) messageLen = shared.getMessageLen([data]);
if(buffer.length === messageLen + 4) {
setTimeout(function() {
con.write(buffer);
con.end();
Expand All @@ -57,10 +61,12 @@ exports.glitchedConnection = function(test) {
var con;
var serverFunc = function(c) {
con = c;
var buffer = '';
var buffer = new Buffer('');
var messageLen = 0;
c.on('data', function(data) {
buffer += data.toString();
if(shared.containsCompleteMessage(buffer)) {
buffer = Buffer.concat([buffer, data]);
if(messageLen === 0) messageLen = shared.getMessageLen([data]);
if(buffer.length === messageLen + 4) {
setTimeout(function() {
if(con) {
con.write(buffer);
Expand Down
30 changes: 21 additions & 9 deletions test/client.js
Expand Up @@ -40,17 +40,29 @@ exports.loopbackHttp = function(test) {
exports.failureTcp = function(test) {
test.expect(2);
var server = net.createServer(function(con) {
var buffer = new Buffer('');
var buffers = [];
var bufferLen = 0;
var messageLen = 0;
con.on('data', function(data) {
buffer = buffer.concat(data);
buffers.push(data);
bufferLen += data.length;
if(messageLen === 0) messageLen = shared.getMessageLen(buffers);
var res, obj;
while ((res = shared.parseBuffer(buffer))) {
buffer = res[0];
obj = res[1];
con.write(shared.formatMessage({
id: obj && obj.id,
error: "I have no idea what I'm doing."
}));
if(bufferLen - 4 >= messageLen) {
while (messageLen && bufferLen - 4 >= messageLen && (res = shared.parseBuffer(buffers, messageLen))) {
buffers = res[0];
obj = res[1];
con.write(shared.formatMessage({
id: obj && obj.id,
error: "I have no idea what I'm doing."
}));
bufferLen = buffers.map(function(buffer) {
return buffer.length;
}).reduce(function(fullLen, currLen) {
return fullLen + currLen;
}, 0);
messageLen = shared.getMessageLen(buffers);
}
}
});
});
Expand Down

0 comments on commit 5731cdc

Please sign in to comment.