Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out rejection handling logic from WritableStreamDefaultControllerProcessClose() #640

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
157 changes: 88 additions & 69 deletions reference-implementation/lib/writable-stream.js
Expand Up @@ -109,18 +109,14 @@ function IsWritableStreamLocked(stream) {
return true;
}

function WritableStreamEnsureReadyPromiseRejectedIfAny(stream, oldState) {
function WritableStreamEnsureReadyPromiseRejectedWith(stream, error, isPending) {
const writer = stream._writer;
if (writer === undefined) {
return;
}
assert(writer !== undefined);

const storedError = stream._storedError;
if (oldState === 'writable' &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) {
defaultWriterReadyPromiseReject(writer, storedError);
if (isPending === true) {
defaultWriterReadyPromiseReject(writer, error);
} else {
defaultWriterReadyPromiseResetToRejected(writer, storedError);
defaultWriterReadyPromiseResetToRejected(writer, error);
}
writer._readyPromise.catch(() => {});
}
Expand All @@ -144,15 +140,19 @@ function WritableStreamAbort(stream, reason) {
return Promise.reject(new TypeError('Aborted'));
}

assert(state === 'writable' || state === 'closing');
assert(state === 'writable' || state === 'closing', 'state must be writable or closing');

const controller = stream._writableStreamController;
assert(controller !== undefined);
assert(controller !== undefined, 'controller must not be undefined');

WritableStreamEnsureReadyPromiseRejectedIfAny(stream, state);
const readyPromiseIsPending = state === 'writable' &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true;

if (controller._writing === false && controller._inClose === false) {
WritableStreamFinishAbort(stream);
if (stream._writer !== undefined) {
WritableStreamEnsureReadyPromiseRejectedWith(stream, new TypeError('Aborted'), readyPromiseIsPending);
}
return WritableStreamDefaultControllerAbort(controller, reason);
}

Expand All @@ -164,6 +164,10 @@ function WritableStreamAbort(stream, reason) {
};
});

if (stream._writer !== undefined) {
WritableStreamEnsureReadyPromiseRejectedWith(stream, new TypeError('Abort requested'), readyPromiseIsPending);
}

return promise;
}

Expand All @@ -186,8 +190,8 @@ function WritableStreamAddWriteRequest(stream) {
}

function WritableStreamMarkFirstWriteRequestPending(stream) {
assert(stream._pendingWriteRequest === undefined);
assert(stream._writeRequests.length !== 0);
assert(stream._pendingWriteRequest === undefined, 'there must be no pending write request');
assert(stream._writeRequests.length !== 0, 'writeRequests must not be empty');
stream._pendingWriteRequest = stream._writeRequests.shift();
}

