Skip to content

Commit

Permalink
Add support for a new ReadableStream "owning" type.
Browse files Browse the repository at this point in the history
  • Loading branch information
youennf committed Apr 14, 2023
1 parent 2942e89 commit 841d2fd
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ jobs:
submodules: true
- uses: actions/setup-node@v1
with:
node-version: 14
node-version: 19
- run: npm install
- run: npm test
101 changes: 71 additions & 30 deletions index.bs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion reference-implementation/lib/ReadableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ exports.implementation = class ReadableStreamImpl {
this, underlyingSource, underlyingSourceDict, highWaterMark
);
} else {
assert(!('type' in underlyingSourceDict));
assert(!('type' in underlyingSourceDict) || underlyingSourceDict.type === 'owning');
const sizeAlgorithm = ExtractSizeAlgorithm(strategy);
const highWaterMark = ExtractHighWaterMark(strategy, 1);
aos.SetUpReadableStreamDefaultControllerFromUnderlyingSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ exports.implementation = class ReadableStreamDefaultControllerImpl {
aos.ReadableStreamDefaultControllerClose(this);
}

enqueue(chunk) {
enqueue(chunk, options) {
const transferList = options ? options.transfer : undefined;
if (aos.ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
throw new TypeError('The stream is not in a state that permits enqueue');
}

return aos.ReadableStreamDefaultControllerEnqueue(this, chunk);
return aos.ReadableStreamDefaultControllerEnqueue(this, chunk, transferList);
}

error(e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
dictionary StructuredSerializeOptions {
sequence<object> transfer = [];
};

[Exposed=(Window,Worker,Worklet)]
interface ReadableStreamDefaultController {
readonly attribute unrestricted double? desiredSize;

undefined close();
undefined enqueue(optional any chunk);
undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { });
undefined error(optional any e);
};
2 changes: 1 addition & 1 deletion reference-implementation/lib/UnderlyingSource.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);

enum ReadableStreamType { "bytes" };
enum ReadableStreamType { "bytes", "owning" };
4 changes: 4 additions & 0 deletions reference-implementation/lib/abstract-ops/miscellaneous.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ exports.CloneAsUint8Array = O => {
const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength);
return new Uint8Array(buffer);
};

exports.StructuredTransferOrClone = (value, transferList) => {
return globalThis.structuredClone(value, { transfer: transferList });
};
20 changes: 17 additions & 3 deletions reference-implementation/lib/abstract-ops/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const assert = require('assert');
const { IsNonNegativeNumber } = require('./miscellaneous.js');
const { IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');

exports.DequeueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container);
Expand All @@ -15,7 +15,7 @@ exports.DequeueValue = container => {
return pair.value;
};

exports.EnqueueValueWithSize = (container, value, size) => {
exports.EnqueueValueWithSize = (container, value, size, transferList) => {
assert('_queue' in container && '_queueTotalSize' in container);

if (!IsNonNegativeNumber(size)) {
Expand All @@ -24,7 +24,9 @@ exports.EnqueueValueWithSize = (container, value, size) => {
if (size === Infinity) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}

if (container.isOwning) {
value = StructuredTransferOrClone(value, transferList);
}
container._queue.push({ value, size });
container._queueTotalSize += size;
};
Expand All @@ -40,6 +42,18 @@ exports.PeekQueueValue = container => {
exports.ResetQueue = container => {
assert('_queue' in container && '_queueTotalSize' in container);

if (container.isOwning) {
while (container._queue.length > 0) {
const value = exports.DequeueValue(container);
if (typeof value.close === 'function') {
try {
value.close();
} catch (closeException) {
// Nothing to do.
}
}
}
}
container._queue = [];
container._queueTotalSize = 0;
};
32 changes: 21 additions & 11 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
const { CloneAsUint8Array, IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -89,7 +89,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi

const controller = ReadableStreamDefaultController.new(globalThis);
SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, false
);

return stream;
Expand Down Expand Up @@ -340,7 +340,7 @@ function ReadableStreamTee(stream, cloneForBranch2) {
if (ReadableByteStreamController.isImpl(stream._controller)) {
return ReadableByteStreamTee(stream);
}
return ReadableStreamDefaultTee(stream, cloneForBranch2);
return ReadableStreamDefaultTee(stream, stream._controller.isOwning ? true : cloneForBranch2);
}

function ReadableStreamDefaultTee(stream, cloneForBranch2) {
Expand Down Expand Up @@ -392,10 +392,10 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
// }

if (canceled1 === false) {
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1);
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1, undefined);
}
if (canceled2 === false) {
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2);
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2, undefined);
}

