Skip to content

Commit

Permalink
Rename "transformStream" variables to "stream"
Browse files Browse the repository at this point in the history
For consistency with Readable- and WritableStream, and brevity, rename
all "transformStream" parameters and local variables to just "stream".

Rename _transformStream members in Source and Sink to
_ownerTransformStream which is hopefully slightly clearer.

No functional changes.
  • Loading branch information
ricea committed Sep 22, 2017
1 parent 569aef4 commit 548b405
Showing 1 changed file with 77 additions and 77 deletions.
154 changes: 77 additions & 77 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class TransformStream {

TransformStreamSetBackpressure(this, true);

const transformStream = this;
const stream = this;
const startResult = InvokeOrNoop(transformer, 'start',
[transformStream._transformStreamController]);
[stream._transformStreamController]);
startPromise_resolve(startResult);
}

Expand Down Expand Up @@ -78,139 +78,139 @@ function IsTransformStream(x) {
return true;
}

function TransformStreamCloseReadable(transformStream) {
function TransformStreamCloseReadable(stream) {
// console.log('TransformStreamCloseReadable()');

if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === false) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readableController) === false) {
throw new TypeError('Readable side is not in a state that can be closed');
}

TransformStreamCloseReadableInternal(transformStream);
TransformStreamCloseReadableInternal(stream);
}

function TransformStreamCloseReadableInternal(transformStream) {
assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === true);
function TransformStreamCloseReadableInternal(stream) {
assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readableController) === true);

ReadableStreamDefaultControllerClose(transformStream._readableController);
ReadableStreamDefaultControllerClose(stream._readableController);
}

function TransformStreamDefaultTransform(chunk, transformStreamController) {
const transformStream = transformStreamController._controlledTransformStream;
TransformStreamEnqueueToReadable(transformStream, chunk);
const stream = transformStreamController._controlledTransformStream;
TransformStreamEnqueueToReadable(stream, chunk);
return Promise.resolve();
}

function TransformStreamEnqueueToReadable(transformStream, chunk) {
function TransformStreamEnqueueToReadable(stream, chunk) {
// console.log('TransformStreamEnqueueToReadable()');

if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === false) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readableController) === false) {
throw new TypeError('Readable side is not in a state that permits enqueue');
}

// We throttle transformer.transform invocation based on the backpressure of the ReadableStream, but we still
// accept TransformStreamEnqueueToReadable() calls.

const controller = transformStream._readableController;
const controller = stream._readableController;

try {
ReadableStreamDefaultControllerEnqueue(controller, chunk);
} catch (e) {
// This happens when readableStrategy.size() throws.
TransformStreamError(transformStream, e);
TransformStreamError(stream, e);

throw transformStream._readable._storedError;
throw stream._readable._storedError;
}

const backpressure = ReadableStreamDefaultControllerHasBackpressure(controller);
if (backpressure !== transformStream._backpressure) {
TransformStreamSetBackpressure(transformStream, backpressure);
if (backpressure !== stream._backpressure) {
TransformStreamSetBackpressure(stream, backpressure);
}
}

