Skip to content
Eugene Lazutkin edited this page May 22, 2026 · 2 revisions

Helpers (stream-fork/utils/)

The helpers are picker factories that compose with route to express common dispatch shapes. They live under src/utils/ in the source layout and import as stream-fork/utils/<name>.js. Shared between the Node and Web trees — pure functions, no runtime imports, identical behavior in Node, Bun, Deno, and browsers.

All helpers return a function suitable for route's options.pick(chunk[, encoding]) => number | undefined.

pickRoundRobin

import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';
declare function pickRoundRobin(count: number): () => number;

Stateful counter. Returns a picker that cycles 0, 1, …, count-1 on each call. Pair with route for load-balancing across count parallel workers.

Throws TypeError if count is not a positive integer.

Stateful — do not share across multiple routes. Each route should have its own picker; sharing one counter across two routes would interleave their state.

Example

import route from 'stream-fork/route.js';
import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';

source.pipe(route([worker1, worker2, worker3], {pick: pickRoundRobin(3)}));
// chunks go to worker1, worker2, worker3, worker1, worker2, ... in order.

pickByHash

import pickByHash from 'stream-fork/utils/pick-by-hash.js';
declare function pickByHash<T>(keyFn: (chunk: T) => unknown, count: number): (chunk: T) => number;

Hash-partition factory. Returns a stateless picker that maps each chunk to a stable index via hash(keyFn(chunk)) % count — the same key always lands on the same output. Useful for sharded downstreams where stateful consumers need all records with the same key.

Numeric keys are used directly modulo count; everything else is stringified and hashed with djb2.

Throws TypeError if keyFn is not a function or count is not a positive integer.

Example

import route from 'stream-fork/route.js';
import pickByHash from 'stream-fork/utils/pick-by-hash.js';

source.pipe(
  route([shard0, shard1, shard2, shard3], {
    pick: pickByHash(record => record.userId, 4)
  })
);
// All records with the same userId always land on the same shard.

Resharding caveat

hash(key) % count is not consistent-hashing; changing count will reshuffle most of the keys. For stable sharding across topology changes, route to a fixed count of virtual shards larger than the worker pool and assign virtual-shard → worker externally.

pickByKey

import pickByKey from 'stream-fork/utils/pick-by-key.js';
declare function pickByKey<T>(
  keyFn: (chunk: T) => string | number | symbol,
  table: Record<string, number> | ReadonlyMap<unknown, number>
): (chunk: T) => number | undefined;

Explicit-table factory. Returns a stateless picker that looks up keyFn(chunk) in table and returns the mapped output index, or undefined to drop the chunk if the key is missing.

table may be a plain object (lookup by table[key]) or a Map (lookup by table.get(key)). Use the Map form when keys are not strings (e.g. numbers, symbols, objects).

Throws TypeError if keyFn is not a function or table is not an object/Map.

Example: plain object

import route from 'stream-fork/route.js';
import pickByKey from 'stream-fork/utils/pick-by-key.js';

source.pipe(
  route([goldSink, silverSink, bronzeSink], {
    pick: pickByKey(c => c.tier, {gold: 0, silver: 1, bronze: 2})
  })
);
// {tier: 'gold'}    → goldSink
// {tier: 'unknown'} → dropped (no key in table)

Example: Map

const lookup = new Map([
  [1, 0], // tenant 1 → shard 0
  [2, 1], // tenant 2 → shard 1
  [3, 0]  // tenant 3 → shard 0
]);

source.pipe(
  route([shardA, shardB], {pick: pickByKey(c => c.tenantId, lookup)})
);

pickFirstMatch

import pickFirstMatch from 'stream-fork/utils/pick-first-match.js';
declare function pickFirstMatch<T>(
  predicates: Array<(chunk: T, encoding?: BufferEncoding) => boolean>
): (chunk: T, encoding?: BufferEncoding) => number | undefined;

Priority-routing factory. Returns a stateless picker that tries each predicate in order and returns the index of the first one to match, or undefined to drop the chunk if none match. Append () => true to make a catch-all output.

Throws TypeError if predicates is empty or any entry is not a function.

Example: severity-based routing with catch-all

import route from 'stream-fork/route.js';
import pickFirstMatch from 'stream-fork/utils/pick-first-match.js';

source.pipe(
  route([errorSink, warnSink, defaultSink], {
    pick: pickFirstMatch([
      log => log.level === 'error',
      log => log.level === 'warn',
      () => true // catch-all → defaultSink
    ])
  })
);

pickFirstMatch for route vs filter for the same shape

pickFirstMatch is the right partner with route when each chunk should land in exactly one output (the first matching predicate wins, others are skipped even if they also match). When chunks should land in all matching outputs (overlapping subsets), reach for filter directly with predicates of equivalent shape — no need for a picker layer.

What's NOT in stream-fork/utils/

The following are out of scope for stream-fork:

  • Per-output transforms. Compose them externally: fork([chain([t1, sink1]), chain([t2, sink2])]) using stream-chain.
  • Speculative race-fork (cancel-rest semantics). Node streams aren't cancellable; on Web, partial-write abort during a round would leave the dispatch inconsistent. The primitive would not be implementable cleanly.
  • Lossy fork (slow outputs drop). Inverts the project's whole thesis (backpressure preservation). Build a non-blocking sink wrapper externally if needed.
  • N→1 combinators. That's stream-join's territory.
  • 1→1 stream operations. That's stream-chain's territory.

Clone this wiki locally