-
Notifications
You must be signed in to change notification settings - Fork 2
filter
Per-chunk subset broadcast. options.predicates is an array of predicates — one per output, same length as outputs. For each incoming chunk, every output whose predicate returns truthy receives the chunk; the slowest of the selected subset gates upstream. An all-true predicate set is equivalent to fork; an all-false predicate set drops the chunk.
filter is the most general of the three primitives: it generalizes both fork (all-true mask) and route (exactly-one mask).
declare function filter(outputs: Writable[], options: filter.FilterOptions): filter.FilterWritable;declare function filter(outputs: WritableStream[], options: filter.FilterOptions): filter.FilterWritable;Non-empty array of downstream sinks. Throws TypeError if missing or empty.
Required. On Node, passed through to the inner Writable super-constructor plus the custom properties below. On Web, accepts {predicates, ignoreErrors, queuingStrategy}.
Required. One predicate per output, in positional order — predicate at index i decides whether outputs[i] receives the chunk. Length must match outputs.length.
The encoding argument is passed only on the Node side (Web Streams don't carry an encoding hint).
Throws TypeError at construction if predicates is missing, mis-sized, or contains non-functions.
Same semantics as in fork. When truthy, downstream errors are silently swallowed and the failing stream is dropped from the live outputs view.
A sink with the same .outputs and .isEmpty() surface as fork. .outputs reflects the live set; .isEmpty() returns true when every downstream has failed.
Only the selected subset gates upstream. The slowest output among those whose predicate matched determines when the next chunk is accepted. If zero predicates match, the chunk is dropped and upstream is unblocked immediately. Dead downstreams are excluded from the subset before predicates are evaluated, so they never gate.
End-of-stream ends every downstream regardless of which predicates ever matched, and waits for all to acknowledge.
Identical to fork, scoped to the selected subset: only an output whose predicate matched can surface an error per round (via the write-rejection path). On Node, pre-write 'error' re-emission still applies to all live outputs in default mode.
import filter from 'stream-fork/filter.js';
source.pipe(
filter([auditSink, errorSink, allSink], {
predicates: [
log => log.audit,
log => log.level === 'error',
() => true // mirror everything to allSink
]
})
);A single error log with audit: true reaches all three sinks; a non-audit info log reaches only allSink.
import filter from 'stream-fork/web/filter.js';
await source.pipeTo(
filter([auditSink, errorSink, allSink], {
predicates: [
log => log.audit,
log => log.level === 'error',
() => true
]
})
);import filter from 'stream-fork/filter.js';
source.pipe(
filter([apiSink, dbSink, slowSink], {
predicates: [
span => span.kind === 'http',
span => span.kind === 'db',
span => span.durationMs > 1000 // overlap: a slow db span hits both dbSink and slowSink
]
})
);import filter from 'stream-fork/filter.js';
// Functionally identical to fork([sinkA, sinkB]).
source.pipe(filter([sinkA, sinkB], {predicates: [() => true, () => true]}));In practice, prefer fork directly — the predicate-array overhead is small but unnecessary when the dispatch shape is "every output, always".
import filter from 'stream-fork/filter.js';
// Mutually-exclusive predicates → equivalent to a route with a chunk%2-based pick.
source.pipe(
filter([evenSink, oddSink], {
predicates: [chunk => chunk % 2 === 0, chunk => chunk % 2 !== 0]
})
);In practice, prefer route when the dispatch shape is "exactly one" — the picker returns one index, which is cheaper than evaluating N predicates and is the more self-documenting choice for the reader.
import chain from 'stream-chain';
import filter from 'stream-fork/filter.js';
chain([source, normalize, filter([sinkA, sinkB], {predicates: [pA, pB]})]);