Skip to content

Commit

Permalink
Version 2: run cancel before cancelling
Browse files Browse the repository at this point in the history
  • Loading branch information
lucacasonato committed Jun 12, 2023
1 parent 2e503c6 commit 5b00d08
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 46 deletions.
67 changes: 42 additions & 25 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -5751,6 +5751,12 @@ the following table:
<td><dfn>\[[cancelAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm, taking one argument (the [=reason=] for
cancellation), which communicates a requested cancellation to the [=transformer=]
<tr>
<td><dfn>\[[finishPromise]]</dfn>
<td class="non-normative">A promise which resolves on completion of either the
[=TransformStreamDefaultController/[[cancelAlgorithm]]=] or the
[=TransformStreamDefaultController/[[flushAlgorithm]]=]. If this field is unpopulated (that is,
undefined), then neither of those algorithms have been [=invoked=] yet
<tr>
<td><dfn>\[[flushAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm which communicates a requested close to
Expand Down Expand Up @@ -6061,36 +6067,45 @@ side=] of [=transform streams=].
id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|,
|reason|)</dfn> performs the following steps:

1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return |cancelPromise|.
1. [=React=] to |cancelPromise|:
1. If |cancelPromise| was fulfilled, then:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |cancelPromise| was rejected with reason |r|, then:
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<div algorithm>
<dfn abstract-op lt="TransformStreamDefaultSinkCloseAlgorithm"
id="transform-stream-default-sink-close-algorithm">TransformStreamDefaultSinkCloseAlgorithm(|stream|)</dfn>
performs the following steps:

1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. If |readable|.[=ReadableStream/[[state]]=] is not "`readable`", return
[=a promise resolved with=] undefined.
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |flushPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=].
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return the result of [=reacting=] to |flushPromise|:
1. [=React=] to |flushPromise|:
1. If |flushPromise| was fulfilled, then:
1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", throw
|readable|.[=ReadableStream/[[storedError]]=].
1. Perform !
[$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]).
1. Perform ! [$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |flushPromise| was rejected with reason |r|, then:
1. Perform ! [$TransformStreamError$](|stream|, |r|).
1. Throw |readable|.[=ReadableStream/[[storedError]]=].
1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<h4 id="ts-default-source-abstract-ops">Default sources</h4>
Expand All @@ -6115,22 +6130,24 @@ side=] of [=transform streams=].
id="transform-stream-default-source-cancel">TransformStreamDefaultSourceCancelAlgorithm(|reason|,
|stream|)</dfn> performs the following steps:

1. Let |writable| be |stream|.[=TransformStream/[[writable]]=].
1. If |writable|.[=WritableStream/[[state]]=] is not "`writable`", return
[=a promise resolved with=] undefined.
1. Perform !
[$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return
|controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
1. Let |writable| be |stream|.[=TransformStream/[[writable]]=].
1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise.
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return |cancelPromise|.

<p class="note">The early return prevents the cancellation algorithm from being called if the
writable side is already (in the process of being) closed. This is important, because the
cancellation algorithm <span class=allow-2119>must</span> not run if the flush algorithm has
already been run.
1. [=React=] to |cancelPromise|:
1. If |cancelPromise| was fulfilled, then:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined.
1. If |cancelPromise| was rejected with reason |r|, then:
1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |r|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|.
1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
</div>

<h2 id="qs">Queuing strategies</h2>
Expand Down
76 changes: 56 additions & 20 deletions reference-implementation/lib/abstract-ops/transform-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
const assert = require('assert');
const verbose = require('debug')('streams:transform-stream:verbose');

const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, transformPromiseWith } =
require('../helpers/webidl.js');
const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, rejectPromise, uponPromise,
transformPromiseWith } = require('../helpers/webidl.js');
const { CreateReadableStream, ReadableStreamDefaultControllerClose, ReadableStreamDefaultControllerEnqueue,
ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerHasBackpressure,
ReadableStreamDefaultControllerCanCloseOrEnqueue } = require('./readable-streams.js');
Expand Down Expand Up @@ -233,37 +233,59 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
function TransformStreamDefaultSinkAbortAlgorithm(stream, reason) {
verbose('TransformStreamDefaultSinkAbortAlgorithm()');

const controller = stream._controller;
if (controller._finishPromise) {
return controller._finishPromise;
}

// stream._readable cannot change after construction, so caching it across a call to user code is safe.
const readable = stream._readable;
ReadableStreamDefaultControllerError(readable._controller, reason);

const controller = stream._controller;
// Assign the _finishPromise now so that if _cancelAlgorithm calls readable.cancel() internally,
// we don't run the _cancelAlgorithm again.
controller._finishPromise = newPromise();

const cancelPromise = controller._cancelAlgorithm(reason);
TransformStreamDefaultControllerClearAlgorithms(controller);

return cancelPromise;
uponPromise(cancelPromise, () => {
ReadableStreamDefaultControllerError(readable._controller, reason);
resolvePromise(controller._finishPromise);
}, r => {
ReadableStreamDefaultControllerError(readable._controller, r);
rejectPromise(controller._finishPromise, r);
});

return controller._finishPromise;
}

