Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tilfin/kinesis-stream-lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
tilfin committed Feb 20, 2018
2 parents 24ef32e + 9e66814 commit 05c3291
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: node_js
node_js:
- "4.3"
- "6.0"
- "6.10"
- "8"
script: 'make test-cov'
after_success: 'make coveralls; make clean'
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ kinesis-stream-lambda
=====================

[![NPM Version][npm-image]][npm-url]
[![Node](https://img.shields.io/node/v/kinesis-stream-lambda.svg)]()
[![Build Status](https://travis-ci.org/tilfin/kinesis-stream-lambda.svg?branch=master)](https://travis-ci.org/tilfin/kinesis-stream-lambda)
[![Coverage Status](https://coveralls.io/repos/github/tilfin/kinesis-stream-lambda/badge.svg?branch=master)](https://coveralls.io/github/tilfin/kinesis-stream-lambda?branch=master)

Expand All @@ -10,7 +11,7 @@ kinesis-stream-lambda
* Easily reads a Lambda event of Kinesis Stream as a stream handling the chunk as Buffer
* Supports KPL aggregation (set opts.isAgg true)
* Provides KSL.parseJSON transform to handle items expanded array data in one record (set opts.expandArray true)
* Node.js 4.3 or Later
* Node.js 6.10 or Later

## How to install

Expand Down
59 changes: 51 additions & 8 deletions lib/json_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const Transform = require('stream').Transform;

const JSONUnexpectedTokenRe = /^Unexpected token (.) in JSON at position (\d+)$/;

/**
* JSONTransform
*/
Expand All @@ -26,19 +28,27 @@ class JSONTransform extends Transform {
this._countBy = countBy;
this._expanding = expanding;
this._items = [];
this._buf = '';
}

_transform(chunk, encoding, cb) {
this._buf += chunk.toString();

try {
const data = JSON.parse(chunk);
if (this._expanding && (data instanceof Array)) {
this._items = this._items.concat(data);
} else {
this._items.push(data);
}
const data = JSON.parse(this._buf);
this._pushItemsFromData(data);
this._buf = '';
} catch (err) {
cb(err, null);
return;
if (!(err instanceof SyntaxError)) {
return cb(err);
}

const r = this._rescueJSONError(err);
if (r === 'next') {
return cb();
} else if (r === 'error') {
return cb(err);
}
}

while (this._items.length >= this._countBy) {
Expand All @@ -47,6 +57,39 @@ class JSONTransform extends Transform {
cb();
}

_rescueJSONError(err) {
const errMsg = err.message;
if (errMsg === 'Unexpected end of JSON input') {
// JSON unfinished
return 'next';
}

const md = JSONUnexpectedTokenRe.exec(errMsg);
if (md) {
const pos = Number(md[2]);
if (md[1] === '{') {
// another JSON follows
const data = JSON.parse(this._buf.substr(0, pos));
this._pushItemsFromData(data);
this._buf = this._buf.substr(pos);
return 'continue';
} else if (this._buf.substr(pos - 1, 1) === '"') {
// JSON unfinished
return 'next';
}
}

return 'error';
}

_pushItemsFromData(data) {
if (this._expanding && (data instanceof Array)) {
this._items = this._items.concat(data);
} else {
this._items.push(data);
}
}

_flush(cb) {
const items = this._items;

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kinesis-stream-lambda",
"version": "0.7.2",
"version": "0.8.0",
"description": "Readable stream in Lambda for Kinesis Stream",
"main": "lib/index.js",
"scripts": {
Expand Down
3 changes: 3 additions & 0 deletions test/fixtures/data/multiline_json.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"color":"red","value":"#f00"}
{"color":"green","value":"#0f0"}
{"color":"blue","value":"#00f"}
21 changes: 21 additions & 0 deletions test/json_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,27 @@ describe('JSONTransform', () => {
});
});

context('passed multiline JSON', () => {
const sharedExamples = function(highWaterMark, errMsg) {
it(`flush valid items through '${errMsg}'`, (done) => {
const readStream = fs.createReadStream(__dirname + '/fixtures/data/multiline_json.txt', { highWaterMark });
const jsonStream = new JSONTransform()
const writeStream = es.writeArray(function (err, array) {
assert.deepEqual(array[0], { color: "red", value: "#f00" });
assert.deepEqual(array[1], { color: "green", value: "#0f0" });
assert.deepEqual(array[2], { color: "blue", value: "#00f" });
done();
});

readStream.pipe(jsonStream).pipe(writeStream);
});
}

sharedExamples(4, 'Unexpected token A in JSON at position');
sharedExamples(25, 'Unexpected end of JSON input');
sharedExamples(36, 'Unexpected token { in JSON at position');
});

context('passed invalid JSON', () => {
it('raises error event', (done) => {
const readStream = fs.createReadStream(__dirname + '/fixtures/data/invalid_json.txt');
Expand Down

0 comments on commit 05c3291

Please sign in to comment.