From 8a1ce3965f26e5863575540596e14a3b8f63cafd Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Thu, 6 Apr 2017 17:38:55 +0900 Subject: [PATCH 1/4] Fix pipeTo() to ensure all read chunks are written Previously, the spec wording allowed for immediate shutdown if the readable stream became closed, even if there were outstanding writes which did not yet get written. This fixes that, by requiring any already-read chunks to be written when possible. The reference implementation mostly did not suffer from this, but did in the case where a read completes while a write is ongoing. This was fixed by making its "wait for writes to finish" functionality more robust. Fixes #644. --- index.bs | 19 +++++++++++++------ .../lib/readable-stream.js | 14 ++++++++++++-- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/index.bs b/index.bs index 3b92c5b98..84dbba38b 100644 --- a/index.bs +++ b/index.bs @@ -649,10 +649,10 @@ ReadableStream(underlyingSource = {}, { size, highWat complete before proceeding to the next read/write operation violates this recommendation. In doing so, such an implementation makes the internal queue of _dest_ useless, as it ensures _dest_ always contains at most one queued chunk.

- * Shutdown must stop all activity: if _shuttingDown_ becomes *true*, the user agent must not - initiate further reads from _reader_ or writes to _writer_. (Ongoing reads and writes may finish.) In particular, - the user agent must check the below conditions on *this*.[[state]] and _dest_.[[state]] before performing any - reads or writes, since they might lead to immediate shutdown. + * Shutdown must stop activity: if _shuttingDown_ becomes *true*, the user agent must not + initiate further reads from _reader_, and must only perform writes of already-read chunks, as described + below. In particular, the user agent must check the below conditions before performing any reads or writes, + since they might lead to immediate shutdown. * Errors must be propagated forward: if *this*.[[state]] is or becomes `"errored"`, then 1. If _preventAbort_ is *false*, shutdown with an action of ! WritableStreamAbort(_dest_, *this*.[[storedError]]) and with *this*.[[storedError]]. @@ -667,15 +667,19 @@ ReadableStream(underlyingSource = {}, { size, highWat 1. Otherwise, shutdown. * Closing must be propagated backward: if ! WritableStreamCloseQueuedOrInFlight(_dest_) is *true* or _dest_.[[state]] is `"closed"`, then + 1. Assert: no chunks have been read or written. 1. Let _destClosed_ be a new *TypeError*. 1. If _preventCancel_ is *false*, shutdown with an action of ! ReadableStreamCancel(*this*, _destClosed_) and with _destClosed_. 1. Otherwise, shutdown with _destClosed_. * Shutdown with an action: if any of the above requirements ask to shutdown with an action _action_, optionally with an error _originalError_, then: + 1. If _dest_.[[state]] is `"writable"` and ! WritableStreamCloseQueuedOrInFlight(dest) is *false*, + 1. If any chunks have been read but not yet written, write them to _dest_. + 1. Wait until every chunk that has been read has been written (i.e. the corresponding promises have + settled). 1. If _shuttingDown_ is *true*, abort these substeps. 1. Set _shuttingDown_ to *true*. - 1. Wait until any ongoing write finishes (i.e. the corresponding promises settle). 1. Let _p_ be the result of performing _action_. 1. Upon fulfillment of _p_, finalize, passing along _originalError_ if it was given. @@ -683,9 +687,12 @@ ReadableStream(underlyingSource = {}, { size, highWat _newError_. * Shutdown: if any of the above requirements or steps ask to shutdown, optionally with an error _error_, then: + 1. If _dest_.[[state]] is `"writable"` and ! WritableStreamCloseQueuedOrInFlight(dest) is *false*, + 1. If any chunks have been read but not yet written, write them to _dest_. + 1. Wait until every chunk that has been read has been written (i.e. the corresponding promises have + settled). 1. If _shuttingDown_ is *true*, abort these substeps. 1. Set _shuttingDown_ to *true*. - 1. Wait until any ongoing write finishes (i.e. the corresponding promises settle). 1. Finalize, passing along _error_ if it was given. * Finalize: both forms of shutdown will eventually ask to finalize, optionally with an error _error_, which means to perform the following steps: diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index b82f64eaf..89075318a 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -187,6 +187,16 @@ class ReadableStream { rethrowAssertionErrorRejection(err); }); + function waitForWritesToFinish() { + if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { + // Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait + // for that too. + const before = currentWrite; + return currentWrite.then(() => before !== currentWrite ? waitForWritesToFinish() : undefined); + } + return Promise.resolve(); + } + function isOrBecomesErrored(stream, promise, action) { if (stream._state === 'errored') { action(stream._storedError); @@ -209,7 +219,7 @@ class ReadableStream { } shuttingDown = true; - currentWrite.then(() => { + waitForWritesToFinish().then(() => { return action().then( () => finalize(originalIsError, originalError), newError => finalize(true, newError) @@ -224,7 +234,7 @@ class ReadableStream { } shuttingDown = true; - currentWrite.then(() => { + waitForWritesToFinish().then(() => { finalize(isError, error); }) .catch(rethrowAssertionErrorRejection); From 003b35e71137adf30c8c8bac93a2d30734db2c84 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Thu, 6 Apr 2017 20:16:45 +0900 Subject: [PATCH 2/4] Make variable name better --- reference-implementation/lib/readable-stream.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 89075318a..d0ff220b2 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -191,8 +191,8 @@ class ReadableStream { if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { // Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait // for that too. - const before = currentWrite; - return currentWrite.then(() => before !== currentWrite ? waitForWritesToFinish() : undefined); + const oldCurrentWrite = currentWrite; + return currentWrite.then(() => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined); } return Promise.resolve(); } From e22b3d81aebc721b72ce27f65f6607f9a8f97b06 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Thu, 6 Apr 2017 21:13:07 +0900 Subject: [PATCH 3/4] Move the ref impl checking logic out so that finalization runs sync Per https://github.com/whatwg/streams/issues/727. --- .../lib/readable-stream.js | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index d0ff220b2..5b28ebb24 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -188,13 +188,10 @@ class ReadableStream { }); function waitForWritesToFinish() { - if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { - // Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait - // for that too. - const oldCurrentWrite = currentWrite; - return currentWrite.then(() => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined); - } - return Promise.resolve(); + // Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait + // for that too. + const oldCurrentWrite = currentWrite; + return currentWrite.then(() => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined); } function isOrBecomesErrored(stream, promise, action) { @@ -219,13 +216,19 @@ class ReadableStream { } shuttingDown = true; - waitForWritesToFinish().then(() => { - return action().then( + if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { + waitForWritesToFinish().then(doTheRest); + } else { + doTheRest(); + } + + function doTheRest() { + action().then( () => finalize(originalIsError, originalError), newError => finalize(true, newError) - ); - }) - .catch(rethrowAssertionErrorRejection); + ) + .catch(rethrowAssertionErrorRejection); + } } function shutdown(isError, error) { @@ -234,10 +237,11 @@ class ReadableStream { } shuttingDown = true; - waitForWritesToFinish().then(() => { + if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { + waitForWritesToFinish().then(() => finalize(isError, error)).catch(rethrowAssertionErrorRejection); + } else { finalize(isError, error); - }) - .catch(rethrowAssertionErrorRejection); + } } function finalize(isError, error) { From 62a5c60a3d82de48432b0664dd8b23a1d247b047 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Thu, 6 Apr 2017 22:18:48 +0900 Subject: [PATCH 4/4] Roll WPT --- reference-implementation/web-platform-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reference-implementation/web-platform-tests b/reference-implementation/web-platform-tests index 0444344a4..1dedb2af8 160000 --- a/reference-implementation/web-platform-tests +++ b/reference-implementation/web-platform-tests @@ -1 +1 @@ -Subproject commit 0444344a4ace9dba478899c92de5a1d6d561cc54 +Subproject commit 1dedb2af8a1485a9338cbf85fc34048cf0a441bd