Skip to content

Concepts

Eugene Lazutkin edited this page Jun 6, 2026 · 6 revisions

This page explains the ideas behind stream-chain — the problem it solves, how it solves it, and the handful of concepts everything else builds on. If you would rather learn by reading code, Intro by examples shows the same ideas as runnable pipelines.

The problem

stream-chain processes streams of objects — records, not raw bytes — and is built for the big cases: multi-gigabyte database dumps, append-only logs, message queues, and other feeds that are generated continuously or are simply too large to hold in memory. You want to process them one record at a time: transform, filter, and enrich as they flow.

That sounds easy — chain a few functions and you are done. Real pipelines are harder, for four reasons:

  • Filtering — a stage may want to drop a record (produce nothing).
  • Fan-out — a stage may want to turn one record into several.
  • Asynchronous work — stages talk to databases, networks, and files, so they have to await.
  • Backpressure — stages run at different speeds. If a fast producer outruns a slow consumer, records pile up in memory until the process dies. Keeping memory flat means making the fast side wait for the slow side — the essence of backpressure.

Node.js streams solve backpressure, but wiring a pipeline out of them by hand is verbose and easy to get wrong.

The idea

Write each stage as an ordinary function (or generator). stream-chain wires your functions — alongside any streams you already have — into a single pipeline that handles the streaming, the backpressure, and the lifecycle for you. The result is a standard Duplex stream (or a Web Streams pair, or a bare async iterable — your choice of substrate), so it drops into pipelines you already have.

You think about what each stage does to one record. The library handles how the records flow.

import chain from 'stream-chain';

const pipeline = chain([
  record => (record.active ? record : chain.none), // filter
  async record => ({...record, rate: await lookup(record.id)}), // async enrich
  record => chain.many([record, summarize(record)]) // fan-out
]);

The model: one input, zero-to-many outputs

The one idea to internalize: a stage maps each input to any number of outputs — zero, one, or many. This is the generator model, and it is what makes pipelines expressive enough for real work.

A generator stage expresses it directly and lazily:

function* (record) {
  if (!record.active) return; // zero outputs: drop it
  yield record; // one output...
  if (record.flagged) yield audit(record); // ...or more
}

A generator yields values one at a time and never accumulates, so memory stays flat no matter how much a stage fans out.

A plain function can only return a single value, so stream-chain gives it a small vocabulary to express the rest:

  • Return a value — pass it on.
  • Return chain.none — produce nothing (drop the record).
  • Return chain.many([...]) — produce several values at once.

chain.none and chain.many are the transport that lets a plain function say what a generator says natively. many() collects its values in an array, so it holds them all in memory — fine for small, bounded fan-out, but for large or unbounded fan-out a generator stage is leaner. (many([]) can express "no values", but chain.none is the cheaper, clearer way to say it — which is why it exists.)

Two more, for advanced control (they work only inside gen() / fun() segments, not raw streams):

  • chain.finalValue(x) — use x as the chain's final value; it is not passed to the rest of the chain.
  • chain.stop — stop the pipeline early (useful for terminating otherwise-infinite sources).

Any stage may also be async (return a Promise) — the pipeline waits for it, holding backpressure while it does.

A note on null / undefined

At the stream boundary — inside chain() and asStream() — returning null or undefined also drops the record, because Node streams reserve those values for end-of-stream signalling. But the lower-level compositors gen() and fun() pass null/undefined through as ordinary values. So chain.none is the one signal that means "drop" everywhere — prefer it over returning null when you want the same behaviour in every context.

Backpressure, handled

Backpressure is what keeps memory flat on a huge or never-ending stream. When a downstream stage is slow, stream-chain propagates the pressure back up the chain, so upstream stages — including async ones mid-await — pause until there is room. You never buffer the whole stream; only a small, bounded number of records are in flight at once.

This is why the library earns its keep on big data specifically: the same pipeline that processes a 10 KB file processes a 10 TB one in constant memory. The native Web Streams adapter (asWebStream()) extends this to per-item backpressure. The mechanics, the knobs, and the highWaterMark setting are covered in highWaterMark.

Building blocks: transducers and adapters

Beneath chain() are two kinds of primitive.

Transducers compose a list of functions into one composed function, with no stream substrate involved:

  • gen() — composes the functions into an async generator: feed it one input, iterate zero-to-many outputs, lazily. It is the memory-flat default, and it is what chain() groups your consecutive functions into for speed.
  • fun() — composes them into a plain function that returns the whole batch of outputs for one input (collected in a many()). It predates async generators and is slightly faster on purely synchronous pipelines, but it holds every output per input in memory — so it is an explicit import, not a default. Reach for it only when per-input output is small and bounded.

Adapters lift a single function onto a stream substrate, when you need a standalone, reusable stream component:

  • asStream() — wraps a function as a Node Duplex.
  • asWebStream() — wraps a function as a Web Streams {readable, writable} pair, with per-item backpressure.

chain() ties it together: it takes an array of functions, arrays, and streams, groups the runs of functions into a gen() for efficiency, and returns one Duplex you compose with .pipe() like any other stream.

Three substrates: node, web, core

The same composition model runs on three substrates. The code you write is the same; pick by environment:

  • stream-chain / stream-chain/node (default) — Node.js streams. The right choice for server-side work.
  • stream-chain/web — native Web Streams, no Node-stream dependency. Browser-safe. (/node and /web are interchangeable — swap if your performance landscape shifts.)
  • stream-chain/core — substrate-free async iterables: no node:stream, no Web Streams, just for await...of. The leanest base for embedding.

See the subpath split for the details and migration notes.

Getting objects in and out

stream-chain works in object mode: it moves records, not raw text or bytes. Turning bytes into objects, and objects back into bytes, happens at the edges of the pipeline. A source or sink can be anything that yields or accepts objects — a database cursor, an HTTP response, stream-json assembling tokens into objects, or your own generator.

For the most common on-disk case, JSONL (JSON Lines) ships in the box: a parser and a stringer, plus fused file-edge composites parseFile() / stringerToFile() that read and write JSONL files as a single pipeline stage. JSONL is the batteries-included I/O for object streams — its page covers the full surface.

Where to go next

  • Intro by examples — these concepts as runnable pipelines.
  • chain() — the full factory API and every return-value rule.
  • JSONL — reading and writing object streams from files.
  • highWaterMark — tuning the flow and backpressure.
  • Benchmarks — how performance is measured.

Clone this wiki locally