Skip to content

Commit

Permalink
Update for changes to HTML's structured cloning/transferring
Browse files Browse the repository at this point in the history
Closes #701.
  • Loading branch information
domenic committed Apr 5, 2017
1 parent 3c38f0e commit 480b2a3
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 27 deletions.
48 changes: 30 additions & 18 deletions index.bs
Expand Up @@ -331,7 +331,7 @@ copies.

An important thing to note here is that the final <code>buffer</code> value is different from the
<code>startingAB</code>, but it (and all intermediate buffers) shares the same backing memory allocation. At each
step, the buffer is <a abstract-op lt="Transfer">transferred</a> to a new {{ArrayBuffer}} object. The
step, the buffer is <a href="#transfer-array-buffer">transferred</a> to a new {{ArrayBuffer}} object. The
<code>newView</code> is a new {{Uint8Array}}, with that {{ArrayBuffer}} object as its <code>buffer</code> property,
the offset that bytes were written to as its <code>byteOffset</code> property, and the number of bytes that were
written as its <code>byteLength</code> property.
Expand Down Expand Up @@ -706,9 +706,7 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat
cancellation reason will then be propagated to the stream's <a>underlying source</a>.

Note that the <a>chunks</a> seen in each branch will be the same object. If the chunks are not immutable, this could
allow interference between the two branches. (<a href="https://github.com/whatwg/streams/issues/new">Let us know</a>
if you think we ought to add an option to <code>tee</code> that creates <a abstract-op lt="StructuredClone">structured
clones</a> of the chunks for each branch.)
allow interference between the two branches.
</div>

<emu-alg>
Expand Down Expand Up @@ -797,11 +795,14 @@ readable stream is <a>locked to a reader</a>.
This abstract operation is meant to be called from other specifications that may wish to <a lt="tee a readable
stream">tee</a> a given readable stream.

