Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransformStream cleanup using "Transformer.cancel" #1283

Merged
merged 15 commits into from
Sep 30, 2023
101 changes: 88 additions & 13 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -5489,13 +5489,15 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason, TransformStreamDefaultController controller);
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
</xmp>

<dl>
Expand Down Expand Up @@ -5558,6 +5560,25 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform
{{Transformer/flush|flush()}}; the stream is already in the process of successfully closing down,
and terminating it would be counterproductive.)

<dt><dfn dict-member for="Transformer" lt="cancel">cancel(<var ignore>reason</var>, <var ignore>controller</var>)</dfn></dt>
<dd>
<p>A function called when the [=writable side=] is aborted, or when the [=readable side=] is
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
cancelled.

<p>Typically this is used to clean up underlying transformer resources when the stream is
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
aborted or cancelled.

<p>If the cancellation process is asynchronous, the function can return a promise to signal
success or failure; the result will be communicated to the caller of
{{WritableStream/abort()|stream.writable.abort()}} or
{{ReadableStream/cancel()|stream.readable.cancel()}}. Throwing an exception is treated the same
as returning a rejected promise.

<p>(Note that there is no need to call
{{TransformStreamDefaultController/terminate()|controller.terminate()}} inside
{{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and
terminating it would be counterproductive.)

Comment on lines +5577 to +5581
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<p>(Note that there is no need to call
{{TransformStreamDefaultController/terminate()|controller.terminate()}} inside
{{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and
terminating it would be counterproductive.)

This part is redundant after 1c65d61.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My train of thought is that you could extract the controller in start and assign it to a local variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm, okay then. Maybe also worth having tests for that btw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it was impossible because of the immediate close semantics. I just pushed the new semantics, and will add tests for this case.

<dt><dfn dict-member for="Transformer">readableType</dfn></dt>
<dd>
<p>This property is reserved for future use, so any attempts to supply a value will throw an
Expand All @@ -5570,9 +5591,9 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform
</dl>

The <code>controller</code> object passed to {{Transformer/start|start()}},
{{Transformer/transform|transform()}}, and {{Transformer/flush|flush()}} is an instance of
{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the [=readable
side=], or to terminate or error the stream.
{{Transformer/transform|transform()}}, {{Transformer/flush|flush()}}, and
{{Transformer/cancel|cancel()}} is an instance of {{TransformStreamDefaultController}}, and has the
ability to enqueue [=chunks=] to the [=readable side=], or to terminate or error the stream.
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved

<h4 id="ts-prototype">Constructor and properties</h4>

Expand Down Expand Up @@ -5726,6 +5747,10 @@ the following table:
<th>Internal Slot</th>
<th>Description (<em>non-normative</em>)</th>
<tbody>
<tr>
<td><dfn>\[[cancelAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm, taking one argument (the [=reason=] for
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
cancellation), which communicates a requested cancellation to the [=transformer=]
<tr>
<td><dfn>\[[flushAlgorithm]]</dfn>
<td class="non-normative">A promise-returning algorithm which communicates a requested close to
Expand Down Expand Up @@ -5819,8 +5844,7 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Let |pullAlgorithm| be the following steps:
1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|).
1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument:
1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|).
1. Return [=a promise resolved with=] undefined.
1. Return ! [$TransformStreamDefaultSourceCancelAlgorithm$](|stream|, |reason|).
1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|,
|pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
1. Set |stream|.[=TransformStream/[[backpressure]]=] and
Expand Down Expand Up @@ -5854,6 +5878,14 @@ The following abstract operations operate on {{TransformStream}} instances at a
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|stream|.[=TransformStream/[[controller]]=]).
1. Perform !
[$WritableStreamDefaultControllerErrorIfNeeded$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=], |e|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
</div>

<div algorithm>
<dfn abstract-op lt="TransformStreamUnblockWrite"
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
id="transform-stream-unblock-write">TransformStreamUnblockWrite(|stream|)</dfn> performs the
following steps:

1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|,
false).

Expand Down Expand Up @@ -5882,7 +5914,8 @@ The following abstract operations support the implementaiton of the
<div algorithm>
<dfn abstract-op lt="SetUpTransformStreamDefaultController"
id="set-up-transform-stream-default-controller">SetUpTransformStreamDefaultController(|stream|,
|controller|, |transformAlgorithm|, |flushAlgorithm|)</dfn> performs the following steps:
|controller|, |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|)</dfn> performs the
following steps:

1. Assert: |stream| [=implements=] {{TransformStream}}.
1. Assert: |stream|.[=TransformStream/[[controller]]=] is undefined.
Expand All @@ -5891,6 +5924,7 @@ The following abstract operations support the implementaiton of the
1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to
|transformAlgorithm|.
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to |flushAlgorithm|.
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|.
</div>

<div algorithm>
Expand All @@ -5904,15 +5938,20 @@ The following abstract operations support the implementaiton of the
1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]].
1. Otherwise, return [=a promise resolved with=] undefined.
1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
1. Let |cancelAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
1. If |transformerDict|["{{Transformer/transform}}"] [=map/exists=], set |transformAlgorithm| to an
algorithm which takes an argument |chunk| and returns the result of [=invoking=]
|transformerDict|["{{Transformer/transform}}"] with argument list «&nbsp;|chunk|,
|controller|&nbsp;» and [=callback this value=] |transformer|.
1. If |transformerDict|["{{Transformer/flush}}"] [=map/exists=], set |flushAlgorithm| to an
algorithm which returns the result of [=invoking=] |transformerDict|["{{Transformer/flush}}"]
with argument list «&nbsp;|controller|&nbsp;» and [=callback this value=] |transformer|.
1. If |transformerDict|["{{Transformer/cancel}}"] [=map/exists=], set |cancelAlgorithm| to an
algorithm which takes an argument |reason| and returns the result of [=invoking=]
|transformerDict|["{{Transformer/cancel}}"] with argument list «&nbsp;|reason|,
|controller|&nbsp;» and [=callback this value=] |transformer|.
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
|transformAlgorithm|, |flushAlgorithm|).
|transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|).
</div>