// This is a no-op if both sides are already errored.
function TransformStreamError(transformStream, e) {
function TransformStreamError(stream, e) {
// console.log('TransformStreamError()');

WritableStreamDefaultControllerErrorIfNeeded(transformStream._writableController, e);
if (transformStream._readable._state === 'readable') {
ReadableStreamDefaultControllerError(transformStream._readableController, e);
WritableStreamDefaultControllerErrorIfNeeded(stream._writableController, e);
if (stream._readable._state === 'readable') {
ReadableStreamDefaultControllerError(stream._readableController, e);
}
if (transformStream._backpressure === true) {
if (stream._backpressure === true) {
// Pretend that pull() was called to permit any pending write() or start() calls to complete.
// TransformStreamSetBackpressure() cannot be called from enqueue() or pull() once the ReadableStream is errored,
// so this will will be the final time _backpressure is set.
TransformStreamSetBackpressure(transformStream, false);
TransformStreamSetBackpressure(stream, false);
}
}

function TransformStreamSetBackpressure(transformStream, backpressure) {
function TransformStreamSetBackpressure(stream, backpressure) {
// console.log(`TransformStreamSetBackpressure(${backpressure})`);

// Passes also when called during construction.
assert(transformStream._backpressure !== backpressure,
assert(stream._backpressure !== backpressure,
'TransformStreamSetBackpressure() should be called only when backpressure is changed');

if (transformStream._backpressureChangePromise !== undefined) {
if (stream._backpressureChangePromise !== undefined) {
// The fulfillment value is just for a sanity check.
transformStream._backpressureChangePromise_resolve(backpressure);
stream._backpressureChangePromise_resolve(backpressure);
}

transformStream._backpressureChangePromise = new Promise(resolve => {
transformStream._backpressureChangePromise_resolve = resolve;
stream._backpressureChangePromise = new Promise(resolve => {
stream._backpressureChangePromise_resolve = resolve;
});

transformStream._backpressureChangePromise.then(resolution => {
stream._backpressureChangePromise.then(resolution => {
assert(resolution !== backpressure,
'_backpressureChangePromise should be fulfilled only when backpressure is changed');
});

transformStream._backpressure = backpressure;
stream._backpressure = backpressure;
}

function TransformStreamTransform(transformStream, chunk) {
function TransformStreamTransform(stream, chunk) {
// console.log('TransformStreamTransform()');

assert(transformStream._readable._state !== 'errored');
assert(transformStream._backpressure === false);
assert(stream._readable._state !== 'errored');
assert(stream._backpressure === false);

const transformer = transformStream._transformer;
const controller = transformStream._transformStreamController;
const transformer = stream._transformer;
const controller = stream._transformStreamController;

const transformPromise = PromiseInvokeOrPerformFallback(transformer, 'transform', [chunk, controller],
TransformStreamDefaultTransform, [chunk, controller]);

return transformPromise.then(
undefined,
e => {
TransformStreamError(transformStream, e);
TransformStreamError(stream, e);
return Promise.reject(e);
});
}

// Class TransformStreamDefaultController

class TransformStreamDefaultController {
constructor(transformStream) {
if (IsTransformStream(transformStream) === false) {
constructor(stream) {
if (IsTransformStream(stream) === false) {
throw new TypeError('TransformStreamDefaultController can only be ' +
'constructed with a TransformStream instance');
}

if (transformStream._transformStreamController !== undefined) {
if (stream._transformStreamController !== undefined) {
throw new TypeError('TransformStreamDefaultController instances can ' +
'only be created by the TransformStream constructor');
}

this._controlledTransformStream = transformStream;
this._controlledTransformStream = stream;
}

get desiredSize() {
if (IsTransformStreamDefaultController(this) === false) {
throw defaultControllerBrandCheckException('desiredSize');
}

const transformStream = this._controlledTransformStream;
const readableController = transformStream._readableController;
const stream = this._controlledTransformStream;
const readableController = stream._readableController;

return ReadableStreamDefaultControllerGetDesiredSize(readableController);
}
Expand Down Expand Up @@ -259,108 +259,108 @@ function IsTransformStreamDefaultController(x) {
}

function TransformStreamDefaultControllerError(controller, e) {
const transformStream = controller._controlledTransformStream;
const stream = controller._controlledTransformStream;

assert(transformStream._readable._state === 'readable', 'stream.[[readable]].[[state]] is "readable"');
assert(stream._readable._state === 'readable', 'stream.[[readable]].[[state]] is "readable"');

TransformStreamError(transformStream, e);
TransformStreamError(stream, e);
}

// Class TransformStreamDefaultSink

class TransformStreamDefaultSink {
constructor(transformStream, startPromise) {
this._transformStream = transformStream;
constructor(stream, startPromise) {
this._ownerTransformStream = stream;
this._startPromise = startPromise;
}

start(c) {
const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

transformStream._writableController = c;
stream._writableController = c;

return this._startPromise;
}

write(chunk) {
// console.log('TransformStreamDefaultSink.write()');

const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

if (transformStream._backpressure === true) {
return transformStream._backpressureChangePromise
.then(() => TransformStreamTransform(transformStream, chunk));
if (stream._backpressure === true) {
return stream._backpressureChangePromise
.then(() => TransformStreamTransform(stream, chunk));
}

return TransformStreamTransform(transformStream, chunk);
return TransformStreamTransform(stream, chunk);
}

abort() {
const transformStream = this._transformStream;
const stream = this._ownerTransformStream;
// abort() is not called synchronously, so it is possible for abort() to be called when the stream is already
// errored.
TransformStreamError(transformStream, new TypeError('Writable side aborted'));
TransformStreamError(stream, new TypeError('Writable side aborted'));
}

close() {
// console.log('TransformStreamDefaultSink.close()');

const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

const flushPromise = PromiseInvokeOrNoop(transformStream._transformer,
'flush', [transformStream._transformStreamController]);
const flushPromise = PromiseInvokeOrNoop(stream._transformer,
'flush', [stream._transformStreamController]);
// Return a promise that is fulfilled with undefined on success.
return flushPromise.then(() => {
if (transformStream._readable._state === 'errored') {
return Promise.reject(transformStream._readable._storedError);
if (stream._readable._state === 'errored') {
return Promise.reject(stream._readable._storedError);
}
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === true) {
TransformStreamCloseReadableInternal(transformStream);
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readableController) === true) {
TransformStreamCloseReadableInternal(stream);
}
return Promise.resolve();
}).catch(r => {
TransformStreamError(transformStream, r);
return Promise.reject(transformStream._readable._storedError);
TransformStreamError(stream, r);
return Promise.reject(stream._readable._storedError);
});
}
}

// Class TransformStreamDefaultSource

class TransformStreamDefaultSource {
constructor(transformStream, startPromise) {
this._transformStream = transformStream;
constructor(stream, startPromise) {
this._ownerTransformStream = stream;
this._startPromise = startPromise;
}

start(c) {
const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

transformStream._readableController = c;
stream._readableController = c;

return this._startPromise;
}

pull() {
// console.log('TransformStreamDefaultSource.pull()');

const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

// Invariant. Enforced by the promises returned by start() and pull().
assert(transformStream._backpressure === true, 'pull() should be never called while _backpressure is false');
assert(stream._backpressure === true, 'pull() should be never called while _backpressure is false');

assert(transformStream._backpressureChangePromise !== undefined,
assert(stream._backpressureChangePromise !== undefined,
'_backpressureChangePromise should have been initialized');

TransformStreamSetBackpressure(transformStream, false);
TransformStreamSetBackpressure(stream, false);

// Prevent the next pull() call until there is backpressure.
return transformStream._backpressureChangePromise;
return stream._backpressureChangePromise;
}

cancel(reason) {
const transformStream = this._transformStream;
TransformStreamError(transformStream, reason);
const stream = this._ownerTransformStream;
TransformStreamError(stream, reason);
}
}

Expand Down

0 comments on commit 548b405

Please sign in to comment.