function TransformStreamDefaultSinkCloseAlgorithm(stream) {
verbose('TransformStreamDefaultSinkCloseAlgorithm()');

const controller = stream._controller;
if (controller._finishPromise) {
return controller._finishPromise;
}

// stream._readable cannot change after construction, so caching it across a call to user code is safe.
const readable = stream._readable;

const controller = stream._controller;
// Assign the _finishPromise now so that if _flushAlgorithm calls readable.cancel() internally,
// we don't also run the _cancelAlgorithm.
controller._finishPromise = newPromise();

const flushPromise = controller._flushAlgorithm();
TransformStreamDefaultControllerClearAlgorithms(controller);

// Return a promise that is fulfilled with undefined on success.
return transformPromiseWith(flushPromise, () => {
if (readable._state === 'errored') {
throw readable._storedError;
}
uponPromise(flushPromise, () => {
ReadableStreamDefaultControllerClose(readable._controller);
resolvePromise(controller._finishPromise);
}, r => {
TransformStreamError(stream, r);
throw readable._storedError;
ReadableStreamDefaultControllerError(readable._controller, r);
rejectPromise(controller._finishPromise, r);
});

return controller._finishPromise;
}

// Default sources
Expand All @@ -285,17 +307,31 @@ function TransformStreamDefaultSourcePullAlgorithm(stream) {
function TransformStreamDefaultSourceCancelAlgorithm(stream, reason) {
verbose('TransformStreamDefaultSourceCancelAlgorithm()');

const controller = stream._controller;
if (controller._finishPromise) {
return controller._finishPromise;
}

// stream._writable cannot change after construction, so caching it across a call to user code is safe.
const writable = stream._writable;
if (writable._state !== 'writable') {
return promiseResolvedWith(undefined);
}
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason);
TransformStreamUnblockWrite(stream);

const controller = stream._controller;
// Assign the _finishPromise now so that if _flushAlgorithm calls writable.abort() or
// writable.cancel() internally, we don't run the _cancelAlgorithm again, or also run the
// _flushAlgorithm.
controller._finishPromise = newPromise();

const cancelPromise = controller._cancelAlgorithm(reason);
TransformStreamDefaultControllerClearAlgorithms(controller);

return cancelPromise;
uponPromise(cancelPromise, () => {
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason);
TransformStreamUnblockWrite(stream);
resolvePromise(controller._finishPromise);
}, r => {
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, r);
TransformStreamUnblockWrite(stream);
rejectPromise(controller._finishPromise, r);
});

return controller._finishPromise;
}

0 comments on commit 5b00d08

Please sign in to comment.