Skip to content
Eugene Lazutkin edited this page May 19, 2026 · 2 revisions

fork(outputs[, options])

The default export. require('stream-fork') returns this function.

Broadcast Writable. Every incoming chunk is forwarded to every live downstream concurrently via Promise.all over the per-output write callbacks. The Writable's own _write callback fires only when every receiving downstream has acknowledged consumption, so the slowest downstream gates upstream backpressure.

Signature

declare function fork(outputs: Writable[], options?: fork.ForkOptions): fork.ForkWritable;

Parameters

outputs

Array of downstream Writable streams. May be empty (isEmpty() will report true and the fork becomes a sink). Throws TypeError if outputs is not an array.

options

Optional. Passed through to the inner Writable super-constructor (see Node's WritableOptions) plus the custom property below.

The default is {objectMode: true}. Pass an explicit empty {} to opt out into chunk mode (Buffer/string), or set {objectMode: false} explicitly.

ignoreErrors

When truthy, downstream errors are silently swallowed and the failing stream is dropped from the live outputs view. When falsy (default), the first downstream error per write round is forwarded to the fork's own write-callback (surfacing as 'error'); pre-write errors re-emit on the fork directly.

Return value

A Writable with two additional read-only public-API members:

.outputs

Read-only snapshot of the currently-live downstream Writables. Recomputed on each access — dead downstreams (errored, marked by the internal pusher) are filtered out.

.isEmpty()

Returns true when every downstream has failed (so the broadcast is effectively a sink). Equivalent to outputs.length === 0 on the live snapshot.

Backpressure

Promise.all over the live pushers' push(chunk, encoding) calls gates the fork's own _write callback. The slowest of the live downstreams determines when the next chunk is accepted. Dead downstreams are filtered out at the top of each _write, so they never gate.

_final follows the same shape: end() is called on every pusher; the fork's _final callback fires when all have acknowledged.

Error handling

Default mode (!ignoreErrors)

Errors can arrive through three channels, all unified by the internal stream pusher:

  1. Downstream emits 'error'. The pusher's 'error' listener captures the error and marks the pusher dead. Pre-write only (before any _write has been called), the fork's own listener re-emits the error on the fork.
  2. Downstream's write callback reports an error. The per-push promise resolves with the error; _write forwards the first error in its round to its own callback, which surfaces as 'error' on the fork via the Writable contract.
  3. stream.write or stream.end throws synchronously. Same as (2) — the pusher catches and resolves with the thrown error.

The failing downstream is marked dead; subsequent writes filter live pushers and skip dead ones. 'error' fires once on the fork per write round (first error wins).

Ignore mode (ignoreErrors: true)

The pre-write re-emission listener is not installed. When a per-push promise resolves with an error, _write calls cb(null) unconditionally — the error is swallowed. The failing downstream is filtered out of subsequent writes. 'error' never fires on the fork in this mode.

Examples

Fan out to multiple sinks

const fork = require('stream-fork');

source.pipe(fork([sinkA, sinkB, sinkC]));

Tee to gzip + stdout

const fork = require('stream-fork');
const fs = require('node:fs');
const zlib = require('node:zlib');

const gzip = zlib.createGzip();
gzip.pipe(fs.createWriteStream('log.txt.gz'));

source.pipe(fork([gzip, process.stdout]));

Tolerate flaky downstreams

const fork = require('stream-fork');

source.pipe(fork([primarySink, flakyMetricsSink], {ignoreErrors: true}));
// If flakyMetricsSink errors, it is dropped from the live set and
// 'error' is not emitted on the fork. primarySink keeps receiving.

Chunk mode (Buffer/string)

const fork = require('stream-fork');

// Empty options literal opts out of the objectMode default.
source.pipe(fork([sinkA, sinkB], {}));

Composition with stream-chain

fork() returns a plain Writable, so it slots naturally as the last item in a stream-chain pipeline:

const chain = require('stream-chain');
const fork = require('stream-fork');

chain([source, transform, fork([sinkA, sinkB])]);

Clone this wiki locally