Expand All @@ -198,12 +202,12 @@ function WritableStreamFinishPendingWrite(stream) {

const state = stream._state;

const wasAborted = stream._pendingAbortRequest !== undefined;

if (state === 'errored') {
if (stream._pendingAbortRequest !== undefined) {
if (wasAborted) {
stream._pendingAbortRequest._reject(reason);
stream._pendingAbortRequest = undefined;
} else {
WritableStreamEnsureReadyPromiseRejectedIfAny(stream, state);
}

WritableStreamRejectPromisesInReactionToError(stream);
Expand All @@ -213,11 +217,11 @@ function WritableStreamFinishPendingWrite(stream) {

const controller = stream._writableStreamController;

if (stream._pendingAbortRequest === undefined) {
if (wasAborted === false) {
return;
}

WritableStreamFinishAbort(stream);
WritableStreamFinishAbort(stream, state);

const abortRequest = stream._pendingAbortRequest;
stream._pendingAbortRequest = undefined;
Expand All @@ -232,16 +236,18 @@ function WritableStreamFinishPendingWriteWithError(stream, reason) {
stream._pendingWriteRequest._reject(reason);
stream._pendingWriteRequest = undefined;

const oldState = stream._state;
const state = stream._state;

const wasAborted = stream._pendingAbortRequest !== undefined;
const readyPromiseIsPending = state === 'writable' && wasAborted === false &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true;

if (wasAborted) {
stream._pendingAbortRequest._reject(reason);
stream._pendingAbortRequest = undefined;
}

if (oldState === 'errored') {
if (state === 'errored') {
WritableStreamRejectPromisesInReactionToError(stream);

return;
Expand All @@ -250,27 +256,26 @@ function WritableStreamFinishPendingWriteWithError(stream, reason) {
stream._state = 'errored';
stream._storedError = reason;

if (!wasAborted) {
console.error('shshs ' + oldState);
WritableStreamEnsureReadyPromiseRejectedIfAny(stream, oldState);
if (wasAborted === false && stream._writer !== undefined) {
WritableStreamEnsureReadyPromiseRejectedWith(stream, reason, readyPromiseIsPending);
}

WritableStreamRejectPromisesInReactionToError(stream);
}

function WritableStreamFinishClose(stream) {
function WritableStreamFinishPendingClose(stream) {
assert(stream._pendingCloseRequest !== undefined);
stream._pendingCloseRequest._resolve(undefined);
stream._pendingCloseRequest = undefined;

const state = stream._state;

const wasAborted = stream._pendingAbortRequest !== undefined;

if (state === 'errored') {
if (stream._pendingAbortRequest !== undefined) {
stream._pendingAbortRequest._reject();
if (wasAborted) {
stream._pendingAbortRequest._reject(stream._storedError);
stream._pendingAbortRequest = undefined;
} else {
WritableStreamEnsureReadyPromiseRejectedIfAny(stream, state);
}

WritableStreamRejectClosedPromiseIfAny(stream);
Expand All @@ -280,45 +285,53 @@ function WritableStreamFinishClose(stream) {

assert(state === 'closing');

if (stream._pendingAbortRequest !== undefined) {
stream._pendingAbortRequest._resolve();
stream._pendingAbortRequest = undefined;
if (wasAborted === false) {
const writer = stream._writer;
if (writer !== undefined) {
defaultWriterClosedPromiseResolve(writer);
}
stream._state = 'closed';
return;
}

const writer = stream._writer;
if (writer !== undefined) {
defaultWriterClosedPromiseResolve(writer);
}
stream._state = 'closed';
stream._pendingAbortRequest._resolve();
stream._pendingAbortRequest = undefined;

stream._state = 'errored';
stream._storedError = new TypeError('Closed');

WritableStreamRejectClosedPromiseIfAny(stream);
}

function WritableStreamFinishCloseWithError(stream, reason) {
function WritableStreamFinishPendingCloseWithError(stream, reason) {
assert(stream._pendingCloseRequest !== undefined);
stream._pendingCloseRequest._reject(reason);
stream._pendingCloseRequest = undefined;

const oldState = stream._state;
const state = stream._state;

const wasAborted = stream._pendingAbortRequest !== undefined;
const readyPromiseIsPending = state === 'writable' && wasAborted === false &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true;

if (wasAborted) {
stream._pendingAbortRequest._reject(reason);
stream._pendingAbortRequest = undefined;
}

if (oldState === 'errored') {
if (state === 'errored') {
WritableStreamRejectClosedPromiseIfAny(stream);

return;
}

assert(oldState === 'closing');
assert(state === 'closing');

stream._state = 'errored';
stream._storedError = reason;

if (!wasAborted) {
WritableStreamEnsureReadyPromiseRejectedIfAny(stream, oldState);
if (wasAborted === false && stream._writer !== undefined) {
WritableStreamEnsureReadyPromiseRejectedWith(stream, reason, readyPromiseIsPending);
}

WritableStreamRejectClosedPromiseIfAny(stream);
Expand Down Expand Up @@ -596,20 +609,16 @@ function WritableStreamDefaultWriterRelease(writer) {
'Writer was released and can no longer be used to monitor the stream\'s closedness');
const state = stream._state;

if (state === 'writable' || state === 'closing' || stream._pendingAbortRequest !== undefined) {
if (state === 'writable' || state === 'closing') {
defaultWriterClosedPromiseReject(writer, releasedError);
} else {
defaultWriterClosedPromiseResetToRejected(writer, releasedError);
}
writer._closedPromise.catch(() => {});

if (state === 'writable' &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true) {
defaultWriterReadyPromiseReject(writer, releasedError);
} else {
defaultWriterReadyPromiseResetToRejected(writer, releasedError);
}
writer._readyPromise.catch(() => {});
const readyPromiseIsPending = state === 'writable' && stream._pendingAbortRequest === undefined &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true;
WritableStreamEnsureReadyPromiseRejectedWith(stream, releasedError, readyPromiseIsPending);

stream._writer = undefined;
writer._ownerWritableStream = undefined;
Expand Down Expand Up @@ -716,6 +725,18 @@ function WritableStreamDefaultControllerGetDesiredSize(controller) {
return controller._strategyHWM - queueSize;
}

function WritableStreamDefaultControllerUpdateBackpressureIfNeeded(controller, oldBackpressure) {
const stream = controller._controlledWritableStream;
if (stream._state !== 'writable') {
return;
}

const backpressure = WritableStreamDefaultControllerGetBackpressure(controller);
if (oldBackpressure !== backpressure) {
WritableStreamUpdateBackpressure(stream, backpressure);
}
}

function WritableStreamDefaultControllerWrite(controller, chunk) {
const stream = controller._controlledWritableStream;

Expand All @@ -736,7 +757,7 @@ function WritableStreamDefaultControllerWrite(controller, chunk) {

const writeRecord = { chunk };

const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller);
const oldBackpressure = WritableStreamDefaultControllerGetBackpressure(controller);

try {
EnqueueValueWithSize(controller._queue, writeRecord, chunkSize);
Expand All @@ -745,12 +766,7 @@ function WritableStreamDefaultControllerWrite(controller, chunk) {
return;
}

if (stream._state === 'writable') {
const backpressure = WritableStreamDefaultControllerGetBackpressure(controller);
if (lastBackpressure !== backpressure) {
WritableStreamUpdateBackpressure(stream, backpressure);
}
}
WritableStreamDefaultControllerUpdateBackpressureIfNeeded(controller, oldBackpressure);

WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}
Expand Down Expand Up @@ -818,7 +834,7 @@ function WritableStreamDefaultControllerProcessClose(controller) {
controller._inClose = false;
assert(stream._state === 'closing' || stream._state === 'errored');

WritableStreamFinishClose(stream);
WritableStreamFinishPendingClose(stream);
},
reason => {
assert(controller._inClose === true);
Expand All @@ -828,7 +844,7 @@ function WritableStreamDefaultControllerProcessClose(controller) {
controller._queue = [];
}

WritableStreamFinishCloseWithError(stream, reason);
WritableStreamFinishPendingCloseWithError(stream, reason);
}
)
.catch(rethrowAssertionErrorRejection);
Expand All @@ -849,18 +865,16 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {

WritableStreamFinishPendingWrite(stream);

if (stream._state === 'errored') {
const state = stream._state;
if (state === 'errored') {
return;
}

const lastBackpressure = WritableStreamDefaultControllerGetBackpressure(controller);
assert(state === 'closing' || state === 'writable');

const oldBackpressure = WritableStreamDefaultControllerGetBackpressure(controller);
DequeueValue(controller._queue);
if (stream._state === 'writable') {
const backpressure = WritableStreamDefaultControllerGetBackpressure(controller);
if (lastBackpressure !== backpressure) {
WritableStreamUpdateBackpressure(stream, backpressure);
}
}
WritableStreamDefaultControllerUpdateBackpressureIfNeeded(controller, oldBackpressure);

WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
},
Expand All @@ -872,6 +886,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {

WritableStreamFinishPendingWriteWithError(stream, reason);

assert(stream._state === 'errored');
if (!wasErrored) {
controller._queue = [];
}
Expand All @@ -897,7 +912,11 @@ function WritableStreamDefaultControllerError(controller, e) {
stream._state = 'errored';
stream._storedError = e;

WritableStreamEnsureReadyPromiseRejectedIfAny(stream, oldState);
if (stream._pendingAbortRequest === undefined && stream._writer !== undefined) {
const readyPromiseIsPending = oldState === 'writable' &&
WritableStreamDefaultControllerGetBackpressure(stream._writableStreamController) === true;
WritableStreamEnsureReadyPromiseRejectedWith(stream, e, readyPromiseIsPending);
}

controller._queue = [];

Expand Down
Expand Up @@ -32,7 +32,7 @@ promise_test(t => {
promise_rejects(t, new TypeError(), readyPromise, 'the ready promise should reject with a TypeError'),
promise_rejects(t, new TypeError(), writePromise, 'the write() promise should reject with a TypeError')
]);
}, 'Aborting a WritableStream should cause the writer\'s unsettled ready promise to reject');
}, 'Aborting a WritableStream before it starts should cause the writer\'s unsettled ready promise to reject');

promise_test(t => {
const ws = new WritableStream();
Expand Down Expand Up @@ -231,10 +231,11 @@ promise_test(t => {
const closePromise = writer.close();

return delay(0).then(() => {
writer.abort(error1);
const abortPromise = writer.abort(error1);
resolveClose();
return Promise.all([
promise_rejects(t, new TypeError(), writer.closed, 'closed should reject with a TypeError'),
abortPromise,
closePromise
]);
});
Expand Down