Skip to content

Commit

Permalink
TransformStream cleanup using "Transformer.cancel"
Browse files Browse the repository at this point in the history
This commit adds a "cancel" hook to "Transformer". This allows users to
perform resource cleanup when the readable side of the TransformStream
is cancelled, or the writable side is aborted.

To preserve existing behavior, when the readable side is cancelled with
a reason, the writable side is always immediately aborted with that same
reason. The same is true in the reverse case. This means that the
status of both sides is always either "closed", "erroring", or
"erroring" when the "cancel" hook is called.

"flush" and "cancel" are never both called. As per existing behaviour,
when the writable side is closed the "flush" hook is called. If the
readable side is cancelled while a promise returned from "flush" is
still pending, "cancel" is not called. In this scenario the readable
side ends up in the "errored" state, while the writable side ends up in
the "closed" state.
  • Loading branch information
lucacasonato committed Jun 8, 2023
1 parent 8d7a0bf commit 147faa3
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 22 deletions.
101 changes: 88 additions & 13 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -5489,13 +5489,15 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason, TransformStreamDefaultController controller);
</xmp>

<dl>
Expand Down Expand Up @@ -5558,6 +5560,25 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform
{{Transformer/flush|flush()}}; the stream is already in the process of successfully closing down,
and terminating it would be counterproductive.)

<dt><dfn dict-member for="Transformer" lt="cancel">cancel(<var ignore>reason</var>, <var ignore>controller</var>)</dfn></dt>
<dd>
<p>A function called when the [=writable side=] is aborted, or when the [=readable side=] is
cancelled.

<p>Typically this is used to clean up underlying transformer resources when the stream is
aborted or cancelled.

<p>If the cancellation process is asynchronous, the function can return a promise to signal
success or failure; the result will be communicated to the caller of
{{WritableStream/abort()|stream.writable.abort()}} or
{{ReadableStream/cancel()|stream.readable.cancel()}}. Throwing an exception is treated the same
as returning a rejected promise.

<p>(Note that there is no need to call
{{TransformStreamDefaultController/terminate()|controller.terminate()}} inside
{{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and
terminating it would be counterproductive.)

<dt><dfn dict-member for="Transformer">readableType</dfn></dt>
<dd>
<p>This property is reserved for future use, so any attempts to supply a value will throw an
Expand All @@ -5570,9 +5591,9 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform
</dl>

The <code>controller</code> object passed to {{Transformer/start|start()}},
{{Transformer/transform|transform()}}, and {{Transformer/flush|flush()}} is an instance of
{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the [=readable
side=], or to terminate or error the stream.
{{Transformer/transform|transform()}}, {{Transformer/flush|flush()}}, and
{{Transformer/cancel|cancel()}} is an instance of {{TransformStreamDefaultController}}, and has the
ability to enqueue [=chunks=] to the [=readable side=], or to terminate or error the stream.

<h4 id="ts-prototype">Constructor and properties</h4>

Expand Down Expand Up @@ -5726,6 +5747,10 @@ the following table:
<th>Internal Slot</th>
<th>Description (<em>non-normative</em>)</th>
<tbody>
<tr>
<td><dfn>\[[cancelAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm, taking one argument (the [=reason=] for
cancellation), which communicates a requested cancellation to the [=transformer=]
<tr>
<td><dfn>\[[flushAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm which communicates a requested close to
Expand Down Expand Up @@ -5819,8 +5844,7 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Let |pullAlgorithm| be the following steps:
1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|).
1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument:
1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|).
1. Return [=a promise resolved with=] undefined.
1. Return ! [$TransformStreamDefaultSourceCancelAlgorithm$](|stream|, |reason|).
1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|,
|pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
1. Set |stream|.[=TransformStream/[[backpressure]]=] and
Expand Down Expand Up @@ -5854,6 +5878,14 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|stream|.[=TransformStream/[[controller]]=]).
1. Perform !
[$WritableStreamDefaultControllerErrorIfNeeded$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=], |e|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
</div>

<div algorithm>
<dfn abstract-op lt="TransformStreamUnblockWrite"
id="transform-stream-unblock-write">TransformStreamUnblockWrite(|stream|)</dfn> performs the
following steps:

1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|,
false).

Expand Down Expand Up @@ -5882,7 +5914,8 @@ The following abstract operations support the implementaiton of the
<div algorithm>
<dfn abstract-op lt="SetUpTransformStreamDefaultController"
id="set-up-transform-stream-default-controller">SetUpTransformStreamDefaultController(|stream|,
|controller|, |transformAlgorithm|, |flushAlgorithm|)</dfn> performs the following steps:
|controller|, |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|)</dfn> performs the
following steps:

1. Assert: |stream| [=implements=] {{TransformStream}}.
1. Assert: |stream|.[=TransformStream/[[controller]]=] is undefined.
Expand All @@ -5891,6 +5924,7 @@ The following abstract operations support the implementaiton of the
1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to
|transformAlgorithm|.
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to |flushAlgorithm|.
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|.
</div>

<div algorithm>
Expand All @@ -5904,15 +5938,20 @@ The following abstract operations support the implementaiton of the
1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]].
1. Otherwise, return [=a promise resolved with=] undefined.
1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
1. Let |cancelAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
1. If |transformerDict|["{{Transformer/transform}}"] [=map/exists=], set |transformAlgorithm| to an
algorithm which takes an argument |chunk| and returns the result of [=invoking=]
|transformerDict|["{{Transformer/transform}}"] with argument list «&nbsp;|chunk|,
|controller|&nbsp;» and [=callback this value=] |transformer|.
1. If |transformerDict|["{{Transformer/flush}}"] [=map/exists=], set |flushAlgorithm| to an
algorithm which returns the result of [=invoking=] |transformerDict|["{{Transformer/flush}}"]
with argument list «&nbsp;|controller|&nbsp;» and [=callback this value=] |transformer|.
1. If |transformerDict|["{{Transformer/cancel}}"] [=map/exists=], set |cancelAlgorithm| to an
algorithm which takes an argument |reason| and returns the result of [=invoking=]
|transformerDict|["{{Transformer/cancel}}"] with argument list «&nbsp;|reason|,
|controller|&nbsp;» and [=callback this value=] |transformer|.
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
|transformAlgorithm|, |flushAlgorithm|).
|transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|).
</div>

