diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index 708f44e16..26370769e 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -5,7 +5,7 @@ const { ReadableStream, ReadableStreamDefaultControllerClose, ReadableStreamDefa ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerGetDesiredSize, ReadableStreamDefaultControllerHasBackpressure, ReadableStreamDefaultControllerCanCloseOrEnqueue } = require('./readable-stream.js'); -const { WritableStream, WritableStreamDefaultControllerError } = require('./writable-stream.js'); +const { WritableStream, WritableStreamDefaultControllerErrorIfNeeded } = require('./writable-stream.js'); // Class TransformStream @@ -20,8 +20,6 @@ class TransformStream { this._readableController = undefined; this._transformStreamController = undefined; - this._writableDone = false; - this._backpressure = undefined; this._backpressureChangePromise = undefined; this._backpressureChangePromise_resolve = undefined; @@ -174,9 +172,7 @@ function TransformStreamErrorInternal(transformStream, e) { transformStream._errored = true; transformStream._storedError = e; - if (transformStream._writableDone === false) { - WritableStreamDefaultControllerError(transformStream._writableController, e); - } + WritableStreamDefaultControllerErrorIfNeeded(transformStream._writableController, e); if (transformStream._readable._state === 'readable') { ReadableStreamDefaultControllerError(transformStream._readableController, e); } @@ -330,8 +326,9 @@ class TransformStreamDefaultSink { abort() { const transformStream = this._transformStream; - transformStream._writableDone = true; - TransformStreamErrorInternal(transformStream, new TypeError('Writable side aborted')); + // abort() is not called synchronously, so it is possible for abort() to be called when the stream is already + // errored. + TransformStreamErrorIfNeeded(transformStream, new TypeError('Writable side aborted')); } close() { @@ -339,8 +336,6 @@ class TransformStreamDefaultSink { const transformStream = this._transformStream; - transformStream._writableDone = true; - const flushPromise = PromiseInvokeOrNoop(transformStream._transformer, 'flush', [transformStream._transformStreamController]); // Return a promise that is fulfilled with undefined on success. diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index 5124c4d73..d8bed6494 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -90,7 +90,7 @@ module.exports = { IsWritableStreamLocked, WritableStream, WritableStreamAbort, - WritableStreamDefaultControllerError, + WritableStreamDefaultControllerErrorIfNeeded, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite, diff --git a/reference-implementation/to-upstream-wpts/transform-streams/errors.js b/reference-implementation/to-upstream-wpts/transform-streams/errors.js index c865a686d..5df3b2de2 100644 --- a/reference-implementation/to-upstream-wpts/transform-streams/errors.js +++ b/reference-implementation/to-upstream-wpts/transform-streams/errors.js @@ -195,4 +195,41 @@ promise_test(t => { ]); }, 'an exception from transform() should error the stream if close has been requested but not completed'); +promise_test(t => { + const ts = new TransformStream(); + const writer = ts.writable.getWriter(); + // The microtask following transformer.start() hasn't completed yet, so the abort is queued and not notified to the + // TransformStream yet. + const abortPromise = writer.abort(thrownError); + const cancelPromise = ts.readable.cancel(new Error('cancel reason')); + return Promise.all([ + abortPromise, + cancelPromise, + promise_rejects(t, new TypeError(), writer.closed, 'writer.closed should reject with a TypeError')]); +}, 'abort should set the close reason for the writable when it happens first during start'); + +promise_test(t => { + let resolveTransform; + const transformPromise = new Promise(resolve => { + resolveTransform = resolve; + }); + const ts = new TransformStream({ + transform() { + return transformPromise; + } + }, { highWaterMark: 2 }); + const writer = ts.writable.getWriter(); + return delay(0).then(() => { + const writePromise = writer.write(); + const abortPromise = writer.abort(thrownError); + const cancelPromise = ts.readable.cancel(new Error('cancel reason')); + resolveTransform(); + return Promise.all([ + writePromise, + abortPromise, + cancelPromise, + promise_rejects(t, new TypeError(), writer.closed, 'writer.closed should reject with a TypeError')]); + }); +}, 'abort should set the close reason for the writable when it happens first during underlying sink write'); + done();