Skip to content
Eugene Lazutkin edited this page May 19, 2026 · 2 revisions

filter(outputs, options)

Per-chunk subset broadcast. options.predicates is an array of predicates — one per output, same length as outputs. For each incoming chunk, every output whose predicate returns truthy receives the chunk; the slowest of the selected subset gates upstream. An all-true predicate set is equivalent to fork; an all-false predicate set drops the chunk.

filter is the most general of the three primitives: it generalizes both fork (all-true mask) and route (exactly-one mask).

Signature

declare function filter(outputs: Writable[], options: filter.FilterOptions): filter.FilterWritable;

Parameters

outputs

Non-empty array of downstream Writable streams. Throws TypeError if missing or empty.

options

Required. Passed through to the inner Writable super-constructor plus the custom properties below.

The default Writable mode is objectMode: true unless overridden.

predicates: Array<(chunk, encoding) => boolean>

Required. One predicate per output, in positional order — predicate at index i decides whether outputs[i] receives the chunk. Length must match outputs.length.

Throws TypeError at construction if predicates is missing, mis-sized, or contains non-functions.

ignoreErrors

Same semantics as in fork. When truthy, downstream errors are silently swallowed and the failing stream is dropped from the live outputs view.

Return value

A Writable with the same .outputs and .isEmpty() surface as fork. .outputs reflects the live set; .isEmpty() returns true when every downstream has failed.

Backpressure

Only the selected subset gates upstream. The slowest output among those whose predicate matched determines when the next chunk is accepted. If zero predicates match, the chunk is dropped and upstream is unblocked immediately. Dead downstreams are excluded from the subset before predicates are evaluated, so they never gate.

_final ends every downstream regardless of which predicates ever matched, and waits for all to acknowledge.

Error handling

Identical to fork, scoped to the selected subset: only an output whose predicate matched can surface an error per round (via the write-callback path); pre-write 'error' re-emission still applies to all live outputs in default mode.

Examples

Multi-channel mask

const filter = require('stream-fork/filter.js');

source.pipe(
  filter([auditSink, errorSink, allSink], {
    predicates: [
      log => log.audit,
      log => log.level === 'error',
      () => true // mirror everything to allSink
    ]
  })
);

A single error log with audit: true reaches all three sinks; a non-audit info log reaches only allSink.

Multiplex by tag (overlapping subsets)

const filter = require('stream-fork/filter.js');

source.pipe(
  filter([apiSink, dbSink, slowSink], {
    predicates: [
      span => span.kind === 'http',
      span => span.kind === 'db',
      span => span.durationMs > 1000 // overlap: a slow db span hits both dbSink and slowSink
    ]
  })
);

fork-equivalent (all-true mask)

const filter = require('stream-fork/filter.js');

// Functionally identical to fork([sinkA, sinkB]).
source.pipe(filter([sinkA, sinkB], {predicates: [() => true, () => true]}));

In practice, prefer fork directly — the predicate-array overhead is small but unnecessary when the dispatch shape is "every output, always".

route-equivalent (exactly-one mask)

const filter = require('stream-fork/filter.js');

// Mutually-exclusive predicates → equivalent to a route with a chunk%2-based pick.
source.pipe(
  filter([evenSink, oddSink], {
    predicates: [chunk => chunk % 2 === 0, chunk => chunk % 2 !== 0]
  })
);

In practice, prefer route when the dispatch shape is "exactly one" — the picker returns one index, which is cheaper than evaluating N predicates and is the more self-documenting choice for the reader.

Composition with stream-chain

const chain = require('stream-chain');
const filter = require('stream-fork/filter.js');

chain([source, normalize, filter([sinkA, sinkB], {predicates: [pA, pB]})]);

Clone this wiki locally