Skip to content

Commit

Permalink
Add reader schema option to block decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
mtth committed Nov 23, 2017
1 parent 102e78c commit a1a4748
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 8 deletions.
20 changes: 13 additions & 7 deletions lib/containers.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ function BlockDecoder(opts) {
readableObjectMode: !noDecode
});

this._type = null;
this._rType = opts.readerSchema !== undefined ?
types.Type.forSchema(opts.readerSchema) :
undefined;
this._wType = null;
this._codecs = opts.codecs;
this._codec = undefined;
this._parseHook = opts.parseHook;
Expand Down Expand Up @@ -187,15 +190,15 @@ BlockDecoder.prototype._decodeHeader = function () {
if (this._parseHook) {
schema = this._parseHook(schema);
}
this._type = types.Type.forSchema(schema);
this._wType = types.Type.forSchema(schema);
} catch (err) {
this.emit('error', err);
return;
}

this._readValue = createReader(this._noDecode, this._type);
this._readValue = createReader(this._noDecode, this._wType, this._rType);
this._syncMarker = header.sync;
this.emit('metadata', this._type, this._codec, header);
this.emit('metadata', this._wType, this._codec, header);
return true;
};

Expand Down Expand Up @@ -555,17 +558,20 @@ function tryReadBlock(tap) {
}

/** Create bytes consumer, either reading or skipping records. */
function createReader(noDecode, type) {
function createReader(noDecode, writerType, readerType) {
if (noDecode) {
return (function (skipper) {
return function (tap) {
var pos = tap.pos;
skipper(tap);
return tap.buf.slice(pos, tap.pos);
};
})(type._skip);
})(writerType._skip);
} else if (readerType) {
var resolver = readerType.createResolver(writerType);
return function (tap) { return resolver._read(tap); };
} else {
return function (tap) { return type._read(tap); };
return function (tap) { return writerType._read(tap); };
}
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "avsc",
"version": "5.1.0",
"version": "5.1.1",
"description": "Avro for JavaScript",
"homepage": "https://github.com/mtth/avsc",
"keywords": [
Expand Down
42 changes: 42 additions & 0 deletions test/test_containers.js
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,48 @@ suite('containers', function () {
}
});

test('reader type', function (cb) {
var t1 = Type.forSchema({
name: 'Person',
type: 'record',
fields: [
{name: 'name', type: 'string'},
]
});
var t2 = Type.forSchema({
name: 'Person',
type: 'record',
fields: [
{name: 'name', type: 'string'},
{name: 'fullName', aliases: ['name'], type: ['null', 'string']},
{name: 'age', type: ['null', 'int'], 'default': null}
]
});
var persons = [];
var encoder = new streams.BlockEncoder(t1);
var decoder = new streams.BlockDecoder({readerSchema: t2})
.on('data', function (val) { persons.push(val); })
.on('end', function () {
assert.deepEqual(
persons,
[
{name: 'Ann', fullName: 'Ann', age: null},
{name: 'Jane', fullName: 'Jane', age: null}
]
);
cb();
});
encoder.pipe(decoder);
encoder.write({name: 'Ann'})
encoder.write({name: 'Jane'})
encoder.end();

function parseHook(schema) {
assert.deepEqual(schema, t1.getSchema());
return t2;
}
});

});

});

0 comments on commit a1a4748

Please sign in to comment.