Skip to content

Commit

Permalink
Add encoder stream event for type errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mtth committed Dec 13, 2019
1 parent 30fb2d8 commit e1e1a56
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
9 changes: 6 additions & 3 deletions lib/containers.js
Expand Up @@ -327,10 +327,12 @@ function RawEncoder(schema, opts) {
try {
this._type._write(tap, val);
} catch (err) {
this.emit('error', err);
this.emit('typeError', err, val, this._type);
}
};
this._tap = new Tap(utils.newBuffer(opts.batchSize || 65536));

this.on('typeError', function (err) { this.emit('error', err); });
}
util.inherits(RawEncoder, stream.Transform);

Expand Down Expand Up @@ -404,7 +406,7 @@ function BlockEncoder(schema, opts) {
try {
this._type._write(tap, val);
} catch (err) {
this.emit('error', err);
this.emit('typeError', err, val, this._type);
return false;
}
return true;
Expand Down Expand Up @@ -457,6 +459,8 @@ function BlockEncoder(schema, opts) {
this.push(null);
}
});

this.on('typeError', function (err) { this.emit('error', err); });
}
util.inherits(BlockEncoder, stream.Duplex);

Expand Down Expand Up @@ -494,7 +498,6 @@ BlockEncoder.prototype._write = function (val, encoding, cb) {
var flushing = false;

if (this._writeValue(tap, val)) {
// The value was valid
if (!tap.isValid()) {
if (pos) {
this._flushChunk(pos, cb);
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "avsc",
"version": "5.4.17",
"version": "5.4.18",
"description": "Avro for JavaScript",
"homepage": "https://github.com/mtth/avsc",
"keywords": [
Expand Down
21 changes: 21 additions & 0 deletions test/test_containers.js
Expand Up @@ -711,6 +711,27 @@ suite('containers', function () {
encoder.end();
});

test('custom type error handler', function (cb) {
var okVals = [];
var badVals = [];
var encoder = new streams.BlockEncoder('int')
.removeAllListeners('typeError')
.on('typeError', function (err, val) { badVals.push(val); });
var decoder = new streams.BlockDecoder()
.on('data', function (val) { okVals.push(val); })
.on('end', function () {
assert.deepEqual(okVals, [1, 2]);
assert.deepEqual(badVals, ['foo', 5.4]);
cb();
});
encoder.pipe(decoder);
encoder.write('foo');
encoder.write(1);
encoder.write(2);
encoder.write(5.4);
encoder.end();
});

test('metadata', function (cb) {
var t = Type.forSchema('string');
var buf = t.toBuffer('hello');
Expand Down

0 comments on commit e1e1a56

Please sign in to comment.