-
-
Notifications
You must be signed in to change notification settings - Fork 53
Intro
These are the Concepts as runnable pipelines — the quickest way to see how stream-json feels. Each example is complete; drop it into an ESM module and adjust the paths. Install with npm install --save stream-json (it brings in stream-chain, which supplies chain()).
Pipelines are built with chain() from stream-chain: an array of stages — a source, a parser(), optional filters, a streamer, and your own functions — wired into one stream.
The everyday case. A JSON document is usually an envelope: some metadata plus the real payload under a property like data. Pick that property, stream its elements one at a time, and tally them — in constant memory, whatever the file size:
import chain from 'stream-chain';
import {parser} from 'stream-json';
import {pick} from 'stream-json/filters/pick.js';
import {streamArray} from 'stream-json/streamers/stream-array.js';
import fs from 'node:fs';
// data.json: { "meta": {...}, "data": [ {"department": "..."}, ... ] }
const pipeline = chain([fs.createReadStream('data.json'), parser(), pick({filter: 'data'}), streamArray()]);
const byDepartment = {};
pipeline.on('data', ({value}) => {
byDepartment[value.department] = (byDepartment[value.department] ?? 0) + 1;
});
pipeline.on('end', () => console.log(byDepartment));Each element arrives as {key, value}: key is the array index, value is the element. If the top level is already the array you want, drop the pick. For a big top-level object, use streamObject instead — it hands you {key: propertyName, value} per property.
When the source is a local file, parseFile reads and parses in one stage, so you can drop createReadStream. Drive the pipeline with pipe / drain (generic drivers from stream-chain) and a terminal function instead of event handlers — the same tally, read straight from the file:
import {parseFile} from 'stream-json/file/parser.js';
import {pick} from 'stream-json/filters/pick.js';
import {streamArray} from 'stream-json/streamers/stream-array.js';
import {pipe} from 'stream-chain/utils/pipe.js';
import {drain} from 'stream-chain/utils/drain.js';
const byDepartment = {};
await drain(
pipe(parseFile(), pick({filter: 'data'}), streamArray(), ({value}) => {
byDepartment[value.department] = (byDepartment[value.department] ?? 0) + 1;
})('data.json')
);
console.log(byDepartment);parseFile() takes the path as the pipeline's input; the terminal function does the work; drain runs it to completion. It is also faster — the file edges fuse the whole pipeline into one pass (see Performance).
Several JSON values in one stream — concatenated, or one per line as JSONL. Turn on jsonStreaming and take each value with streamValues:
import chain from 'stream-chain';
import {parser} from 'stream-json';
import {streamValues} from 'stream-json/streamers/stream-values.js';
import fs from 'node:fs';
const pipeline = chain([fs.createReadStream('events.jsonl'), parser({jsonStreaming: true}), streamValues()]);
pipeline.on('data', ({value}) => console.log(value));For strict line-delimited JSONL, a dedicated splitter is much faster than tokenizing — use stream-chain's JSONL directly.
When the document (or a picked subobject) fits in memory, skip the streamer and rebuild it as one value with the Assembler — a streaming JSON.parse:
import chain from 'stream-chain';
import {parser} from 'stream-json';
import Assembler from 'stream-json/assembler.js';
import fs from 'node:fs';
const pipeline = chain([fs.createReadStream('config.json'), parser()]);
Assembler.connectTo(pipeline, {onDone: asm => console.log(asm.current)});asm.current is the finished object. FlexAssembler assembles into custom containers (a Map or Set) at chosen paths.
Every streamer takes an objectFilter, checked during assembly, so rejected objects are dropped before they are fully built. Keep only the orders:
chain([
fs.createReadStream('data.json'),
parser(),
pick({filter: 'data'}),
streamArray({
objectFilter: asm => (asm.current?.type === undefined ? undefined : asm.current.type === 'order')
})
]);Return undefined to stay undecided until the deciding field arrives, false to reject, true to accept. See StreamBase.
Read a huge file, drop a subobject, and write the rest back out — never holding the whole thing. With the file edges at both ends it is one fused pass:
import {parseFile} from 'stream-json/file/parser.js';
import {ignore} from 'stream-json/filters/ignore.js';
import {stringerToFile} from 'stream-json/file/stringer.js';
import {pipe} from 'stream-chain/utils/pipe.js';
import {drain} from 'stream-chain/utils/drain.js';
await drain(pipe(parseFile(), ignore({filter: 'meta'}), stringerToFile('cleaned.json'))('data.json'));Stringer turns the token stream back into JSON text; Disassembler goes the other way, turning JavaScript values into tokens to feed such a pipeline.
Need a callback per token rather than assembled objects? emit() attaches token events to any pipeline:
import chain from 'stream-chain';
import {parser} from 'stream-json';
import emit from 'stream-json/utils/emit.js';
import fs from 'node:fs';
const pipeline = chain([fs.createReadStream('data.json'), parser()]);
emit(pipeline);
let depth = 0,
max = 0;
pipeline.on('startObject', () => (max = Math.max(max, ++depth)));
pipeline.on('endObject', () => --depth);
pipeline.on('end', () => console.log('max nesting:', max));Emitter is the standalone-component version. For hot paths a terminal function is leaner than events.
- Concepts — the model these examples follow.
- Recipes — task-oriented how-tos.
- Performance — keeping pipelines fast.
- Component reference: Parser, Pick, StreamArray, Assembler, and the rest from Home.
Start here
Core
Filters
Streamers
Essentials
Utilities
File I/O (Node-only)
JSONC
JSONL (use stream-chain)
Reference
Built on stream-chain