Permalink
Browse files

Add Stream

  • Loading branch information...
1 parent bfc0c2c commit 0ce9f66e02570ed3ae2be3529fb3745305fafbf4 @josh josh committed with Sep 28, 2010
Showing with 78 additions and 0 deletions.
  1. +43 −0 lib/ns.js
  2. +35 −0 test/test.js
View
@@ -2,6 +2,8 @@
var assert = require('assert');
var Buffer = require('buffer').Buffer;
+var events = require('events');
+var sys = require('sys');
// Get the length of the netstring payload (i.e. excluding header and footer)
// pointed to by Buffer or String 'buf'. Returns -1 if the buffer is
@@ -172,3 +174,44 @@ var nsWrite = function(pay, payStart, payEnd, buf, bufOff) {
}
};
exports.nsWrite = nsWrite;
+
+var Stream = function(s) {
+ var self = this;
+
+ events.EventEmitter.call(self);
+
+ self.buf = null;
+
+ s.addListener('data', function(d) {
+ if (self.buf) {
+ var b = new Buffer(self.buf.length + d.length);
+ self.buf.copy(b, 0, 0, self.buf.length);
+ d.copy(b, self.buf.length, 0, d.length);
+
+ self.buf = b;
+ } else {
+ self.buf = d;
+ }
+
+ while (self.buf && self.buf.length > 0) {
+ try {
+ pay = nsPayload(self.buf);
+
+ if (pay == -1) {
+ break;
+ }
+
+ var nsLen = nsWriteLength(pay.length);
+ self.buf = self.buf.slice(nsLen, self.buf.length);
+
+ self.emit('data', pay);
+ } catch (exception) {
+ self.emit('error', exception);
+ break;
+ }
+ }
+ });
+}
+
+sys.inherits(Stream, events.EventEmitter);
+exports.Stream = Stream;
View
@@ -2,6 +2,7 @@
var at = require('./async_testing');
var Buffer = require('buffer').Buffer;
+var events = require('events');
var ns = require('../lib/ns');
(function() {
@@ -158,3 +159,37 @@ var ns = require('../lib/ns');
ts.runTests();
})();
+
+(function() {
+ var ts = new at.TestSuite('Stream');
+
+ ts.addTests({
+ 'simple' : function(as) {
+ var is = new events.EventEmitter();
+ var ins = new ns.Stream(is);
+
+ var MSGS = [
+ "abc",
+ "hello world!",
+ "café",
+ "a",
+ "b",
+ "c"
+ ];
+
+ var msgsReceived = 0;
+ ins.addListener('data', function(d) {
+ as.equal(d.toString(), MSGS[msgsReceived]);
+ msgsReceived++;
+ });
+
+ is.emit('data', new Buffer("3:abc,"));
+ is.emit('data', new Buffer("12:hello"));
+ is.emit('data', new Buffer(" world!,"));
+ is.emit('data', new Buffer("5:café,"));
+ is.emit('data', new Buffer("1:a,1:b,1:c,"));
+ }
+ });
+
+ ts.runTests();
+})();

0 comments on commit 0ce9f66

Please sign in to comment.