Skip to content
Browse files

[pull #39, issue #34] Merge streaming from fkuo (with some improvements)

  • Loading branch information...
2 parents 1f1c9be + c3e5f9e commit 61b1839136e8cade1af9dec6909f283d07628b9c @trentm committed Oct 16, 2012
View
297 lib/jsontool.js
@@ -12,6 +12,7 @@ var pathlib = require('path');
var vm = require('vm');
var fs = require('fs');
var warn = console.warn;
+var EventEmitter = require('events').EventEmitter;
@@ -395,6 +396,206 @@ function parseArgv(argv) {
}
+
+/**
+ * Streams chunks from given file paths or stdin.
+ *
+ * @param opts {Object} Parsed options.
+ * @returns {Object} An emitter that emits 'chunk', 'error', and 'end'.
+ * - `emit('chunk', chunk, [obj])` where chunk is a complete block of JSON
+ * ready to parse. If `obj` is provided, it is the already parsed
+ * JSON.
+ * - `emit('error', error)` when an underlying stream emits an error
+ * - `emit('end')` when all streams are done
+ */
+function chunkEmitter(opts) {
+ var emitter = new EventEmitter();
+ var streaming = true;
+ var chunks = [];
+ var leftover = '';
+ var finishedHeaders = false;
+
+ function stripHeaders(s) {
+ // Take off a leading HTTP header if any and pass it through.
+ while (true) {
+ if (s.slice(0,5) === "HTTP/") {
+ var index = s.indexOf('\r\n\r\n');
+ var sepLen = 4;
+ if (index == -1) {
+ index = s.indexOf('\n\n');
+ sepLen = 2;
+ }
+ if (index != -1) {
+ if (! opts.dropHeaders) {
+ emit(s.slice(0, index+sepLen));
+ }
+ var is100Continue = (s.slice(0, 21) === "HTTP/1.1 100 Continue");
+ s = s.slice(index+sepLen);
+ if (is100Continue) {
+ continue;
+ }
+ finishedHeaders = true;
+ }
+ } else {
+ finishedHeaders = true;
+ }
+ break;
+ }
+ //console.warn("XXX stripHeaders done, finishedHeaders=%s", finishedHeaders)
+ return s;
+ }
+
+ function emitChunks(block, emitter) {
+ //console.warn("XXX emitChunks start: block='%s'", block)
+ var splitter = /(})(\s*\n\s*)?({\s*")/;
+ var leftTrimmedBlock = block.trimLeft();
+ if (leftTrimmedBlock && leftTrimmedBlock[0] !== '{') {
+ // Currently (at least), only support streaming consecutive *objects*.
+ streaming = false;
+ chunks.push(block);
+ return '';
+ }
+ /* Example:
+ * > '{"a":"b"}\n{"a":"b"}\n{"a":"b"}'.split(/(})(\s*\n\s*)?({\s*")/)
+ * [ '{"a":"b"',
+ * '}',
+ * '\n',
+ * '{"',
+ * 'a":"b"',
+ * '}',
+ * '\n',
+ * '{"',
+ * 'a":"b"}' ]
+ */
+ var bits = block.split(splitter);
+ //console.warn("XXX emitChunks: bits (length %d): %j", bits.length, bits);
+ if (bits.length === 1) {
+ /*
+ * An unwanted side-effect of using a regex to find newline-separated
+ * objects *with a regex*, is that we are looking for the end of one
+ * object leading into the start of a another. That means that we
+ * can end up buffering a complete object until a subsequent one
+ * comes in. If the input stream has large delays between objects, then
+ * this is unwanted buffering.
+ *
+ * One solution would be full stream parsing of objects a la
+ * <https://github.com/creationix/jsonparse>. This would nicely also
+ * remove the artibrary requirement that the input stream be newline
+ * separated. jsonparse apparently has some issues tho, so I don't
+ * want to use it right now. It also isn't *small* so not sure I
+ * want to inline it (`json` doesn't have external deps).
+ *
+ * An alternative: The block we have so far one of:
+ * 1. some JSON that we don't support grouping (e.g. a stream of
+ * non-objects),
+ * 2. a JSON object fragment, or
+ * 3. a complete JSON object (with a possible trailing '{')
+ *
+ * If #3, then we can just emit this as a chunk right now.
+ *
+ * TODO(PERF): Try out avoiding the first more complete regex split
+ * for a presumed common case of single-line newline-separated JSON
+ * objects (e.g. a bunyan log).
+ */
+ // An object must end with '}'. This is an early out to avoid
+ // `JSON.parse` which I *presuming* is slower.
+ var trimmed = block.split(/\s*\r?\n/)[0];
+ //console.warn("XXX trimmed: '%s'", trimmed);
+ if (trimmed[trimmed.length - 1] === '}') {
+ var obj;
+ try {
+ obj = JSON.parse(block);
+ } catch (e) {
+ /* pass through */
+ }
+ if (obj !== undefined) {
+ // Emit the parsed `obj` to avoid re-parsing it later.
+ emitter.emit('chunk', block, obj);
+ block = '';
+ }
+ }
+ return block;
+ } else {
+ var n = bits.length - 2;
+ emitter.emit('chunk', bits[0] + bits[1]);
+ for (var i = 3; i < n; i += 4) {
+ emitter.emit('chunk', bits[i] + bits[i+1] + bits[i+2]);
+ }
+ return bits[n] + bits[n+1];
+ }
+ }
+
+ function addDataListener(stream) {
+ stream.on('data', function (chunk) {
+ var s = leftover + chunk;
+ if (!finishedHeaders) {
+ s = stripHeaders(s);
+ }
+ if (!finishedHeaders) {
+ leftover = s;
+ } else {
+ if (!streaming) {
+ chunks.push(chunk);
+ return;
+ }
+ leftover = emitChunks(s, emitter);
+ //console.warn("XXX leftover: '%s'", leftover)
+ }
+ });
+ }
+
+ if (opts.inputFiles.length > 0) {
+ // Stream each file in order.
+ var i = 0;
+ function addErrorListener(file) {
+ file.on('error', function (err) {
+ emitter.emit(
+ 'error',
+ format('could not read "%s": %s', opts.inputFiles[i], e)
+ );
+ });
+ }
+ function addEndListener(file) {
+ file.on('end', function () {
+ if (i < opts.inputFiles.length) {
+ var next = opts.inputFiles[i++];
+ var nextFile = fs.createReadStream(next, {encoding: 'utf8'});
+ addErrorListener(nextFile);
+ addEndListener(nextFile);
+ addDataListener(nextFile);
+ } else {
+ if (!streaming) {
+ emitter.emit('chunk', chunks.join(''));
+ } else if (leftover) {
+ leftover = emitChunks(leftover, emitter);
+ emitter.emit('chunk', leftover);
+ }
+ emitter.emit('end');
+ }
+ });
+ }
+ var first = fs.createReadStream(opts.inputFiles[i++], {encoding: 'utf8'});
+ addErrorListener(first);
+ addEndListener(first);
+ addDataListener(first);
+ } else {
+ // Streaming from stdin.
+ var stdin = process.openStdin();
+ stdin.setEncoding('utf8');
+ addDataListener(stdin);
+ stdin.on('end', function () {
+ if (!streaming) {
+ emitter.emit('chunk', chunks.join(''));
+ } else if (leftover) {
+ leftover = emitChunks(leftover, emitter);
+ emitter.emit('chunk', leftover);
+ }
+ emitter.emit('end');
+ });
+ }
+ return emitter;
+}
+
/**
* Get input from either given file paths or stdin.
*
@@ -560,14 +761,18 @@ function parseLookup(lookup, lookupDelim) {
* }
*
* @param buffer {String} The text to parse as JSON.
+ * @param obj {Object} Optional. Some streaming code paths will provide
+ * this, an already parsed JSON object. Use this to avoid reparsing.
* @param group {Boolean} Default false. If true, then non-JSON input
* will be attempted to be "arrayified" (see inline comment).
* @param merge {Boolean} Default null. Can be "shallow" or "deep". An
* attempt will be made to interpret the input as adjacent objects to
* be merged, last key wins. See inline comment for limitations.
*/
-function parseInput(buffer, group, merge) {
- if (group) {
+function parseInput(buffer, obj, group, merge) {
+ if (obj) {
+ return {datum: obj};
+ } else if (group) {
// Special case: Grouping (previously called auto-arrayification)
// of unjoined list of objects:
// {"one": 1}{"two": 2}
@@ -769,7 +974,8 @@ function emit(s) {
process.stdout.on("error", function (err) {
if (err.code === "EPIPE") {
- // Pass. See <https://github.com/trentm/json/issues/9>.
+ // See <https://github.com/trentm/json/issues/9>.
+ drainStdoutAndExit(0);
} else {
warn(err)
drainStdoutAndExit(1);
@@ -820,55 +1026,76 @@ function main(argv) {
}
var lookupStrs = opts.args;
- getInput(opts, function (err, buffer) {
- if (err) {
- warn("json: error: %s", err)
- return drainStdoutAndExit(1);
- }
-
- // Take off a leading HTTP header if any and pass it through.
- while (true) {
- if (buffer.slice(0,5) === "HTTP/") {
- var index = buffer.indexOf('\r\n\r\n');
- var sepLen = 4;
- if (index == -1) {
- index = buffer.indexOf('\n\n');
- sepLen = 2;
- }
- if (index != -1) {
- if (! opts.dropHeaders) {
- emit(buffer.slice(0, index+sepLen));
+ if (opts.group && opts.array && opts.outputMode !== OM_JSON) {
+ // streaming
+ var chunker = chunkEmitter(opts);
+ chunker.on('error', function(error) {
+ warn("json: error: %s", err);
+ return drainStdoutAndExit(1);
+ });
+ chunker.on('chunk', parseChunk);
+ } else {
+ // not streaming
+ getInput(opts, function (err, buffer) {
+ if (err) {
+ warn("json: error: %s", err)
+ return drainStdoutAndExit(1);
+ }
+ // Take off a leading HTTP header if any and pass it through.
+ while (true) {
+ if (buffer.slice(0,5) === "HTTP/") {
+ var index = buffer.indexOf('\r\n\r\n');
+ var sepLen = 4;
+ if (index == -1) {
+ index = buffer.indexOf('\n\n');
+ sepLen = 2;
}
- var is100Continue = (buffer.slice(0, 21) === "HTTP/1.1 100 Continue");
- buffer = buffer.slice(index+sepLen);
- if (is100Continue) {
- continue;
+ if (index != -1) {
+ if (! opts.dropHeaders) {
+ emit(buffer.slice(0, index+sepLen));
+ }
+ var is100Continue = (buffer.slice(0, 21) === "HTTP/1.1 100 Continue");
+ buffer = buffer.slice(index+sepLen);
+ if (is100Continue) {
+ continue;
+ }
}
}
+ break;
}
- break;
- }
+ parseChunk(buffer);
+ });
+ }
- // Expect the remainder to be JSON.
- if (! buffer.length) {
+ /**
+ * Parse a single chunk of JSON. When not streaming, this will just be
+ * called once.
+ *
+ * @param chunk {String} The JSON-encoded string.
+ * @param obj {Object} Optional. For some code paths while streaming `obj`
+ * will be provided. This is an already parsed JSON object.
+ */
+ function parseChunk(chunk, obj) {
+ // Expect the chunk to be JSON.
+ if (! chunk.length) {
return;
}
// parseInput() -> {datum: <input object>, error: <error object>}
- var input = parseInput(buffer, opts.group, opts.merge);
+ var input = parseInput(chunk, obj, opts.group, opts.merge);
if (input.error) {
// Doesn't look like JSON. Just print it out and move on.
if (! opts.quiet) {
// Use JSON-js' "json_parse" parser to get more detail on the
// syntax error.
var details = "";
- var normBuffer = buffer.replace(/\r\n|\n|\r/, '\n');
+ var normBuffer = chunk.replace(/\r\n|\n|\r/, '\n');
try {
json_parse(normBuffer);
details = input.error;
} catch(err) {
// err.at has the position. Get line/column from that.
var at = err.at - 1; // `err.at` looks to be 1-based.
- var lines = buffer.split('\n');
+ var lines = chunk.split('\n');
var line, col, pos = 0;
for (line = 0; line < lines.length; line++) {
pos += lines[line].length + 1;
@@ -887,8 +1114,8 @@ function main(argv) {
warn("json: error: input is not JSON: %s", details);
}
if (!opts.validate) {
- emit(buffer);
- if (buffer.length && buffer[buffer.length-1] !== "\n") {
+ emit(chunk);
+ if (chunk.length && chunk[chunk.length-1] !== "\n") {
emit('\n');
}
}
@@ -1078,7 +1305,7 @@ function main(argv) {
// Output `data` as is.
printDatum(data, opts, '\n', false);
}
- });
+ }
}
if (require.main === module) {
View
52 test/docstream.js
@@ -1,11 +1,49 @@
+#!/usr/bin/env node
+/*
+ * Stream out JSON objects.
+ *
+ * Usage:
+ * docstream.js [TIMES] [PERIOD] [RANDOM-CHUNKS]
+ *
+ * where:
+ * TIMES is an integer number of times to emit the JSON object. Default 10.
+ * PERIOD is the number of ms between emitted chunks. Default 1000.
+ * RANDOM-CHUNKS, if present, will result in non-complete JSON objects
+ * being emitted. Default *not* randomized.
+ */
-var sentinel = Number(process.argv[2]);
-if (isNaN(sentinel))
- sentinel = 5;
+
+function randint(min, max) {
+ if (max === undefined) {
+ max = min;
+ min = 0;
+ }
+ return (Math.floor(Math.random() * (max - min + 1)) + min);
+}
+
+
+//---- mainline
+
+var times = Number(process.argv[2]);
+if (isNaN(times))
+ times = 5;
+var period = Number(process.argv[3]);
+if (isNaN(period))
+ period = 1000;
+var randomize = (process.argv[4] !== undefined); // i.e. don't emit whole JSON objects.
+
+var leftover = '';
var interval = setInterval(function () {
- sentinel--;
- if (sentinel <= 0) {
+ times--;
+ var s = leftover + JSON.stringify({foo: "bar"}) + '\n';
+ var len = s.length; // emit the whole thing
+ if (times <= 0) {
clearInterval(interval);
+ } else if (randomize) {
+ len = randint(0, s.length);
}
- console.log(JSON.stringify({foo: "bar"}))
-}, 1000);
+ //console.warn("XXX docstream.js [%d]: emit %d chars: '%s'", times, len,
+ // s.slice(0, len));
+ process.stdout.write(s.slice(0, len));
+ leftover = s.slice(len);
+}, period);
View
1 test/stream-100-continue/cmd
@@ -0,0 +1 @@
+cat input | ../../lib/jsontool.js -ga
View
1 test/stream-100-continue/expected.exitCode
@@ -0,0 +1 @@
+0
View
13 test/stream-100-continue/expected.stdout
@@ -0,0 +1,13 @@
+HTTP/1.1 100 Continue
+
+HTTP/1.1 201 Created
+Server: nginx/0.8.53
+Date: Thu, 14 Apr 2011 09:50:35 GMT
+Content-Type: application/json
+Connection: keep-alive
+Status: 201 Created
+Content-Length: 14
+
+{
+ "foo": "bar"
+}
View
11 test/stream-100-continue/input
@@ -0,0 +1,11 @@
+HTTP/1.1 100 Continue
+
+HTTP/1.1 201 Created
+Server: nginx/0.8.53
+Date: Thu, 14 Apr 2011 09:50:35 GMT
+Content-Type: application/json
+Connection: keep-alive
+Status: 201 Created
+Content-Length: 14
+
+{"foo": "bar"}
View
12 test/stream-flat-array-input/README.md
@@ -0,0 +1,12 @@
+Test handling of a flat array of objects as input:
+
+ $ echo '{"one": 1}{"two": 1}' | json
+ [
+ {
+ "one": 1
+ },
+ {
+ "two": 1
+ }
+ ]
+
View
6 test/stream-flat-array-input/arrays-complex.input
@@ -0,0 +1,6 @@
+ ["one", 1]
+["two", 2]
+["three", 3]
+ ["four", 4]
+
+["five", 5]
View
1 test/stream-flat-array-input/arrays-on-same-line-fail.input
@@ -0,0 +1 @@
+["a", "b"] ["c", "d"]
View
1 test/stream-flat-array-input/arrays-on-same-line.input
@@ -0,0 +1 @@
+["a", "b"]["c", "d"]
View
2 test/stream-flat-array-input/arrays.input
@@ -0,0 +1,2 @@
+["a", "b"]
+["c", "d"]
View
48 test/stream-flat-array-input/cmd
@@ -0,0 +1,48 @@
+JSON="node ../../lib/jsontool.js -q --group --array"
+
+echo "# simple"
+cat objects.input | $JSON
+echo ""
+echo "# simple (-f)"
+$JSON -f objects.input
+
+echo ""
+echo "# separate"
+cat one.input two.input | $JSON
+echo ""
+echo "# separate (-f)"
+$JSON -f one.input -f two.input
+
+echo ""
+echo "# on same line"
+cat objects-on-same-line.input | $JSON
+
+echo ""
+echo "# more complex"
+cat objects-complex.input | $JSON
+
+echo ""
+echo "# simple arrays"
+cat arrays.input | $JSON
+
+echo ""
+echo "# on same line"
+cat arrays-on-same-line.input | $JSON
+
+echo ""
+echo "# more complex"
+cat arrays-complex.input | $JSON
+
+echo ""
+echo "# cat json files"
+cat dir/*.json | $JSON
+
+echo ""
+echo "# objects on same line (space FAIL)"
+cat objects-on-same-line-fail.input | $JSON
+echo ""
+echo "# arrays on same line (space FAIL)"
+cat arrays-on-same-line-fail.input | $JSON
+echo ""
+echo "# mixed objects and arrays FAIL"
+cat mixed-objects-and-arrays.input | $JSON
View
1 test/stream-flat-array-input/dir/a.json
@@ -0,0 +1 @@
+{"a": "A"}
View
1 test/stream-flat-array-input/dir/b.json
@@ -0,0 +1 @@
+{"b": "B"}
View
1 test/stream-flat-array-input/dir/c.json
@@ -0,0 +1 @@
+{"c": "C"}
View
100 test/stream-flat-array-input/expected.stdout
@@ -0,0 +1,100 @@
+# simple
+{
+ "one": 1
+}
+{
+ "two": 2
+}
+
+# simple (-f)
+{
+ "one": 1
+}
+{
+ "two": 2
+}
+
+# separate
+{
+ "one": 1
+}
+{
+ "two": 2
+}
+
+# separate (-f)
+{
+ "one": 1
+}
+{
+ "two": 2
+}
+
+# on same line
+{
+ "one": 1
+}
+{
+ "two": 2
+}
+
+# more complex
+{
+ "one": 1
+}
+{
+ "two": 2
+}
+{
+ "three": 3
+}
+{
+ "four": 4
+}
+{
+ "five": 5
+}
+
+# simple arrays
+a
+b
+c
+d
+
+# on same line
+a
+b
+c
+d
+
+# more complex
+one
+1
+two
+2
+three
+3
+four
+4
+five
+5
+
+# cat json files
+{
+ "a": "A"
+}
+{
+ "b": "B"
+}
+{
+ "c": "C"
+}
+
+# objects on same line (space FAIL)
+{"one": 1} {"two": 2}
+
+# arrays on same line (space FAIL)
+["a", "b"] ["c", "d"]
+
+# mixed objects and arrays FAIL
+{"one": 1} ["a", "b"] {"two": 2}
View
1 test/stream-flat-array-input/mixed-objects-and-arrays.input
@@ -0,0 +1 @@
+{"one": 1} ["a", "b"] {"two": 2}
View
6 test/stream-flat-array-input/objects-complex.input
@@ -0,0 +1,6 @@
+ {"one": 1}
+{"two": 2}
+{"three": 3}
+ {"four": 4}
+
+{"five": 5}
View
1 test/stream-flat-array-input/objects-on-same-line-fail.input
@@ -0,0 +1 @@
+{"one": 1} {"two": 2}
View
1 test/stream-flat-array-input/objects-on-same-line.input
@@ -0,0 +1 @@
+{"one": 1}{"two": 2}
View
2 test/stream-flat-array-input/objects.input
@@ -0,0 +1,2 @@
+{"one": 1}
+{"two": 2}
View
1 test/stream-flat-array-input/one.input
@@ -0,0 +1 @@
+{"one": 1}
View
1 test/stream-flat-array-input/two.input
@@ -0,0 +1 @@
+{"two": 2}
View
3 test/stream/README.md
@@ -0,0 +1,3 @@
+Want this to stream output:
+
+ <stream-of-json-docs> | json -ga
View
11 test/stream/cmd
@@ -0,0 +1,11 @@
+JSON=../../lib/jsontool.js
+
+node ../docstream.js 3 | $JSON -ga >stream.log &
+sleep 2.5
+cat stream.log
+
+
+echo ""
+# With some randomized emits.
+node ../docstream.js 10 100 randomize | $JSON -ga foo
+
View
17 test/stream/expected.stdout
@@ -0,0 +1,17 @@
+{
+ "foo": "bar"
+}
+{
+ "foo": "bar"
+}
+
+bar
+bar
+bar
+bar
+bar
+bar
+bar
+bar
+bar
+bar

0 comments on commit 61b1839

Please sign in to comment.
Something went wrong with that request. Please try again.