The second argument, <var>cloneForBranch2</var>, governs whether or not the data from the original stream will be <a
abstract-op lt="StructuredClone">structured cloned</a> before appearing in the second of the returned branches. This is
The second argument, <var>cloneForBranch2</var>, governs whether or not the data from the original stream will be cloned
(using HTML's <a>serializable objects</a> framework) before appearing in the second of the returned branches. This is
useful for scenarios where both branches are to be consumed in such a way that they might otherwise interfere with each
other, such as by <a abstract-op lt="Transfer">transfering</a> their <a>chunks</a>. However, it does introduce a
noticable asymmetry between the two branches. [[!HTML]]
other, such as by <a lt="transferable objects">transferring</a> their <a>chunks</a>. However, it does introduce a
noticable asymmetry between the two branches, and limits the possible <a>chunks</a> to serializable ones. [[!HTML]]

<p class="note">In this standard ReadableStreamTee is always called with <var>cloneForBranch2</var> set to
<emu-val>false</emu-val>; other specifications pass <emu-val>true</emu-val>.</p>

<emu-alg>
1. Assert: ! IsReadableStream(_stream_) is *true*.
Expand Down Expand Up @@ -857,7 +858,8 @@ a ReadableStreamTee pull function <var>F</var> is called, it performs the follow
1. If _teeState_.[[closedOrErrored]] is *true*, return.
1. Let _value1_ and _value2_ be _value_.
1. If _teeState_.[[canceled2]] is *false* and _cloneForBranch2_ is *true*, set _value2_ to ? <a
abstract-op>StructuredClone</a>(_value2_).
abstract-op>StructuredDeserialize</a>(<a abstract-op>StructuredSerialize</a>(_value2_), the current Realm
Record).
1. If _teeState_.[[canceled1]] is *false*, perform ? ReadableStreamDefaultControllerEnqueue(_branch1_, _value1_).
1. If _teeState_.[[canceled2]] is *false*, perform ? ReadableStreamDefaultControllerEnqueue(_branch2_, _value2_).
</emu-alg>
Expand Down Expand Up @@ -2201,7 +2203,7 @@ nothrow>ReadableByteStreamControllerEnqueue ( <var>controller</var>, <var>chunk<
1. Let _buffer_ be _chunk_.[[ViewedArrayBuffer]].
1. Let _byteOffset_ be _chunk_.[[ByteOffset]].
1. Let _byteLength_ be _chunk_.[[ByteLength]].
1. Let _transferredBuffer_ be ! <a abstract-op>Transfer</a>(_buffer_, the current Realm Record).
1. Let _transferredBuffer_ be ! TransferArrayBuffer(_buffer_).
1. If ! ReadableStreamHasDefaultReader(_stream_) is *true*
1. If ! ReadableStreamGetNumReadRequests(_stream_) is *0*,
1. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(_controller_, _transferredBuffer_, _byteOffset_,
Expand Down Expand Up @@ -2360,8 +2362,7 @@ nothrow>ReadableByteStreamControllerPullInto ( <var>controller</var>, <var>view<
_view_.[[ByteOffset]], [[byteLength]]: _view_.[[ByteLength]], [[bytesFilled]]: *0*, [[elementSize]]: _elementSize_,
[[ctor]]: _ctor_, [[readerType]]: `"byob"`}.
1. If _controller_.[[pendingPullIntos]] is not empty,
1. Set _pullIntoDescriptor_.[[buffer]] to ! <a abstract-op>Transfer</a>(_pullIntoDescriptor_.[[buffer]], the current
Realm Record).
1. Set _pullIntoDescriptor_.[[buffer]] to ! TransferArrayBuffer(_pullIntoDescriptor_.[[buffer]]).
1. Append _pullIntoDescriptor_ as the last element of _controller_.[[pendingPullIntos]].
1. Return ! ReadableStreamAddReadIntoRequest(_stream_).
1. If _stream_.[[state]] is `"closed"`,
Expand All @@ -2376,8 +2377,7 @@ nothrow>ReadableByteStreamControllerPullInto ( <var>controller</var>, <var>view<
1. Let _e_ be a *TypeError* exception.
1. Perform ! ReadableByteStreamControllerError(_controller_, _e_).
1. Return <a>a promise rejected with</a> _e_.
1. Set _pullIntoDescriptor_.[[buffer]] to ! <a abstract-op>Transfer</a>(_pullIntoDescriptor_.[[buffer]], the current
Realm Record).
1. Set _pullIntoDescriptor_.[[buffer]] to ! TransferArrayBuffer(_pullIntoDescriptor_.[[buffer]]).
1. Append _pullIntoDescriptor_ as the last element of _controller_.[[pendingPullIntos]].
1. Let _promise_ be ! ReadableStreamAddReadIntoRequest(_stream_).
1. Perform ! ReadableByteStreamControllerCallPullIfNeeded(_controller_).
Expand All @@ -2399,8 +2399,7 @@ throws>ReadableByteStreamControllerRespond ( <var>controller</var>, <var>bytesWr
nothrow>ReadableByteStreamControllerRespondInClosedState ( <var>controller</var>, <var>firstDescriptor</var> )</h4>

<emu-alg>
1. Set _firstDescriptor_.[[buffer]] to ! <a abstract-op>Transfer</a>(_firstDescriptor_.[[buffer]], the current Realm
Record).
1. Set _firstDescriptor_.[[buffer]] to ! TransferArrayBuffer(_firstDescriptor_.[[buffer]]).
1. Assert: _firstDescriptor_.[[bytesFilled]] is *0*.
1. Let _stream_ be _controller_.[[controlledReadableStream]].
1. Repeat the following steps while ! ReadableStreamGetNumReadIntoRequests(_stream_) > *0*,
Expand All @@ -2426,8 +2425,7 @@ aoid="ReadableByteStreamControllerRespondInReadableState" throws>ReadableByteStr
%ArrayBuffer%).
1. Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(_controller_, _remainder_, *0*,
_remainder_.[[ByteLength]]).
1. Set _pullIntoDescriptor_.[[buffer]] to ! <a abstract-op>Transfer</a>(_pullIntoDescriptor_.[[buffer]], the current
Realm Record).
1. Set _pullIntoDescriptor_.[[buffer]] to ! TransferArrayBuffer(_pullIntoDescriptor_.[[buffer]]).
1. Set _pullIntoDescriptor_.[[bytesFilled]] to _pullIntoDescriptor_.[[bytesFilled]] − _remainderSize_.
1. Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(_controller_.[[controlledReadableStream]],
_pullIntoDescriptor_).
Expand Down Expand Up @@ -3924,6 +3922,20 @@ A few abstract operations are used in this specification for utility purposes. W
1. Otherwise, return <a>a promise resolved with</a> _returnValue_.[[Value]].
</emu-alg>

<h4 id="transfer-array-buffer" aoid="TransferArrayBuffer" nothrow>TransferArrayBuffer ( <var>O</var> )</h4>

<emu-alg>
1. Assert: Type(_O_) is Object.
1. Assert: _O_ has an [[ArrayBufferData]] internal slot.
1. Assert: ! IsDetachedBuffer(_O_) is *false*.
1. Let _arrayBufferData_ be _O_.[[ArrayBufferData]].
1. Let _arrayBufferByteLength_ be _O_.[[ArrayBufferByteLength]].
1. Perform ! DetachArrayBuffer(_O_).
1. Return a new <a interface><code>ArrayBuffer</code></a> object (created in the current Realm Record) whose
[[ArrayBufferData]] internal slot value is _arrayBufferData_ and whose [[ArrayBufferByteLength]] internal slot
value is _arrayBufferByteLength_.
</emu-alg>

<h4 id="validate-and-normalize-high-water-mark" aoid="ValidateAndNormalizeHighWaterMark"
throws>ValidateAndNormalizeHighWaterMark ( <var>highWaterMark</var> )</h4>

Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/lib/helpers.js
Expand Up @@ -101,7 +101,7 @@ exports.PromiseInvokeOrPerformFallback = (O, P, args, F, argsF) => {
};

// Not implemented correctly
exports.SameRealmTransfer = O => O;
exports.TransferArrayBuffer = O => O.slice();

exports.ValidateAndNormalizeHighWaterMark = highWaterMark => {
highWaterMark = Number(highWaterMark);
Expand Down
16 changes: 8 additions & 8 deletions reference-implementation/lib/readable-stream.js
@@ -1,7 +1,7 @@
'use strict';
const assert = require('assert');
const { ArrayBufferCopy, CreateIterResultObject, IsFiniteNonNegativeNumber, InvokeOrNoop, PromiseInvokeOrNoop,
SameRealmTransfer, ValidateAndNormalizeQueuingStrategy, ValidateAndNormalizeHighWaterMark } =
TransferArrayBuffer, ValidateAndNormalizeQueuingStrategy, ValidateAndNormalizeHighWaterMark } =
require('./helpers.js');
const { createArrayFromList, createDataProperty, typeIsObject } = require('./helpers.js');
const { rethrowAssertionErrorRejection } = require('./utils.js');
Expand Down Expand Up @@ -383,9 +383,9 @@ function create_ReadableStreamTeePullFunction() {
const value2 = value;

// There is no way to access the cloning code right now in the reference implementation.
// If we add one then we'll need an implementation for StructuredClone.
// If we add one then we'll need an implementation for serializable objects.
// if (teeState.canceled2 === false && cloneForBranch2 === true) {
// value2 = StructuredClone(value2);
// value2 = StructuredDeserialize(StructuredSerialize(value2));
// }

if (teeState.canceled1 === false) {
Expand Down Expand Up @@ -1581,7 +1581,7 @@ function ReadableByteStreamControllerPullInto(controller, view) {
};

if (controller._pendingPullIntos.length > 0) {
pullIntoDescriptor.buffer = SameRealmTransfer(pullIntoDescriptor.buffer);
pullIntoDescriptor.buffer = TransferArrayBuffer(pullIntoDescriptor.buffer);
controller._pendingPullIntos.push(pullIntoDescriptor);

// No ReadableByteStreamControllerCallPullIfNeeded() call since:
Expand Down Expand Up @@ -1613,7 +1613,7 @@ function ReadableByteStreamControllerPullInto(controller, view) {
}
}

pullIntoDescriptor.buffer = SameRealmTransfer(pullIntoDescriptor.buffer);
pullIntoDescriptor.buffer = TransferArrayBuffer(pullIntoDescriptor.buffer);
controller._pendingPullIntos.push(pullIntoDescriptor);

const promise = ReadableStreamAddReadIntoRequest(stream);
Expand All @@ -1624,7 +1624,7 @@ function ReadableByteStreamControllerPullInto(controller, view) {
}

function ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor) {
firstDescriptor.buffer = SameRealmTransfer(firstDescriptor.buffer);
firstDescriptor.buffer = TransferArrayBuffer(firstDescriptor.buffer);

assert(firstDescriptor.bytesFilled === 0, 'bytesFilled must be 0');

Expand Down Expand Up @@ -1658,7 +1658,7 @@ function ReadableByteStreamControllerRespondInReadableState(controller, bytesWri
ReadableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder.byteLength);
}

pullIntoDescriptor.buffer = SameRealmTransfer(pullIntoDescriptor.buffer);
pullIntoDescriptor.buffer = TransferArrayBuffer(pullIntoDescriptor.buffer);
pullIntoDescriptor.bytesFilled -= remainderSize;
ReadableByteStreamControllerCommitPullIntoDescriptor(controller._controlledReadableStream, pullIntoDescriptor);

Expand Down Expand Up @@ -1755,7 +1755,7 @@ function ReadableByteStreamControllerEnqueue(controller, chunk) {
const buffer = chunk.buffer;
const byteOffset = chunk.byteOffset;
const byteLength = chunk.byteLength;
const transferredBuffer = SameRealmTransfer(buffer);
const transferredBuffer = TransferArrayBuffer(buffer);

if (ReadableStreamHasDefaultReader(stream) === true) {
if (ReadableStreamGetNumReadRequests(stream) === 0) {
Expand Down

0 comments on commit 480b2a3

Please sign in to comment.