Skip to content
Eugene Lazutkin edited this page Jun 7, 2026 · 2 revisions

These are the Concepts as runnable pipelines — the quickest way to see how stream-json feels. Each example is complete; drop it into an ESM module and adjust the paths. Install with npm install --save stream-json (it brings in stream-chain, which supplies chain()).

Pipelines are built with chain() from stream-chain: an array of stages — a source, a parser(), optional filters, a streamer, and your own functions — wired into one stream.

Pick an array out of a big file

The everyday case. A JSON document is usually an envelope: some metadata plus the real payload under a property like data. Pick that property, stream its elements one at a time, and tally them — in constant memory, whatever the file size:

import chain from 'stream-chain';
import {parser} from 'stream-json';
import {pick} from 'stream-json/filters/pick.js';
import {streamArray} from 'stream-json/streamers/stream-array.js';
import fs from 'node:fs';

// data.json: { "meta": {...}, "data": [ {"department": "..."}, ... ] }
const pipeline = chain([
  fs.createReadStream('data.json'),
  parser(),
  pick({filter: 'data'}),
  streamArray()
]);

const byDepartment = {};
pipeline.on('data', ({value}) => {
  byDepartment[value.department] = (byDepartment[value.department] ?? 0) + 1;
});
pipeline.on('end', () => console.log(byDepartment));

Each element arrives as {key, value}: key is the array index, value is the element. If the top level is already the array you want, drop the pick. For a big top-level object, use streamObject instead — it hands you {key: propertyName, value} per property.

Straight from a file, without createReadStream

When the source is a local file, parseFile reads and parses in one stage, so you can drop createReadStream. Drive the pipeline with pipe / drain (generic drivers from stream-chain) and a terminal function instead of event handlers — the same tally, read straight from the file:

import {parseFile} from 'stream-json/file/parser.js';
import {pick} from 'stream-json/filters/pick.js';
import {streamArray} from 'stream-json/streamers/stream-array.js';
import {pipe} from 'stream-chain/utils/pipe.js';
import {drain} from 'stream-chain/utils/drain.js';

const byDepartment = {};
await drain(
  pipe(
    parseFile(),
    pick({filter: 'data'}),
    streamArray(),
    ({value}) => {
      byDepartment[value.department] = (byDepartment[value.department] ?? 0) + 1;
    }
  )('data.json')
);
console.log(byDepartment);

parseFile() takes the path as the pipeline's input; the terminal function does the work; drain runs it to completion. It is also faster — the file edges fuse the whole pipeline into one pass (see Performance).

JSON Streaming and JSONL

Several JSON values in one stream — concatenated, or one per line as JSONL. Turn on jsonStreaming and take each value with streamValues:

import chain from 'stream-chain';
import {parser} from 'stream-json';
import {streamValues} from 'stream-json/streamers/stream-values.js';
import fs from 'node:fs';

const pipeline = chain([
  fs.createReadStream('events.jsonl'),
  parser({jsonStreaming: true}),
  streamValues()
]);
pipeline.on('data', ({value}) => console.log(value));

For strict line-delimited JSONL, a dedicated splitter is much faster than tokenizing — use stream-chain's JSONL directly.

The whole value, when it fits: Assembler

When the document (or a picked subobject) fits in memory, skip the streamer and rebuild it as one value with the Assembler — a streaming JSON.parse:

import chain from 'stream-chain';
import {parser} from 'stream-json';
import Assembler from 'stream-json/assembler.js';
import fs from 'node:fs';

const pipeline = chain([fs.createReadStream('config.json'), parser()]);
Assembler.connectTo(pipeline, {onDone: asm => console.log(asm.current)});

asm.current is the finished object. FlexAssembler assembles into custom containers (a Map or Set) at chosen paths.

Skip objects you don't want: objectFilter

Every streamer takes an objectFilter, checked during assembly, so rejected objects are dropped before they are fully built. Keep only the orders:

chain([
  fs.createReadStream('data.json'),
  parser(),
  pick({filter: 'data'}),
  streamArray({
    objectFilter: asm =>
      asm.current?.type === undefined ? undefined : asm.current.type === 'order'
  })
]);

Return undefined to stay undecided until the deciding field arrives, false to reject, true to accept. See StreamBase.

Edit a document and write it back

Read a huge file, drop a subobject, and write the rest back out — never holding the whole thing. With the file edges at both ends it is one fused pass:

import {parseFile} from 'stream-json/file/parser.js';
import {ignore} from 'stream-json/filters/ignore.js';
import {stringerToFile} from 'stream-json/file/stringer.js';
import {pipe} from 'stream-chain/utils/pipe.js';
import {drain} from 'stream-chain/utils/drain.js';

await drain(
  pipe(parseFile(), ignore({filter: 'meta'}), stringerToFile('cleaned.json'))('data.json')
);

Stringer turns the token stream back into JSON text; Disassembler goes the other way, turning JavaScript values into tokens to feed such a pipeline.

Token events

Need a callback per token rather than assembled objects? emit() attaches token events to any pipeline:

import chain from 'stream-chain';
import {parser} from 'stream-json';
import emit from 'stream-json/utils/emit.js';
import fs from 'node:fs';

const pipeline = chain([fs.createReadStream('data.json'), parser()]);
emit(pipeline);

let depth = 0, max = 0;
pipeline.on('startObject', () => (max = Math.max(max, ++depth)));
pipeline.on('endObject', () => --depth);
pipeline.on('end', () => console.log('max nesting:', max));

Emitter is the standalone-component version. For hot paths a terminal function is leaner than events.

Where to go next

Clone this wiki locally