diff --git a/index.bs b/index.bs index 6a894f662..517909b77 100644 --- a/index.bs +++ b/index.bs @@ -171,18 +171,21 @@ sink.
Transform streams are not yet fully developed. We are iterating on their design using a JavaScript -reference implementation and test suite; you can follow that work on the issue tracker. Until then, this section -gives a brief overview of their intended design, even though the details of their API is not yet determined.
+A transform stream consists of a pair of streams: a writable stream, known as its writable +side, and a readable stream, known as its readable side. In a manner specific to the transform +stream in question, writes to the writable side result in new data being made available for reading from the readable +side. -A transform stream consists of a pair of streams: a writable stream, and a readable stream. -In a manner specific to the transform stream in question, writes to the writable side result in new data being made -available for reading from the readable side. Concretely, any object with awritable
property and a
-readable
property can serve as a transform stream. However, we plan to provide a
-TransformStream
class which makes it much easier to create such a pair that is properly entangled.
+Concretely, any object with a writable
property and a readable
property can serve as a
+transform stream. However, the standard {{TransformStream}} class makes it much easier to create such a pair that is
+properly entangled. It wraps a transformer object which defines the specific transformation to be performed.
-Some examples of transform streams include:
+An identity transform stream is a type of transform stream which forwards all chunks written to its
+writable side to its readable side, without any changes. This can be useful in a variety of scenarios. By default, the {{TransformStream}} constructor will
+create an identity transform stream, when no transform
method is present on the transformer object.
+
+Some examples of potential transform streams include:
start(controller)
is called immediately, and is typically used to adapt a push
+ start(controller)
is called immediately, and is typically used to adapt a push
source by setting up relevant event listeners, or to acquire access to a pull source. If this process
is asynchronous, it can return a promise to signal success or failure.
-
pull(controller)
is called when the stream's internal queue of chunks is not full, and
+
+ pull(controller)
is called when the stream's internal queue of chunks is not full, and
will be called repeatedly until the queue reaches its high water mark. If pull
returns a
promise, then pull
will not be called again until that promise fulfills; if the promise rejects, the
stream will become errored.
-
cancel(reason)
is called when the consumer signals that they are no longer interested in the
+
+ cancel(reason)
is called when the consumer signals that they are no longer interested in the
stream. It can perform any actions necessary to release access to the underlying source. If this
process is asynchronous, it can return a promise to signal success or failure.
start(controller)
is called immediately, and can perform any actions necessary to acquire
+ start(controller)
is called immediately, and can perform any actions necessary to acquire
access to the underlying sink. If this process is asynchronous, it can return a promise to signal success
or failure.
-
write(chunk, controller)
is called when a new chunk of data is ready to be written to the
+
+ write(chunk, controller)
is called when a new chunk of data is ready to be written to the
underlying sink. It can return a promise to signal success or failure of the write operation. The stream
implementation guarantees that this method will be called only after previous writes have succeeded, and never
after close
or abort
is called.
-
close()
is called after the producer signals that they are done writing chunks to the
+
+ close()
is called after the producer signals that they are done writing chunks to the
stream, and all queued-up writes successfully complete. It can perform any actions necessary to finalize writes
to the underlying sink, and release access to it. If this process is asynchronous, it can return a promise
to signal success or failure. The stream implementation guarantees that this method will be called only after all
queued-up writes have succeeded.
-
abort(reason)
is called when the producer signals they wish to abruptly close the stream
+
+ The stream implementation guarantees that readableStrategy is created from scratch so that the default high water mark can
+ be overridden. This does not apply to writableStrategy, so it is passed through unchanged from the
+ second constructor argument. The [[backpressure]] slot is set to *undefined* so that it can be initialized by
+ TransformStreamSetBackpressure. Alternatively, implementations can use a strictly boolean value for
+ [[backpressure]] and change the way it is initialized. This will not be visible to user code so long as the
+ initialization is correctly completed before _transformer_'s This operation works correctly when one or both sides are already errored. As a result, calling
+algorithms do not need to check stream states when responding to an error condition. The {{TransformStreamDefaultSink/write()}} method of {{TransformStreamDefaultSink}} could be waiting for
+the promise stored in the \[[backpressureChangePromise]] slot to resolve. This call to TransformStreamSetBackpressure
+ensures that the promise always resolves. The readable side is closed before abort(reason)
is called when the producer signals they wish to abruptly close the stream
and put it in an errored state. It can clean up any held resources, much like close
, but perhaps
with some custom handling. Unlike close
, abort
will be called even if writes are queued
up; those chunks will be thrown away. If this process is asynchronous, it can return a promise to signal
@@ -3763,10 +3771,678 @@ nothrow>WritableStreamDefaultControllerError ( controller, error
Transform Streams
-Transform streams have been developed in the testable implementation, but not yet re-encoded in spec language.
-We are waiting to validate their design before doing so. In the meantime, see reference-implementation/lib/transform-stream.js.
+Using Transform Streams
+
+
+
+ readableStream
+ .pipeThrough(transformStream)
+ .pipeTo(writableStream)
+ .then(() => console.log("All data successfully transformed!"))
+ .catch(e => console.error("Something went wrong!", e));
+
anotherWritableStream
.
+
+
+
+ const writer = transformStream.writable.getWriter();
+ writer.write("input chunk");
+ transformStream.readable.pipeTo(anotherWritableStream);
+
+
+ Another use of identity transform streams is to add additional buffering to a pipe. In this example we add
+ extra buffering between
+ const { writable, readable } = new TransformStream();
+ fetch("...", { body: readable }).then(response => /* ... */);
+
+ writable.write(new Uint8Array([0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, 0x73, 0x21]));
+ writable.close();
+
readableStream
and writableStream
.
+
+
+
+ const writableStrategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 });
+
+ readableStream
+ .pipeThrough(new TransformStream(undefined, writableStrategy))
+ .pipeTo(writableStream);
+
Class
+
+TransformStream
Class Definition
+
+
+
+
+ class TransformStream {
+ constructor(transformer = {}, writableStrategy = {}, readableStrategy = {})
+
+ get readable()
+ get writable()
+ }
+
Internal Slots
+
+Instances of {{TransformStream}} are created with the internal slots described in the following table:
+
+
+
+
+
+
+
+
+ Internal Slot
+ Description (non-normative)
+
+
+ \[[backpressure]]
+ Whether there was backpressure on \[[readable]] the last time it was observed
+
+
+ \[[backpressureChangePromise]]
+ A promise which is fulfilled and replaced every time the value of \[[backpressure]]
+ changes
+
+
+ \[[readable]]
+ The {{ReadableStream}} instance controlled by this object
+
+
+ \[[transformer]]
+ The transformer object that was passed to the constructor
+
+
+ \[[transformStreamController]]
+ A {{TransformStreamDefaultController}} created with the ability to control \[[readable]]
+ and \[[writable]]; also used for the IsTransformStream brand check
+
+
+\[[writable]]
+ The {{WritableStream}} instance controlled by this object
+ new TransformStream(transformer = {}, writableStrategy = undefined, {
+size, highWaterMark = 0 } = {})
+
+transformer
object passed to the constructor can implement any of the following methods to govern
+ how the constructed stream instance behaves:
+
+
+
+
+ The start(controller)
is called immediately, and is typically used to enqueue prefix chunks
+ that will be read from the readable side but don't depend on any writes to the writable side. If
+ this process is asynchronous, it can return a promise to signal success or failure.
+
+ transform(chunk, controller)
is called when a new chunk originally written to the writable side is
+ ready to be transformed. It can return a promise to signal success or failure of the transformation. The results
+ of the transformation can be enqueued to the readable side using the
+ {{ReadableStreamDefaultController/enqueue(chunk)|controller.enqueue()}} method. This permits a single chunk
+ written to the writable side to result in zero or multiple chunks on the readable side.
+
+ transform
will be called only after previous
+ transformations have succeeded, and never before start
has completed or after flush
is
+ called. When no transform
method is supplied, the identity transform is used, which enqueues chunks
+ unchanged from the writable side to the readable side.
+
+ flush(controller)
is called after all chunks written to the writable side have been
+ transformed and the writable side has been closed. It is typically used to enqueue suffix chunks to the
+ readable side, before that too becomes closed. If this process is asynchronous, it can return a promise
+ to signal success or failure.
+ controller
object passed to start
, transform
and flush
is an
+ instance of {{TransformStreamDefaultController}}, and has the ability to enqueue chunks to the readable side,
+ or to terminate or error the stream.
+
+ The second and third arguments to the constructor are the queuing strategy objects for the writable and
+ readable sides respectively. These are used in the construction of the {{WritableStream}} and {{ReadableStream}}
+ objects and can be used to add buffering to a {{TransformStream}}, in order to smooth out variations in the speed of
+ the transformation, or to increase the amount of buffering in a pipe.
+start()
method is is called.Properties of the {{TransformStream}} Prototype
+
+get readable
+
+readable
getter gives access to the readable side of the transform stream.
+get writable
+
+writable
getter gives access to the writable side of the transform stream.
+General Transform Stream Abstract Operations
+
+IsTransformStream ( x )
+
+TransformStreamError ( stream,
+e )
+
+TransformStreamErrorWritableAndUnblockWrite ( stream, e )
+
+TransformStreamSetBackpressure
+( stream, backpressure )
+
+Class
+
+
+The {{TransformStreamDefaultController}} class has methods that allow manipulation of the associated {{ReadableStream}}
+and {{WritableStream}}. When constructing a {{TransformStream}}, the transformer is given a corresponding
+{{TransformStreamDefaultController}} instance to manipulate.
+
+TransformStreamDefaultController
Class Definition
+
+
+
+
+ class TransformStreamDefaultController {
+ constructor(stream)
+
+ get desiredSize()
+
+ enqueue(chunk)
+ error(reason)
+ terminate()
+ }
+
Internal Slots
+
+Instances of {{TransformStreamDefaultController}} are created with the internal slots described in the following table:
+
+
+
+
+
+
+
+
+ Internal Slot
+ Description (non-normative)
+
+
+\[[controlledTransformStream]]
+ The {{TransformStream}} instance controlled; also used for the
+ IsTransformStreamDefaultController brand check
+ new TransformStreamDefaultController(stream)
+
+TransformStreamDefaultController
constructor cannot be used directly; it only works on a
+ {{TransformStream}} that is in the middle of being constructed.
+Properties of the {{TransformStreamDefaultController}} Prototype
+
+get
+desiredSize
+
+desiredSize
getter returns the desired size
+ to fill the readable side's internal queue. It can be negative, if the queue is over-full.
+enqueue(chunk)
+
+
+error(reason)
+
+error
method will error both the readable side and the writable side of the controlled
+ transform stream, making all future interactions fail with the given reason
. Any chunks
+ queued for transformation will be discarded.
+terminate()
+
+terminate
method will close the readable side and error the writable side of the
+ controlled transform stream. This is useful when the transformer only needs to consume a portion of the
+ chunks written to the writable side.
+Transform Stream Default Controller Abstract Operations
+
+IsTransformStreamDefaultController ( x )
+
+TransformStreamDefaultControllerEnqueue ( controller, chunk )
+
+This abstract operation can be called by other specifications that wish to enqueue chunks in the readable
+side, in the same way a developer would enqueue chunks using the stream's associated controller object.
+Specifications should not do this to streams they did not create.
+
+TransformStreamDefaultControllerError ( controller, e )
+
+This abstract operation can be called by other specifications that wish to move a transform stream to an errored state,
+in the same way a developer would error a stream using its associated controller object. Specifications should
+not do this to streams they did not create.
+
+TransformStreamDefaultControllerTerminate ( controller )
+
+This abstract operation can be called by other specifications that wish to terminate a transform stream, in the same way
+a developer-created stream would be closed by its associated controller object. Specifications should not do
+this to streams they did not create.
+
+Class
+
+
+The {{TransformStreamDefaultSink}} class is used internally as the underlying sink that is passed to the
+{{WritableStream}} constructor when constructing the \[[writable]] slot of a {{TransformStream}}.
+
+TransformStreamDefaultSink
Class Definition
+
+
+
+
+ class TransformStreamDefaultSink {
+ constructor(stream, startPromise)
+
+ start()
+ write(chunk)
+ abort()
+ close()
+ }
+
Internal Slots
+
+Instances of {{TransformStreamDefaultSink}} are created with the internal slots described in the following table:
+
+
+
+
+
+
+
+
+ Internal Slot
+ Description (non-normative)
+
+
+ \[[ownerTransformStream]]
+ The {{TransformStream}} instance
+
+
+\[[startPromise]]
+ The startPromise parameter that was passed to the constructor
+ new TransformStreamDefaultSink(stream, startPromise)
+
+Properties of the {{TransformStreamDefaultSink}} Prototype
+
+start()
+
+write(chunk)
+
+abort()
+
+close()
+
+Transform Stream Default Sink Abstract Operations
+
+TransformStreamDefaultSinkInvokeTransform ( stream, chunk )
+
+TransformStreamDefaultSinkTransform ( sink, chunk )
+
+Class
+
+
+The {{TransformStreamDefaultSource}} class is used internally as the underlying source that is passed to the
+{{ReadableStream}} constructor when constructing the \[[readable]] slot.
+
+TransformStreamDefaultSource
Class Definition
+
+
+
+
+ class TransformStreamDefaultSource {
+ constructor(stream, startPromise)
+
+ start()
+ pull()
+ cancel(reason)
+ }
+
Internal Slots
+
+Instances of {{TransformStreamDefaultSource}} are created with the internal slots described in the following table:
+
+
+
+
+
+
+
+
+ Internal Slot
+ Description (non-normative)
+
+
+ \[[ownerTransformStream]]
+ The {{TransformStream}} instance
+
+
+\[[startPromise]]
+ The startPromise parameter that was passed to the constructor
+ new TransformStreamDefaultSource(stream,
+startPromise)
+
+Properties of the {{TransformStreamDefaultSource}} Prototype
+
+start()
+
+pull()
+
+cancel(reason)
+
+cancel()
is called. Only the writable side
+becomes errored.Other Stream APIs and Operations
@@ -4073,6 +4749,7 @@ The following constructors must be exposed on the global object as data properti
@@ -4082,8 +4759,11 @@ The attributes of these properties must be { \[[Writable]]: Examples of Creating Streams
diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js
index f8505ecb2..1cd560b55 100644
--- a/reference-implementation/lib/transform-stream.js
+++ b/reference-implementation/lib/transform-stream.js
@@ -17,12 +17,6 @@ class TransformStream {
constructor(transformer = {}, writableStrategy = undefined, { size, highWaterMark = 0 } = {}) {
this._transformer = transformer;
- this._transformStreamController = undefined;
-
- this._backpressure = undefined;
- this._backpressureChangePromise = undefined;
- this._backpressureChangePromise_resolve = undefined;
-
const readableType = transformer.readableType;
if (readableType !== undefined) {
@@ -35,6 +29,7 @@ class TransformStream {
throw new RangeError('Invalid writable type specified');
}
+ this._transformStreamController = undefined;
const controller = new TransformStreamDefaultController(this);
this._transformStreamController = controller;
@@ -53,6 +48,10 @@ class TransformStream {
this._writable = new WritableStream(sink, writableStrategy);
+ // The [[backpressure]] slot is set to undefined so that it can be initialised by TransformStreamSetBackpressure.
+ this._backpressure = undefined;
+ this._backpressureChangePromise = undefined;
+ this._backpressureChangePromise_resolve = undefined;
TransformStreamSetBackpressure(this, true);
const startResult = InvokeOrNoop(transformer, 'start', [controller]);
@@ -94,10 +93,14 @@ function IsTransformStream(x) {
function TransformStreamError(stream, e) {
verbose('TransformStreamError()');
- WritableStreamDefaultControllerErrorIfNeeded(stream._writable._writableStreamController, e);
if (stream._readable._state === 'readable') {
ReadableStreamDefaultControllerError(stream._readable._readableStreamController, e);
}
+ TransformStreamErrorWritableAndUnblockWrite(stream, e);
+}
+
+function TransformStreamErrorWritableAndUnblockWrite(stream, e) {
+ WritableStreamDefaultControllerErrorIfNeeded(stream._writable._writableStreamController, e);
if (stream._backpressure === true) {
// Pretend that pull() was called to permit any pending write() calls to complete. TransformStreamSetBackpressure()
// cannot be called from enqueue() or pull() once the ReadableStream is errored, so this will will be the final time
@@ -204,7 +207,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) {
ReadableStreamDefaultControllerEnqueue(readableController, chunk);
} catch (e) {
// This happens when readableStrategy.size() throws.
- TransformStreamError(stream, e);
+ TransformStreamErrorWritableAndUnblockWrite(stream, e);
throw stream._readable._storedError;
}
@@ -217,9 +220,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) {
}
function TransformStreamDefaultControllerError(controller, e) {
- const stream = controller._controlledTransformStream;
-
- TransformStreamError(stream, e);
+ TransformStreamError(controller._controlledTransformStream, e);
}
function TransformStreamDefaultControllerTerminate(controller) {
@@ -233,11 +234,7 @@ function TransformStreamDefaultControllerTerminate(controller) {
}
const error = new TypeError('TransformStream terminated');
- WritableStreamDefaultControllerErrorIfNeeded(stream._writable._writableStreamController, error);
- if (stream._backpressure === true) {
- // Permit any pending write() or start() calls to complete.
- TransformStreamSetBackpressure(stream, false);
- }
+ TransformStreamErrorWritableAndUnblockWrite(stream, error);
}
// Class TransformStreamDefaultSink
@@ -336,8 +333,8 @@ function TransformStreamDefaultSinkTransform(sink, chunk) {
// handling in one place in the text of the standard.
const transformResult = TransformStreamDefaultSinkInvokeTransform(stream, chunk);
transformPromise = Promise.resolve(transformResult);
- } catch (e) {
- transformPromise = Promise.reject(e);
+ } catch (transformResultE) {
+ transformPromise = Promise.reject(transformResultE);
}
return transformPromise.catch(e => {
@@ -375,7 +372,8 @@ class TransformStreamDefaultSource {
}
cancel(reason) {
- TransformStreamError(this._ownerTransformStream, reason);
+ // The readable side is closed before cancel() is called. Only the writable side should be errored.
+ TransformStreamErrorWritableAndUnblockWrite(this._ownerTransformStream, reason);
}
}