Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename "transformStream" variables to "stream" #805

Merged
merged 2 commits into from
Sep 25, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
158 changes: 79 additions & 79 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class TransformStream {

TransformStreamSetBackpressure(this, true);

const transformStream = this;
const stream = this;
const startResult = InvokeOrNoop(transformer, 'start',
[transformStream._transformStreamController]);
[stream._transformStreamController]);
startPromise_resolve(startResult);
}

Expand Down Expand Up @@ -78,139 +78,139 @@ function IsTransformStream(x) {
return true;
}

function TransformStreamCloseReadable(transformStream) {
function TransformStreamCloseReadable(stream) {
// console.log('TransformStreamCloseReadable()');

if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === false) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readableController) === false) {
throw new TypeError('Readable side is not in a state that can be closed');
}

TransformStreamCloseReadableInternal(transformStream);
TransformStreamCloseReadableInternal(stream);
}

function TransformStreamCloseReadableInternal(transformStream) {
assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === true);
function TransformStreamCloseReadableInternal(stream) {
assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readableController) === true);

ReadableStreamDefaultControllerClose(transformStream._readableController);
ReadableStreamDefaultControllerClose(stream._readableController);
}

function TransformStreamDefaultTransform(chunk, transformStreamController) {
const transformStream = transformStreamController._controlledTransformStream;
TransformStreamEnqueueToReadable(transformStream, chunk);
function TransformStreamDefaultTransform(chunk, controller) {
const stream = controller._controlledTransformStream;
TransformStreamEnqueueToReadable(stream, chunk);
return Promise.resolve();
}

function TransformStreamEnqueueToReadable(transformStream, chunk) {
function TransformStreamEnqueueToReadable(stream, chunk) {
// console.log('TransformStreamEnqueueToReadable()');

if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === false) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readableController) === false) {
throw new TypeError('Readable side is not in a state that permits enqueue');
}

// We throttle transformer.transform invocation based on the backpressure of the ReadableStream, but we still
// accept TransformStreamEnqueueToReadable() calls.

const controller = transformStream._readableController;
const controller = stream._readableController;

try {
ReadableStreamDefaultControllerEnqueue(controller, chunk);
} catch (e) {
// This happens when readableStrategy.size() throws.
TransformStreamError(transformStream, e);
TransformStreamError(stream, e);

throw transformStream._readable._storedError;
throw stream._readable._storedError;
}

const backpressure = ReadableStreamDefaultControllerHasBackpressure(controller);
if (backpressure !== transformStream._backpressure) {
TransformStreamSetBackpressure(transformStream, backpressure);
if (backpressure !== stream._backpressure) {
TransformStreamSetBackpressure(stream, backpressure);
}
}