<div algorithm>
Expand All @@ -5931,6 +5970,7 @@ The following abstract operations support the implementaiton of the

1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to undefined.
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to undefined.
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to undefined.
</div>

<div algorithm>
Expand Down Expand Up @@ -6021,8 +6061,13 @@ side=] of [=transform streams=].
id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|,
|reason|)</dfn> performs the following steps:

1. Perform ! [$TransformStreamError$](|stream|, |reason|).
1. Return [=a promise resolved with=] undefined.
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return |cancelPromise|.
</div>

<div algorithm>
Expand Down Expand Up @@ -6062,6 +6107,30 @@ side=] of [=transform streams=].
1. Return |stream|.[=TransformStream/[[backpressureChangePromise]]=].
</div>


<div algorithm>
<dfn abstract-op lt="TransformStreamDefaultSourceCancelAlgorithm"
id="transform-stream-default-source-cancel">TransformStreamDefaultSourceCancelAlgorithm(|reason|,
|stream|)</dfn> performs the following steps:

1. Let |writable| be |stream|.[=TransformStream/[[writable]]=].
1. If |writable|.[=WritableStream/[[state]]=] is not "`writable`", return
[=a promise resolved with=] undefined.
1. Perform !
[$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return |cancelPromise|.

<p class="note">The early return prevents the cancellation algorithm from being called if the
writable side is already (in the process of being) closed. This is important, because the
cancellation algorithm <span class=allow-2119>must</span> not run if the flush algorithm has
already been run.
</div>

<h2 id="qs">Queuing strategies</h2>

<h3 id="qs-api">The queuing strategy API</h3>
Expand Down Expand Up @@ -7106,9 +7175,11 @@ reason.
<div algorithm="create a TransformStream">
To <dfn export for="TransformStream" lt="set up|setting up">set up</dfn> a
newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm <dfn export
for="TransformStream/set up"><var>transformAlgorithm</var></dfn> and an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, perform the following steps.
|transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise.
for="TransformStream/set up"><var>transformAlgorithm</var></dfn>, an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, and an optional algorithm <dfn
export for="TransformStream/set up"><var>cancelAlgorithm</var></dfn>, perform the following steps.
|transformAlgorithm|, if given, |flushAlgorithm|, and, if given, |cancelAlgorithm|, may return a
promise.

1. Let |writableHighWaterMark| be 1.
1. Let |writableSizeAlgorithm| be an algorithm that returns 1.
Expand All @@ -7124,12 +7195,16 @@ reason.
null otherwise. If this throws an exception |e|, return [=a promise rejected with=] |e|.
1. If |result| is a {{Promise}}, then return |result|.
1. Return [=a promise resolved with=] undefined.
1. Let |cancelAlgorithmWrapper| be an algorithm that runs these steps given a value |reason|:
1. Let |result| be the result of running |cancelAlgorithm| given |reason|, if |cancelAlgorithm|
was given, or null otherwise. If this throws an exception |e|, return
[=a promise rejected with=] |e|.
1. Let |startPromise| be [=a promise resolved with=] undefined.
1. Perform ! [$InitializeTransformStream$](|stream|, |startPromise|, |writableHighWaterMark|,
|writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}.
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|).
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|, |cancelAlgorithmWrapper|).

