diff --git a/index.bs b/index.bs index c27dbe2c4..2c6449bd6 100644 --- a/index.bs +++ b/index.bs @@ -375,6 +375,10 @@ Instances of {{ReadableStream}} are created with the internal slots described in Description (non-normative) + + {{[[Detached]]}} + A boolean flag set to true when the stream has been transferred away to another realm + \[[disturbed]] A boolean flag set to true when the stream has been read from or canceled @@ -448,6 +452,7 @@ ReadableStream(underlyingSource = {}, { size, highWat 1. Set *this*.[[state]] to `"readable"`. 1. Set *this*.[[reader]] and *this*.[[storedError]] to *undefined*. + 1. Set *this*.[[Detached]] to *false*. 1. Set *this*.[[disturbed]] to *false*. 1. Set *this*.[[readableStreamController]] to *undefined*. 1. Let _type_ be ? GetV(_underlyingSource_, `"type"`). @@ -456,10 +461,10 @@ ReadableStream(underlyingSource = {}, { size, highWat 1. If _highWaterMark_ is *undefined*, let _highWaterMark_ be *0*. 1. Set *this*.[[readableStreamController]] to ? Construct(`ReadableByteStreamController`, « *this*, _underlyingSource_, _highWaterMark_ »). - 1. Otherwise, if _type_ is *undefined*, + 1. Otherwise, if _type_ is *undefined* or _type_ is `"cloning"`, 1. If _highWaterMark_ is *undefined*, let _highWaterMark_ be *1*. 1. Set *this*.[[readableStreamController]] to ? Construct(`ReadableStreamDefaultController`, « *this*, - _underlyingSource_, _size_, _highWaterMark_ »). + _underlyingSource_, _size_, _highWaterMark_, _type_ »). 1. Otherwise, throw a *RangeError* exception. @@ -733,6 +738,30 @@ ReadableStream(underlyingSource = {}, { size, highWat +

Readable Stream Internal Methods

+ +The following internal method is implemented by each {{ReadableStream}} instance. + +
\[[Transfer]](targetRealm)
+ + + 1. If ! IsReadableStreamLocked(*this*) is *true*, throw a *TypeError* exception. + 1. If *this*.[[state]] is `"errored"`, throw a *TypeError* exception. + 1. Let _controller_ be *this*.[[readableStreamController]]. + 1. If _controller_.[[targetRealm]] is *undefined*, throw a *TypeError* exception. + 1. Let _that_ be a new instance of ReadableStream in _targetRealm_. + 1. Set _that_.[[state]] to *this*.[[state]]. + 1. Set _that_.[[disturbed]] to *this*.[[disturbed]]. + 1. Set _controller_.[[controlledReadableStream]] to _that_. + 1. Set _that_.[[readableStreamController]] to _controller_. + 1. Let _queue_ be _controller_.[[queue]]. + 1. Repeat for each Record {[[value]], [[size]]} _pair_ that is an element of _queue_, + 1. Set _pair_.[[value]] to ! StructuredClone(_pair_.[[value]], _targetRealm_). + 1. Set _controller_.[[targetRealm]] to _targetRealm_. + 1. Set _this_.[[Detached]] to *true*. + 1. Return _that_. + +

General Readable Stream Abstract Operations

The following abstract operations, unlike most in this specification, are meant to be generally useful by other @@ -781,10 +810,11 @@ readable stream has ever been read from or canceled. stream ) This abstract operation is meant to be called from other specifications that may wish to query whether or not a -readable stream is locked to a reader. +readable stream is locked to a reader or has been transferred away to another realm. 1. Assert: ! IsReadableStream(_stream_) is *true*. + 1. If _stream_.[[Detached]] is *true*, return *true*. 1. If _stream_.[[reader]] is *undefined*, return *false*. 1. Return *true*. @@ -940,6 +970,7 @@ nothrow>ReadableStreamAddReadIntoRequest ( stream ) reason ) + 1. Assert: _stream_.[[Detached]] is *false*. 1. Set _stream_.[[disturbed]] to *true*. 1. If _stream_.[[state]] is `"closed"`, return a promise resolved with *undefined*. 1. If _stream_.[[state]] is `"errored"`, return a promise rejected with _stream_.[[storedError]]. @@ -1446,6 +1477,12 @@ Instances of {{ReadableStreamDefaultController}} are created with the internal s Description (non-normative) + + \[[targetRealm]] + Either *undefined*, or a Realm Record. If set to a Realm Record, ReadableStreamDefaultControllerEnqueue + will perform the structured clone algorithm on passed chunks, cloning them into the targetRealm + and enqueueing the result. + \[[closeRequested]] A boolean flag indicating whether the stream has been closed by its underlying source, but still has @@ -1494,7 +1531,7 @@ Instances of {{ReadableStreamDefaultController}} are created with the internal s

new ReadableStreamDefaultController(stream, underlyingSource, size, -highWaterMark)

+highWaterMark, type)
The ReadableStreamDefaultController constructor cannot be used directly; it only works on a @@ -1508,6 +1545,8 @@ ReadableStreamDefaultController(stream, underlyingSource, 1. Set *this*.[[underlyingSource]] to _underlyingSource_. 1. Set *this*.[[queue]] to a new empty List. 1. Set *this*.[[started]], *this*.[[closeRequested]], *this*.[[pullAgain]], and *this*.[[pulling]] to *false*. + 1. Set *this*.[[targetRealm]] to *undefined*. + 1. If _type_ is `"cloning"`, set *this*.[[targetRealm]] to the current Realm Record. 1. Let _normalizedStrategy_ be ? ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_). 1. Set *this*.[[strategySize]] to _normalizedStrategy_.[[size]] and *this*.[[strategyHWM]] to _normalizedStrategy_.[[highWaterMark]]. @@ -1681,8 +1720,14 @@ asserts). 1. Let _stream_ be _controller_.[[controlledReadableStream]]. 1. Assert: _controller_.[[closeRequested]] is *false*. 1. Assert: _stream_.[[state]] is `"readable"`. - 1. If ! IsReadableStreamLocked(_stream_) is *true* and ! ReadableStreamGetNumReadRequests(_stream_) > *0*, perform - ! ReadableStreamFulfillReadRequest(_stream_, _chunk_, *false*). + 1. If ! IsReadableStreamLocked(_stream_) is *true* and ! ReadableStreamGetNumReadRequests(_stream_) > *0*, + 1. If _controller_.[[targetRealm]] is not *undefined*, + 1. Let _chunk_ be StructuredClone(_chunk_, _controller_.[[targetRealm]]). + 1. If _chunk_ is an abrupt completion, + 1. Perform ! ReadableStreamDefaultControllerErrorIfNeeded(_controller_, _chunk_.[[Value]]). + 1. Return _chunk_. + 1. Let _chunk_ be _chunk_.[[Value]]. + 1. Perform ! ReadableStreamFulfillReadRequest(_stream_, _chunk_, *false*). 1. Otherwise, 1. Let _chunkSize_ be *1*. 1. If _controller_.[[strategySize]] is not *undefined*, @@ -3589,11 +3634,13 @@ throughout the rest of this standard.

EnqueueValueWithSize ( queue, -value, size )

+value, size, targetRealm ) + 1. If _targetRealm_ was not passed, let _targetRealm_ be *undefined*. 1. Let _size_ be ? ToNumber(_size_). 1. If ! IsFiniteNonNegativeNumber(_size_) is *false*, throw a *RangeError* exception. + 1. If _targetRealm_ is not *undefined*, let _value_ be the result of ? StructuredClone(_value_, _targetRealm_). 1. Append Record {[[value]]: _value_, [[size]]: _size_} as the last element of _queue_. diff --git a/reference-implementation/lib/queue-with-sizes.js b/reference-implementation/lib/queue-with-sizes.js index 9ded23094..cedc1f959 100644 --- a/reference-implementation/lib/queue-with-sizes.js +++ b/reference-implementation/lib/queue-with-sizes.js @@ -1,5 +1,7 @@ 'use strict'; const assert = require('assert'); +/* structured clone is impossible to truly polyfill, but closest match */ +const StructuredClone = require('realistic-structured-clone'); const { IsFiniteNonNegativeNumber } = require('./helpers.js'); exports.DequeueValue = queue => { @@ -11,12 +13,16 @@ exports.DequeueValue = queue => { return pair.value; }; -exports.EnqueueValueWithSize = (queue, value, size) => { +exports.EnqueueValueWithSize = (queue, value, size, targetRealm) => { size = Number(size); if (!IsFiniteNonNegativeNumber(size)) { throw new RangeError('Size must be a finite, non-NaN, non-negative number.'); } + if (targetRealm !== undefined) { + value = StructuredClone(value/* , targetRealm*/); + } + queue.push({ value, size }); if (queue._totalSize === undefined) { diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 086659b50..68a52b333 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -1,5 +1,7 @@ 'use strict'; const assert = require('assert'); +/* structured clone is impossible to truly polyfill, but closest match */ +const StructuredClone = require('realistic-structured-clone'); const { ArrayBufferCopy, CreateIterResultObject, IsFiniteNonNegativeNumber, InvokeOrNoop, PromiseInvokeOrNoop, SameRealmTransfer, ValidateAndNormalizeQueuingStrategy, ValidateAndNormalizeHighWaterMark } = require('./helpers.js'); @@ -12,6 +14,7 @@ const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLo const InternalCancel = Symbol('[[Cancel]]'); const InternalPull = Symbol('[[Pull]]'); +const InternalTransfer = Symbol('[[Transfer]]'); class ReadableStream { constructor(underlyingSource = {}, { size, highWaterMark } = {}) { @@ -21,6 +24,7 @@ class ReadableStream { this._reader = undefined; this._storedError = undefined; + this._Detached = false; this._disturbed = false; // Initialize to undefined first because the constructor of the controller checks this @@ -33,11 +37,13 @@ class ReadableStream { highWaterMark = 0; } this._readableStreamController = new ReadableByteStreamController(this, underlyingSource, highWaterMark); - } else if (type === undefined) { + } else if (type === undefined || type === 'cloning') { if (highWaterMark === undefined) { highWaterMark = 1; } - this._readableStreamController = new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark); + + this._readableStreamController = + new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark, type); } else { throw new RangeError('Invalid type is specified'); } @@ -251,6 +257,33 @@ class ReadableStream { const branches = ReadableStreamTee(this, false); return createArrayFromList(branches); } + + [InternalTransfer](targetRealm) { + if (IsReadableStreamLocked(this) === true) { + throw new TypeError('Cannot transfer a locked stream'); + } + if (this._state === 'errored') { + throw new TypeError('Cannot transfer an errored stream'); + } + const controller = this._readableStreamController; + if (controller._targetRealm === undefined) { + throw new TypeError('Only cloning streams are transferable'); + } + /* at least approximate realm-transfer */ + const that = new targetRealm.ReadableStream(); + that._state = this._state; + that._disturbed = this._disturbed; + + controller._controlledReadableStream = that; + that._readableStreamController = controller; + for (const pair of controller._queue) { + pair.value = StructuredClone(pair.value/* , targetRealm*/); + } + controller._targetRealm = targetRealm; + this._Detached = true; + + return that; + } } module.exports = { @@ -293,6 +326,9 @@ function IsReadableStreamDisturbed(stream) { function IsReadableStreamLocked(stream) { assert(IsReadableStream(stream) === true, 'IsReadableStreamLocked should only be used on known readable streams'); + if (stream._Detached === true) { + return true; + } if (stream._reader === undefined) { return false; } @@ -469,6 +505,8 @@ function ReadableStreamAddReadRequest(stream) { } function ReadableStreamCancel(stream, reason) { + assert(stream._Detached === false); + stream._disturbed = true; if (stream._state === 'closed') { @@ -839,7 +877,7 @@ function ReadableStreamDefaultReaderRead(reader) { // Controllers class ReadableStreamDefaultController { - constructor(stream, underlyingSource, size, highWaterMark) { + constructor(stream, underlyingSource, size, highWaterMark, type) { if (IsReadableStream(stream) === false) { throw new TypeError('ReadableStreamDefaultController can only be constructed with a ReadableStream instance'); } @@ -858,6 +896,12 @@ class ReadableStreamDefaultController { this._closeRequested = false; this._pullAgain = false; this._pulling = false; + this._targetRealm = undefined; + + if (type === 'cloning') { + /* can't access self/window/worker settings object from inside node module */ + this._targetRealm = global; // set this.[[targetRealm]] to current Realm Record + } const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark); this._strategySize = normalizedStrategy.size; @@ -1062,6 +1106,14 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) { assert(stream._state === 'readable'); if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) { + if (controller._targetRealm !== undefined) { + try { + chunk = StructuredClone(chunk/* , controller._targetRealm */); + } catch (cloneE) { + ReadableStreamDefaultControllerErrorIfNeeded(controller, cloneE); + throw cloneE; + } + } ReadableStreamFulfillReadRequest(stream, chunk, false); } else { let chunkSize = 1; @@ -1076,7 +1128,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) { } try { - EnqueueValueWithSize(controller._queue, chunk, chunkSize); + EnqueueValueWithSize(controller._queue, chunk, chunkSize, controller._targetRealm); } catch (enqueueE) { ReadableStreamDefaultControllerErrorIfNeeded(controller, enqueueE); throw enqueueE; diff --git a/reference-implementation/package.json b/reference-implementation/package.json index 1a203210f..60b9993b4 100644 --- a/reference-implementation/package.json +++ b/reference-implementation/package.json @@ -15,6 +15,9 @@ "Takeshi Yoshino " ], "license": "(CC0-1.0 OR MIT)", + "dependencies": { + "realistic-structured-clone": "^0.0.3" + }, "devDependencies": { "eslint": "^3.2.2", "glob": "^7.0.3",