-
-
Notifications
You must be signed in to change notification settings - Fork 14
streamPuller
makeStreamPuller() wraps a Node Readable as a non-destructive async iterator. It's a thin helper over Node's built-in stream.iterator({destroyOnReturn: false}) — the convenience is in picking the right options for downstream consumers (stream-join, stream-sorting) that need original-error preservation and the ability to break out of iteration without destroying the source.
The example of usage:
import makeStreamPuller from 'stream-chain/utils/streamPuller.js';
// const {makeStreamPuller} = require('stream-chain/utils/streamPuller.js');The function accepts the following argument:
-
stream— a NodeReadable(any object mode). Listeners are attached immediately on call.
The function returns an AsyncIterableIterator<T> that implements the full async-iterator protocol:
-
next()— returns aPromise<{value, done}>. Resolves with{value: chunk, done: false}while items are flowing,{value: undefined, done: true}once the stream ends. Rejects with the original'error'value (verbatim, noAbortErrorwrapping) if the stream errors, or withError('Premature close')if the stream is destroyed without first emitting'end'or'error'. -
return(value?)— finalizes iteration without destroying the source stream. ReturnsPromise<{value, done: true}>. Automatically invoked byfor awaitonbreak,throw, or earlyreturn. The source remains usable afterwards — caller controls its lifecycle. -
[Symbol.asyncIterator]()— returnsthis. Lets the puller be used directly infor await (const v of puller).
The contract is exactly that of Node's stream.iterator({destroyOnReturn: false}); the wrapper exists so consumers don't have to remember the option name and so the API matches makeWebStreamPuller.
By default, breaking out of for await (const v of readable) calls iter.return(), which destroys the source stream. That's the right behavior for one-shot iteration. But for stream merging / sorting use cases, the consumer often wants to:
- Read partial input, decide to abandon early, and let the caller continue using the source for something else.
- Switch between multiple readers without tearing down the underlying stream.
- Re-acquire a new iterator over the same source later in the pipeline.
destroyOnReturn: false keeps the source alive; makeStreamPuller is the no-typo-required form.
import makeStreamPuller from 'stream-chain/utils/streamPuller.js';
// for-await idiom -- break leaves the source alive
const puller = makeStreamPuller(readable);
for await (const chunk of puller) {
if (shouldStop(chunk)) break;
process(chunk);
}
console.log(readable.destroyed); // false
// manual driving -- explicit next() / return() calls
const iter = makeStreamPuller(readable);
const first = await iter.next();
if (first.done) return;
process(first.value);
// ... decide to stop ...
await iter.return();import makeStreamPuller from 'stream-chain/utils/streamPuller.js';
const original = new Error('source failure');
readable.destroy(original);
const puller = makeStreamPuller(readable);
try {
for await (const v of puller) {
/* ... */
}
} catch (e) {
// e === original (no AbortError wrapping)
}-
asStream()— the inverse: wraps a function as aDuplex. -
makeWebStreamPuller()— Web Streams counterpart. -
highWaterMark— backpressure background.
Start here
API
Transducers
Adapters
I/O & helpers
Tuning & internals
Reference
stream-chain 2.x (legacy)