Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
Convert Reader to bufrw
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua T Corbin committed Mar 2, 2015
1 parent 3a56083 commit 58a4444
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 60 deletions.
55 changes: 16 additions & 39 deletions node/test/v2/test_frame.js
Expand Up @@ -20,57 +20,34 @@

'use strict';

var TypedError = require("error/typed");
var read = require('../../lib/read.js');
var write = require('../../lib/write.js');

var SizeMismatchError = TypedError({
type: 'test-frame.size-mismatch',
message: 'size ({size}) mismatches buffer length ({bufferLength})',
size: null,
bufferLength: null
});
var bufrw = require('bufrw');

module.exports = TestFrame;

// size:1 payload~1

function TestFrame(payload) {
if (!(this instanceof TestFrame)) {
return new TestFrame(payload);
}
var self = this;
self.size = 0;
self.payload = payload;
}

TestFrame.read = read.chained(read.series([
read.UInt8, // size:1
read.buf1 // payload~1
]), function(results, buffer, offset) {
var size = results[0];
var payload = results[1];
if (size !== buffer.length) {
// parser shouldn't let this happen
return [SizeMismatchError({
size: size,
bufferLength: buffer.length
}), offset, null];
}
var body = new TestFrame(payload);
return [null, offset, body];
});

TestFrame.prototype.write = function writeTestBody() {
var self = this;
var payload = write.buf1(self.payload);
var size = write.UInt8(1 + payload.length);
return write.series([
size, // size:1
payload // payload~1
]);
};
// size:1 payload~1
TestFrame.struct = bufrw.Struct(TestFrame, [
{call: {
byteLength: function byteLength(frame) {
var res = bufrw.buf1.byteLength(frame.payload);
if (res.err) return res;
frame.size = 1 + res.length;
return bufrw.LengthResult.just(0);
}
}},
{name: 'size', rw: bufrw.UInt8}, // size:1
{name: 'payload', rw: bufrw.buf1} // payload~1
]);

TestFrame.prototype.toBuffer = function toBuffer() {
var self = this;
return self.write().create();
return bufrw.toBuffer(TestFrame.struct, self);
};
56 changes: 35 additions & 21 deletions node/v2/reader.js
Expand Up @@ -24,17 +24,19 @@ var TypedError = require('error/typed');
var inherits = require('util').inherits;
var Transform = require('stream').Transform;
var ParseBuffer = require('./parse_buffer');
var bufrw = require('bufrw');

var BrokenReaderStateError = TypedError({
type: 'tchannel.broken-reader-state',
message: 'reader in invalid state {state}',
state: null
var ZeroLengthFrameError = TypedError({
type: 'tchannel.zero-length-frame',
message: 'zero length frame encountered'
});

var ShortChunkRead = TypedError({
type: 'tchannel.short-chunk-read',
message: "didn't consume {remaining} bytes from incomnig chunk buffer",
remaining: null
var BrokenReaderStateError = TypedError({
type: 'tchannel.broken-reader-state',
message: 'reader in invalid state {state} expecting {expecting} avail {aval}',
state: null,
expecting: null,
avail: null
});

var TruncatedReadError = TypedError({
Expand Down Expand Up @@ -93,16 +95,35 @@ ChunkReader.prototype._transform = function _transform(chunk, encoding, callback
switch (self.state) {
case States.PendingLength:
self.expecting = self._readLength();
self.state = States.Seeking;
if (!self.expecting) {
self.emit('error', ZeroLengthFrameError());
self.buffer.shift(self.frameLengthSize);
self.expecting = self.frameLengthSize;
self.state = States.PendingLength;
} else {
self.state = States.Seeking;
}
break;
case States.Seeking:
var frameChunk = self.buffer.shift(self.expecting);
if (!frameChunk.length) {
callback(BrokenReaderStateError({
state: self.state,
expecting: self.expecting,
avail: self.buffer.avail()
}));
return;
}
self.handleFrame(frameChunk);
self.expecting = self.frameLengthSize;
self.state = States.PendingLength;
break;
default:
callback(BrokenReaderStateError({state: self.state}));
callback(BrokenReaderStateError({
state: self.state,
expecting: self.expecting,
avail: self.buffer.avail()
}));
return;
}
}
Expand Down Expand Up @@ -135,18 +156,11 @@ ChunkReader.prototype.handleFrame = function handleFrame(chunk, callback) {
if (!callback) {
callback = emitFrame;
}
var res = self.FrameType.read(chunk, 0);
var err = res[0];
var end = res[1];
var frame = res[2];
if (!err && end < chunk.length) {
// NOTE redundant with check in Frame.read
err = ShortChunkRead({remaining: chunk.length - end});
}
var tup = bufrw.fromBufferTuple(self.FrameType.struct, chunk);
var err = tup[0];
var frame = tup[1];
if (err) {
err.offset = end;
err.buffer = chunk;
callback(err);
callback(err, frame);
} else {
callback(null, frame);
}
Expand Down

0 comments on commit 58a4444

Please sign in to comment.