Skip to content

Commit

Permalink
update implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
youennf committed Apr 13, 2023
1 parent 38f415f commit 340375a
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 22 deletions.
2 changes: 1 addition & 1 deletion reference-implementation/lib/ReadableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ exports.implementation = class ReadableStreamImpl {
this, underlyingSource, underlyingSourceDict, highWaterMark
);
} else {
assert(!('type' in underlyingSourceDict));
assert(!('type' in underlyingSourceDict) || underlyingSourceDict.type === 'transfer');
const sizeAlgorithm = ExtractSizeAlgorithm(strategy);
const highWaterMark = ExtractHighWaterMark(strategy, 1);
aos.SetUpReadableStreamDefaultControllerFromUnderlyingSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ exports.implementation = class ReadableStreamDefaultControllerImpl {
aos.ReadableStreamDefaultControllerClose(this);
}

enqueue(chunk) {
enqueue(chunk, optionsOrTransfer) {
const hasTransfer = optionsOrTransfer && !Array.isArray(optionsOrTransfer);
const transfer = hasTransfer ? optionsOrTransfer.transfer : optionsOrTransfer;
if (aos.ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
throw new TypeError('The stream is not in a state that permits enqueue');
}

return aos.ReadableStreamDefaultControllerEnqueue(this, chunk);
return aos.ReadableStreamDefaultControllerEnqueue(this, chunk, transfer);
}

error(e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
dictionary StructuredSerializeOptions {
sequence<object> transfer = [];
};

[Exposed=(Window,Worker,Worklet)]
interface ReadableStreamDefaultController {
readonly attribute unrestricted double? desiredSize;

undefined close();
undefined enqueue(optional any chunk);
undefined enqueue(any chunk, sequence<object> transfer);
undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { });
undefined error(optional any e);
};
2 changes: 1 addition & 1 deletion reference-implementation/lib/UnderlyingSource.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);

enum ReadableStreamType { "bytes" };
enum ReadableStreamType { "bytes", "transfer" };
8 changes: 8 additions & 0 deletions reference-implementation/lib/abstract-ops/miscellaneous.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ exports.CloneAsUint8Array = O => {
const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength);
return new Uint8Array(buffer);
};

exports.StructuredTransferOrClone = (value, transfer) => {
// FIXME: We should check whether value is an array buffer or is transferable and update transfer accordingly.
if (structuredClone)
return structuredClone(value, transfer);

return JSON.parse(JSON.stringify(value));
};
20 changes: 17 additions & 3 deletions reference-implementation/lib/abstract-ops/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const assert = require('assert');
const { IsNonNegativeNumber } = require('./miscellaneous.js');
const { IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');

exports.DequeueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container);
Expand All @@ -15,7 +15,7 @@ exports.DequeueValue = container => {
return pair.value;
};

exports.EnqueueValueWithSize = (container, value, size) => {
exports.EnqueueValueWithSize = (container, value, size, transfer) => {
assert('_queue' in container && '_queueTotalSize' in container);

if (!IsNonNegativeNumber(size)) {
Expand All @@ -24,7 +24,9 @@ exports.EnqueueValueWithSize = (container, value, size) => {
if (size === Infinity) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}

if (container.isTransferring) {
value = StructuredTransferOrClone(value, transfer);
}
container._queue.push({ value, size });
container._queueTotalSize += size;
};
Expand All @@ -40,6 +42,18 @@ exports.PeekQueueValue = container => {
exports.ResetQueue = container => {
assert('_queue' in container && '_queueTotalSize' in container);

if (container.isTransferring) {
while (container._queue.length > 0) {
const value = exports.DequeueValue(container);
if (typeof value.close === 'function') {
try {
value.close();
} catch (closeException) {
// Nothing to do.
}
}
}
}
container._queue = [];
container._queueTotalSize = 0;
};
32 changes: 21 additions & 11 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
const { CloneAsUint8Array, IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -89,7 +89,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi

const controller = ReadableStreamDefaultController.new(globalThis);
SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, false
);

return stream;
Expand Down Expand Up @@ -340,7 +340,7 @@ function ReadableStreamTee(stream, cloneForBranch2) {
if (ReadableByteStreamController.isImpl(stream._controller)) {
return ReadableByteStreamTee(stream);
}
return ReadableStreamDefaultTee(stream, cloneForBranch2);
return ReadableStreamDefaultTee(stream, stream._controller.isTransferring ? true : cloneForBranch2);
}

function ReadableStreamDefaultTee(stream, cloneForBranch2) {
Expand Down Expand Up @@ -392,10 +392,10 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
// }

if (canceled1 === false) {
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1);
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1, undefined);
}
if (canceled2 === false) {
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2);
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2, undefined);
}

reading = false;
Expand Down Expand Up @@ -1074,14 +1074,22 @@ function ReadableStreamDefaultControllerClose(controller) {
}
}

function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
function ReadableStreamDefaultControllerEnqueue(controller, chunk, transfer) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) {
return;
}

const stream = controller._stream;

if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
if (controller.isTransferring) {
try {
chunk = StructuredTransferOrClone(chunk, transfer);
} catch (chunkCloneError) {
ReadableStreamDefaultControllerError(controller, chunkCloneError);
throw chunkCloneError;
}
}
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize;
Expand All @@ -1093,7 +1101,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
}

try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, transfer);
} catch (enqueueE) {
ReadableStreamDefaultControllerError(controller, enqueueE);
throw enqueueE;
Expand Down Expand Up @@ -1148,7 +1156,7 @@ function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
}

function SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) {
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isTransferring) {
assert(stream._controller === undefined);

controller._stream = stream;
Expand All @@ -1169,6 +1177,8 @@ function SetUpReadableStreamDefaultController(
controller._pullAlgorithm = pullAlgorithm;
controller._cancelAlgorithm = cancelAlgorithm;

controller.isTransferring = isTransferring;

stream._controller = controller;

const startResult = startAlgorithm();
Expand All @@ -1195,7 +1205,7 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
let startAlgorithm = () => undefined;
let pullAlgorithm = () => promiseResolvedWith(undefined);
let cancelAlgorithm = () => promiseResolvedWith(undefined);

const isTransferring = underlyingSourceDict.type === 'transfer';
if ('start' in underlyingSourceDict) {
startAlgorithm = () => underlyingSourceDict.start.call(underlyingSource, controller);
}
Expand All @@ -1207,8 +1217,8 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
}

SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
);
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm,
isTransferring);
}

// Byte stream controllers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) {
// accept TransformStreamDefaultControllerEnqueue() calls.

try {
ReadableStreamDefaultControllerEnqueue(readableController, chunk);
ReadableStreamDefaultControllerEnqueue(readableController, chunk, undefined);
} catch (e) {
// This happens when readableStrategy.size() throws.
TransformStreamErrorWritableAndUnblockWrite(stream, e);
Expand Down
4 changes: 2 additions & 2 deletions reference-implementation/lib/abstract-ops/writable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ function WritableStreamDefaultControllerClearAlgorithms(controller) {
}

function WritableStreamDefaultControllerClose(controller) {
EnqueueValueWithSize(controller, closeSentinel, 0);
EnqueueValueWithSize(controller, closeSentinel, 0, undefined);
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}

Expand Down Expand Up @@ -729,7 +729,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {

function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, undefined);
} catch (enqueueE) {
WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);
return;
Expand Down

0 comments on commit 340375a

Please sign in to comment.