-
Notifications
You must be signed in to change notification settings - Fork 2
fork
The default export.
import fork from 'stream-fork'(Node Streams) orimport fork from 'stream-fork/web'(Web Streams) returns this function.
Broadcast sink. Every incoming chunk is forwarded to every live downstream concurrently via Promise.all over the per-output writes. The sink's own write callback fires only when every receiving downstream has acknowledged consumption, so the slowest downstream gates upstream backpressure.
The Web flavor is a backpressure-preserving generalization of ReadableStream.tee() to N outputs — unlike tee, it does not buffer per branch.
declare function fork(outputs: Writable[], options?: fork.ForkOptions): fork.ForkWritable;declare function fork(outputs: WritableStream[], options?: fork.ForkOptions): fork.ForkWritable;Array of downstream sinks (Writable[] for Node, WritableStream[] for Web). May be empty (isEmpty() will report true and the fork becomes a sink). Throws TypeError if outputs is not an array.
Optional.
Node: passed through to the inner Writable super-constructor (see Node's WritableOptions) plus the custom properties below. The default is {objectMode: true}. Pass an explicit empty {} to opt out into chunk mode (Buffer/string), or set {objectMode: false} explicitly.
Web: accepts {ignoreErrors, queuingStrategy}. queuingStrategy is forwarded to the underlying WritableStream constructor; Web Streams accept any value, so there is no objectMode analog.
When truthy, downstream errors are silently swallowed and the failing stream is dropped from the live outputs view. When falsy (default), the first downstream error per write round is surfaced upstream — on Node, via the fork's own 'error' event; on Web, by rejecting the upstream pipeTo promise. On Node, pre-write errors also re-emit on the fork directly.
A sink (Writable for Node, WritableStream for Web) with two additional read-only public-API members:
Read-only snapshot of the currently-live downstream sinks. Recomputed on each access — dead downstreams (errored, marked by the internal pusher) are filtered out.
Returns true when every downstream has failed (so the broadcast is effectively a sink). Equivalent to outputs.length === 0 on the live snapshot.
Promise.all over the live pushers' push(chunk) calls gates the fork's own write callback. The slowest of the live downstreams determines when the next chunk is accepted. Dead downstreams are filtered out at the top of each write, so they never gate.
End-of-stream follows the same shape: end() (Node) / close() (Web) is called on every pusher; the fork's final callback fires when all have acknowledged.
Node: errors can arrive through three channels, all unified by the internal stream pusher:
-
Downstream emits
'error'. The pusher's'error'listener captures the error and marks the pusher dead. Pre-write only (before any_writehas been called), the fork's own listener re-emits the error on the fork. -
Downstream's
writecallback reports an error. The per-push promise resolves with the error;_writeforwards the first error in its round to its own callback, which surfaces as'error'on the fork via the Writable contract. -
stream.writeorstream.endthrows synchronously. Same as (2) — the pusher catches and resolves with the thrown error.
The failing downstream is marked dead; subsequent writes filter live pushers and skip dead ones. 'error' fires once on the fork per write round (first error wins).
Web: the pusher's writer.closed.catch handler marks dead on async errors; writer.write(chunk) rejections resolve to the error on the per-push promise; the first error in the round is rethrown from the underlying sink's write callback, which rejects upstream's pipeTo promise (the Web Streams analog of 'error').
The pre-write re-emission listener is not installed (Node — Web has no event-emitter analog). The failing downstream is filtered out of subsequent writes. Upstream sees only successful writes.
import fork from 'stream-fork';
source.pipe(fork([sinkA, sinkB, sinkC]));import fork from 'stream-fork';
import fs from 'node:fs';
import zlib from 'node:zlib';
const gzip = zlib.createGzip();
gzip.pipe(fs.createWriteStream('log.txt.gz'));
source.pipe(fork([gzip, process.stdout]));import fork from 'stream-fork';
source.pipe(fork([primarySink, flakyMetricsSink], {ignoreErrors: true}));
// If flakyMetricsSink errors, it is dropped from the live set and
// 'error' is not emitted on the fork. primarySink keeps receiving.import fork from 'stream-fork';
// Empty options literal opts out of the objectMode default.
source.pipe(fork([sinkA, sinkB], {}));import fork from 'stream-fork/web';
// readableSource is a ReadableStream; sinkA / sinkB are WritableStreams.
await readableSource.pipeTo(fork([sinkA, sinkB]));
// Unlike `readableSource.tee().map(r => r.pipeTo(s))`, this does NOT
// buffer per branch — a slow sinkA slows upstream, instead of growing
// an unbounded internal queue on sinkB's branch.fork() returns a plain Writable, so it slots naturally as the last item in a stream-chain pipeline:
import chain from 'stream-chain';
import fork from 'stream-fork';
chain([source, transform, fork([sinkA, sinkB])]);