Permalink
Browse files

added binary support

  • Loading branch information...
1 parent e119f8c commit 18137165b14cc7f311a1b973e8454c28b4b43f2d @kilianc committed Jul 6, 2012
Showing with 103 additions and 54 deletions.
  1. +83 −36 lib/nssocket.js
  2. +3 −3 package.json
  3. +1 −1 test/tcp-reconnect-test.js
  4. +8 −7 test/tcp-test.js
  5. +8 −7 test/tls-test.js
View
119 lib/nssocket.js
@@ -9,7 +9,7 @@ var net = require('net'),
tls = require('tls'),
util = require('util'),
events2 = require('eventemitter2'),
- Lazy = require('lazy'),
+ BufferJoiner = require('bufferjoiner'),
common = require('./common');
//
@@ -77,6 +77,12 @@ var NsSocket = exports.NsSocket = function (socket, options) {
maxListeners: options.maxListeners || 10
});
+ // Initializing parsing holders
+ this._eventLength = -1;
+ this._messageLength = -1;
+ this._messagetype = 0;
+ this._bufferJoiner = new BufferJoiner();
+
this._setup();
};
@@ -124,30 +130,42 @@ exports.createServer = function createServer(options, connectionListener) {
//
NsSocket.prototype.send = function send(event, data, callback) {
var dataType = typeof data,
+ header = new Buffer(9),
message;
+ // if we aren't connected/socketed, then error
+ if (!this.socket || !this.connected) {
+ return this.emit('error', new Error('NsSocket: sending on a bad socket'));
+ }
+
// rebinds
if (typeof event === 'string') {
event = event.split(this._delimiter);
}
-
+
+ event = Buffer(JSON.stringify(event));
+
if (dataType === 'undefined' || dataType === 'function') {
callback = data;
data = null;
}
- // if we aren't connected/socketed, then error
- if (!this.socket || !this.connected) {
- return this.emit('error', new Error('NsSocket: sending on a bad socket'));
+ if (Buffer.isBuffer(data)) {
+ header.writeInt8(1, 8);
+ } else {
+ data = Buffer(JSON.stringify(data));
+ header.writeInt8(0, 8);
}
-
- message = Buffer(JSON.stringify(event.concat(data)) + '\n');
+ header.writeUInt32BE(event.length, 0);
+ header.writeUInt32BE(data.length, 4);
+
+ message = Buffer.concat([header, event, data], 9 + event.length + data.length);
+
+ // now actually write to the socket
if (this.socket.cleartext) {
this.socket.cleartext.write(message, callback);
- }
- else {
- // now actually write to the socket
+ } else {
this.socket.write(message, callback);
}
};
@@ -364,11 +382,8 @@ NsSocket.prototype._setup = function () {
//
if (this._type === 'tcp4') {
startName = 'connect';
-
- Lazy(this.socket)
- .lines
- .map(String)
- .forEach(this._onData.bind(this));
+
+ this.socket.on('data', this._onData.bind(this));
// create a stub for the setKeepAlive functionality
this.setKeepAlive = function () {
@@ -378,12 +393,9 @@ NsSocket.prototype._setup = function () {
else if (this._type === 'tls') {
startName = 'secureConnection';
this.socket.once('connect', function () {
- Lazy(self.socket.cleartext)
- .lines
- .map(String)
- .forEach(self._onData.bind(self));
+ self.socket.cleartext.on('data', self._onData.bind(self));
});
-
+
// create a stub for the setKeepAlive functionality
this.setKeepAlive = function () {
self.socket.socket.setKeepAlive.apply(self.socket.socket, arguments);
@@ -422,26 +434,61 @@ NsSocket.prototype._onStart = function _onStart() {
//
// ### @private function _onData (message)
-// #### @message {String} literal message from the data event of the socket
-// Messages are assumed to be delimited properly (if using nssocket to send)
-// otherwise the delimiter should exist at the end of every message
+// #### @chunk {Buffer} binary chunk from the data event of the socket
+// Messages are assumed to be formatted properly (if using nssocket to send)
// We assume messages arrive in order.
//
-NsSocket.prototype._onData = function _onData(message) {
- var parsed,
- data;
-
- try {
- parsed = JSON.parse(message);
- data = parsed.pop();
+NsSocket.prototype._onData = function _onData(chunk) {
+ ~this._incomingMessageLength ? this._fetchHeader(chunk) : this._fetchBody(chunk);
+};
+
+//
+// ### @private function _fetchHeader (chunk)
+// #### @chunk {Buffer} binary chunk from the data event of the socket
+// Buffers and parse the header of the message
+//
+NsSocket.prototype._fetchHeader = function _fetchHeader(chunk) {
+ var header
+
+ if (this._bufferJoiner.length + chunk.length >= 9) {
+ header = this._bufferJoiner.add(chunk).join();
+ this._eventLength = header.readUInt32BE(0);
+ this._messageLength = header.readUInt32BE(4);
+ this._messagetype = header.readInt8(8);
+ this._fetchBody(chunk.slice(9));
+ } else {
+ this._bufferJoiner.add(chunk);
}
- catch (err) {
- //
- // Don't do anything, assume that the message is only partially
- // received.
- //
+};
+
+//
+// ### @private function _fetchBody (chunk)
+// #### @chunk {Buffer} binary chunk from the data event of the socket
+// Buffers and parse the body of the message, when ready
+// emits a data::* event
+//
+NsSocket.prototype._fetchBody = function _fetchBody(chunk) {
+ var raw, event, data;
+ var chunkLength = chunk.length;
+ var bytesLeft = (this._eventLength + this._messageLength) - this._bufferJoiner.length;
+
+ if (chunkLength >= bytesLeft) {
+ raw = this._bufferJoiner.add(chunk.slice(0, bytesLeft)).join();
+ event = JSON.parse(raw.slice(0, this._eventLength));
+ data = this._messagetype ? raw.slice(this._eventLength) : JSON.parse(raw.slice(this._eventLength).toString());
+
+ this._eventLength = -1;
+ this._messageLength = -1;
+ this.emit(['data'].concat(event), data);
+
+ if (chunkLength - bytesLeft) {
+ process.nextTick(this._fetchHeader.bind(this, chunk.slice(bytesLeft)));
+ }
+
+ return;
}
- this.emit(['data'].concat(parsed), data);
+
+ this._bufferJoiner.add(chunk);
};
//
View
6 package.json
@@ -12,15 +12,15 @@
"url": "http://github.com/nodejitsu/nssocket.git"
},
"dependencies": {
- "eventemitter2": "0.4.x",
- "lazy": "1.0.x"
+ "bufferjoiner": "0.1.x",
+ "eventemitter2": "0.4.x"
},
"devDependencies": {
"vows": "0.6.x"
},
"main": "./lib/nssocket",
"engines": {
- "node": ">= 0.4.x"
+ "node": ">= 0.7.x"
},
"scripts": {
"test": "vows test/*-test.js --spec"
View
2 test/tcp-reconnect-test.js
@@ -58,7 +58,7 @@ vows.describe('nssocket/tcp/reconnect').addBatch({
"the on() method": {
topic: function (outbound, inbound) {
outbound.on('data.}here.}is', this.callback.bind(outbound, null));
- inbound.write(JSON.stringify(['here', 'is', 'something.']) + '\n');
+ inbound.write(Buffer('0000000d0000000c005b2268657265222c226973225d22736f6d657468696e672e22', 'hex'));
},
"should handle namespaced events": function (_, data) {
assert.isArray(this.event);
View
15 test/tcp-test.js
@@ -47,7 +47,7 @@ vows.describe('nssocket/tcp').addBatch({
"the on() method": {
topic: function (outbound, inbound) {
outbound.on('data.}here.}is', this.callback.bind(outbound, null));
- inbound.write(JSON.stringify(['here', 'is', 'something.']) + '\n');
+ inbound.write(Buffer('0000000d0000000c005b2268657265222c226973225d22736f6d657468696e672e22', 'hex'));
},
"should handle namespaced events": function (_, data) {
assert.isArray(this.event);
@@ -72,15 +72,16 @@ vows.describe('nssocket/tcp').addBatch({
"the send() method": {
topic: function (outbound, inbound) {
inbound.on('data', this.callback.bind(null, null, outbound, inbound));
- outbound.send(['hello','world'], { some: "json", data: 123 });
+ outbound.send(['hello','world'], Buffer('foo::bar'));
},
"we should see it on the other end": function (_, outbound, wraped, data) {
assert.isObject(data);
- arr = JSON.parse(data.toString());
- assert.lengthOf(arr, 3);
- assert.equal(arr[0], 'hello');
- assert.equal(arr[1], 'world');
- assert.deepEqual(arr[2], { some: "json", data: 123 });
+ event = JSON.parse(data.slice(9, 26).toString());
+ data = data.slice(26).toString();
+ assert.lengthOf(event, 2);
+ assert.equal(event[0], 'hello');
+ assert.equal(event[1], 'world');
+ assert.deepEqual(data, 'foo::bar');
},
"the end() method": {
topic: function (outbound, inbound) {
View
15 test/tls-test.js
@@ -57,7 +57,7 @@ vows.describe('nssocket/tls').addBatch({
"the on() method": {
topic: function (outbound, inbound) {
outbound.on(['data', 'here', 'is'], this.callback.bind(outbound, null));
- inbound.write(JSON.stringify(['here', 'is', 'something']) + '\n');
+ inbound.write(Buffer('0000000d0000000b005b2268657265222c226973225d22736f6d657468696e6722', 'hex'));
},
"should handle namespaced events": function (_, data) {
assert.isString(data);
@@ -79,15 +79,16 @@ vows.describe('nssocket/tls').addBatch({
"the send() method": {
topic: function (outbound, inbound) {
inbound.on('data', this.callback.bind(null, null, outbound, inbound));
- outbound.send(['hello','world'], { some: "json", data: 123 });
+ outbound.send(['hello','world'], Buffer('foo::bar'));
},
"we should see it on the other end": function (_, outbound, inbound, data) {
assert.isObject(data);
- arr = JSON.parse(data.toString());
- assert.lengthOf(arr, 3);
- assert.equal(arr[0], 'hello');
- assert.equal(arr[1], 'world');
- assert.deepEqual(arr[2], { some: "json", data: 123 });
+ event = JSON.parse(data.slice(9, 26).toString());
+ data = data.slice(26).toString();
+ assert.lengthOf(event, 2);
+ assert.equal(event[0], 'hello');
+ assert.equal(event[1], 'world');
+ assert.deepEqual(data, 'foo::bar');
},
"the end() method": {
topic: function (outbound, inbound) {

0 comments on commit 1813716

Please sign in to comment.