Skip to content
This repository
Browse code

first stab at streaming for issue #34

  • Loading branch information...
commit 6912b25272312d0f13ad32e98ebc1b4deb6334cc 1 parent e5e5446
Trent Mick authored

Showing 1 changed file with 132 additions and 49 deletions. Show diff stats Hide diff stats

  1. 181  lib/jsontool.js
181  lib/jsontool.js
@@ -399,36 +399,140 @@ function parseArgv(argv) {
399 399
  * Get input from either given file paths or stdin.
400 400
  *
401 401
  * @param opts {Object} Parsed options.
402  
- * @param callback {Function} `function (err, callback)` where err is an
403  
- *    error string if there was a problem.
  402
+ * @param callback {Function} `function (err, chunk)` where err is an
  403
+ *    error string if there was a problem. This is called once for each
  404
+ *    "chunk". XXX START HERE: splain chunks
404 405
  */
405  
-function getInput(opts, callback) {
406  
-  if (opts.inputFiles.length === 0) {
407  
-    // Read from stdin.
408  
-    var chunks = [];
409  
-
410  
-    var stdin = process.openStdin();
411  
-    stdin.setEncoding('utf8');
412  
-    stdin.on('data', function (chunk) {
413  
-      chunks.push(chunk);
414  
-    });
  406
+function jsonChunksFromInput(opts, callback) {
  407
+  // If returned from `stripHeaders()` and `finishedHeaders` is still false,
  408
+  // then we've process a chunk with an incomplete set of headers:
  409
+  // `stripHeaders()` should be called again with the next chunk.
  410
+  var finishedHeaders = false;
  411
+  function stripHeaders(s) {
  412
+    // Take off a leading HTTP header if any and pass it through.
  413
+    while (true) {
  414
+      if (s.slice(0,5) === "HTTP/") {
  415
+        var index = s.indexOf('\r\n\r\n');
  416
+        var sepLen = 4;
  417
+        if (index == -1) {
  418
+          index = s.indexOf('\n\n');
  419
+          sepLen = 2;
  420
+        }
  421
+        if (index != -1) {
  422
+          if (! opts.dropHeaders) {
  423
+            emit(s.slice(0, index+sepLen));
  424
+          }
  425
+          var is100Continue = (s.slice(0, 21) === "HTTP/1.1 100 Continue");
  426
+          s = s.slice(index+sepLen);
  427
+          if (is100Continue) {
  428
+            continue;
  429
+          }
  430
+          finishedHeaders = true;
  431
+        }
  432
+      } else {
  433
+        finishedHeaders = true;
  434
+      }
  435
+      break;
  436
+    }
  437
+    //console.warn("XXX stripHeaders done, finishedHeaders=%s", finishedHeaders)
  438
+    return s;
  439
+  }
415 440
 
416  
-    stdin.on('end', function () {
417  
-      callback(null, chunks.join(''));
418  
-    });
419  
-  } else {
  441
+  if (opts.inputFiles.length > 0) {
420 442
     // Read input files.
  443
+    // TODO: Improve streaming here: read files async in chunks and stream
  444
+    // as above if `-ga`.
421 445
     var i = 0;
422 446
     var chunks = [];
423 447
     try {
424  
-      for (; i < opts.inputFiles.length; i++) {
425  
-        chunks.push(fs.readFileSync(opts.inputFiles[i], 'utf8'));
  448
+      var first = fs.readFileSync(opts.inputFiles[i], 'utf8');
  449
+      first = stripHeaders(first);
  450
+      callback(null, first);
  451
+      for (i++; i < opts.inputFiles.length; i++) {
  452
+        callback(null, fs.readFileSync(opts.inputFiles[i], 'utf8'));
426 453
       }
427 454
     } catch (e) {
428 455
       return callback(
429 456
         format('could not read "%s": %s', opts.inputFiles[i], e));
430 457
     }
431  
-    callback(null, chunks.join(''));
  458
+  } else if (opts.group && opts.array && opts.outputMode !== OM_JSON) {
  459
+    // Streaming from stdin.
  460
+    //console.warn("XXX streaming");
  461
+    var streaming = true;
  462
+    var leftover = '';
  463
+    var chunks = [];
  464
+    var splitter = /(})(\s*\n\s*)?({\s*")/;
  465
+    function callbackJsonChunks(chunk) {
  466
+      if (chunk[0] !== '{') {  // Only support streaming consecutive *objects*.
  467
+        streaming = false;
  468
+        chunks.push(chunk);
  469
+        return;
  470
+      }
  471
+      /* Example:
  472
+       * > '{"a":"b"}\n{"a":"b"}\n{"a":"b"}'.split(/(})(\s*\n\s*)?({\s*")/)
  473
+       * [ '{"a":"b"',
  474
+       *   '}',
  475
+       *   '\n',
  476
+       *   '{"',
  477
+       *   'a":"b"',
  478
+       *   '}',
  479
+       *   '\n',
  480
+       *   '{"',
  481
+       *   'a":"b"}' ]
  482
+       */
  483
+      var bits = chunk.split(splitter);
  484
+      //console.warn("XXX bits: ", bits)
  485
+      if (bits.length === 1) {
  486
+        leftover = chunk;
  487
+      } else {
  488
+        var n = bits.length - 2;
  489
+        callback(null, bits[0] + bits[1]);
  490
+        for (var i = 3; i < n; i += 4) {
  491
+          callback(null, bits[i] + bits[i+1] + bits[i+2]);
  492
+        }
  493
+        leftover = bits[n] + bits[n+1];
  494
+      }
  495
+    }
  496
+
  497
+    var stdin = process.openStdin();
  498
+    stdin.setEncoding('utf8');
  499
+    stdin.on('data', function (chunk) {
  500
+      //console.warn("XXX process chunk: %s", JSON.stringify(chunk))
  501
+      if (!streaming) {
  502
+        chunks.push(chunk);
  503
+        return;
  504
+      }
  505
+      var s = leftover + chunk;
  506
+      if (!finishedHeaders) {
  507
+        s = stripHeaders(s);
  508
+      }
  509
+      if (!finishedHeaders) {
  510
+        leftover = s;
  511
+      } else {
  512
+        callbackJsonChunks(s);
  513
+      }
  514
+    });
  515
+    stdin.on('end', function () {
  516
+      if (!streaming) {
  517
+        callback(null, chunks.join(''));
  518
+      } else if (leftover) {
  519
+        callbackJsonChunks(leftover);
  520
+        callback(null, leftover);
  521
+      }
  522
+    });
  523
+  } else {
  524
+    // Read stdin in one big chunk.
  525
+    var stdin = process.openStdin();
  526
+    stdin.setEncoding('utf8');
  527
+    var chunks = [];
  528
+    stdin.on('data', function (chunk) {
  529
+      chunks.push(chunk);
  530
+    });
  531
+    stdin.on('end', function () {
  532
+      var chunk = chunks.join('');
  533
+      chunk = stripHeaders(chunk);
  534
+      callback(null, chunks.join(''));
  535
+    });
432 536
   }
433 537
 }
434 538
 
@@ -590,6 +694,7 @@ function parseInput(buffer, group, merge) {
590 694
     //   This condition should be fine for typical use cases and ensures
591 695
     //   no false matches inside JS strings.
592 696
     var newBuffer = buffer;
  697
+    //XXX START HERE
593 698
     [/(})\s*\n\s*({)/g, /(})({")/g].forEach(function (pat) {
594 699
       newBuffer = newBuffer.replace(pat, "$1,\n$2");
595 700
     });
@@ -820,55 +925,33 @@ function main(argv) {
820 925
   }
821 926
   var lookupStrs = opts.args;
822 927
 
823  
-  getInput(opts, function (err, buffer) {
  928
+  jsonChunksFromInput(opts, function (err, chunk) {
  929
+    //console.warn("XXX chunk: '%s'", chunk)
824 930
     if (err) {
825 931
       warn("json: error: %s", err)
826 932
       return drainStdoutAndExit(1);
827 933
     }
828 934
 
829  
-    // Take off a leading HTTP header if any and pass it through.
830  
-    while (true) {
831  
-      if (buffer.slice(0,5) === "HTTP/") {
832  
-        var index = buffer.indexOf('\r\n\r\n');
833  
-        var sepLen = 4;
834  
-        if (index == -1) {
835  
-          index = buffer.indexOf('\n\n');
836  
-          sepLen = 2;
837  
-        }
838  
-        if (index != -1) {
839  
-          if (! opts.dropHeaders) {
840  
-            emit(buffer.slice(0, index+sepLen));
841  
-          }
842  
-          var is100Continue = (buffer.slice(0, 21) === "HTTP/1.1 100 Continue");
843  
-          buffer = buffer.slice(index+sepLen);
844  
-          if (is100Continue) {
845  
-            continue;
846  
-          }
847  
-        }
848  
-      }
849  
-      break;
850  
-    }
851  
-
852 935
     // Expect the remainder to be JSON.
853  
-    if (! buffer.length) {
  936
+    if (! chunk.length) {
854 937
       return;
855 938
     }
856 939
     // parseInput() -> {datum: <input object>, error: <error object>}
857  
-    var input = parseInput(buffer, opts.group, opts.merge);
  940
+    var input = parseInput(chunk, opts.group, opts.merge);
858 941
     if (input.error) {
859 942
       // Doesn't look like JSON. Just print it out and move on.
860 943
       if (! opts.quiet) {
861 944
         // Use JSON-js' "json_parse" parser to get more detail on the
862 945
         // syntax error.
863 946
         var details = "";
864  
-        var normBuffer = buffer.replace(/\r\n|\n|\r/, '\n');
  947
+        var normBuffer = chunk.replace(/\r\n|\n|\r/, '\n');
865 948
         try {
866 949
           json_parse(normBuffer);
867 950
           details = input.error;
868 951
         } catch(err) {
869 952
           // err.at has the position. Get line/column from that.
870 953
           var at = err.at - 1;  // `err.at` looks to be 1-based.
871  
-          var lines = buffer.split('\n');
  954
+          var lines = chunk.split('\n');
872 955
           var line, col, pos = 0;
873 956
           for (line = 0; line < lines.length; line++) {
874 957
             pos += lines[line].length + 1;
@@ -887,8 +970,8 @@ function main(argv) {
887 970
         warn("json: error: input is not JSON: %s", details);
888 971
       }
889 972
       if (!opts.validate) {
890  
-        emit(buffer);
891  
-        if (buffer.length && buffer[buffer.length-1] !== "\n") {
  973
+        emit(chunk);
  974
+        if (chunk.length && chunk[chunk.length-1] !== "\n") {
892 975
           emit('\n');
893 976
         }
894 977
       }

0 notes on commit 6912b25

Please sign in to comment.
Something went wrong with that request. Please try again.