Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Create msgpack.Stream abstraction; split C++/JS.

- Provide the external API to the addon as lib/msgpack.js, which builds
  off of the rudimentary facilities provided by the bindings. This
  requires installation to install both build/default/mpBindings.node
  and lib/msgpack.js.
- Create msgpack.Stream abstraction which wraps a net.Stream and emits
  'msg' events when a full message is received. Provides a 'sendmsg'
  method to transparently pack JavaScript objects.
- Add tests for msgpack.Stream.
  • Loading branch information...
commit 976b5126d0f326c1ca5083b25d127a9cfdf2e92b 1 parent f22f530
@pgriess pgriess authored
View
71 lib/msgpack.js
@@ -0,0 +1,71 @@
+// Wrap a nicer JavaScript API that wraps the direct MessagePack bindings.
+
+var buffer = require('buffer');
+var events = require('events');
+var mpBindings = require('mpBindings');
+var sys = require('sys');
+
+var debug = ('NODE_DEBUG' in process.env) ?
+ function(m) { sys.error.apply(this, arguments); } :
+ function() { };
+
+var pack = mpBindings.pack;
+var unpack = mpBindings.unpack;
+
+exports.pack = pack;
+exports.unpack = unpack;
+
+var Stream = function(s) {
+ var self = this;
+
+ events.EventEmitter.call(self);
+
+ // Buffer of incomplete stream data
+ self.buf = null;
+
+ // Send a message down the stream
+ self.send = function(m) {
+ s.write(pack(m));
+ };
+
+ // Listen for data from the underlying stream, consuming it and emitting
+ // 'msg' events as we find whole messages.
+ s.addListener('data', function(d) {
+ debug('received ' + d.length + ' bytes');
+
+ // Make sure that self.buf reflects the entirety of the unread stream
+ // of bytes; it needs to be a single buffer
+ if (self.buf) {
+ var b = new buffer.Buffer(self.buf.length + d.length);
+ self.buf.copy(b, 0, 0, self.buf.length);
+ d.copy(b, self.buf.length, 0, d.length);
+
+ self.buf = b;
+ } else {
+ self.buf = d;
+ }
+
+ // Consume messages from the stream, one by one
+ while (self.buf && self.buf.length > 0) {
+ var msg = unpack(self.buf);
+ if (!msg) {
+ break;
+ }
+
+ self.emit('msg', msg);
+ if (unpack.bytes_remaining > 0) {
+ self.buf = self.buf.slice(
+ self.buf.length - unpack.bytes_remaining,
+ self.buf.length
+ );
+ } else {
+ self.buf = null;
+ }
+ }
+
+ debug(((self.buf) ? self.buf.length : 0) + ' bytes remaining');
+ });
+};
+
+sys.inherits(Stream, events.EventEmitter);
+exports.Stream = Stream;
View
2  src/wscript
@@ -11,7 +11,7 @@ def configure(ctx):
def build(ctx):
t = ctx.new_task_gen('cxx', 'shlib', 'node_addon')
- t.target = 'msgpack'
+ t.target = 'mpBindings'
t.source = 'msgpack.cc'
t.includes = ['../deps/msgpack/dist/include']
t.cxxflags = ['-g', '-Wall']
View
66 test/test-stream-disjoint.js
@@ -0,0 +1,66 @@
+// Verify that a msgpack.Stream can handle partial messages arriving.
+
+var assert = require('assert');
+var buffer = require('buffer');
+var msgpack = require('msgpack');
+var net = require('net');
+var netBindings = process.binding('net');
+var sys = require('sys');
+
+var MSGS = [
+ [1, 2, 3],
+ {'a' : 1, 'b' : 2},
+ {'test' : [1, 'a', 3]}
+];
+
+// Write a buffer to a stream with a delay
+var writemsg = function(s, buf, delay) {
+ setTimeout(function() {
+ sys.debug(sys.inspect(buf));
+ s.write(buf);
+ },
+ delay
+ );
+};
+
+var fds = netBindings.socketpair();
+
+var is = new net.Stream(fds[0]);
+var ims = new msgpack.Stream(is);
+var os = new net.Stream(fds[1]);
+
+var msgsReceived = 0;
+ims.addListener('msg', function(m) {
+ sys.debug('received msg: ' + sys.inspect(m));
+
+ assert.deepEqual(m, MSGS[msgsReceived]);
+
+ if (++msgsReceived == MSGS.length) {
+ is.end();
+ os.end();
+ }
+});
+is.resume();
+
+// Serialize the messages into a single large buffer
+var buf = new buffer.Buffer(0);
+MSGS.forEach(function (m) {
+ var b = msgpack.pack(m);
+ var bb = new buffer.Buffer(buf.length + b.length);
+ buf.copy(bb, 0, 0, buf.length);
+ b.copy(bb, buf.length, 0, b.length);
+
+ buf = bb;
+});
+
+// Slice our single large buffer into 3 pieces and send them all off
+// separately.
+var nwrites = 3;
+var sz = Math.ceil(buf.length / nwrites);
+for (var i = 0; i < nwrites; i++) {
+ writemsg(
+ os,
+ buf.slice(i * sz, Math.min((i + 1) * sz, buf.length)),
+ (i + 1) * 1000
+ );
+}
View
34 test/test-stream.js
@@ -0,0 +1,34 @@
+// Verify that msgpack.Stream can pass multiple messages around as expected.
+
+var assert = require('assert');
+var msgpack = require('msgpack');
+var net = require('net');
+var netBindings = process.binding('net');
+var sys = require('sys');
+
+var MSGS = [
+ [1, 2, 3],
+ {'a' : 1, 'b' : 2},
+ {'test' : [1, 'a', 3]}
+];
+var fds = netBindings.socketpair();
+
+var is = new net.Stream(fds[0]);
+var ims = new msgpack.Stream(is);
+var os = new net.Stream(fds[1]);
+var oms = new msgpack.Stream(os);
+
+var msgsReceived = 0;
+ims.addListener('msg', function(m) {
+ assert.deepEqual(m, MSGS[msgsReceived]);
+
+ if (++msgsReceived == MSGS.length) {
+ is.end();
+ os.end();
+ }
+});
+is.resume();
+
+MSGS.forEach(function (m) {
+ oms.send(m);
+});
Please sign in to comment.
Something went wrong with that request. Please try again.