Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ReadableStream "owning" type #1271

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9d67644
Add support for a new ReadableStream "owning" type.
youennf Apr 9, 2023
e05ba57
Update index.bs
youennf Apr 14, 2023
6ef5db9
Use _isOwning private slot
youennf Apr 14, 2023
13da733
Ensure transferList is empty for non owning readable streams
youennf Apr 17, 2023
65b6f2a
Update ReadableStreamDefaultController enqueue non normative description
youennf Apr 18, 2023
38d0bc0
Update index.bs
youennf Apr 20, 2023
b8ee345
Update index.bs
youennf Apr 20, 2023
8057ea1
fix StructuredTransferOrClone
youennf Apr 20, 2023
8633ef6
Add Assert to ReadableStreamDefaultControllerEnqueue
youennf Apr 20, 2023
2f924f4
Use Symbol for closing steps
youennf Apr 20, 2023
48ebb43
Update index.bs
youennf Apr 20, 2023
611aae5
Update index.bs
youennf Apr 20, 2023
6d3284f
Update index.bs
youennf Apr 20, 2023
24698dd
Update index.bs
youennf Apr 20, 2023
f5d32b5
Update index.bs
youennf Apr 20, 2023
d4b8b84
Update index.bs
youennf Apr 20, 2023
52a0116
Update index.bs
youennf Apr 20, 2023
21237a6
Update index.bs
youennf Apr 20, 2023
9f0bb24
Update index.bs
youennf Apr 20, 2023
9db7cc2
Update reference-implementation/lib/ReadableStreamDefaultController-i…
youennf Apr 20, 2023
cc6a6b7
Update reference-implementation/lib/abstract-ops/transform-streams.js
youennf Apr 20, 2023
ddbed60
Update reference-implementation/lib/abstract-ops/readable-streams.js
youennf Apr 20, 2023
0ab0d72
Update reference-implementation/lib/abstract-ops/readable-streams.js
youennf Apr 20, 2023
77450a8
Rename closing steps to dispose steps
youennf Apr 20, 2023
46718bb
Update index.bs
youennf May 10, 2023
db7658b
Update index.bs
youennf May 10, 2023
94ebacf
Update index.bs
youennf May 10, 2023
bf7404b
Update index.bs
youennf May 10, 2023
657e85f
Update index.bs
youennf May 10, 2023
a1c6573
Update index.bs
youennf May 10, 2023
4c80708
Update reference-implementation/lib/ReadableStreamDefaultController.w…
youennf May 10, 2023
9146111
Update according review
youennf May 10, 2023
002587a
Reverting WPT module change
youennf Jun 7, 2023
7679251
Update according saschanaz review
youennf Jun 12, 2023
0380d03
Merge branch 'main' into add-support-for-stream-transfer-type
youennf Jun 12, 2023
fa981ae
Move isOwning tab entry
youennf Jun 12, 2023
cbd64e1
Merge branch 'main' into add-support-for-stream-transfer-type
MattiasBuelens May 3, 2024
81411c5
Update Enqueue() call inside ReadableStreamFromIterable
MattiasBuelens May 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
- uses: actions/setup-node@v1
- uses: actions/setup-node@v3
with:
node-version: 18
- run: npm install
Expand Down
109 changes: 77 additions & 32 deletions index.bs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion reference-implementation/lib/ReadableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ exports.implementation = class ReadableStreamImpl {
this, underlyingSource, underlyingSourceDict, highWaterMark
);
} else {
assert(!('type' in underlyingSourceDict));
assert(!('type' in underlyingSourceDict) || underlyingSourceDict.type === 'owning');
const sizeAlgorithm = ExtractSizeAlgorithm(strategy);
const highWaterMark = ExtractHighWaterMark(strategy, 1);
aos.SetUpReadableStreamDefaultControllerFromUnderlyingSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ exports.implementation = class ReadableStreamDefaultControllerImpl {
aos.ReadableStreamDefaultControllerClose(this);
}

enqueue(chunk) {
enqueue(chunk, options) {
const transferList = options.transfer;
if (transferList.length && !this._isOwning) {
throw new TypeError('The stream is not an owning stream and cannot make use of options');
}
if (aos.ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
throw new TypeError('The stream is not in a state that permits enqueue');
}

return aos.ReadableStreamDefaultControllerEnqueue(this, chunk);
return aos.ReadableStreamDefaultControllerEnqueue(this, chunk, transferList);
}

error(e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
dictionary StructuredSerializeOptions {
sequence<object> transfer = [];
};

[Exposed=(Window,Worker,Worklet)]
interface ReadableStreamDefaultController {
readonly attribute unrestricted double? desiredSize;

undefined close();
undefined enqueue(optional any chunk);
undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = {});
undefined error(optional any e);
};
2 changes: 1 addition & 1 deletion reference-implementation/lib/UnderlyingSource.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);

enum ReadableStreamType { "bytes" };
enum ReadableStreamType { "bytes", "owning" };
4 changes: 4 additions & 0 deletions reference-implementation/lib/abstract-ops/miscellaneous.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ exports.CloneAsUint8Array = O => {
const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength);
return new Uint8Array(buffer);
};

exports.StructuredTransferOrClone = (value, transferList) => {
return globalThis.structuredClone(value, { transfer: transferList });
};
22 changes: 19 additions & 3 deletions reference-implementation/lib/abstract-ops/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const assert = require('assert');
const { IsNonNegativeNumber } = require('./miscellaneous.js');
const { IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');

exports.DequeueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container);
Expand All @@ -15,7 +15,7 @@ exports.DequeueValue = container => {
return pair.value;
};

