Skip to content

Commit

Permalink
change to emit framed buffer instead of arguments
Browse files Browse the repository at this point in the history
this takes a 50% hit on performance.. but much more elegant in terms
of not having a stupid object Stream, making mux/demux easier as well,
and now you can just do new Message(buf) to unpack. im sure we can
improve the throughput here some more
  • Loading branch information
tj committed Oct 30, 2013
1 parent 19a6998 commit 4859687
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
21 changes: 10 additions & 11 deletions lib/stream.js
Expand Up @@ -15,8 +15,6 @@ module.exports = Parser;
/**
* Initialize parser.
*
* TODO: remove recursion
*
* @param {Options} [opts]
* @api public
*/
Expand Down Expand Up @@ -45,7 +43,8 @@ Parser.prototype._write = function(chunk, encoding, fn){
this.version = meta >> 4;
this.argv = meta & 0xf;
this.state = 'arglen';
this._args = [];
this._bufs = [new Buffer([meta])];
this._nargs = 0;
this._leni = 0;
break;

Expand All @@ -55,8 +54,11 @@ Parser.prototype._write = function(chunk, encoding, fn){
// done
if (2 == this._leni) {
this._arglen = this._lenbuf.readUInt16BE();
var buf = new Buffer(2);
buf[0] = this._lenbuf[0];
buf[1] = this._lenbuf[1];
this._bufs.push(buf);
this._argcur = 0;
this._argbuf = [];
this.state = 'arg';
}
break;
Expand All @@ -72,22 +74,19 @@ Parser.prototype._write = function(chunk, encoding, fn){

// slice arg chunk
var part = chunk.slice(i, pos);
this._argbuf.push(part);
this._bufs.push(part);

// check if we have the complete arg
this._argcur += pos - i;
var done = this._argcur == this._arglen;
i = pos - 1;

if (done) {
var arg = Buffer.concat(this._argbuf);
this._args.push(arg);
}
if (done) this._nargs++;

// no more args
if (this._args.length == this.argv) {
if (this._nargs == this.argv) {
this.state = 'message';
this.emit('data', this._args);
this.emit('data', Buffer.concat(this._bufs));
break;
}

Expand Down
16 changes: 9 additions & 7 deletions test/stream.js
Expand Up @@ -11,24 +11,26 @@ describe('amp.Stream', function(){

var n = 0;

stream.on('data', function(msg){
stream.on('data', function(buf){
var msg = amp.decode(buf).map(function(b){ return b.toString(); });

switch (n++) {
case 0:
msg.should.have.length(1);
msg[0].toString().should.equal('tobi');
msg[0].should.equal('tobi');
break;

case 1:
msg.should.have.length(2);
msg[0].toString().should.equal('loki');
msg[1].toString().should.equal('abby');
msg[0].should.equal('loki');
msg[1].should.equal('abby');
break;

case 2:
msg.should.have.length(3);
msg[0].toString().should.equal('manny');
msg[1].toString().should.equal('luna');
msg[2].toString().should.equal('ewald');
msg[0].should.equal('manny');
msg[1].should.equal('luna');
msg[2].should.equal('ewald');
done();
break;
}
Expand Down

0 comments on commit 4859687

Please sign in to comment.