From 07e31941a10e98b067a60cfe09ee0db6fdad1c1e Mon Sep 17 00:00:00 2001 From: Adam Rice Date: Tue, 29 Aug 2017 22:54:53 +0900 Subject: [PATCH 1/2] Remove TransformStream _readableClosed member Instead inspect the state of the ReadableStream directly using a new abstract operation, ReadableStreamDefaultControllerCanCloseOrEnqueue operation. This new operation is also used by the ReadableStream implementation in place of the equivalent comparisons. This avoids redundant state tracking. Also allow erroring a TransformStream after close is requested Previously, TransformStream would not permit erroring the readable side after controller.close() was called. Change it to be consistent with ReadableStream so that erroring is still possible as long as all enqueued chunks have not yet been read. Also add tests for the new behaviour. --- index.bs | 28 ++++++------ .../lib/readable-stream.js | 43 ++++++++----------- .../lib/transform-stream.js | 26 +++++------ .../transform-streams/errors.js | 36 +++++++++++++++- 4 files changed, 79 insertions(+), 54 deletions(-) diff --git a/index.bs b/index.bs index ecf780da4..d7b270d4d 100644 --- a/index.bs +++ b/index.bs @@ -1622,8 +1622,7 @@ desiredSize 1. If ! IsReadableStreamDefaultController(*this*) is *false*, throw a *TypeError* exception. - 1. If *this*.[[closeRequested]] is *true*, throw a *TypeError* exception. - 1. If *this*.[[controlledReadableStream]].[[state]] is not `"readable"`, throw a *TypeError* exception. + 1. If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(*this*) is *false*, throw a *TypeError* exception. 1. Perform ! ReadableStreamDefaultControllerClose(*this*). @@ -1635,8 +1634,7 @@ desiredSize 1. If ! IsReadableStreamDefaultController(*this*) is *false*, throw a *TypeError* exception. - 1. If *this*.[[closeRequested]] is *true*, throw a *TypeError* exception. - 1. If *this*.[[controlledReadableStream]].[[state]] is not `"readable"`, throw a *TypeError* exception. + 1. If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(*this*) is *false*, throw a *TypeError* exception. 1. Return ? ReadableStreamDefaultControllerEnqueue(*this*, _chunk_). @@ -1717,8 +1715,7 @@ nothrow>ReadableStreamDefaultControllerShouldCallPull ( controller )< 1. Let _stream_ be _controller_.[[controlledReadableStream]]. - 1. If _stream_.[[state]] is `"closed"` or _stream_.[[state]] is `"errored"`, return *false*. - 1. If _controller_.[[closeRequested]] is *true*, return *false*. + 1. If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(_controller_) is *false*, return *false*. 1. If _controller_.[[started]] is *false*, return *false*. 1. If ! IsReadableStreamLocked(_stream_) is *true* and ! ReadableStreamGetNumReadRequests(_stream_) > *0*, return *true*. @@ -1736,8 +1733,7 @@ this to streams they did not create, and must ensure they have obeyed the precon 1. Let _stream_ be _controller_.[[controlledReadableStream]]. - 1. Assert: _controller_.[[closeRequested]] is *false*. - 1. Assert: _stream_.[[state]] is `"readable"`. + 1. Assert: ! ReadableStreamDefaultControllerCanCloseOrEnqueue(_controller_) is *true*. 1. Set _controller_.[[closeRequested]] to *true*. 1. If _controller_.[[queue]] is empty, perform ! ReadableStreamClose(_stream_). @@ -1752,8 +1748,7 @@ asserts). 1. Let _stream_ be _controller_.[[controlledReadableStream]]. - 1. Assert: _controller_.[[closeRequested]] is *false*. - 1. Assert: _stream_.[[state]] is `"readable"`. + 1. Assert: ! ReadableStreamDefaultControllerCanCloseOrEnqueue(_controller_) is *true*. 1. If ! IsReadableStreamLocked(_stream_) is *true* and ! ReadableStreamGetNumReadRequests(_stream_) > *0*, perform ! ReadableStreamFulfillReadRequest(_stream_, _chunk_, *false*). 1. Otherwise, @@ -1821,15 +1816,22 @@ Specifications should not use this on streams they did not create.

ReadableStreamDefaultControllerHasBackpressure ( controller )

-
- This method is used in the implementation of TransformStream. -
+This abstract operation is used in the implementation of TransformStream. 1. If ! ReadableStreamDefaultControllerShouldCallPull(_controller_) is *true*, return *false*. 1. Otherwise, return *true*. +

ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller )

