diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index 26370769e..aa53894a3 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -13,9 +13,6 @@ class TransformStream { constructor(transformer = {}, writableStrategy = undefined, readableStrategy = undefined) { this._transformer = transformer; - this._errored = false; - this._storedError = undefined; - this._writableController = undefined; this._readableController = undefined; this._transformStreamController = undefined; @@ -48,13 +45,6 @@ class TransformStream { const startResult = InvokeOrNoop(transformer, 'start', [transformStream._transformStreamController]); startPromise_resolve(startResult); - startPromise.catch(e => { - // The underlyingSink and underlyingSource will error the readable and writable ends on their own. - if (transformStream._errored === false) { - transformStream._errored = true; - transformStream._storedError = e; - } - }); } get readable() { @@ -91,19 +81,14 @@ function IsTransformStream(x) { function TransformStreamCloseReadable(transformStream) { // console.log('TransformStreamCloseReadable()'); - if (transformStream._errored === true) { - throw new TypeError('TransformStream is already errored'); - } - if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === false) { - throw new TypeError('Readable side is already closed'); + throw new TypeError('Readable side is not in a state that can be closed'); } TransformStreamCloseReadableInternal(transformStream); } function TransformStreamCloseReadableInternal(transformStream) { - assert(transformStream._errored === false); assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === true); try { @@ -122,12 +107,8 @@ function TransformStreamDefaultTransform(chunk, transformStreamController) { function TransformStreamEnqueueToReadable(transformStream, chunk) { // console.log('TransformStreamEnqueueToReadable()'); - if (transformStream._errored === true) { - throw new TypeError('TransformStream is already errored'); - } - if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === false) { - throw new TypeError('Readable side is already closed'); + throw new TypeError('Readable side is not in a state that permits enqueue'); } // We throttle transformer.transform invocation based on the backpressure of the ReadableStream, but we still @@ -139,9 +120,9 @@ function TransformStreamEnqueueToReadable(transformStream, chunk) { ReadableStreamDefaultControllerEnqueue(controller, chunk); } catch (e) { // This happens when readableStrategy.size() throws. - TransformStreamErrorIfNeeded(transformStream, e); + TransformStreamError(transformStream, e); - throw transformStream._storedError; + throw transformStream._readable._storedError; } const backpressure = ReadableStreamDefaultControllerHasBackpressure(controller); @@ -150,27 +131,9 @@ function TransformStreamEnqueueToReadable(transformStream, chunk) { } } +// This is a no-op if both sides are already errored. function TransformStreamError(transformStream, e) { - if (transformStream._errored === true) { - throw new TypeError('TransformStream is already errored'); - } - - TransformStreamErrorInternal(transformStream, e); -} - -function TransformStreamErrorIfNeeded(transformStream, e) { - if (transformStream._errored === false) { - TransformStreamErrorInternal(transformStream, e); - } -} - -function TransformStreamErrorInternal(transformStream, e) { - // console.log('TransformStreamErrorInternal()'); - - assert(transformStream._errored === false); - - transformStream._errored = true; - transformStream._storedError = e; + // console.log('TransformStreamError()'); WritableStreamDefaultControllerErrorIfNeeded(transformStream._writableController, e); if (transformStream._readable._state === 'readable') { @@ -211,7 +174,7 @@ function TransformStreamSetBackpressure(transformStream, backpressure) { function TransformStreamTransform(transformStream, chunk) { // console.log('TransformStreamTransform()'); - assert(transformStream._errored === false); + assert(transformStream._readable._state !== 'errored'); assert(transformStream._backpressure === false); const transformer = transformStream._transformer; @@ -223,7 +186,7 @@ function TransformStreamTransform(transformStream, chunk) { return transformPromise.then( undefined, e => { - TransformStreamErrorIfNeeded(transformStream, e); + TransformStreamError(transformStream, e); return Promise.reject(e); }); } @@ -277,7 +240,11 @@ class TransformStreamDefaultController { throw defaultControllerBrandCheckException('error'); } - TransformStreamError(this._controlledTransformStream, reason); + if (this._controlledTransformStream._readable._state !== 'readable') { + throw new TypeError('TransformStream is not in a state that can be errored'); + } + + TransformStreamDefaultControllerError(this, reason); } } @@ -295,6 +262,14 @@ function IsTransformStreamDefaultController(x) { return true; } +function TransformStreamDefaultControllerError(controller, e) { + const transformStream = controller._controlledTransformStream; + + assert(transformStream._readable._state === 'readable', 'stream.[[readable]].[[state]] is "readable"'); + + TransformStreamError(transformStream, e); +} + // Class TransformStreamDefaultSink class TransformStreamDefaultSink { @@ -328,7 +303,7 @@ class TransformStreamDefaultSink { const transformStream = this._transformStream; // abort() is not called synchronously, so it is possible for abort() to be called when the stream is already // errored. - TransformStreamErrorIfNeeded(transformStream, new TypeError('Writable side aborted')); + TransformStreamError(transformStream, new TypeError('Writable side aborted')); } close() { @@ -340,16 +315,16 @@ class TransformStreamDefaultSink { 'flush', [transformStream._transformStreamController]); // Return a promise that is fulfilled with undefined on success. return flushPromise.then(() => { - if (transformStream._errored === true) { - return Promise.reject(transformStream._storedError); + if (transformStream._readable._state === 'errored') { + return Promise.reject(transformStream._readable._storedError); } if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === true) { TransformStreamCloseReadableInternal(transformStream); } return Promise.resolve(); }).catch(r => { - TransformStreamErrorIfNeeded(transformStream, r); - return Promise.reject(transformStream._storedError); + TransformStreamError(transformStream, r); + return Promise.reject(transformStream._readable._storedError); }); } } @@ -389,7 +364,7 @@ class TransformStreamDefaultSource { cancel(reason) { const transformStream = this._transformStream; - TransformStreamErrorInternal(transformStream, reason); + TransformStreamError(transformStream, reason); } }