// This is a no-op if both sides are already errored.
function TransformStreamError(transformStream, e) {
function TransformStreamError(stream, e) {
// console.log('TransformStreamError()');

WritableStreamDefaultControllerErrorIfNeeded(transformStream._writableController, e);
if (transformStream._readable._state === 'readable') {
ReadableStreamDefaultControllerError(transformStream._readableController, e);
WritableStreamDefaultControllerErrorIfNeeded(stream._writableController, e);
if (stream._readable._state === 'readable') {
ReadableStreamDefaultControllerError(stream._readableController, e);
}
if (transformStream._backpressure === true) {
if (stream._backpressure === true) {
// Pretend that pull() was called to permit any pending write() or start() calls to complete.
// TransformStreamSetBackpressure() cannot be called from enqueue() or pull() once the ReadableStream is errored,
// so this will will be the final time _backpressure is set.
TransformStreamSetBackpressure(transformStream, false);
TransformStreamSetBackpressure(stream, false);
}
}

function TransformStreamSetBackpressure(transformStream, backpressure) {
function TransformStreamSetBackpressure(stream, backpressure) {
// console.log(`TransformStreamSetBackpressure(${backpressure})`);

// Passes also when called during construction.
assert(transformStream._backpressure !== backpressure,
assert(stream._backpressure !== backpressure,
'TransformStreamSetBackpressure() should be called only when backpressure is changed');

if (transformStream._backpressureChangePromise !== undefined) {
if (stream._backpressureChangePromise !== undefined) {
// The fulfillment value is just for a sanity check.
transformStream._backpressureChangePromise_resolve(backpressure);
stream._backpressureChangePromise_resolve(backpressure);
}

transformStream._backpressureChangePromise = new Promise(resolve => {
transformStream._backpressureChangePromise_resolve = resolve;
stream._backpressureChangePromise = new Promise(resolve => {
stream._backpressureChangePromise_resolve = resolve;
});

transformStream._backpressureChangePromise.then(resolution => {
stream._backpressureChangePromise.then(resolution => {
assert(resolution !== backpressure,
'_backpressureChangePromise should be fulfilled only when backpressure is changed');
});

transformStream._backpressure = backpressure;
stream._backpressure = backpressure;
}

function TransformStreamTransform(transformStream, chunk) {
function TransformStreamTransform(stream, chunk) {
// console.log('TransformStreamTransform()');

assert(transformStream._readable._state !== 'errored');
assert(transformStream._backpressure === false);
assert(stream._readable._state !== 'errored');
assert(stream._backpressure === false);

const transformer = transformStream._transformer;
const controller = transformStream._transformStreamController;
const transformer = stream._transformer;
const controller = stream._transformStreamController;

const transformPromise = PromiseInvokeOrPerformFallback(transformer, 'transform', [chunk, controller],
TransformStreamDefaultTransform, [chunk, controller]);

return transformPromise.then(
undefined,
e => {
TransformStreamError(transformStream, e);
TransformStreamError(stream, e);
return Promise.reject(e);
});
}

// Class TransformStreamDefaultController

class TransformStreamDefaultController {
constructor(transformStream) {
if (IsTransformStream(transformStream) === false) {
constructor(stream) {
if (IsTransformStream(stream) === false) {
throw new TypeError('TransformStreamDefaultController can only be ' +
'constructed with a TransformStream instance');
}

if (transformStream._transformStreamController !== undefined) {
if (stream._transformStreamController !== undefined) {
throw new TypeError('TransformStreamDefaultController instances can ' +
'only be created by the TransformStream constructor');
}

this._controlledTransformStream = transformStream;
this._controlledTransformStream = stream;
}

get desiredSize() {
if (IsTransformStreamDefaultController(this) === false) {
throw defaultControllerBrandCheckException('desiredSize');
}

const transformStream = this._controlledTransformStream;
const readableController = transformStream._readableController;
const stream = this._controlledTransformStream;
const readableController = stream._readableController;

return ReadableStreamDefaultControllerGetDesiredSize(readableController);
}
Expand Down Expand Up @@ -259,116 +259,116 @@ function IsTransformStreamDefaultController(x) {
}

function TransformStreamDefaultControllerError(controller, e) {
const transformStream = controller._controlledTransformStream;
const stream = controller._controlledTransformStream;

assert(transformStream._readable._state === 'readable', 'stream.[[readable]].[[state]] is "readable"');
assert(stream._readable._state === 'readable', 'stream.[[readable]].[[state]] is "readable"');

TransformStreamError(transformStream, e);
TransformStreamError(stream, e);
}

// Class TransformStreamDefaultSink

class TransformStreamDefaultSink {
constructor(transformStream, startPromise) {
this._transformStream = transformStream;
constructor(stream, startPromise) {
this._ownerTransformStream = stream;
this._startPromise = startPromise;
}

start(c) {
const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

transformStream._writableController = c;
stream._writableController = c;

return this._startPromise;
}

write(chunk) {
// console.log('TransformStreamDefaultSink.write()');

const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

if (transformStream._backpressure === true) {
return transformStream._backpressureChangePromise
if (stream._backpressure === true) {
return stream._backpressureChangePromise
.then(() => {
const writable = transformStream._writable;
const writable = stream._writable;
const state = writable._state;
if (state === 'erroring') {
return Promise.reject(writable._storedError);
}
assert(state === 'writable', 'state is `"writable"`');
return TransformStreamTransform(transformStream, chunk);
return TransformStreamTransform(stream, chunk);
});
}

return TransformStreamTransform(transformStream, chunk);
return TransformStreamTransform(stream, chunk);
}

abort() {
const transformStream = this._transformStream;
const stream = this._ownerTransformStream;
// abort() is not called synchronously, so it is possible for abort() to be called when the stream is already
// errored.
TransformStreamError(transformStream, new TypeError('Writable side aborted'));
TransformStreamError(stream, new TypeError('Writable side aborted'));
}

close() {
// console.log('TransformStreamDefaultSink.close()');

const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

const flushPromise = PromiseInvokeOrNoop(transformStream._transformer,
'flush', [transformStream._transformStreamController]);
const flushPromise = PromiseInvokeOrNoop(stream._transformer,
'flush', [stream._transformStreamController]);
// Return a promise that is fulfilled with undefined on success.
return flushPromise.then(() => {
if (transformStream._readable._state === 'errored') {
return Promise.reject(transformStream._readable._storedError);
if (stream._readable._state === 'errored') {
return Promise.reject(stream._readable._storedError);
}
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(transformStream._readableController) === true) {
TransformStreamCloseReadableInternal(transformStream);
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream._readableController) === true) {
TransformStreamCloseReadableInternal(stream);
}
return Promise.resolve();
}).catch(r => {
TransformStreamError(transformStream, r);
return Promise.reject(transformStream._readable._storedError);
TransformStreamError(stream, r);
return Promise.reject(stream._readable._storedError);
});
}
}

// Class TransformStreamDefaultSource

class TransformStreamDefaultSource {
constructor(transformStream, startPromise) {
this._transformStream = transformStream;
constructor(stream, startPromise) {
this._ownerTransformStream = stream;
this._startPromise = startPromise;
}

start(c) {
const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

transformStream._readableController = c;
stream._readableController = c;

return this._startPromise;
}

pull() {
// console.log('TransformStreamDefaultSource.pull()');

const transformStream = this._transformStream;
const stream = this._ownerTransformStream;

// Invariant. Enforced by the promises returned by start() and pull().
assert(transformStream._backpressure === true, 'pull() should be never called while _backpressure is false');
assert(stream._backpressure === true, 'pull() should be never called while _backpressure is false');

assert(transformStream._backpressureChangePromise !== undefined,
assert(stream._backpressureChangePromise !== undefined,
'_backpressureChangePromise should have been initialized');

TransformStreamSetBackpressure(transformStream, false);
TransformStreamSetBackpressure(stream, false);

// Prevent the next pull() call until there is backpressure.
return transformStream._backpressureChangePromise;
return stream._backpressureChangePromise;
}

cancel(reason) {
const transformStream = this._transformStream;
TransformStreamError(transformStream, reason);
const stream = this._ownerTransformStream;
TransformStreamError(stream, reason);
}
}

Expand Down