reading = false;
Expand Down Expand Up @@ -1074,14 +1074,22 @@ function ReadableStreamDefaultControllerClose(controller) {
}
}

function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
function ReadableStreamDefaultControllerEnqueue(controller, chunk, transferList) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) {
return;
}

const stream = controller._stream;

if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
if (controller.isOwning) {
try {
chunk = StructuredTransferOrClone(chunk, transferList);
} catch (chunkCloneError) {
ReadableStreamDefaultControllerError(controller, chunkCloneError);
throw chunkCloneError;
}
}
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize;
Expand All @@ -1093,7 +1101,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
}

try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, transferList);
} catch (enqueueE) {
ReadableStreamDefaultControllerError(controller, enqueueE);
throw enqueueE;
Expand Down Expand Up @@ -1148,7 +1156,7 @@ function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
}

function SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) {
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isOwning) {
assert(stream._controller === undefined);

controller._stream = stream;
Expand All @@ -1169,6 +1177,8 @@ function SetUpReadableStreamDefaultController(
controller._pullAlgorithm = pullAlgorithm;
controller._cancelAlgorithm = cancelAlgorithm;

controller.isOwning = isOwning;

stream._controller = controller;

const startResult = startAlgorithm();
Expand All @@ -1195,7 +1205,7 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
let startAlgorithm = () => undefined;
let pullAlgorithm = () => promiseResolvedWith(undefined);
let cancelAlgorithm = () => promiseResolvedWith(undefined);

const isOwning = underlyingSourceDict.type === 'owning';
if ('start' in underlyingSourceDict) {
startAlgorithm = () => underlyingSourceDict.start.call(underlyingSource, controller);
}
Expand All @@ -1207,8 +1217,8 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
}

SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
);
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm,
isOwning);
}

// Byte stream controllers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) {
// accept TransformStreamDefaultControllerEnqueue() calls.

try {
ReadableStreamDefaultControllerEnqueue(readableController, chunk);
ReadableStreamDefaultControllerEnqueue(readableController, chunk, undefined);
} catch (e) {
// This happens when readableStrategy.size() throws.
TransformStreamErrorWritableAndUnblockWrite(stream, e);
Expand Down
4 changes: 2 additions & 2 deletions reference-implementation/lib/abstract-ops/writable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ function WritableStreamDefaultControllerClearAlgorithms(controller) {
}

function WritableStreamDefaultControllerClose(controller) {
EnqueueValueWithSize(controller, closeSentinel, 0);
EnqueueValueWithSize(controller, closeSentinel, 0, undefined);
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}

Expand Down Expand Up @@ -729,7 +729,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {

function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, undefined);
} catch (enqueueE) {
WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);
return;
Expand Down
5 changes: 4 additions & 1 deletion reference-implementation/run-web-platform-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ async function main() {
const excludeGlobs = [
// These tests use ArrayBuffers backed by WebAssembly.Memory objects, which *should* be non-transferable.
// However, our TransferArrayBuffer implementation cannot detect these, and will incorrectly "transfer" them anyway.
'readable-byte-streams/non-transferable-buffers.any.html'
'readable-byte-streams/non-transferable-buffers.any.html',
'readable-streams/owning-type-message-port.any.html', // MessagePort is not defined.
'readable-streams/owning-type-video-frame.any.html' // VideoFrame is not defined.
];
const anyTestPattern = /\.any\.html$/;

Expand All @@ -58,6 +60,7 @@ async function main() {
}
};
};
window.structuredClone = globalThis.structuredClone;
window.eval(bundledJS);
},
filter(testPath) {
Expand Down

0 comments on commit 841d2fd

Please sign in to comment.