+ + + 1. Let _state_ be _controller_.[[controlledReadableStream]].[[state]]. + 1. If _controller_.[[closeRequested]] is *false* and _state_ is `"readable"`, return *true*. + 1. Otherwise, return *false*. + +

Class ReadableByteStreamController

diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index e2d2a013d..8040b57f0 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -275,7 +275,8 @@ module.exports = { ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerGetDesiredSize, - ReadableStreamDefaultControllerHasBackpressure + ReadableStreamDefaultControllerHasBackpressure, + ReadableStreamDefaultControllerCanCloseOrEnqueue }; // Abstract operations for the ReadableStream. @@ -919,13 +920,8 @@ class ReadableStreamDefaultController { throw defaultControllerBrandCheckException('close'); } - if (this._closeRequested === true) { - throw new TypeError('The stream has already been closed; do not close it again!'); - } - - const state = this._controlledReadableStream._state; - if (state !== 'readable') { - throw new TypeError(`The stream (in ${state} state) is not in the readable state and cannot be closed`); + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) { + throw new TypeError('The stream is not in a state that can be closed'); } ReadableStreamDefaultControllerClose(this); @@ -936,13 +932,8 @@ class ReadableStreamDefaultController { throw defaultControllerBrandCheckException('enqueue'); } - if (this._closeRequested === true) { - throw new TypeError('stream is closed or draining'); - } - - const state = this._controlledReadableStream._state; - if (state !== 'readable') { - throw new TypeError(`The stream (in ${state} state) is not in the readable state and cannot be enqueued to`); + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) { + throw new TypeError('The stream is not in a state that permits enqueue'); } return ReadableStreamDefaultControllerEnqueue(this, chunk); @@ -1039,11 +1030,7 @@ function ReadableStreamDefaultControllerCallPullIfNeeded(controller) { function ReadableStreamDefaultControllerShouldCallPull(controller) { const stream = controller._controlledReadableStream; - if (stream._state === 'closed' || stream._state === 'errored') { - return false; - } - - if (controller._closeRequested === true) { + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) { return false; } @@ -1068,8 +1055,7 @@ function ReadableStreamDefaultControllerShouldCallPull(controller) { function ReadableStreamDefaultControllerClose(controller) { const stream = controller._controlledReadableStream; - assert(controller._closeRequested === false); - assert(stream._state === 'readable'); + assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === true); controller._closeRequested = true; @@ -1081,8 +1067,7 @@ function ReadableStreamDefaultControllerClose(controller) { function ReadableStreamDefaultControllerEnqueue(controller, chunk) { const stream = controller._controlledReadableStream; - assert(controller._closeRequested === false); - assert(stream._state === 'readable'); + assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === true); if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) { ReadableStreamFulfillReadRequest(stream, chunk, false); @@ -1151,6 +1136,16 @@ function ReadableStreamDefaultControllerHasBackpressure(controller) { return true; } +function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) { + const state = controller._controlledReadableStream._state; + + if (controller._closeRequested === false && state === 'readable') { + return true; + } + + return false; +} + class ReadableStreamBYOBRequest { constructor(controller, view) { if (IsReadableByteStreamController(controller) === false) { diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index a1d7dfa85..708f44e16 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -1,10 +1,10 @@ 'use strict'; const assert = require('assert'); const { InvokeOrNoop, PromiseInvokeOrPerformFallback, PromiseInvokeOrNoop, typeIsObject } = require('./helpers.js'); -const { ReadableStream, ReadableStreamDefaultControllerClose, - ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError, - ReadableStreamDefaultControllerGetDesiredSize, - ReadableStreamDefaultControllerHasBackpressure } = require('./readable-stream.js'); +const { ReadableStream, ReadableStreamDefaultControllerClose, ReadableStreamDefaultControllerEnqueue, + ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerGetDesiredSize, + ReadableStreamDefaultControllerHasBackpressure, + ReadableStreamDefaultControllerCanCloseOrEnqueue } = require('./readable-stream.js'); const { WritableStream, WritableStreamDefaultControllerError } = require('./writable-stream.js'); // Class TransformStream @@ -21,7 +21,6 @@ class TransformStream { this._transformStreamController = undefined; this._writableDone = false; - this._readableClosed = false; this._backpressure = undefined; this._backpressureChangePromise = undefined; @@ -98,7 +97,7 @@ function TransformStreamCloseReadable(transformStream) { throw new TypeError('TransformStream is already errored'); } - if (transformStream._readableClosed === true) { + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === false) { throw new TypeError('Readable side is already closed'); } @@ -107,15 +106,13 @@ function TransformStreamCloseReadable(transformStream) { function TransformStreamCloseReadableInternal(transformStream) { assert(transformStream._errored === false); - assert(transformStream._readableClosed === false); + assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === true); try { ReadableStreamDefaultControllerClose(transformStream._readableController); } catch (e) { assert(false); } - - transformStream._readableClosed = true; } function TransformStreamDefaultTransform(chunk, transformStreamController) { @@ -131,7 +128,7 @@ function TransformStreamEnqueueToReadable(transformStream, chunk) { throw new TypeError('TransformStream is already errored'); } - if (transformStream._readableClosed === true) { + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === false) { throw new TypeError('Readable side is already closed'); } @@ -144,8 +141,6 @@ function TransformStreamEnqueueToReadable(transformStream, chunk) { ReadableStreamDefaultControllerEnqueue(controller, chunk); } catch (e) { // This happens when readableStrategy.size() throws. - // The ReadableStream has already errored itself. - transformStream._readableClosed = true; TransformStreamErrorIfNeeded(transformStream, e); throw transformStream._storedError; @@ -182,7 +177,7 @@ function TransformStreamErrorInternal(transformStream, e) { if (transformStream._writableDone === false) { WritableStreamDefaultControllerError(transformStream._writableController, e); } - if (transformStream._readableClosed === false) { + if (transformStream._readable._state === 'readable') { ReadableStreamDefaultControllerError(transformStream._readableController, e); } if (transformStream._backpressure === true) { @@ -290,7 +285,7 @@ class TransformStreamDefaultController { } } -// Writable Stream Default Controller Abstract Operations +// Transform Stream Default Controller Abstract Operations function IsTransformStreamDefaultController(x) { if (!typeIsObject(x)) { @@ -353,7 +348,7 @@ class TransformStreamDefaultSink { if (transformStream._errored === true) { return Promise.reject(transformStream._storedError); } - if (transformStream._readableClosed === false) { + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === true) { TransformStreamCloseReadableInternal(transformStream); } return Promise.resolve(); @@ -399,7 +394,6 @@ class TransformStreamDefaultSource { cancel(reason) { const transformStream = this._transformStream; - transformStream._readableClosed = true; TransformStreamErrorInternal(transformStream, reason); } } diff --git a/reference-implementation/to-upstream-wpts/transform-streams/errors.js b/reference-implementation/to-upstream-wpts/transform-streams/errors.js index f208da51c..c865a686d 100644 --- a/reference-implementation/to-upstream-wpts/transform-streams/errors.js +++ b/reference-implementation/to-upstream-wpts/transform-streams/errors.js @@ -151,7 +151,7 @@ test(() => { }, undefined, strategy), 'the first error should be thrown'); }, 'when strategy.size calls controller.error() then throws, the constructor should throw the first error'); -test(t => { +promise_test(t => { const ts = new TransformStream(); const writer = ts.writable.getWriter(); const closedPromise = writer.closed; @@ -161,4 +161,38 @@ test(t => { ]); }, 'cancelling the readable side should error the writable'); +promise_test(t => { + let controller; + const ts = new TransformStream({ + start(c) { + controller = c; + } + }); + const writer = ts.writable.getWriter(); + const reader = ts.readable.getReader(); + const writePromise = writer.write('a'); + const closePromise = writer.close(); + controller.error(thrownError); + return Promise.all([ + promise_rejects(t, thrownError, reader.closed, 'reader.closed should reject'), + promise_rejects(t, thrownError, writePromise, 'writePromise should reject'), + promise_rejects(t, thrownError, closePromise, 'closePromise should reject')]); +}, 'it should be possible to error the readable between close requested and complete'); + +promise_test(t => { + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + controller.close(); + throw thrownError; + } + }); + const writePromise = ts.writable.getWriter().write('a'); + const closedPromise = ts.readable.getReader().closed; + return Promise.all([ + promise_rejects(t, thrownError, writePromise, 'write() should reject'), + promise_rejects(t, thrownError, closedPromise, 'reader.closed should reject') + ]); +}, 'an exception from transform() should error the stream if close has been requested but not completed'); + done(); From 9864d13f17e51cd737519185b6022c3c7d8bd686 Mon Sep 17 00:00:00 2001 From: Adam Rice Date: Thu, 7 Sep 2017 20:11:51 +0900 Subject: [PATCH 2/2] Fix exception message --- reference-implementation/lib/readable-stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 8040b57f0..6377e14f8 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -921,7 +921,7 @@ class ReadableStreamDefaultController { } if (ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) { - throw new TypeError('The stream is not in a state that can be closed'); + throw new TypeError('The stream is not in a state that permits close'); } ReadableStreamDefaultControllerClose(this);