Skip to content

Commit

Permalink
Remove frame decoder and encoder.
Browse files Browse the repository at this point in the history
Streams aren't usable in the browser (which would have been half the
use-cases for this). Removing them for now (to not have to support them
all through 2.0), there might be a more uniform way to implement this
functionality.
  • Loading branch information
mtth committed Oct 13, 2015
1 parent 26e42da commit 1c65ae4
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 310 deletions.
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -28,8 +28,8 @@ Libraries compared:

These rates are for decoding a [realistic record schema][coupon-schema],
modeled after a popular open-source API. Encoding rates are slightly lower but
relative rates across libraries remain similar. You can find the raw numbers
and more details on the [benchmarks page][benchmarks].
ratios across libraries are similar. You can find the raw numbers and more
details on the [benchmarks page][benchmarks].


## Installation
Expand All @@ -44,7 +44,7 @@ including `0.11`.

## Documentation

+ [Quickstart](https://github.com/mtth/avsc/wiki/Quickstart)
+ [Overview](https://github.com/mtth/avsc/wiki)
+ [API](https://github.com/mtth/avsc/wiki/API)
+ [Advanced usage](https://github.com/mtth/avsc/wiki/Advanced-usage)

Expand All @@ -62,7 +62,7 @@ var avsc = require('avsc');
```javascript
var type = avsc.parse('Person.avsc'); // Load schema from a file.
var buf = type.toBuffer({name: 'Ann', age: 25}); // Serialize an object.
var obj = type.fromBuffer(buf); // == {name: 'Ann', age: 25}
var obj = type.fromBuffer(buf); // {name: 'Ann', age: 25}
```

+ Generate random instances from a schema:
Expand Down
2 changes: 1 addition & 1 deletion doc
Submodule doc updated from 479ba8 to 46e86d
159 changes: 2 additions & 157 deletions lib/streams.js
@@ -1,5 +1,7 @@
/* jshint node: true */

// TODO: Add `readerType` option for `RawDecoder` and `BlockDecoder`.

'use strict';


Expand Down Expand Up @@ -46,11 +48,6 @@ var f = util.format;
/**
* Duplex stream for decoding fragments.
*
* @param opts {Object}
*
* + type
* + decode
*
*/
function RawDecoder(type, opts) {
checkIsType(type);
Expand Down Expand Up @@ -103,100 +100,9 @@ RawDecoder.prototype._read = function () {
};


/**
* Duplex stream for decoding frames.
*
* @param opts {Object}
*
* + type
* + decode
*
*/
function FrameDecoder(type, opts) {
checkIsType(type);
opts = opts || {};

var decode = opts.decode === undefined ? true : !!opts.decode;
stream.Duplex.call(this, {
readableObjectMode: decode,
allowHalfOpen: false
});

this._type = type;
this._tap = new Tap(new Buffer(0));
this._decode = decode;
this._needPush = false;
this._frameLength = -1;
this._finished = false;

this.on('finish', function () {
this._finished = true;
this._read();
});
}
util.inherits(FrameDecoder, stream.Duplex);

FrameDecoder.prototype._write = function (chunk, encoding, cb) {
var tap = this._tap;
tap.buf = Buffer.concat([tap.buf.slice(tap.pos), chunk]);
tap.pos = 0;
if (this._needPush) {
this._needPush = false;
this._read();
}
cb();
};

FrameDecoder.prototype._read = function () {
var tap = this._tap;
var buf = tap.buf;
var pos = tap.pos;
var len = buf.length;

if (this._frameLength < 0) {
if (this._finished) {
this.push(null);
return;
}
if (pos + 4 > len) {
this._needPush = true;
return;
}
tap.pos += 4;
this._frameLength = buf.readUIntBE(pos, 4);
}

if (!this._frameLength) {
this.push(null); // This will close the writable side as well.
return;
}

if (tap.pos + this._frameLength > len) {
this._needPush = true;
return;
}

if (this._decode) {
this._frameLength = -1;
this.push(this._type._read(tap));
} else {
pos = tap.pos;
tap.pos += this._frameLength;
this._frameLength = -1;
this.push(buf.slice(pos, tap.pos));
}
};


/**
* Duplex stream for decoding object container files.
*
* @param opts {Object}
*
* + typeOpts
* + decode
* + codecs
*
*/
function BlockDecoder(opts) {
opts = opts || {};
Expand Down Expand Up @@ -348,12 +254,6 @@ BlockDecoder.prototype._read = function () {
/**
* Duplex stream for encoding.
*
* @param type
* @param opts {Object}
*
* + batchSize
* + noCheck
*
*/
function RawEncoder(type, opts) {
checkIsType(type);
Expand Down Expand Up @@ -409,58 +309,6 @@ RawEncoder.prototype._flush = function (cb) {
};


/**
* Duplex streams which encodes and frames each object.
*
* This is useful particularly for Avro protocol messages:
* http://avro.apache.org/docs/current/spec.html#Message+Framing
*
* Currently each frame corresponds to a single encoded object.
*
*/
function FrameEncoder(type, opts) {
checkIsType(type);
opts = opts || {};

stream.Transform.call(this, {
writableObjectMode: true,
allowHalfOpen: false
});

this._type = type;
this._noCheck = opts.noCheck || false;
this._frameSize = opts.frameSize || 1024;
this._tap = new Tap(new Buffer(this._frameSize));
}
util.inherits(FrameEncoder, stream.Transform);

FrameEncoder.prototype._transform = function (obj, encoding, cb) {
if (!this._noCheck && !this._type.isValid(obj)) {
cb(new Error(f('invalid object: %j', obj)));
return;
}

var tap = this._tap;
tap.pos = 4;
this._type._write(tap, obj);
if (!tap.isValid()) {
this._frameSize = 2 * tap.pos;
tap.buf = new Buffer(this._frameSize);
tap.pos = 4;
this._type._write(tap, obj);
}
tap.buf.writeUIntBE(tap.pos - 4, 0, 4);
cb(null, copyBuffer(tap.buf, 0, tap.pos));
};

FrameEncoder.prototype._flush = function (cb) {
var buf = new Buffer(4);
buf.fill(0);
this.push(buf);
cb();
};


/**
* Duplex stream to write object container files.
*
Expand Down Expand Up @@ -611,7 +459,6 @@ BlockEncoder.prototype._createBlockCallback = function () {
};



// Helpers.

/**
Expand Down Expand Up @@ -692,10 +539,8 @@ module.exports = {
MAGIC_BYTES: MAGIC_BYTES,
streams: {
RawDecoder: RawDecoder,
FrameDecoder: FrameDecoder,
BlockDecoder: BlockDecoder,
RawEncoder: RawEncoder,
FrameEncoder: FrameEncoder,
BlockEncoder: BlockEncoder
}
};
148 changes: 0 additions & 148 deletions test/test_streams.js
Expand Up @@ -176,154 +176,6 @@ suite('streams', function () {

});

suite('FrameEncoder', function () {

var FrameEncoder = streams.FrameEncoder;
var END = new Buffer([0, 0, 0, 0]);

test('write once', function (cb) {
var t = fromSchema('int');
var bufs = [];
var encoder = new FrameEncoder(t)
.on('data', function (chunk) {
bufs.push(chunk);
})
.on('end', function () {
assert.deepEqual(bufs, [new Buffer([0, 0, 0, 1, 2]), END]);
cb();
});
encoder.end(1);
});

test('write multiple', function (cb) {
var t = fromSchema('int');
var bufs = [];
var encoder = new FrameEncoder(t)
.on('data', function (chunk) {
bufs.push(chunk);
})
.on('end', function () {
assert.deepEqual(
bufs,
[
new Buffer([0, 0, 0, 1, 2]),
new Buffer([0, 0, 0, 2, 128, 1]),
END
]
);
cb();
});
encoder.write(1);
encoder.end(64);
});

test('resize', function (cb) {
var t = fromSchema({type: 'fixed', name: 'A', size: 2});
var data = new Buffer([48, 18]);
var bufs = [];
var encoder = new FrameEncoder(t, {frameSize: 1})
.on('data', function (chunk) {
bufs.push(chunk);
})
.on('end', function () {
assert.deepEqual(bufs, [new Buffer([0, 0, 0, 2, 48, 18]), END]);
cb();
});
encoder.write(data);
encoder.end();
});

test('empty', function (cb) {
var t = fromSchema('int');
var chunks = [];
var encoder = new FrameEncoder(t, {batchSize: 2})
.on('data', function (chunk) { chunks.push(chunk); })
.on('end', function () {
assert.deepEqual(chunks, [END]);
cb();
});
encoder.end();
});

test('missing writer type', function () {
assert.throws(function () { new FrameEncoder(); });
});

test('invalid object', function (cb) {
var t = fromSchema('int');
var encoder = new FrameEncoder(t)
.on('error', function () { cb(); });
encoder.write('hi');
});

});

suite('FrameDecoder', function () {

var FrameDecoder = streams.FrameDecoder;

test('single item', function (cb) {
var t = fromSchema('int');
var objs = [];
var decoder = new FrameDecoder(t)
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, [0]);
cb();
});
decoder.end(new Buffer([0, 0, 0, 1, 0, 0, 0, 0, 0]));
});

test('no writer type', function () {
assert.throws(function () { new FrameDecoder(); });
});

test('decoding no trailing', function (cb) {
var t = fromSchema('int');
var objs = [];
var decoder = new FrameDecoder(t)
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, [1, 2]);
cb();
});
decoder.write(new Buffer([0, 0, 0, 1, 2]));
decoder.end(new Buffer([0, 0, 0, 1, 4]));
});

test('no decoding', function (cb) {
var t = fromSchema('int');
var bufs = [new Buffer([3]), new Buffer([124, 5])];
var objs = [];
var decoder = new FrameDecoder(t, {decode: false})
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, bufs);
cb();
});
decoder.write(new Buffer([0, 0, 0, 1]));
decoder.write(bufs[0]);
decoder.write(new Buffer([0, 0, 0, 2]));
decoder.end(bufs[1]);
});

test('write partial', function (cb) {
var t = fromSchema('bytes');
var objs = [];
var decoder = new FrameDecoder(t)
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, [new Buffer([1])]);
cb();
});
decoder.write(new Buffer([0, 0]));
decoder.write(new Buffer([0, 2, 2]));
// Let the first read go through (and return null).
process.nextTick(function () { decoder.end(new Buffer([1])); });
});

});

suite('BlockEncoder', function () {

var BlockEncoder = streams.BlockEncoder;
Expand Down

0 comments on commit 1c65ae4

Please sign in to comment.