Skip to content
Eugene Lazutkin edited this page May 22, 2026 · 3 revisions

Helpers (stream-join/utils/)

The helpers compose with select to express common merge patterns. They live under src/utils/ in the source layout and import as stream-join/utils/<name>.

All helpers operate on item values (not slot objects); the helper unwraps slot.item internally so the same comparator can be reused across helpers.

pickFirst

import pickFirst from 'stream-join/utils/pick-first.js';

Signature

declare const pickFirst: () => number;

Parameters

(none)

Return value

The integer 0, regardless of input. Constant-time, no allocations.

Pair with sortedInsert — when the buffer is maintained in sorted order, the slot to emit is always at index 0, so the picker is O(1) with zero comparisons.

Example

See mergeSorted, which is select + pickFirst + sortedInsert glued together.

pickMin

import pickMin from 'stream-join/utils/pick-min.js';

Signature

declare function pickMin<T>(lessFn: (a: T, b: T) => boolean): (items: readonly Slot<T>[]) => number;

Parameters

  • lessFn(a, b) — comparator on item values. Returns true if a should come before b (i.e., a < b). Must be a strict-weak-order predicate.

Return value

A picker function suitable for select's pick option. The returned function takes:

  • items — read-only buffer of Slot<T> ({item, index}) shown to the picker.

…and returns the index of the slot whose item is the smallest per lessFn. Ties resolve to the first occurrence.

The picker is O(items.length) per call: linear scan over the buffer, no allocations, branch-predictable. For typical N (2–20) and windowSize (1–4), this is dozens of comparisons per emit — invisible compared to the I/O cost of an await on a Readable.

Example

import select from 'stream-join/select.js';
import pickMin from 'stream-join/utils/pick-min.js';

// Priority-queue merge: emit the smallest value available each round.
select([s1, s2, s3], {pick: pickMin((a, b) => a.priority < b.priority)});

sortedInsert

import sortedInsert from 'stream-join/utils/sorted-insert.js';

Signature

declare function sortedInsert<T>(
  lessFn: (a: T, b: T) => boolean
): (items: Slot<T>[], newSlot: Slot<T>, lastPos?: number) => void;

Parameters

  • lessFn(a, b) — comparator on item values. Returns true if a should come before b. Must be a strict-weak-order predicate.

Return value

An insert callback suitable for select's insert option. The returned function takes:

  • items — the mutable slot buffer maintained in sorted order.
  • newSlot — the freshly-pulled slot to insert.
  • lastPosundefined during initial fill (grow the buffer; the helper splices the slot in at its sorted position), or the index of the slot just emitted (refill in place; length unchanged).

…and returns nothing (mutates items in place).

Smart-replace optimization. On post-pick refill, the helper detects when the new slot belongs at the same logical position as the just-removed one — in which case it does an in-place replace (one assignment) instead of two splice calls. This is the common case for sorted streams: a sorted stream's next item is usually "close to" the previous one in global key order, so the new slot's position often matches lastPos.

Built on nano-binary-search for the insertion-point lookup.

Example

import {Readable} from 'node:stream';
import select from 'stream-join/select.js';
import pickFirst from 'stream-join/utils/pick-first.js';
import sortedInsert from 'stream-join/utils/sorted-insert.js';

const less = (a, b) => a < b;

select([Readable.from([1, 4, 7]), Readable.from([2, 5, 8]), Readable.from([3, 6, 9])], {
  pick: pickFirst,
  insert: sortedInsert(less)
}).on('data', x => console.log(x));
// 1, 2, 3, 4, 5, 6, 7, 8, 9

Note on pickMin + sortedInsert

Don't pair them. If the buffer is maintained in sorted order via sortedInsert, the smallest slot is always at index 0; pickMin's linear scan rediscovers what sort already knows. Use pickFirst instead — constant time, zero comparisons. The sortedInsert invariant carries correctness.

pickMin is the right partner when the buffer is not maintained sorted (e.g., with the default replace-in-place insert).

mergeSorted

import mergeSorted from 'stream-join/utils/merge-sorted.js';

Signature

declare function mergeSorted<T>(
  streams: readonly Readable[],
  lessFn: (a: T, b: T) => boolean,
  options?: MergeSortedOptions
): TypedReadable<T>;

interface MergeSortedOptions extends ReadableOptions {
  windowSize?: number; // default 1
}

Parameters

  • streams — non-empty array of object-mode Readable streams. Each is expected to be sorted per lessFn (or locally disordered within windowSize).
  • lessFn(a, b) — comparator on item values. Returns true if a should come before b. Same shape and convention as pickMin / sortedInsert.
  • options — optional. ReadableOptions plus windowSize (default 1; positive integer; larger values tolerate that many items of local disorder per stream).

Return value

An object-mode Readable that emits the merged sequence in sorted order per lessFn. Propagates 'error' events from any input stream with the original value preserved; ends when every input stream has ended.

Equivalent to:

select(streams, {...options, pick: pickFirst, insert: sortedInsert(lessFn)});

Examples

K-way merge of sorted streams

import mergeSorted from 'stream-join/utils/merge-sorted.js';
import {Readable} from 'node:stream';

mergeSorted(
  [Readable.from([1, 4, 7]), Readable.from([2, 5, 8]), Readable.from([3, 6, 9])],
  (a, b) => a < b
).on('data', x => console.log(x));
// 1, 2, 3, 4, 5, 6, 7, 8, 9

Mismatched stream lengths

mergeSorted(
  [Readable.from([1, 10, 100]), Readable.from([2]), Readable.from([3, 30])],
  (a, b) => a < b
).on('data', x => console.log(x));
// 1, 2, 3, 10, 30, 100

Drift-tolerant merge

// Stream 1's timestamps may be locally out of order by up to 3 positions;
// windowSize: 4 gives the picker enough lookahead to recover global order.
mergeSorted([s1, s2], (a, b) => a.timestamp < b.timestamp, {windowSize: 4});

What's NOT in stream-join/utils/

The following sorted-stream operations are out of scope for stream-join — they require or preserve sortedness in ways that fit better in a dedicated package. They will live in stream-sorting (forthcoming):

  • mergeJoin / joinBy — key-based SQL-style join of sorted streams. Pre-condition: inputs sorted by the join key.
  • union — sorted merge with duplicate elimination across streams. Distinct from mergeSorted (which keeps duplicates).
  • intersection — emit values present in all input streams.
  • difference — emit values in stream 0 not present in stream 1..N-1.

stream-join itself stays scope-agnostic: it doesn't know what "sorted" means. The primitives and helpers here let stream-sorting build the above on top of select + sortedInsert.

Clone this wiki locally