diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index e531e65428ef6d..69c79448f2d74d 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -12,6 +12,7 @@ const { PromiseReject, PromiseResolve, PromiseWithResolvers, + Symbol, SymbolAsyncDispose, SymbolAsyncIterator, SymbolDispose, @@ -55,6 +56,8 @@ const { RingBuffer, } = require('internal/streams/iter/ringbuffer'); +const kNoFailReason = Symbol('kNoFailReason'); + // ============================================================================= // PushQueue - Internal Queue with Chunk-Based Backpressure // ============================================================================= @@ -317,14 +320,16 @@ class PushQueue { * No-op if errored or closed (fully drained). * If closing (draining), short-circuits the drain. */ - fail(reason) { + fail(reason = kNoFailReason) { if (this.#writerState === 'errored' || this.#writerState === 'closed') { return; } const wasClosing = this.#writerState === 'closing'; this.#writerState = 'errored'; - this.#error = reason ?? new ERR_INVALID_STATE('Failed'); + this.#error = reason === kNoFailReason ? + new ERR_INVALID_STATE('Failed') : + reason; this.#cleanup(); this.#rejectPendingReads(this.#error); this.#rejectPendingDrains(this.#error); @@ -413,7 +418,7 @@ class PushQueue { return { __proto__: null, value: undefined, done: true }; } - if (this.#writerState === 'errored' && this.#error) { + if (this.#writerState === 'errored') { throw this.#error; } @@ -482,7 +487,7 @@ class PushQueue { } else if (this.#writerState === 'closed') { const pending = this.#pendingReads.shift(); pending.resolve({ __proto__: null, value: undefined, done: true }); - } else if (this.#writerState === 'errored' && this.#error) { + } else if (this.#writerState === 'errored') { const pending = this.#pendingReads.shift(); pending.reject(this.#error); } else { @@ -659,7 +664,7 @@ class PushWriter { } fail(reason) { - this.#queue.fail(reason); + this.#queue.fail(arguments.length === 0 ? kNoFailReason : reason); } [SymbolAsyncDispose]() { diff --git a/test/parallel/test-stream-iter-push-writer.js b/test/parallel/test-stream-iter-push-writer.js index acac652b23941b..1bdbad091c80ab 100644 --- a/test/parallel/test-stream-iter-push-writer.js +++ b/test/parallel/test-stream-iter-push-writer.js @@ -323,6 +323,39 @@ async function testFailRejectsPendingRead() { ); } +async function testFailRejectsFutureReadWithFalsyReason() { + for (const reason of [0, null]) { + const { writer, readable } = push(); + + writer.fail(reason); + + const iter = readable[Symbol.asyncIterator](); + await iter.next().then( + common.mustNotCall(), + common.mustCall((rejection) => { + assert.strictEqual(rejection, reason); + }), + ); + } +} + +async function testFailRejectsPendingReadWithFalsyReason() { + const { writer, readable } = push(); + + const iter = readable[Symbol.asyncIterator](); + const readPromise = iter.next(); + + await new Promise(setImmediate); + + writer.fail(false); + await readPromise.then( + common.mustNotCall(), + common.mustCall((reason) => { + assert.strictEqual(reason, false); + }), + ); +} + // end() while writes are pending rejects those writes async function testEndRejectsPendingWrites() { const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' }); @@ -435,6 +468,8 @@ Promise.all([ testConsumerThrowRejectsWrites(), testEndResolvesPendingRead(), testFailRejectsPendingRead(), + testFailRejectsFutureReadWithFalsyReason(), + testFailRejectsPendingReadWithFalsyReason(), testEndRejectsPendingWrites(), testEndIdempotentWhenClosed(), testEndRejectsWhenErrored(),