<div algorithm>
Expand All @@ -5931,6 +5970,7 @@ The following abstract operations support the implementaiton of the

1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to undefined.
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to undefined.
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to undefined.
</div>

<div algorithm>
Expand Down Expand Up @@ -6021,8 +6061,13 @@ side=] of [=transform streams=].
id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|,
|reason|)</dfn> performs the following steps:

1. Perform ! [$TransformStreamError$](|stream|, |reason|).
1. Return [=a promise resolved with=] undefined.
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
1. [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return |cancelPromise|.
</div>

<div algorithm>
Expand Down Expand Up @@ -6062,6 +6107,30 @@ side=] of [=transform streams=].
1. Return |stream|.[=TransformStream/[[backpressureChangePromise]]=].
</div>


<div algorithm>
<dfn abstract-op lt="TransformStreamDefaultSourceCancelAlgorithm"
id="transform-stream-default-source-cancel">TransformStreamDefaultSourceCancelAlgorithm(|reason|,
|stream|)</dfn> performs the following steps:

1. Let |writable| be |stream|.[=TransformStream/[[writable]]=].
1. If |writable|.[=WritableStream/[[state]]=] is not "`writable`", return
[=a promise resolved with=] undefined.
1. Perform !
[$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
1. Let |cancelPromise| be the result of performing
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
1. Return |cancelPromise|.

<p class="note">The early return prevents the cancellation algorithm from being called if the
writable side is already (in the process of being) closed. This is important, because the
cancellation algorithm <span class=allow-2119>must</span> not run if the flush algorithm has
already been run.
</div>

<h2 id="qs">Queuing strategies</h2>

<h3 id="qs-api">The queuing strategy API</h3>
Expand Down Expand Up @@ -7106,9 +7175,11 @@ reason.
<div algorithm="create a TransformStream">
To <dfn export for="TransformStream" lt="set up|setting up">set up</dfn> a
newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm <dfn export
for="TransformStream/set up"><var>transformAlgorithm</var></dfn> and an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, perform the following steps.
|transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise.
for="TransformStream/set up"><var>transformAlgorithm</var></dfn>, an optional algorithm <dfn
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, and an optional algorithm <dfn
export for="TransformStream/set up"><var>cancelAlgorithm</var></dfn>, perform the following steps.
|transformAlgorithm|, if given, |flushAlgorithm|, and, if given, |cancelAlgorithm|, may return a
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
promise.

1. Let |writableHighWaterMark| be 1.
1. Let |writableSizeAlgorithm| be an algorithm that returns 1.
Expand All @@ -7124,12 +7195,16 @@ reason.
null otherwise. If this throws an exception |e|, return [=a promise rejected with=] |e|.
1. If |result| is a {{Promise}}, then return |result|.
1. Return [=a promise resolved with=] undefined.
1. Let |cancelAlgorithmWrapper| be an algorithm that runs these steps given a value |reason|:
1. Let |result| be the result of running |cancelAlgorithm| given |reason|, if |cancelAlgorithm|
was given, or null otherwise. If this throws an exception |e|, return
[=a promise rejected with=] |e|.
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
1. Let |startPromise| be [=a promise resolved with=] undefined.
1. Perform ! [$InitializeTransformStream$](|stream|, |startPromise|, |writableHighWaterMark|,
|writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}.
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|).
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|, |cancelAlgorithmWrapper|).