exports.EnqueueValueWithSize = (container, value, size) => {
exports.EnqueueValueWithSize = (container, value, size, transferList) => {
assert('_queue' in container && '_queueTotalSize' in container);

if (!IsNonNegativeNumber(size)) {
Expand All @@ -24,7 +24,9 @@ exports.EnqueueValueWithSize = (container, value, size) => {
if (size === Infinity) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}

if (container._isOwning) {
value = StructuredTransferOrClone(value, transferList);
}
container._queue.push({ value, size });
container._queueTotalSize += size;
};
Expand All @@ -37,9 +39,23 @@ exports.PeekQueueValue = container => {
return pair.value;
};

const disposeStepsSymbol = Symbol('dispose-steps');

exports.ResetQueue = container => {
assert('_queue' in container && '_queueTotalSize' in container);

if (container._isOwning) {
while (container._queue.length > 0) {
const value = exports.DequeueValue(container);
if (typeof value[disposeStepsSymbol] === 'function') {
try {
value[disposeStepsSymbol]();
} catch (closeException) {
// Nothing to do.
}
}
}
}
container._queue = [];
container._queueTotalSize = 0;
};
32 changes: 21 additions & 11 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
const { CloneAsUint8Array, IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -89,7 +89,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi

const controller = ReadableStreamDefaultController.new(globalThis);
SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, false
);

return stream;
Expand Down Expand Up @@ -340,7 +340,7 @@ function ReadableStreamTee(stream, cloneForBranch2) {
if (ReadableByteStreamController.isImpl(stream._controller)) {
return ReadableByteStreamTee(stream);
}
return ReadableStreamDefaultTee(stream, cloneForBranch2);
return ReadableStreamDefaultTee(stream, stream._controller._isOwning ? true : cloneForBranch2);
}

function ReadableStreamDefaultTee(stream, cloneForBranch2) {
Expand Down Expand Up @@ -392,10 +392,10 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
// }

if (canceled1 === false) {
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1);
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1, []);
}
if (canceled2 === false) {
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2);
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2, []);
}

reading = false;
Expand Down Expand Up @@ -1074,14 +1074,22 @@ function ReadableStreamDefaultControllerClose(controller) {
}
}

function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
function ReadableStreamDefaultControllerEnqueue(controller, chunk, transferList) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) {
return;
}

const stream = controller._stream;

if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
if (controller._isOwning) {
try {
chunk = StructuredTransferOrClone(chunk, transferList);
} catch (chunkCloneError) {
ReadableStreamDefaultControllerError(controller, chunkCloneError);
throw chunkCloneError;
}
}
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize;
Expand All @@ -1093,7 +1101,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
}

try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, transferList);
} catch (enqueueE) {
ReadableStreamDefaultControllerError(controller, enqueueE);
throw enqueueE;
Expand Down Expand Up @@ -1148,7 +1156,7 @@ function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
}

function SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) {
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isOwning) {
assert(stream._controller === undefined);

controller._stream = stream;
Expand All @@ -1169,6 +1177,8 @@ function SetUpReadableStreamDefaultController(
controller._pullAlgorithm = pullAlgorithm;
controller._cancelAlgorithm = cancelAlgorithm;

controller._isOwning = isOwning;

stream._controller = controller;

const startResult = startAlgorithm();
Expand All @@ -1195,7 +1205,7 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
let startAlgorithm = () => undefined;
let pullAlgorithm = () => promiseResolvedWith(undefined);
let cancelAlgorithm = () => promiseResolvedWith(undefined);

const isOwning = underlyingSourceDict.type === 'owning';
if ('start' in underlyingSourceDict) {
startAlgorithm = () => underlyingSourceDict.start.call(underlyingSource, controller);
}
Expand All @@ -1207,8 +1217,8 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
}

SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
);
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm,
isOwning);
}

// Byte stream controllers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) {
// accept TransformStreamDefaultControllerEnqueue() calls.

try {
ReadableStreamDefaultControllerEnqueue(readableController, chunk);
ReadableStreamDefaultControllerEnqueue(readableController, chunk, []);
} catch (e) {
// This happens when readableStrategy.size() throws.
TransformStreamErrorWritableAndUnblockWrite(stream, e);
Expand Down
4 changes: 2 additions & 2 deletions reference-implementation/lib/abstract-ops/writable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ function WritableStreamDefaultControllerClearAlgorithms(controller) {
}

function WritableStreamDefaultControllerClose(controller) {
EnqueueValueWithSize(controller, closeSentinel, 0);
EnqueueValueWithSize(controller, closeSentinel, 0, undefined);
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}

Expand Down Expand Up @@ -729,7 +729,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {

function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, undefined);
} catch (enqueueE) {
WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);
return;
Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/run-web-platform-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ async function main() {
'readable-byte-streams/non-transferable-buffers.any.html',
'readable-streams/owning-type-message-port.any.html', // disabled due to MessagePort use.
'readable-streams/owning-type-video-frame.any.html', // disabled due to VideoFrame use.
'readable-streams/owning-type.any.html', // FIXME: reenable this test once owning type PR lands.
'transferable/transform-stream-members.any.html' // FIXME: reenable if structuredClone is aligned.
];
const anyTestPattern = /\.any\.html$/;
Expand All @@ -64,6 +63,7 @@ async function main() {
}
};
};
window.structuredClone = globalThis.structuredClone;
window.eval(bundledJS);
},
filter(testPath) {
Expand Down