diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index 425ee15be..1c628f673 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -37,8 +37,7 @@ class TransformStream { TransformStreamSetBackpressure(this, true); const stream = this; - const startResult = InvokeOrNoop(transformer, 'start', - [stream._transformStreamController]); + const startResult = InvokeOrNoop(transformer, 'start', [stream._transformStreamController]); startPromise_resolve(startResult); } @@ -73,56 +72,6 @@ function IsTransformStream(x) { return true; } -function TransformStreamCloseReadable(stream) { - // console.log('TransformStreamCloseReadable()'); - - if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readable._readableStreamController) === false) { - throw new TypeError('Readable side is not in a state that can be closed'); - } - - TransformStreamCloseReadableInternal(stream); -} - -function TransformStreamCloseReadableInternal(stream) { - assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readable._readableStreamController) === true); - - ReadableStreamDefaultControllerClose(stream._readable._readableStreamController); -} - -function TransformStreamDefaultTransform(chunk, controller) { - const stream = controller._controlledTransformStream; - TransformStreamEnqueueToReadable(stream, chunk); - return Promise.resolve(); -} - -function TransformStreamEnqueueToReadable(stream, chunk) { - // console.log('TransformStreamEnqueueToReadable()'); - - if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readable._readableStreamController) === false) { - throw new TypeError('Readable side is not in a state that permits enqueue'); - } - - // We throttle transformer.transform invocation based on the backpressure of the ReadableStream, but we still - // accept TransformStreamEnqueueToReadable() calls. - - const controller = stream._readable._readableStreamController; - - try { - ReadableStreamDefaultControllerEnqueue(controller, chunk); - } catch (e) { - // This happens when readableStrategy.size() throws. - TransformStreamError(stream, e); - - throw stream._readable._storedError; - } - - const backpressure = ReadableStreamDefaultControllerHasBackpressure(controller); - if (backpressure !== stream._backpressure) { - assert(backpressure === true, 'backpressure is *true*'); - TransformStreamSetBackpressure(stream, true); - } -} - // This is a no-op if both sides are already errored. function TransformStreamError(stream, e) { // console.log('TransformStreamError()'); @@ -163,26 +112,6 @@ function TransformStreamSetBackpressure(stream, backpressure) { stream._backpressure = backpressure; } -function TransformStreamTransform(stream, chunk) { - // console.log('TransformStreamTransform()'); - - assert(stream._readable._state !== 'errored'); - assert(stream._backpressure === false); - - const transformer = stream._transformer; - const controller = stream._transformStreamController; - - const transformPromise = PromiseInvokeOrPerformFallback(transformer, 'transform', [chunk, controller], - TransformStreamDefaultTransform, [chunk, controller]); - - return transformPromise.then( - undefined, - e => { - TransformStreamError(stream, e); - return Promise.reject(e); - }); -} - // Class TransformStreamDefaultController class TransformStreamDefaultController { @@ -216,7 +145,7 @@ class TransformStreamDefaultController { throw defaultControllerBrandCheckException('enqueue'); } - TransformStreamEnqueueToReadable(this._controlledTransformStream, chunk); + TransformStreamDefaultControllerEnqueue(this, chunk); } close() { @@ -224,7 +153,7 @@ class TransformStreamDefaultController { throw defaultControllerBrandCheckException('close'); } - TransformStreamCloseReadable(this._controlledTransformStream); + TransformStreamDefaultControllerClose(this); } error(reason) { @@ -254,6 +183,46 @@ function IsTransformStreamDefaultController(x) { return true; } +function TransformStreamDefaultControllerClose(controller) { + // console.log('TransformStreamDefaultControllerClose()'); + + const stream = controller._controlledTransformStream; + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readable._readableStreamController) === false) { + throw new TypeError('Readable side is not in a state that can be closed'); + } + + ReadableStreamDefaultControllerClose(stream._readable._readableStreamController); +} + +function TransformStreamDefaultControllerEnqueue(controller, chunk) { + // console.log('TransformStreamDefaultControllerEnqueue()'); + + const stream = controller._controlledTransformStream; + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readable._readableStreamController) === false) { + throw new TypeError('Readable side is not in a state that permits enqueue'); + } + + // We throttle transformer.transform invocations based on the backpressure of the ReadableStream, but we still + // accept TransformStreamDefaultControllerEnqueue() calls. + + const readableController = stream._readable._readableStreamController; + + try { + ReadableStreamDefaultControllerEnqueue(readableController, chunk); + } catch (e) { + // This happens when readableStrategy.size() throws. + TransformStreamError(stream, e); + + throw stream._readable._storedError; + } + + const backpressure = ReadableStreamDefaultControllerHasBackpressure(readableController); + if (backpressure !== stream._backpressure) { + assert(backpressure === true, 'backpressure is *true*'); + TransformStreamSetBackpressure(stream, true); + } +} + function TransformStreamDefaultControllerError(controller, e) { const stream = controller._controlledTransformStream; @@ -291,11 +260,11 @@ class TransformStreamDefaultSink { return Promise.reject(writable._storedError); } assert(state === 'writable', 'state is `"writable"`'); - return TransformStreamTransform(stream, chunk); + return TransformStreamDefaultSinkTransform(this, chunk); }); } - return TransformStreamTransform(stream, chunk); + return TransformStreamDefaultSinkTransform(this, chunk); } abort() { @@ -310,15 +279,14 @@ class TransformStreamDefaultSink { const stream = this._ownerTransformStream; - const flushPromise = PromiseInvokeOrNoop(stream._transformer, - 'flush', [stream._transformStreamController]); + const flushPromise = PromiseInvokeOrNoop(stream._transformer, 'flush', [stream._transformStreamController]); // Return a promise that is fulfilled with undefined on success. return flushPromise.then(() => { if (stream._readable._state === 'errored') { return Promise.reject(stream._readable._storedError); } if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readable._readableStreamController) === true) { - TransformStreamCloseReadableInternal(stream); + ReadableStreamDefaultControllerClose(stream._readable._readableStreamController); } return Promise.resolve(); }).catch(r => { @@ -328,6 +296,33 @@ class TransformStreamDefaultSink { } } +function TransformStreamDefaultSinkDefaultTransform(chunk, controller) { + TransformStreamDefaultControllerEnqueue(controller, chunk); + return Promise.resolve(); +} + +function TransformStreamDefaultSinkTransform(sink, chunk) { + // console.log('TransformStreamDefaultSinkTransform()'); + + const stream = sink._ownerTransformStream; + + assert(stream._readable._state !== 'errored'); + assert(stream._backpressure === false); + + const transformer = stream._transformer; + const controller = stream._transformStreamController; + + const transformPromise = PromiseInvokeOrPerformFallback(transformer, 'transform', [chunk, controller], + TransformStreamDefaultSinkDefaultTransform, [chunk, controller]); + + return transformPromise.then( + undefined, + e => { + TransformStreamError(stream, e); + return Promise.reject(e); + }); +} + // Class TransformStreamDefaultSource class TransformStreamDefaultSource {