Other specifications should be careful when constructing their
<i>[=TransformStream/set up/transformAlgorithm=]</i> to avoid [=in parallel=] reads from the given
Expand Down
2 changes: 2 additions & 0 deletions reference-implementation/lib/Transformer.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason, TransformStreamDefaultController controller);
53 changes: 44 additions & 9 deletions reference-implementation/lib/abstract-ops/transform-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ function InitializeTransformStream(
}

function cancelAlgorithm(reason) {
TransformStreamErrorWritableAndUnblockWrite(stream, reason);
return promiseResolvedWith(undefined);
return TransformStreamDefaultSourceCancelAlgorithm(stream, reason);
}

stream._readable = CreateReadableStream(
Expand All @@ -77,6 +76,10 @@ function TransformStreamError(stream, e) {
function TransformStreamErrorWritableAndUnblockWrite(stream, e) {
TransformStreamDefaultControllerClearAlgorithms(stream._controller);
WritableStreamDefaultControllerErrorIfNeeded(stream._writable._controller, e);
TransformStreamUnblockWrite(stream);
}

function TransformStreamUnblockWrite(stream) {
if (stream._backpressure === true) {
// Pretend that pull() was called to permit any pending write() calls to complete. TransformStreamSetBackpressure()
// cannot be called from enqueue() or pull() once the ReadableStream is errored, so this will will be the final time
Expand All @@ -102,7 +105,8 @@ function TransformStreamSetBackpressure(stream, backpressure) {

// Default controllers

function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) {
function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm,
cancelAlgorithm) {
assert(TransformStream.isImpl(stream));
assert(stream._controller === undefined);

Expand All @@ -111,6 +115,7 @@ function SetUpTransformStreamDefaultController(stream, controller, transformAlgo

controller._transformAlgorithm = transformAlgorithm;
controller._flushAlgorithm = flushAlgorithm;
controller._cancelAlgorithm = cancelAlgorithm;
}

function SetUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) {
Expand All @@ -125,21 +130,26 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme
}
};

let flushAlgorithm = () => promiseResolvedWith(undefined);
let flushAlgorithm = () => promiseResolvedWith(undefined);

Check failure on line 133 in reference-implementation/lib/abstract-ops/transform-streams.js

View workflow job for this annotation

GitHub Actions / Test

Trailing spaces not allowed
let cancelAlgorithm = () => promiseResolvedWith(undefined);

if ('transform' in transformerDict) {
transformAlgorithm = chunk => transformerDict.transform.call(transformer, chunk, controller);
}
if ('flush' in transformerDict) {
flushAlgorithm = () => transformerDict.flush.call(transformer, controller);
}
if ('cancel' in transformerDict) {
cancelAlgorithm = reason => transformerDict.cancel.call(transformer, reason, controller);
}

SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm);
}

function TransformStreamDefaultControllerClearAlgorithms(controller) {
controller._transformAlgorithm = undefined;
controller._flushAlgorithm = undefined;
controller._cancelAlgorithm = undefined;
}

function TransformStreamDefaultControllerEnqueue(controller, chunk) {
Expand Down Expand Up @@ -221,10 +231,17 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
}

function TransformStreamDefaultSinkAbortAlgorithm(stream, reason) {
// abort() is not called synchronously, so it is possible for abort() to be called when the stream is already
// errored.
TransformStreamError(stream, reason);
return promiseResolvedWith(undefined);
verbose('TransformStreamDefaultSinkAbortAlgorithm()');

// stream._readable cannot change after construction, so caching it across a call to user code is safe.
const readable = stream._readable;
ReadableStreamDefaultControllerError(readable._controller, reason);

const controller = stream._controller;
const cancelPromise = controller._cancelAlgorithm(reason);
TransformStreamDefaultControllerClearAlgorithms(controller);

return cancelPromise;
}

function TransformStreamDefaultSinkCloseAlgorithm(stream) {
Expand Down Expand Up @@ -264,3 +281,21 @@ function TransformStreamDefaultSourcePullAlgorithm(stream) {
// Prevent the next pull() call until there is backpressure.
return stream._backpressureChangePromise;
}

function TransformStreamDefaultSourceCancelAlgorithm(stream, reason) {
verbose('TransformStreamDefaultSourceCancelAlgorithm()');

// stream._writable cannot change after construction, so caching it across a call to user code is safe.
const writable = stream._writable;
if (writable._state !== 'writable') {
return promiseResolvedWith(undefined);
}
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason);
TransformStreamUnblockWrite(stream);

const controller = stream._controller;
const cancelPromise = controller._cancelAlgorithm(reason);
TransformStreamDefaultControllerClearAlgorithms(controller);

return cancelPromise;
}

0 comments on commit 147faa3

Please sign in to comment.