Skip to content
Eugene Lazutkin edited this page May 19, 2026 · 2 revisions

route(outputs, options)

Per-chunk single-target dispatch. For each incoming chunk, options.pick(chunk, encoding) returns the index of the output to forward to; that output's write callback gates upstream. Returning a non-index value (or pointing at a dead slot) drops the chunk silently and unblocks upstream immediately.

Signature

declare function route(outputs: Writable[], options: route.RouteOptions): route.RouteWritable;

Parameters

outputs

Non-empty array of downstream Writable streams. Throws TypeError if missing or empty.

options

Required. Passed through to the inner Writable super-constructor plus the custom properties below.

The default Writable mode is objectMode: true unless overridden.

pick(chunk, encoding) => number | undefined | null

Required. Picker called once per incoming chunk.

  • Return an integer in [0, outputs.length) to forward the chunk to that output. The picked output's write callback gates upstream.
  • Return any non-index valueundefined, null, NaN, a negative number, a non-integer, an out-of-range index, or an index pointing at a dead slot — to drop the chunk silently. Upstream is unblocked immediately for the next chunk.

Throws TypeError at construction if options.pick is not a function.

ignoreErrors

Same semantics as in fork. When truthy, downstream errors are silently swallowed and the failing stream is dropped from the live outputs view.

Return value

A Writable with the same .outputs and .isEmpty() surface as fork. .outputs reflects the live set; .isEmpty() returns true when every downstream has failed (so every subsequent chunk would be dropped).

Backpressure

Only the picked output gates upstream. The other outputs are inert for this round. If the picker returns a non-index value (drop), no downstream is written to; the route's _write callback fires immediately.

_final ends every downstream regardless of which ones the picker ever selected, and waits for all of them to acknowledge before signalling end-of-write upstream.

Error handling

Identical to fork, with one wrinkle: in default mode, only the picked output's error can surface per round (since it's the only one written to). Pre-write listeners on the other outputs are still installed in default mode, so a downstream that errors before any write happens will still re-emit on the route.

Examples

Partition by predicate (evens / odds)

const route = require('stream-fork/route.js');

source.pipe(
  route([evenSink, oddSink], {
    pick: chunk => (chunk % 2 === 0 ? 0 : 1)
  })
);

Drop unrouted chunks

const route = require('stream-fork/route.js');

// Anything that isn't 'a', 'b', or 'c' is silently dropped.
source.pipe(
  route([sinkA, sinkB, sinkC], {
    pick: chunk => {
      if (chunk.tag === 'a') return 0;
      if (chunk.tag === 'b') return 1;
      if (chunk.tag === 'c') return 2;
      return undefined;
    }
  })
);

Round-robin load balance

const route = require('stream-fork/route.js');
const pickRoundRobin = require('stream-fork/utils/pick-round-robin.js');

source.pipe(route([worker1, worker2, worker3], {pick: pickRoundRobin(3)}));

Stable sharding by key

const route = require('stream-fork/route.js');
const pickByHash = require('stream-fork/utils/pick-by-hash.js');

source.pipe(
  route([shard0, shard1, shard2, shard3], {
    pick: pickByHash(record => record.userId, 4)
  })
);

Explicit key→index routing

const route = require('stream-fork/route.js');
const pickByKey = require('stream-fork/utils/pick-by-key.js');

source.pipe(
  route([goldSink, silverSink, bronzeSink], {
    pick: pickByKey(c => c.tier, {gold: 0, silver: 1, bronze: 2})
  })
);

Priority routing with catch-all

const route = require('stream-fork/route.js');
const pickFirstMatch = require('stream-fork/utils/pick-first-match.js');

source.pipe(
  route([errorSink, warnSink, defaultSink], {
    pick: pickFirstMatch([
      log => log.level === 'error',
      log => log.level === 'warn',
      () => true // catch-all
    ])
  })
);

Composition with stream-chain

const chain = require('stream-chain');
const route = require('stream-fork/route.js');

chain([source, normalize, route([sinkA, sinkB], {pick: byTag})]);

Clone this wiki locally