Skip to content

Commit

Permalink
Fix interaction between writable.abort() and readable.cancel()
Browse files Browse the repository at this point in the history
If writable.abort() is called when the WritableStream is waiting for
another operation to complete, and then readable.cancel() is called
before the queued abort is processed, an assert() would happen due to
trying to error the TransformStream twice.

Change TransformStreamDefaultSink's abort() method to use
TransformStreamErrorIfNeeded to correctly handle this case. Also add
tests for this case, and to confirm that it is the call to abort() that
errors the writable in this case.

Also remove the [[writableDone]] slot. It was only used to avoid calling
WritableStreamDefaultControllerError when the stream wasn't
writable. This is exactly the semantics that
WritableStreamDefaultControllerErrorIfNeeded has, so use that instead.
  • Loading branch information
ricea committed Sep 8, 2017
1 parent be948b6 commit 8a39c6a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 11 deletions.
15 changes: 5 additions & 10 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { ReadableStream, ReadableStreamDefaultControllerClose, ReadableStreamDefa
ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerGetDesiredSize,
ReadableStreamDefaultControllerHasBackpressure,
ReadableStreamDefaultControllerCanCloseOrEnqueue } = require('./readable-stream.js');
const { WritableStream, WritableStreamDefaultControllerError } = require('./writable-stream.js');
const { WritableStream, WritableStreamDefaultControllerErrorIfNeeded } = require('./writable-stream.js');

// Class TransformStream

Expand All @@ -20,8 +20,6 @@ class TransformStream {
this._readableController = undefined;
this._transformStreamController = undefined;

this._writableDone = false;

this._backpressure = undefined;
this._backpressureChangePromise = undefined;
this._backpressureChangePromise_resolve = undefined;
Expand Down Expand Up @@ -174,9 +172,7 @@ function TransformStreamErrorInternal(transformStream, e) {
transformStream._errored = true;
transformStream._storedError = e;

if (transformStream._writableDone === false) {
WritableStreamDefaultControllerError(transformStream._writableController, e);
}
WritableStreamDefaultControllerErrorIfNeeded(transformStream._writableController, e);
if (transformStream._readable._state === 'readable') {
ReadableStreamDefaultControllerError(transformStream._readableController, e);
}
Expand Down Expand Up @@ -330,17 +326,16 @@ class TransformStreamDefaultSink {

abort() {
const transformStream = this._transformStream;
transformStream._writableDone = true;
TransformStreamErrorInternal(transformStream, new TypeError('Writable side aborted'));
// abort() is not called synchronously, so it is possible for abort() to be called when the stream is already
// errored.
TransformStreamErrorIfNeeded(transformStream, new TypeError('Writable side aborted'));
}

close() {
// console.log('TransformStreamDefaultSink.close()');

const transformStream = this._transformStream;

transformStream._writableDone = true;

const flushPromise = PromiseInvokeOrNoop(transformStream._transformer,
'flush', [transformStream._transformStreamController]);
// Return a promise that is fulfilled with undefined on success.
Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/lib/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ module.exports = {
IsWritableStreamLocked,
WritableStream,
WritableStreamAbort,
WritableStreamDefaultControllerError,
WritableStreamDefaultControllerErrorIfNeeded,
WritableStreamDefaultWriterCloseWithErrorPropagation,
WritableStreamDefaultWriterRelease,
WritableStreamDefaultWriterWrite,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,41 @@ promise_test(t => {
]);
}, 'an exception from transform() should error the stream if close has been requested but not completed');

promise_test(t => {
const ts = new TransformStream();
const writer = ts.writable.getWriter();
// The microtask following transformer.start() hasn't completed yet, so the abort is queued and not notified to the
// TransformStream yet.
const abortPromise = writer.abort(thrownError);
const cancelPromise = ts.readable.cancel(new Error('cancel reason'));
return Promise.all([
abortPromise,
cancelPromise,
promise_rejects(t, new TypeError(), writer.closed, 'writer.closed should reject with a TypeError')]);
}, 'abort should set the close reason for the writable when it happens first during start');

promise_test(t => {
let resolveTransform;
const transformPromise = new Promise(resolve => {
resolveTransform = resolve;
});
const ts = new TransformStream({
transform() {
return transformPromise;
}
}, { highWaterMark: 2 });
const writer = ts.writable.getWriter();
return delay(0).then(() => {
const writePromise = writer.write();
const abortPromise = writer.abort(thrownError);
const cancelPromise = ts.readable.cancel(new Error('cancel reason'));
resolveTransform();
return Promise.all([
writePromise,
abortPromise,
cancelPromise,
promise_rejects(t, new TypeError(), writer.closed, 'writer.closed should reject with a TypeError')]);
});
}, 'abort should set the close reason for the writable when it happens first during underlying sink write');

done();

0 comments on commit 8a39c6a

Please sign in to comment.