Skip to content

Commit

Permalink
New feature: an error indicator. Fixes #136. Thx, AK!
Browse files Browse the repository at this point in the history
  • Loading branch information
uhop committed May 30, 2023
1 parent ac8c975 commit 2854b16
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 0 deletions.
45 changes: 45 additions & 0 deletions jsonl/Parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,29 @@ class JsonlParser extends Utf8Stream {
return new JsonlParser(options);
}

static checkedParse(input, reviver, errorIndicator) {
try {
return JSON.parse(input, reviver);
} catch (error) {
if (typeof errorIndicator == 'function') return errorIndicator(error);
}
return errorIndicator;
}

constructor(options) {
super(Object.assign({}, options, {readableObjectMode: true}));
this._rest = '';
this._counter = 0;
this._reviver = options && options.reviver;
this._errorIndicator = options && options.errorIndicator;
if (options && options.checkErrors) {
this._processBuffer = this._checked_processBuffer;
this._flush = this._checked_flush;
}
if (options && 'errorIndicator' in options) {
this._processBuffer = this._suppressed_processBuffer;
this._flush = this._suppressed_flush;
}
}

_processBuffer(callback) {
Expand Down Expand Up @@ -43,6 +57,37 @@ class JsonlParser extends Utf8Stream {
});
}

_suppressed_processBuffer(callback) {
const lines = this._buffer.split('\n');
this._rest += lines[0];
if (lines.length > 1) {
if (this._rest) {
const value = JsonlParser.checkedParse(this._rest, this._reviver, this._errorIndicator);
value !== undefined && this.push({key: this._counter++, value});
}
this._rest = lines.pop();
for (let i = 1; i < lines.length; ++i) {
if (!lines[i]) continue;
const value = JsonlParser.checkedParse(lines[i], this._reviver, this._errorIndicator);
value !== undefined && this.push({key: this._counter++, value});
}
}
this._buffer = '';
callback(null);
}

_suppressed_flush(callback) {
super._flush(error => {
if (error) return callback(error);
if (this._rest) {
const value = JsonlParser.checkedParse(this._rest, this._reviver, this._errorIndicator);
value !== undefined && this.push({key: this._counter++, value});
this._rest = '';
}
callback(null);
});
}

_checked_processBuffer(callback) {
const lines = this._buffer.split('\n');
this._rest += lines[0];
Expand Down
60 changes: 60 additions & 0 deletions tests/test_jsonl.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ unit.add(module, [
})
);
},

// check errors

function test_jsonl_invalid_json_end_fail(t) {
const async = t.startAsync('test_jsonl_invalid_json_end_fail');

Expand Down Expand Up @@ -236,4 +239,61 @@ unit.add(module, [

readString('{}\n]\n1').pipe(stream);
},

// suppress errors

function test_jsonl_skip_errors(t) {
const async = t.startAsync('test_jsonl_skip_errors');

const stream = parser({errorIndicator: undefined}),
result = [];

stream.on('data', data => result.push(data));
stream.on('error', err => {
eval(t.TEST("!'We shouldn't be here.'"));
async.done();
});
stream.on('end', value => {
eval(t.TEST("t.unify(result, [{key: 0, value: 1}, {key: 1, value: 2}, {key:2, value: 3}])"));
async.done();
});

readString('{\n1\n]\n2\n3').pipe(stream);
},
function test_jsonl_replace_errors(t) {
const async = t.startAsync('test_jsonl_replace_errors');

const stream = parser({errorIndicator: null}),
result = [];

stream.on('data', data => result.push(data));
stream.on('error', err => {
eval(t.TEST("!'We shouldn't be here.'"));
async.done();
});
stream.on('end', value => {
eval(t.TEST("t.unify(result, [{key: 0, value: null}, {key: 1, value: 1}, {key: 2, value: null}, {key: 3, value: 2}, {key:4, value: 3}])"));
async.done();
});

readString('{\n1\n]\n2\n3').pipe(stream);
},
function test_jsonl_transform_errors(t) {
const async = t.startAsync('test_jsonl_transform_errors');

const stream = parser({errorIndicator: error => error.name}),
result = [];

stream.on('data', data => result.push(data));
stream.on('error', err => {
eval(t.TEST("!'We shouldn't be here.'"));
async.done();
});
stream.on('end', value => {
eval(t.TEST("t.unify(result, [{key: 0, value: 'SyntaxError'}, {key: 1, value: 1}, {key: 2, value: 'SyntaxError'}, {key: 3, value: 2}, {key:4, value: 3}])"));
async.done();
});

readString('{\n1\n]\n2\n3').pipe(stream);
}
]);

1 comment on commit 2854b16

@ak--47
Copy link
Contributor

@ak--47 ak--47 commented on 2854b16 Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epic! <3

Please sign in to comment.