-
-
Notifications
You must be signed in to change notification settings - Fork 14
webStreamPuller
makeWebStreamPuller() wraps a Web Streams ReadableStream as a non-destructive async iterator with an explicit cancel(reason) extension. It's a thin wrapper over the native stream[Symbol.asyncIterator]({preventCancel: true}) — the wrapper adds cancel(reason) because the iterator-protocol return() can't carry a cancel reason cleanly (its argument is the iterator's completion value, not a cancel reason).
The example of usage:
import makeWebStreamPuller from 'stream-chain/utils/webStreamPuller.js';
// const {makeWebStreamPuller} = require('stream-chain/utils/webStreamPuller.js');The function accepts the following argument:
-
stream— a Web StreamsReadableStream. A reader is acquired immediately, locking the stream.
The function returns a WebStreamPuller<T> — an async iterator with one extra method:
-
next()— returns aPromise<{value, done}>. Resolves with{value: chunk, done: false}while items are flowing,{value: undefined, done: true}once the stream closes. Rejects with the stream's error reason if the stream errors. -
return(value?)— releases the reader lock without canceling the underlying stream. ReturnsPromise<{value, done: true}>. Automatically invoked byfor awaitonbreak,throw, or earlyreturn. The source stream remains uncanceled. -
cancel(reason?)— releases the lock and cancels the underlying stream withreason(which the source'scancelalgorithm receives). ReturnsPromise<unknown>(whatever the stream's cancel resolves to). Idempotent — a second call resolves toundefined. -
[Symbol.asyncIterator]()— returnsthis. Lets the puller be used infor await (const v of puller).
By default, breaking out of for await (const v of webReadable) calls iter.return() and triggers cancel on the underlying stream. For stream-merging / multi-pass use cases the consumer often wants to release the lock without telling the upstream "I'm done forever" — leaving the source available for another consumer downstream of the chain.
The native stream[Symbol.asyncIterator]({preventCancel: true}) already provides this. makeWebStreamPuller is the no-typo-required form plus the cancel(reason) convenience.
Two distinct intents:
-
return()— "I'm done iterating; release the lock; leave the source alive for someone else." The protocol-standard cleanup hook. Fired automatically byfor await. -
cancel(reason)— "I want the source to know consumers gave up; tell it to cancel with this reason." Not part of the iterator protocol — exposed as a separate method becausereturn(value)'s argument is the completion value, not a cancel reason. Equivalent to callingiter.return()thenstream.cancel(reason)manually.
import makeWebStreamPuller from 'stream-chain/utils/webStreamPuller.js';
// for-await — break releases the lock but doesn't cancel
const puller = makeWebStreamPuller(readable);
for await (const chunk of puller) {
if (shouldStop(chunk)) break;
process(chunk);
}
console.log(readable.locked); // false (lock released by return())
// readable is still in its pre-iteration state — caller can do more with it
// Explicit cancel with a reason
const puller2 = makeWebStreamPuller(readable);
await puller2.next();
await puller2.cancel(new Error('user aborted'));
// underlying source's `cancel` callback received the Error-
asWebStream()— the inverse: wraps a function as a Web Streams duplex pair. -
makeStreamPuller()— Node Streams counterpart. -
highWaterMark— backpressure background.
Start here
API
Transducers
Adapters
I/O & helpers
Tuning & internals
Reference
stream-chain 2.x (legacy)