Other specifications should be careful when constructing their
<i>[=TransformStream/set up/transformAlgorithm=]</i> to avoid [=in parallel=] reads from the given
Expand Down
2 changes: 2 additions & 0 deletions reference-implementation/lib/Transformer.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
TransformerCancelCallback cancel;
any readableType;
any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason, TransformStreamDefaultController controller);
51 changes: 43 additions & 8 deletions reference-implementation/lib/abstract-ops/transform-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ function InitializeTransformStream(
}

function cancelAlgorithm(reason) {
TransformStreamErrorWritableAndUnblockWrite(stream, reason);
return promiseResolvedWith(undefined);
return TransformStreamDefaultSourceCancelAlgorithm(stream, reason);
}

stream._readable = CreateReadableStream(
Expand All @@ -77,6 +76,10 @@ function TransformStreamError(stream, e) {
function TransformStreamErrorWritableAndUnblockWrite(stream, e) {
TransformStreamDefaultControllerClearAlgorithms(stream._controller);
WritableStreamDefaultControllerErrorIfNeeded(stream._writable._controller, e);
TransformStreamUnblockWrite(stream);
}

function TransformStreamUnblockWrite(stream) {
if (stream._backpressure === true) {
// Pretend that pull() was called to permit any pending write() calls to complete. TransformStreamSetBackpressure()
// cannot be called from enqueue() or pull() once the ReadableStream is errored, so this will will be the final time
Expand All @@ -102,7 +105,8 @@ function TransformStreamSetBackpressure(stream, backpressure) {

// Default controllers

function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) {
function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm,
cancelAlgorithm) {
assert(TransformStream.isImpl(stream));
assert(stream._controller === undefined);

Expand All @@ -111,6 +115,7 @@ function SetUpTransformStreamDefaultController(stream, controller, transformAlgo

controller._transformAlgorithm = transformAlgorithm;
controller._flushAlgorithm = flushAlgorithm;
controller._cancelAlgorithm = cancelAlgorithm;
}

function SetUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) {
Expand All @@ -126,20 +131,25 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme
};

let flushAlgorithm = () => promiseResolvedWith(undefined);
let cancelAlgorithm = () => promiseResolvedWith(undefined);

if ('transform' in transformerDict) {
transformAlgorithm = chunk => transformerDict.transform.call(transformer, chunk, controller);
}
if ('flush' in transformerDict) {
flushAlgorithm = () => transformerDict.flush.call(transformer, controller);
}
if ('cancel' in transformerDict) {
cancelAlgorithm = reason => transformerDict.cancel.call(transformer, reason, controller);
}

SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm);
}

function TransformStreamDefaultControllerClearAlgorithms(controller) {
controller._transformAlgorithm = undefined;
controller._flushAlgorithm = undefined;
controller._cancelAlgorithm = undefined;
}

function TransformStreamDefaultControllerEnqueue(controller, chunk) {
Expand Down Expand Up @@ -221,10 +231,17 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
}

function TransformStreamDefaultSinkAbortAlgorithm(stream, reason) {
// abort() is not called synchronously, so it is possible for abort() to be called when the stream is already
// errored.
TransformStreamError(stream, reason);
return promiseResolvedWith(undefined);
verbose('TransformStreamDefaultSinkAbortAlgorithm()');

// stream._readable cannot change after construction, so caching it across a call to user code is safe.
const readable = stream._readable;
ReadableStreamDefaultControllerError(readable._controller, reason);

const controller = stream._controller;
const cancelPromise = controller._cancelAlgorithm(reason);
TransformStreamDefaultControllerClearAlgorithms(controller);

return cancelPromise;
}

function TransformStreamDefaultSinkCloseAlgorithm(stream) {
Expand Down Expand Up @@ -264,3 +281,21 @@ function TransformStreamDefaultSourcePullAlgorithm(stream) {
// Prevent the next pull() call until there is backpressure.
return stream._backpressureChangePromise;
}

function TransformStreamDefaultSourceCancelAlgorithm(stream, reason) {
verbose('TransformStreamDefaultSourceCancelAlgorithm()');

// stream._writable cannot change after construction, so caching it across a call to user code is safe.
const writable = stream._writable;
if (writable._state !== 'writable') {
return promiseResolvedWith(undefined);
}
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason);
TransformStreamUnblockWrite(stream);

const controller = stream._controller;
const cancelPromise = controller._cancelAlgorithm(reason);
TransformStreamDefaultControllerClearAlgorithms(controller);

return cancelPromise;
}
Loading