-
-
Notifications
You must be signed in to change notification settings - Fork 14
Performance
stream-chain is built for huge data, so performance is a first-class concern. Concepts — How performance shaped this explains why the toolkit has the shape it does; this page is the practical companion: how to write pipelines that stay on the fast path, how to package reusable components so they fuse instead of fighting each other, and how to pick a substrate. None of this is micro-optimisation for its own sake — on a stream of billions of records, a small per-record cost compounds into hours.
Performance here is empirical, not folklore. The repository ships a benchmark suite built on nano-benchmark; every suggestion on this page is backed by a file in bench/, and the deeper investigations record their method and caveats (see bench/json-exec/RESULTS.md). Two rules before you trust any number:
- Match the benchmark to your real shape. A sync-heavy pipeline and an async-saturated one behave completely differently — the same change can be a large win on one and a wash on the other. Benchmark the workload you actually run.
-
Synthetic numbers over-state real gains. An isolated micro-benchmark removes the parsing, allocation, and I/O that share wall-time in a real pipeline. Treat the
bench/figures as directional and measure end-to-end before you rely on them.
A stage can be a plain function or a generator (see Concepts). They are not equal in cost:
- A plain function — even an
asyncone — is the fast path. - A generator pays the async-iteration protocol (a promise per
.next(), afor awaitstep) on every item, whether or not it fans out. That cost buys laziness and composability as an async iterable; it does not buy throughput.
So write stages as plain functions by default, and keep them plain even when they need to drop or multiply records: return chain.none to drop and chain.many([...]) to fan out, rather than switching the stage to a generator. One limit to respect: chain.many() collects its outputs into an array held in memory, so it is strictly for bounded fan-out — a handful of values per input that you know stays small. It is not a stream and must never be treated as one: if a single input can fan out to a large or unbounded number of outputs, a many() balloons memory and can spiral out. Use a generator stage there instead — it yields outputs one at a time and never accumulates, which is exactly when a generator earns its async-iteration cost. Evidence: bench/gen-fun.js (generator vs function, no streams) and bench/gen-fun-stream.js.
await is a per-item tax. stream-chain's executor is sync-when-possible — a stage that returns a plain value never touches the promise machinery — but it can only stay synchronous if your function does. Two habits keep the hot path sync:
-
Don't mark a stage
asyncunless it actually awaits. Anasyncfunction that merely returns a value still wraps it in a resolved promise and re-enters the event loop. -
Hoist async work out of the per-record path. If every record does the same lookup, do it once upstream (in bulk) and pass the result down, instead of
await-ing per record.
The cost is real but bounded: when a stage genuinely awaits, the sync-when-possible advantage shrinks rather than reverses — bench/json-exec/node-async.js shows "faster or even, never a meaningful regression."
chain() groups runs of consecutive functions into a single fused gen() segment, so a block of function stages runs with no stream machinery between them. Every stream you interleave breaks that run and forces a substrate boundary — real, measurable overhead (bench/chain-1-stage.js vs bench/chain-2-stage.js isolates exactly one extra boundary).
Two practical consequences:
- Don't gratuitously wrap a function in asStream(). A bare function in the array fuses; the same function wrapped as a stream becomes its own boundary.
- Keep your function stages contiguous. Order the pipeline so the per-record transforms sit together and fuse, with streams confined to the ends (see "Streams at the edges" below).
Fuse trivial functions yourself. chain() removes the stream boundaries between functions, but each function is still its own call. The lowest-hanging fruit is to merge trivial transforms by hand: [x => 2 * x, x => x + 1] does the same work as the faster [x => 2 * x + 1]. The saved call is a small but real per-record cost.
This is a judgement call, not a rule. Keep stages as small, separate blocks — and skip the hand-fusion — when:
- they are canned, well-debugged functions you would rather not rewrite (and risk a bug in) for a micro-gain;
- a stage is inherently slow — a long
await, heavy computation — so the call overhead is noise beside it and fusing buys nothing; - you get reuse or clarity from keeping them apart, which is worth more than a saved call in most code.
Hand-fuse the hot, trivial stages; leave the rest readable and let chain() handle the stream-level fusion.
When you build a reusable, multi-stage component, the highest-leverage performance decision is how you package it — because it decides whether the component fuses into a consumer's pipeline or imposes a boundary they cannot remove.
Package it as a functional pipeline — a gen() (or fun()) over its internal stages — not as a pre-materialised stream. Every gen() / fun() result carries its flat function list (its fList), so when a consumer drops your component into a chain(), chain() pulls that list out and merges your component's internal stages and the consumer's own functions into one fused segment. Your component's internals never impose a stream boundary on the consumer's code. (The mechanism is described in Concepts; as a component author you just build with gen(), and fusion happens for free.)
Reference: stream-json's JSON parser. Shown as an illustration of the technique — not a dependency you need. Its core is, in essence:
import {gen} from 'stream-chain/core';
const parser = options => gen(fixUtf8Stream(), jsonParser(options));The parser packs a UTF-8 fixer and the JSON tokenizer/assembler into one gen(). A consumer then writes:
import chain from 'stream-chain';
const pipeline = chain([
parser(), // tokenise + assemble
record => (record.active ? record : chain.none), // your filter
record => ({...record, rate: lookup(record.id)}) // your enrich
]);and the parser's internal stages fuse together with the filter and the enrich into a single segment — there is no stream boundary between the tokenizer and your filter. Packaging the parser this way is a large part of what makes JSON processing fast (stream-json/src/core/parser.js).
Offer a materialised variant too. Consumers who want a standalone, stream-to-stream component — to .pipe() between two Node streams — still need one. Ship both: the functional pipeline as the default, plus an .asStream / .asWebStream form, exactly as stream-json does (parserStream = parser.asStream):
import {gen} from 'stream-chain/core';
import {asStream} from 'stream-chain';
// mergeable form (the default) -- fuses into a consumer's chain
export const parseRecords = opts => gen(decode(opts), tokenize(opts), assemble(opts));
// materialised form -- for stream-to-stream use
parseRecords.asStream = opts => asStream(parseRecords(opts), {objectMode: true});Caveat: fusion only crosses gen() / fun() / chain() wrappers — the fList carriers. A raw Transform stream dropped into the middle of a chain is an opaque boundary that cannot be fused, whatever surrounds it. If you want a component to fuse, build it from functions and generators, not from Transform subclasses. The function-list inlining itself is benchmarked in bench/gen-opt.js / bench/fun-opt.js: a flat list and a fused nested list match, while a non-inlined nested list is measurably slower.
Streams earn their cost at two jobs: interfacing with byte/text I/O (files, sockets) and carrying backpressure (see Concepts — Why streams?). The per-record transforms in between need not be streams — and, per the sections above, run fastest when they are not. So the shape to aim for is:
source stream --> [ one fused block of functions ] --> sink stream
The JSONL file-edge composites are this pattern made concrete: parseFile() and stringerToFile() fold the file handles into the pipeline as the only I/O edges and fuse everything else into one executor (bench/jsonl-file.js, the roundtrip: case; see JSONL).
The same pipeline runs on three substrates (Concepts); pick the least machinery your environment needs:
-
stream-chain/core— no stream machinery at all, just async iterables. The leanest option when you do not need a stream interface (you are embedding, or consuming withfor await).bench/core-chain.jsmeasures its only overhead, the outer iteration loop. -
stream-chain/stream-chain/node— Node streams; the fastest stream substrate today for server-side work. -
stream-chain/web— Web Streams; portable and browser-safe, but currently slower — a defaulthighWaterMarkof 1 makes every output enqueue hit backpressure (bench/raw-streams.jsestablishes the substrate floor; thewebvsnodebenches show the gap).
/node and /web are interchangeable in concept, not by a rename — and that interchangeability is deliberately engineered at the chain() level. Everything you express inside chain([...]) — the same functions, none / many / stop, generators, the same property surface — ports unchanged. What does not port is the wiring around the chain: the two hand back genuinely different stream types, with different APIs and execution models — a Node Duplex (.pipe(), EventEmitter) versus a Web {readable, writable} pair (.pipeThrough() / .pipeTo(), readers and writers). Feeding the chain and, especially, consuming it — reacting to completion, errors, the next chunk — is substrate-specific and becomes your job when you switch. The way out is to keep that work inside chain() too: consume through the chain (for instance, a terminal function as the last stage) and you stay on the portable side. So budget for a real port of the surrounding stream code, not a find-and-replace.
Consuming a chain: prefer the portable forms. Reading a chain's output is the most common place substrate-specificity leaks in. A terminal function as the last stage, or async iteration (for await...of), reads the same on every substrate; .on('data') is Node-only — Web Streams have no data event, so there you use a reader, pipeTo / pipeThrough, or webStreamPuller (Node also has streamPuller). Prefer the portable forms unless you specifically need a substrate's native interface. Whether a terminal function is also faster than event-based consumption we have not benchmarked — treat this as a portability choice, not a performance one (yet).
Backpressure is what keeps memory flat; do not trade it away for throughput. The one knob is highWaterMark: a larger value lets more records sit in flight (more throughput, more memory), a smaller one tightens memory at the cost of more frequent suspension. The default is chosen to keep memory bounded on unbounded input. Tune it against a real workload, and never disable backpressure to chase a number — the bounded-queue guarantee is a correctness property, not a tuning preference.
stream-chain predates Node's own stream-composition tooling and is not a reimplementation of it — the two arrived independently. Node now ships stream.pipeline(), compose(), and finished(); where they overlap with chain(), they interoperate cleanly rather than compete. A chain() result is a standard Duplex, so it drops straight into pipeline() as one stage, and an older .pipe() arrangement can be rewritten with pipeline() for its error-and-cleanup handling:
import {pipeline} from 'node:stream/promises';
await pipeline(source, chain([...]), destination);Use them together where it helps. The reason to reach for chain() over composing the same stages as separate streams with compose() is the fusion this page is about: chain() runs a run of functions as one segment, where compose() keeps a stream per stage — it builds a Duplex over stream.pipeline() and wraps even a plain-function stage into its own stream (Duplex.from), so adjacent stages are never fused. (A head-to-head benchmark of the two is tracked as future work.)
A stability caveat, current as of this writing (Node 26): of the three, compose() is experimental — Node's stability index level 1, defined as "not subject to semantic versioning rules… Use of the feature is not recommended in production environments." pipeline() and finished() are stable (level 2). So reach for pipeline() / finished() freely; treat compose() as provisional until Node promotes it.
- Concepts — the model and why performance shaped it.
- Intro by examples — the same ideas as runnable pipelines.
- Benchmarks — the full benchmark catalogue and how to run it.
- chain() — the factory and every return-value rule.
- gen() / fun() — the functional pipelines you package components with.
- highWaterMark — tuning flow and backpressure.
- JSONL — fused file-edge composites.
Start here
API
Transducers
Adapters
I/O & helpers
Tuning & internals
Reference
stream-chain 2.x (legacy)