-
Notifications
You must be signed in to change notification settings - Fork 2
route
Per-chunk single-target dispatch. For each incoming chunk, options.pick(chunk[, encoding]) returns the index of the output to forward to; that output's write gates upstream. Returning a non-index value (or pointing at a dead slot) drops the chunk silently and unblocks upstream immediately.
declare function route(outputs: Writable[], options: route.RouteOptions): route.RouteWritable;declare function route(outputs: WritableStream[], options: route.RouteOptions): route.RouteWritable;Non-empty array of downstream sinks. Throws TypeError if missing or empty.
Required. On Node, passed through to the inner Writable super-constructor plus the custom properties below. On Web, accepts {pick, ignoreErrors, queuingStrategy}.
Required. Picker called once per incoming chunk.
- Return an integer in
[0, outputs.length)to forward the chunk to that output. The picked output's write callback gates upstream. - Return any non-index value —
undefined,null,NaN, a negative number, a non-integer, an out-of-range index, or an index pointing at a dead slot — to drop the chunk silently. Upstream is unblocked immediately for the next chunk.
The encoding argument is passed only on the Node side (Web Streams don't carry an encoding hint).
Throws TypeError at construction if options.pick is not a function.
Same semantics as in fork. When truthy, downstream errors are silently swallowed and the failing stream is dropped from the live outputs view.
A sink with the same .outputs and .isEmpty() surface as fork. .outputs reflects the live set; .isEmpty() returns true when every downstream has failed (so every subsequent chunk would be dropped).
Only the picked output gates upstream. The other outputs are inert for this round. If the picker returns a non-index value (drop), no downstream is written to; the route's write callback fires immediately.
End-of-stream ends every downstream regardless of which ones the picker ever selected, and waits for all of them to acknowledge before signalling end-of-write upstream.
Identical to fork, with one wrinkle: in default mode, only the picked output's error can surface per round (since it's the only one written to). On the Node side, pre-write listeners on the other outputs are still installed in default mode, so a downstream that errors before any write happens will still re-emit on the route. On the Web side, there is no pre-write re-emission — async errors surface when the round that picks that downstream attempts a write.
import route from 'stream-fork/route.js';
source.pipe(
route([evenSink, oddSink], {
pick: chunk => (chunk % 2 === 0 ? 0 : 1)
})
);import route from 'stream-fork/web/route.js';
await source.pipeTo(
route([evenSink, oddSink], {
pick: chunk => (chunk % 2 === 0 ? 0 : 1)
})
);import route from 'stream-fork/route.js';
// Anything that isn't 'a', 'b', or 'c' is silently dropped.
source.pipe(
route([sinkA, sinkB, sinkC], {
pick: chunk => {
if (chunk.tag === 'a') return 0;
if (chunk.tag === 'b') return 1;
if (chunk.tag === 'c') return 2;
return undefined;
}
})
);import route from 'stream-fork/route.js';
import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';
source.pipe(route([worker1, worker2, worker3], {pick: pickRoundRobin(3)}));import route from 'stream-fork/route.js';
import pickByHash from 'stream-fork/utils/pick-by-hash.js';
source.pipe(
route([shard0, shard1, shard2, shard3], {
pick: pickByHash(record => record.userId, 4)
})
);import route from 'stream-fork/route.js';
import pickByKey from 'stream-fork/utils/pick-by-key.js';
source.pipe(
route([goldSink, silverSink, bronzeSink], {
pick: pickByKey(c => c.tier, {gold: 0, silver: 1, bronze: 2})
})
);import route from 'stream-fork/route.js';
import pickFirstMatch from 'stream-fork/utils/pick-first-match.js';
source.pipe(
route([errorSink, warnSink, defaultSink], {
pick: pickFirstMatch([
log => log.level === 'error',
log => log.level === 'warn',
() => true // catch-all
])
})
);import chain from 'stream-chain';
import route from 'stream-fork/route.js';
chain([source, normalize, route([sinkA, sinkB], {pick: byTag})]);