From 6eb872b16e0b071036dd4222e14d7399d0614740 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Mon, 24 Nov 2014 17:41:44 -0500 Subject: [PATCH] Rename .wait() to .ready Closes #243. --- BinaryExtension.md | 18 ++-- Examples.md | 8 +- index.bs | 98 +++++++++---------- .../lib/experimental/readable-byte-stream.js | 34 +++---- .../lib/readable-stream.js | 58 +++++------ .../lib/writable-stream.js | 36 +++---- .../test/experimental/readable-byte-stream.js | 17 ++-- reference-implementation/test/pipe-to.js | 4 +- .../test/readable-stream-cancel.js | 10 +- .../test/readable-stream.js | 27 ++--- .../test/transform-stream-errors.js | 4 +- .../test/transform-stream.js | 20 ++-- .../test/utils/readable-stream-to-array.js | 2 +- .../test/writable-stream-abort.js | 4 +- .../test/writable-stream.js | 21 ++-- 15 files changed, 180 insertions(+), 181 deletions(-) diff --git a/BinaryExtension.md b/BinaryExtension.md index 204588f20..af2620f21 100644 --- a/BinaryExtension.md +++ b/BinaryExtension.md @@ -50,7 +50,7 @@ class ReadableByteStream { [[state]] = "waiting" [[storedError]] - [[waitPromise]] + [[readyPromise]] [[closedPromise]] // Holders for stuff given by the underlying source @@ -74,13 +74,13 @@ When a notify ready function _F_ is called, the following steps are taken: 1. Let _stream_ be the value of _F_'s [[Stream]] internal slot. 1. If _stream_.[[state]] is not `"waiting"`, return. 1. Set _stream_.[[state]] to `"readable"`. -1. Resolve _stream_.[[waitPromise]] with **undefined**. +1. Resolve _stream_.[[readyPromise]] with **undefined**. ##### ErrorReadableByteStream( stream, error ) 1. If _stream_.[[state]] is `"errored"` or `"closed"`, return. -1. If _stream_.[[state]] is `"waiting"`, reject _stream_.[[waitPromise]] with _error_. -1. If _stream_.[[state]] is `"readable"`, let _stream_.[[waitPromise]] be a new promise rejected with _error_. +1. If _stream_.[[state]] is `"waiting"`, reject _stream_.[[readyPromise]] with _error_. +1. If _stream_.[[state]] is `"readable"`, let _stream_.[[readyPromise]] be a new promise rejected with _error_. 1. Set _stream_.[[state]] to `"errored"`. 1. Set _stream_.[[storedError]] to _error_. 1. Reject _stream_.[[closedPromise]] with _error_. @@ -108,7 +108,7 @@ When an error function _F_ is called with argument _error_, the following steps 1. Set _stream_.[[onReadInto]] to _readInto_. 1. Set _stream_.[[onCancel]] to _cancel_. 1. Set _stream_.[[readBufferSize]] to _readBufferSize_. -1. Let _stream_.[[waitPromise]] be a new promise. +1. Let _stream_.[[readyPromise]] be a new promise. 1. Let _stream_.[[closedPromise]] be a new promise. 1. Let _stream_.[[notifyReady]] be a new built-in function object as defined in Notify Ready Function with [[Stream]] internal slot set to _stream_. 1. Let _stream_.[[error]] be a new built-in function object as defined in Error Function with [[Stream]] internal slot set to _stream_. @@ -154,7 +154,7 @@ When an error function _F_ is called with argument _error_, the following steps 1. Throw _error_. 1. If _bytesRead_ is -2, 1. Set **this**.[[state]] to `"waiting"`. - 1. Let **this**.[[waitPromise]] be a new promise. + 1. Let **this**.[[readyPromise]] be a new promise. 1. Return 0. 1. If _bytesRead_ is -1, 1. Set **this**.[[state]] to `"closed"` @@ -166,7 +166,7 @@ When an error function _F_ is called with argument _error_, the following steps 1. If `this.[[state]]` is `"closed"`, return a new promise resolved with **undefined**. 1. If `this.[[state]]` is `"errored"`, return a new promise rejected with `this.[[storedError]]`. -1. If `this.[[state]]` is `"waiting"`, resolve `this.[[waitPromise]]` with **undefined**. +1. If `this.[[state]]` is `"waiting"`, resolve `this.[[readyPromise]]` with **undefined**. 1. Set `this.[[state]]` to `"closed"`. 1. Resolve `this.[[closedPromise]]` with **undefined**. 1. Let _cancelPromise_ be a new promise. @@ -180,10 +180,10 @@ When an error function _F_ is called with argument _error_, the following steps 1. Let _stream_ be the **this** value. 1. Return _stream_.[[state]]. -##### get ReadableByteStream.prototype.wait +##### get ReadableByteStream.prototype.ready 1. Let _stream_ be the **this** value. -1. Return _stream_.[[waitPromise]]. +1. Return _stream_.[[readyPromise]]. ##### get ReadableByteStream.prototype.closed diff --git a/Examples.md b/Examples.md index 55118ee10..090e54cb6 100644 --- a/Examples.md +++ b/Examples.md @@ -26,7 +26,7 @@ function streamToConsole(readable) { } else { // If we're in an error state, the returned promise will be rejected with that error, // so no need to handle "waiting" vs. "errored" separately. - readable.wait().then(pump, e => console.error(e)); + readable.ready.then(pump, e => console.error(e)); } } } @@ -44,7 +44,7 @@ function getNext(stream) { return Promise.resolve(EOF); } - return stream.wait().then(function () { + return stream.ready.then(function () { if (stream.state === "readable") { return stream.read(); } @@ -80,7 +80,7 @@ function readableStreamToArray(readable) { } if (readable.state === "waiting") { - readable.wait().then(pump); + readable.ready.then(pump); } // All other cases will go through `readable.closed.then(...)` above. @@ -293,7 +293,7 @@ function promptAndWrite(myStream) { }); } else if (writableStream.state === "waiting") { console.log("Waiting for the stream to flush to the underlying sink, please hold..."); - writableStream.wait() + writableStream.ready .then(promptAndWrite) .catch(e => console.error("While flushing, an error occurred: ", e)); } else if (writableStream.state === "errored") { diff --git a/index.bs b/index.bs index 61aa10c39..9728434c2 100644 --- a/index.bs +++ b/index.bs @@ -375,7 +375,7 @@ it. } else { // If we're in an error state, the returned promise will be rejected with // that error, so no need to handle "waiting" vs. "errored" separately. - readableStream.wait().then(pump, e => console.error(e)); + readableStream.ready.then(pump, e => console.error(e)); } } } @@ -427,13 +427,13 @@ would look like } = {}) get closed() + get ready() get state() cancel(reason) pipeThrough({ writable, readable }, options) pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) read() - wait() } @@ -506,7 +506,7 @@ Instances of ReadableStream are created with the internal slots des An object containing the stream's queuing strategy - \[[waitPromise]] + \[[readyPromise]] A promise that becomes fulfilled when the stream becomes "readable", and is replaced with a new pending promise when the stream becomes "waiting"; returned by the wait() method @@ -545,7 +545,7 @@ Instances of ReadableStream are created with the internal slots des
  • Set this@\[[onPull]] to pull.
  • Set this@\[[onCancel]] to cancel.
  • Set this@\[[strategy]] to strategy. -
  • Set this@\[[waitPromise]] and this@\[[closedPromise]] to new promises. +
  • Set this@\[[readyPromise]] and this@\[[closedPromise]] to new promises.
  • Set this@\[[queue]] to a new empty List.
  • Set this@\[[state]] to "waiting".
  • Set this@\[[started]], this@\[[draining]], and this@\[[pulling]] to false. @@ -579,6 +579,17 @@ Instances of ReadableStream are created with the internal slots des
  • Return this@\[[closedPromise]]. +
    get ready
    + +
    + The ready getter returns a promise that will be fulfilled either when the stream's internal queue becomes + nonempty, or the stream becomes closed. (The promise will be rejected if the stream errors.) +
    + +
      +
    1. Return this@\[[readyPromise]]. +
    +
    get state
    @@ -586,7 +597,7 @@ Instances of ReadableStream are created with the internal slots des
    "waiting" -
    The stream's internal queue is empty; call .wait() to be notified of any changes. +
    The stream's internal queue is empty; use .ready to be notified of any changes.
    "readable"
    The stream's internal queue has chunks available; call .read() to retrieve the next one. @@ -614,7 +625,7 @@ Instances of ReadableStream are created with the internal slots des
    1. If this@\[[state]] is "closed", return a new promise resolved with undefined.
    2. If this@\[[state]] is "errored", return a new promise rejected with this@\[[storedError]]. -
    3. If this@\[[state]] is "waiting", resolve this@\[[waitPromise]] with undefined. +
    4. If this@\[[state]] is "waiting", resolve this@\[[readyPromise]] with undefined.
    5. Let this@\[[queue]] be a new empty List.
    6. Set this@\[[state]] to "closed".
    7. Resolve this@\[[closedPromise]] with undefined. @@ -680,28 +691,13 @@ look for the pipeTo method.
    8. If this@\[[draining]] is false,
      1. Set this@\[[state]] to "waiting". -
      2. Let this@\[[waitPromise]] be a new promise. +
      3. Let this@\[[readyPromise]] be a new promise.
  • Call-with-rethrow CallReadableStreamPull(this).
  • Return chunk. -
    wait()
    - -
    - The wait method returns a promise that will be fulfilled either when the stream's internal queue becomes - nonempty, or the stream becomes closed. (The promise will be rejected if the stream errors.) - - As a side-effect, it signals interest in reading more chunks, thus causing the stream to pull more data from - the underlying source if it isn't already doing so (e.g. because backpressure signals had temporarily stopped - the flow). -
    - -
      -
    1. Return this@\[[waitPromise]]. -
    -

    Readable Stream Abstract Operations

    CallReadableStreamPull ( stream )

    @@ -735,7 +731,7 @@ A Readable Stream Close Function is a built-in anonymous function of
    1. If stream@\[[state]] is "waiting",
        -
      1. Resolve stream@\[[waitPromise]] with undefined. +
      2. Resolve stream@\[[readyPromise]] with undefined.
      3. Resolve stream@\[[closedPromise]] with undefined.
      4. Set stream@\[[state]] to "closed".
      @@ -771,7 +767,7 @@ closing over a variable stream, that performs the following steps:
    2. If stream@\[[state]] is "waiting",
      1. Set stream@\[[state]] to "readable". -
      2. Resolve stream@\[[waitPromise]] with undefined. +
      3. Resolve stream@\[[readyPromise]] with undefined.
    3. If shouldApplyBackpressure.\[[value]] is true, return false.
    4. Return true. @@ -791,7 +787,7 @@ a variable stream, that performs the following steps:
      1. Set stream@\[[state]] to "errored".
      2. Set stream@\[[storedError]] to e. -
      3. Reject stream@\[[waitPromise]] with e. +
      4. Reject stream@\[[readyPromise]] with e.
      5. Reject stream@\[[closedPromise]] with e.
    5. If stream@\[[state]] is "readable", @@ -799,7 +795,7 @@ a variable stream, that performs the following steps:
    6. Let stream@\[[queue]] be a new empty List.
    7. Set stream@\[[state]] to "errored".
    8. Set stream@\[[storedError]] to e. -
    9. Let stream@\[[waitPromise]] be a new promise rejected with e. +
    10. Let stream@\[[readyPromise]] be a new promise rejected with e.
    11. Reject stream@\[[closedPromise]] with e.
    @@ -1016,11 +1012,11 @@ would look like } = {}) get closed() + get ready() get state() abort(reason) close() - wait() write(chunk) } @@ -1086,7 +1082,7 @@ Instances of WritableStream are created with the internal slots des An object containing the stream's queuing strategy - \[[waitPromise]] + \[[readyPromise]] A promise that becomes fulfilled when the stream becomes "writable", and is replaced with a new pending promise when the stream becomes "waiting"; returned by the wait() method @@ -1147,7 +1143,7 @@ Instances of WritableStream are created with the internal slots des
  • Set this@\[[onAbort]] to abort.
  • Set this@\[[strategy]] to strategy.
  • Set this@\[[closedPromise]] to a new promise. -
  • Set this@\[[writablePromise]] to a new promise resolved with undefined. +
  • Set this@\[[readyPromise]] to a new promise resolved with undefined.
  • Set this@\[[queue]] to a new empty List.
  • Set this@\[[state]] to "writable".
  • Set this@\[[started]] and this@\[[writing]] to false. @@ -1178,6 +1174,20 @@ Instances of WritableStream are created with the internal slots des
  • Return this@\[[closedPromise]]. +
    get ready
    + +
    + The ready getter returns a promise that will be fulfilled when the stream enters the + "writable" state, i.e., when the stream's internal queue is not full according to its queuing + strategy. (The promise will be rejected if the stream errors.) + + In essence, this promise gives a signal as to when any backpressure has let up. +
    + +
      +
    1. Return this@\[[readyPromise]]. +
    +
    get state
    @@ -1186,7 +1196,7 @@ Instances of WritableStream are created with the internal slots des
    "waiting"
    The stream's internal queue is full; that is, the stream is - exerting backpressure. Call .wait() to be notified of when the pressure subsides. + exerting backpressure. Use .ready to be notified of when the pressure subsides.
    "writable"
    The stream's internal queue is not full; call .write() until backpressure is exerted. @@ -1239,9 +1249,9 @@ Instances of WritableStream are created with the internal slots des TypeError exception.
  • If this@\[[state]] is "errored", return a promise rejected with this@\[[storedError]]. -
  • If this@\[[state]] is "writable", set this@\[[writablePromise]] to a new promise +
  • If this@\[[state]] is "writable", set this@\[[readyPromise]] to a new promise rejected with a TypeError exception. -
  • If this@\[[state]] is "waiting", reject this@\[[writablePromise]] with a +
  • If this@\[[state]] is "waiting", reject this@\[[readyPromise]] with a TypeError exception.
  • Set this@\[[state]] to "closing"
  • Call-with-rethrow EnqueueValueWithSize(this@\[[queue]], "close", 0). @@ -1249,20 +1259,6 @@ Instances of WritableStream are created with the internal slots des
  • Return this@\[[closedPromise]]. -
    wait()
    - -
    - The wait method returns a promise that will be fulfilled when the stream enters the - "writable" state, i.e., when the stream's internal queue is not full according to its queuing - strategy. (The promise will be rejected if the stream errors.) - - In essence, this method gives a signal as to when any backpressure has let up. -
    - -
      -
    1. Return this@\[[writablePromise]]. -
    -
    write(chunk)
    @@ -1361,8 +1357,8 @@ a variable stream, that performs the following steps:
  • Set stream@\[[storedError]] to e.
  • If stream@\[[state]] is "writable" or "closing", set - stream@\[[writablePromise]] to a new promise rejected with e. -
  • If stream@\[[state]] is "waiting", reject stream@\[[writablePromise]] with + stream@\[[readyPromise]] to a new promise rejected with e. +
  • If stream@\[[state]] is "waiting", reject stream@\[[readyPromise]] with e.
  • Reject stream@\[[closedPromise]] with e.
  • Set stream@\[[state]] to "errored". @@ -1376,7 +1372,7 @@ a variable stream, that performs the following steps:
  • If stream@\[[state]] is "waiting" and stream@\[[queue]] is empty, then
    1. Set stream@\[[state]] to "writable". -
    2. Resolve stream@\[[writablePromise]] with undefined. +
    3. Resolve stream@\[[readyPromise]] with undefined.
    4. Return undefined.
  • Let queueSize be GetTotalQueueSize(stream@\[[queue]]). @@ -1388,13 +1384,13 @@ a variable stream, that performs the following steps: "writable", then
    1. Set stream@\[[state]] to "waiting". -
    2. Set stream@\[[writablePromise]] to a new promise. +
    3. Set stream@\[[readyPromise]] to a new promise.
  • If shouldApplyBackpressure is false and stream@\[[state]] is "waiting", then
    1. Set stream@\[[state]] to "writable". -
    2. Resolve stream@\[[writablePromise]] with undefined. +
    3. Resolve stream@\[[readyPromise]] with undefined.
  • Return undefined. diff --git a/reference-implementation/lib/experimental/readable-byte-stream.js b/reference-implementation/lib/experimental/readable-byte-stream.js index 89da59a00..7696181b1 100644 --- a/reference-implementation/lib/experimental/readable-byte-stream.js +++ b/reference-implementation/lib/experimental/readable-byte-stream.js @@ -17,13 +17,13 @@ function errorReadableByteStream(stream, error) { } if (stream._state === 'waiting') { - stream._waitPromise_reject(error); - stream._waitPromise_resolve = null; - stream._waitPromise_reject = null; + stream._readyPromise_reject(error); + stream._readyPromise_resolve = null; + stream._readyPromise_reject = null; } else { - stream._waitPromise = Promise.reject(error); - stream._waitPromise_resolve = null; - stream._waitPromise_reject = null; + stream._readyPromise = Promise.reject(error); + stream._readyPromise_resolve = null; + stream._readyPromise_reject = null; } stream._state = 'errored'; @@ -64,9 +64,9 @@ export default class ReadableByteStream { this._readBufferSize = readBufferSize; - this._waitPromise = new Promise((resolve, reject) => { - this._waitPromise_resolve = resolve; - this._waitPromise_reject = reject; + this._readyPromise = new Promise((resolve, reject) => { + this._readyPromise_resolve = resolve; + this._readyPromise_reject = reject; }); this._closedPromise = new Promise((resolve, reject) => { this._closedPromise_resolve = resolve; @@ -145,9 +145,9 @@ export default class ReadableByteStream { if (bytesRead === -2) { this._state = 'waiting'; - this._waitPromise = new Promise((resolve, reject) => { - this._waitPromise_resolve = resolve; - this._waitPromise_reject = reject; + this._readyPromise = new Promise((resolve, reject) => { + this._readyPromise_resolve = resolve; + this._readyPromise_reject = reject; }); return 0; @@ -176,8 +176,8 @@ export default class ReadableByteStream { ReadableStream.prototype.pipeTo.call(this, dest, {preventClose, preventAbort, preventCancel}); } - get wait() { - return this._waitPromise; + get ready() { + return this._readyPromise; } cancel(reason) { @@ -212,9 +212,9 @@ export default class ReadableByteStream { } _resolveWaitPromise(value) { - this._waitPromise_resolve(value); - this._waitPromise_resolve = null; - this._waitPromise_reject = null; + this._readyPromise_resolve(value); + this._readyPromise_resolve = null; + this._readyPromise_reject = null; } _resolveClosedPromise(value) { diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 606d9993f..d95538e04 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -23,7 +23,7 @@ export default class ReadableStream { this._onPull = pull; this._onCancel = cancel; this._strategy = strategy; - this._initWaitPromise(); + this._initReadyPromise(); this._initClosedPromise(); this._queue = []; this._state = 'waiting'; @@ -61,7 +61,7 @@ export default class ReadableStream { return Promise.reject(this._storedError); } if (this._state === 'waiting') { - this._resolveWaitPromise(undefined); + this._resolveReadyPromise(undefined); } this._queue = []; @@ -109,24 +109,24 @@ export default class ReadableStream { dest.write(source.read()); continue; } else if (source.state === 'waiting') { - Promise.race([source.wait(), dest.closed]).then(doPipe, doPipe); + Promise.race([source.ready, dest.closed]).then(doPipe, doPipe); } else if (source.state === 'errored') { - source.wait().catch(abortDest); + source.ready.catch(abortDest); } else if (source.state === 'closed') { closeDest(); } } else if (ds === 'waiting') { if (source.state === 'readable') { - Promise.race([source.closed, dest.wait()]).then(doPipe, doPipe); + Promise.race([source.closed, dest.ready]).then(doPipe, doPipe); } else if (source.state === 'waiting') { - Promise.race([source.wait(), dest.wait()]).then(doPipe, doPipe); + Promise.race([source.ready, dest.ready]).then(doPipe, doPipe); } else if (source.state === 'errored') { - source.wait().catch(abortDest); + source.ready.catch(abortDest); } else if (source.state === 'closed') { closeDest(); } } else if (ds === 'errored' && (source.state === 'readable' || source.state === 'waiting')) { - dest.wait().catch(cancelSource); + dest.ready.catch(cancelSource); } else if ((ds === 'closing' || ds === 'closed') && (source.state === 'readable' || source.state === 'waiting')) { cancelSource(new TypeError('destination is closing or closed and cannot be piped to anymore')); @@ -180,7 +180,7 @@ export default class ReadableStream { this._resolveClosedPromise(undefined); } else { this._state = 'waiting'; - this._initWaitPromise(); + this._initReadyPromise(); } } @@ -189,14 +189,14 @@ export default class ReadableStream { return chunk; } - wait() { - return this._waitPromise; + get ready() { + return this._readyPromise; } - _initWaitPromise() { - this._waitPromise = new Promise((resolve, reject) => { - this._waitPromise_resolve = resolve; - this._waitPromise_reject = reject; + _initReadyPromise() { + this._readyPromise = new Promise((resolve, reject) => { + this._readyPromise_resolve = resolve; + this._readyPromise_reject = reject; }); } @@ -213,16 +213,16 @@ export default class ReadableStream { // detect unexpected extra resolve/reject calls that may be caused by bugs in // the algorithm. - _resolveWaitPromise(value) { - this._waitPromise_resolve(value); - this._waitPromise_resolve = null; - this._waitPromise_reject = null; + _resolveReadyPromise(value) { + this._readyPromise_resolve(value); + this._readyPromise_resolve = null; + this._readyPromise_reject = null; } - _rejectWaitPromise(reason) { - this._waitPromise_reject(reason); - this._waitPromise_resolve = null; - this._waitPromise_reject = null; + _rejectReadyPromise(reason) { + this._readyPromise_reject(reason); + this._readyPromise_resolve = null; + this._readyPromise_reject = null; } _resolveClosedPromise(value) { @@ -268,7 +268,7 @@ function CallReadableStreamPull(stream) { function CreateReadableStreamCloseFunction(stream) { return () => { if (stream._state === 'waiting') { - stream._resolveWaitPromise(undefined); + stream._resolveReadyPromise(undefined); stream._resolveClosedPromise(undefined); stream._state = 'closed'; } @@ -307,7 +307,7 @@ function CreateReadableStreamEnqueueFunction(stream) { if (stream._state === 'waiting') { stream._state = 'readable'; - stream._resolveWaitPromise(undefined); + stream._resolveReadyPromise(undefined); } if (shouldApplyBackpressure === true) { @@ -322,7 +322,7 @@ function CreateReadableStreamErrorFunction(stream) { if (stream._state === 'waiting') { stream._state = 'errored'; stream._storedError = e; - stream._rejectWaitPromise(e); + stream._rejectReadyPromise(e); stream._rejectClosedPromise(e); } else if (stream._state === 'readable') { @@ -330,9 +330,9 @@ function CreateReadableStreamErrorFunction(stream) { stream._state = 'errored'; stream._storedError = e; - stream._waitPromise = Promise.reject(e); - stream._waitPromise_resolve = null; - stream._waitPromise_reject = null; + stream._readyPromise = Promise.reject(e); + stream._readyPromise_resolve = null; + stream._readyPromise_reject = null; stream._rejectClosedPromise(e); } diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index 21e735a33..a64e00d13 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -34,9 +34,9 @@ export default class WritableStream { this._closedPromise_reject = reject; }); - this._writablePromise = Promise.resolve(undefined); - this._writablePromise_resolve = null; - this._writablePromise_reject = null; + this._readyPromise = Promise.resolve(undefined); + this._readyPromise_resolve = null; + this._readyPromise_reject = null; this._queue = []; this._state = 'writable'; @@ -86,12 +86,12 @@ export default class WritableStream { return Promise.reject(this._storedError); } if (this._state === 'writable') { - this._writablePromise = Promise.reject(new TypeError('stream has already been closed')); - this._writablePromise_resolve = null; - this._writablePromise_reject = null; + this._readyPromise = Promise.reject(new TypeError('stream has already been closed')); + this._readyPromise_resolve = null; + this._readyPromise_reject = null; } if (this._state === 'waiting') { - this._writablePromise_reject(new TypeError('stream has already been closed')); + this._readyPromise_reject(new TypeError('stream has already been closed')); } this._state = 'closing'; @@ -101,8 +101,8 @@ export default class WritableStream { return this._closedPromise; } - wait() { - return this._writablePromise; + get ready() { + return this._readyPromise; } write(chunk) { @@ -202,12 +202,12 @@ function CreateWritableStreamErrorFunction(stream) { stream._storedError = e; if (stream._state === 'writable' || stream._state === 'closing') { - stream._writablePromise = Promise.reject(e); - stream._writablePromise_resolve = null; - stream._writablePromise_reject = null; + stream._readyPromise = Promise.reject(e); + stream._readyPromise_resolve = null; + stream._readyPromise_reject = null; } if (stream._state === 'waiting') { - stream._writablePromise_reject(e); + stream._readyPromise_reject(e); } stream._closedPromise_reject(e); stream._state = 'errored'; @@ -224,7 +224,7 @@ function SyncWritableStreamStateWithQueue(stream) { if (stream._state === 'waiting' && stream._queue.length === 0) { stream._state = 'writable'; - stream._writablePromise_resolve(undefined); + stream._readyPromise_resolve(undefined); return undefined; } @@ -233,15 +233,15 @@ function SyncWritableStreamStateWithQueue(stream) { if (shouldApplyBackpressure === true && stream._state === 'writable') { stream._state = 'waiting'; - stream._writablePromise = new Promise((resolve, reject) => { - stream._writablePromise_resolve = resolve; - stream._writablePromise_reject = reject; + stream._readyPromise = new Promise((resolve, reject) => { + stream._readyPromise_resolve = resolve; + stream._readyPromise_reject = reject; }); } if (shouldApplyBackpressure === false && stream._state === 'waiting') { stream._state = 'writable'; - stream._writablePromise_resolve(undefined); + stream._readyPromise_resolve(undefined); } return undefined; diff --git a/reference-implementation/test/experimental/readable-byte-stream.js b/reference-implementation/test/experimental/readable-byte-stream.js index e83c666ec..6f97f8f4c 100644 --- a/reference-implementation/test/experimental/readable-byte-stream.js +++ b/reference-implementation/test/experimental/readable-byte-stream.js @@ -29,7 +29,7 @@ test('ReadableByteStream: Call notifyReady() asynchronously to enter readable st } }); - var waitPromise = rbs.wait; + var readyPromise = rbs.ready; t.equal(rbs.state, 'waiting'); @@ -37,12 +37,13 @@ test('ReadableByteStream: Call notifyReady() asynchronously to enter readable st t.equal(rbs.state, 'readable'); - waitPromise.then( - () => t.end(), - error => { - t.fail(error); - t.end(); - }); + readyPromise.then( + () => t.end(), + error => { + t.fail(error); + t.end(); + } + ); }); test('ReadableByteStream: read() must throw if constructed with passing undefined for readBufferSize', t => { @@ -497,7 +498,7 @@ test('ReadableByteStream: Have source\'s readInto() write up to 10 bytes for eac var bytesRead = rbs.readInto(buffer, bytesFilled); bytesFilled += bytesRead; } else if (rbs.state === 'waiting') { - rbs.wait + rbs.ready .then(readAndProcess, readAndProcess) .catch( error => { diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 0e6c957fa..518a31b45 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -112,7 +112,7 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro ws.write('Hello'); t.assert(writeCalled, 'write must be called'); - ws.wait().then( + ws.ready.then( () => t.fail('wait promise unexpectedly fulfilled'), () => { t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state'); @@ -510,7 +510,7 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait t.equal(ws.state, 'waiting'); resolveWritePromise(); - ws.wait().then(() => { + ws.ready.then(() => { t.equal(ws.state, 'writable'); }) .catch(t.error); diff --git a/reference-implementation/test/readable-stream-cancel.js b/reference-implementation/test/readable-stream-cancel.js index 01946d993..865df4d1f 100644 --- a/reference-implementation/test/readable-stream-cancel.js +++ b/reference-implementation/test/readable-stream-cancel.js @@ -64,7 +64,7 @@ test('ReadableStream cancellation puts the stream in a closed state (no chunks p () => t.fail('closed promise vended before the cancellation should not be rejected') ); - rs.wait().then( + rs.ready.then( () => t.assert(true, 'wait() promise vended before the cancellation should fulfill'), () => t.fail('wait() promise vended before the cancellation should not be rejected') ); @@ -77,7 +77,7 @@ test('ReadableStream cancellation puts the stream in a closed state (no chunks p () => t.assert(true, 'closed promise vended after the cancellation should fulfill'), () => t.fail('closed promise vended after the cancellation should not be rejected') ); - rs.wait().then( + rs.ready.then( () => t.assert(true, 'wait promise vended after the cancellation should fulfill'), () => t.fail('wait promise vended after the cancellation should not be rejected') ); @@ -88,14 +88,14 @@ test('ReadableStream cancellation puts the stream in a closed state (after waiti t.plan(5); - rs.wait().then( + rs.ready.then( () => { rs.closed.then( () => t.assert(true, 'closed promise vended before the cancellation should fulfill'), () => t.fail('closed promise vended before the cancellation should not be rejected') ); - rs.wait().then( + rs.ready.then( () => t.assert(true, 'wait() promise vended before the cancellation should fulfill'), () => t.fail('wait() promise vended before the cancellation should not be rejected') ); @@ -108,7 +108,7 @@ test('ReadableStream cancellation puts the stream in a closed state (after waiti () => t.assert(true, 'closed promise vended after the cancellation should fulfill'), () => t.fail('closed promise vended after the cancellation should not be rejected') ); - rs.wait().then( + rs.ready.then( () => t.assert(true, 'wait promise vended after the cancellation should fulfill'), () => t.fail('wait promise vended after the cancellation should not be rejected') ); diff --git a/reference-implementation/test/readable-stream.js b/reference-implementation/test/readable-stream.js index dda4a81b8..9c0c0c4d0 100644 --- a/reference-implementation/test/readable-stream.js +++ b/reference-implementation/test/readable-stream.js @@ -12,18 +12,19 @@ test('ReadableStream can be constructed with no arguments', t => { }); test('ReadableStream instances have the correct methods and properties', t => { - t.plan(8); + t.plan(9); var rs = new ReadableStream(); t.equal(typeof rs.read, 'function', 'has a read method'); - t.equal(typeof rs.wait, 'function', 'has a wait method'); t.equal(typeof rs.cancel, 'function', 'has an cancel method'); t.equal(typeof rs.pipeTo, 'function', 'has a pipeTo method'); t.equal(typeof rs.pipeThrough, 'function', 'has a pipeThrough method'); t.equal(rs.state, 'waiting', 'state starts out waiting'); + t.ok(rs.ready, 'has a ready property'); + t.ok(rs.ready.then, 'ready property is a thenable'); t.ok(rs.closed, 'has a closed property'); t.ok(rs.closed.then, 'closed property is thenable'); }); @@ -40,7 +41,7 @@ test(`ReadableStream closing puts the stream in a closed state, fulfilling the w t.equal(rs.state, 'closed', 'The stream should be in closed state'); - rs.wait().then( + rs.ready.then( v => t.equal(v, undefined, 'wait() should return a promise fulfilled with undefined'), () => t.fail('wait() should not return a rejected promise') ); @@ -110,7 +111,7 @@ test(`ReadableStream reading a stream makes wait() and closed return a promise f t.throws(() => rs.read(), /TypeError/); - rs.wait().then( + rs.ready.then( v => t.equal(v, undefined, 'wait() should return a promise fulfilled with undefined'), () => t.fail('wait() should not return a rejected promise') ); @@ -133,9 +134,9 @@ test('ReadableStream avoid redundant pull call', t => { } }); - rs.wait(); - rs.wait(); - rs.wait(); + rs.ready; + rs.ready; + rs.ready; // Use setTimeout to ensure we run after any promises. setTimeout(() => { @@ -163,7 +164,7 @@ test('ReadableStream pull throws an error', t => { var error = new Error('aaaugh!!'); var rs = new ReadableStream({ pull() { throw error; } }); - rs.wait().then(() => { + rs.ready.then(() => { t.fail('waiting should fail'); t.end(); }); @@ -173,7 +174,7 @@ test('ReadableStream pull throws an error', t => { t.end(); }); - rs.wait().catch(caught => { + rs.ready.catch(caught => { t.equal(rs.state, 'errored', 'state is "errored" after waiting'); t.equal(caught, error, 'error was passed through as rejection of wait() call'); }); @@ -263,7 +264,7 @@ test('ReadableStream is able to pull data repeatedly if it\'s available synchron } }); - rs.wait().then(() => { + rs.ready.then(() => { var data = []; while (rs.state === 'readable') { data.push(rs.read()); @@ -292,7 +293,7 @@ test('ReadableStream wait() does not error when no more data is available', t => if (rs.state === 'closed') { t.deepEqual(result, [1, 2, 3, 4, 5], 'got the expected 5 chunks'); } else { - rs.wait().then(pump, r => t.ifError(r)); + rs.ready.then(pump, r => t.ifError(r)); } } }); @@ -326,7 +327,7 @@ test('ReadableStream should be able to get data sequentially from an asynchronou return Promise.resolve(EOF); } - return rs.wait().then(() => { + return rs.ready.then(() => { if (rs.state === 'readable') { return rs.read(); } else if (rs.state === 'closed') { @@ -544,7 +545,7 @@ test('ReadableStream errors in shouldApplyBackpressure prevent wait() from fulfi } }); - rs.wait().then( + rs.ready.then( () => { t.fail('wait() should not be fulfilled'); t.end(); diff --git a/reference-implementation/test/transform-stream-errors.js b/reference-implementation/test/transform-stream-errors.js index 8b36b15ea..aae369982 100644 --- a/reference-implementation/test/transform-stream-errors.js +++ b/reference-implementation/test/transform-stream-errors.js @@ -32,7 +32,7 @@ test('TransformStream errors thrown in transform put the writable and readable i } }, 0); - ts.readable.wait().then( + ts.readable.ready.then( () => t.fail('readable\'s wait() should not be fulfilled'), e => t.equal(e, thrownError, 'readable\'s wait() should be rejected with the thrown error') ); @@ -86,7 +86,7 @@ test('TransformStream errors thrown in flush put the writable and readable in an } }, 0); - ts.readable.wait().then( + ts.readable.ready.then( () => t.fail('readable\'s wait() should not be fulfilled'), e => t.equal(e, thrownError, 'readable\'s wait() should be rejected with the thrown error') ); diff --git a/reference-implementation/test/transform-stream.js b/reference-implementation/test/transform-stream.js index d1394b73e..242b6e1d3 100644 --- a/reference-implementation/test/transform-stream.js +++ b/reference-implementation/test/transform-stream.js @@ -51,7 +51,7 @@ test('Pass-through sync TransformStream: can read from readable what is put into t.equal(ts.readable.state, 'readable', 'readable is readable since transformation is sync'); t.equal(ts.readable.read(), 'a', 'result from reading the readable is the same as was written to writable'); t.equal(ts.readable.state, 'waiting', 'readable is waiting again after having read all that was written'); - ts.writable.wait().then(() => { + ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); }) .catch(t.error); @@ -75,7 +75,7 @@ test('Uppercaser sync TransformStream: can read from readable transformed versio t.equal(ts.readable.state, 'readable', 'readable is readable since transformation is sync'); t.equal(ts.readable.read(), 'A', 'result from reading the readable is the same as was written to writable'); t.equal(ts.readable.state, 'waiting', 'readable is waiting again after having read all that was written'); - ts.writable.wait().then(() => { + ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); }) .catch(t.error); @@ -102,7 +102,7 @@ test('Uppercaser-doubler sync TransformStream: can read both chunks put into the t.equal(ts.readable.state, 'readable', 'readable is readable still after reading the first chunk'); t.equal(ts.readable.read(), 'A', 'the second chunk read is also the transformation of the single chunk written'); t.equal(ts.readable.state, 'waiting', 'readable is waiting again after having read both enqueued chunks'); - ts.writable.wait().then(() => { + ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); }) .catch(t.error); @@ -125,14 +125,14 @@ test('Uppercaser async TransformStream: readable chunk becomes available asynchr t.equal(ts.writable.state, 'waiting', 'writable is now waiting since the transform has not signaled done'); t.equal(ts.readable.state, 'waiting', 'readable is still not readable'); - ts.readable.wait().then(() => { + ts.readable.ready.then(() => { t.equal(ts.readable.state, 'readable', 'readable eventually becomes readable'); t.equal(ts.readable.read(), 'A', 'chunk read from readable is the transformation result'); t.equal(ts.readable.state, 'waiting', 'readable is waiting again after having read the chunk'); t.equal(ts.writable.state, 'waiting', 'writable is still waiting since the transform still has not signaled done'); - return ts.writable.wait().then(() => { + return ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable eventually becomes writable (after the transform signals done)'); }); }) @@ -157,21 +157,21 @@ test('Uppercaser-doubler async TransformStream: readable chunks becomes availabl t.equal(ts.writable.state, 'waiting', 'writable is now waiting since the transform has not signaled done'); t.equal(ts.readable.state, 'waiting', 'readable is still not readable'); - ts.readable.wait().then(() => { + ts.readable.ready.then(() => { t.equal(ts.readable.state, 'readable', 'readable eventually becomes readable'); t.equal(ts.readable.read(), 'A', 'chunk read from readable is the transformation result'); t.equal(ts.readable.state, 'waiting', 'readable is waiting again after having read the chunk'); t.equal(ts.writable.state, 'waiting', 'writable is still waiting since the transform still has not signaled done'); - return ts.readable.wait().then(() => { + return ts.readable.ready.then(() => { t.equal(ts.readable.state, 'readable', 'readable becomes readable again'); t.equal(ts.readable.read(), 'A', 'chunk read from readable is the transformation result'); t.equal(ts.readable.state, 'waiting', 'readable is waiting again after having read the chunk'); t.equal(ts.writable.state, 'waiting', 'writable is still waiting since the transform still has not signaled done'); - return ts.writable.wait().then(() => { + return ts.writable.ready.then(() => { t.equal(ts.writable.state, 'writable', 'writable eventually becomes writable (after the transform signals done)'); }); }); @@ -340,7 +340,7 @@ test('TransformStream flush gets a chance to enqueue more into the readable', t ts.writable.write('a'); t.equal(ts.readable.state, 'waiting', 'after a write to the writable, the readable is still waiting'); ts.writable.close(); - ts.readable.wait().then(() => { + ts.readable.ready.then(() => { t.equal(ts.readable.state, 'readable', 'after closing the writable, the readable is now readable as a result of flush'); t.equal(ts.readable.read(), 'x', 'reading the first chunk gives back what was enqueued'); t.equal(ts.readable.read(), 'y', 'reading the second chunk gives back what was enqueued'); @@ -369,7 +369,7 @@ test('TransformStream flush gets a chance to enqueue more into the readable, and ts.writable.write('a'); t.equal(ts.readable.state, 'waiting', 'after a write to the writable, the readable is still waiting'); ts.writable.close(); - ts.readable.wait().then(() => { + ts.readable.ready.then(() => { t.equal(ts.readable.state, 'readable', 'after closing the writable, the readable is now readable as a result of flush'); t.equal(ts.readable.read(), 'x', 'reading the first chunk gives back what was enqueued'); t.equal(ts.readable.read(), 'y', 'reading the second chunk gives back what was enqueued'); diff --git a/reference-implementation/test/utils/readable-stream-to-array.js b/reference-implementation/test/utils/readable-stream-to-array.js index b27effd6a..d1c856f06 100644 --- a/reference-implementation/test/utils/readable-stream-to-array.js +++ b/reference-implementation/test/utils/readable-stream-to-array.js @@ -11,7 +11,7 @@ export default function readableStreamToArray(readable) { } if (readable.state === 'waiting') { - readable.wait().then(pump); + readable.ready.then(pump); } } }); diff --git a/reference-implementation/test/writable-stream-abort.js b/reference-implementation/test/writable-stream-abort.js index e952859ce..54472454e 100644 --- a/reference-implementation/test/writable-stream-abort.js +++ b/reference-implementation/test/writable-stream-abort.js @@ -113,7 +113,7 @@ test('Aborting a WritableStream puts it in an errored state, with stored error e r => t.equal(r, passedReason, 'writing should reject with the given reason') ); - ws.wait().then( + ws.ready.then( () => t.fail('waiting should not succeed'), r => t.equal(r, passedReason, 'waiting should reject with the given reason') ); @@ -142,7 +142,7 @@ test('Aborting a WritableStream causes any outstanding wait() promises to be rej ws.write('a'); t.equal(ws.state, 'waiting', 'state should be waiting'); - ws.wait().then( + ws.ready.then( () => t.fail('waiting should not succeed'), r => t.equal(r, passedReason, 'waiting should reject with the given reason') ); diff --git a/reference-implementation/test/writable-stream.js b/reference-implementation/test/writable-stream.js index 202d205f0..67127b719 100644 --- a/reference-implementation/test/writable-stream.js +++ b/reference-implementation/test/writable-stream.js @@ -161,17 +161,18 @@ test('WritableStream can be constructed with no arguments', t => { }); test('WritableStream instances have the correct methods and properties', t => { - t.plan(7); + t.plan(8); var ws = new WritableStream(); t.equal(typeof ws.write, 'function', 'has a write method'); - t.equal(typeof ws.wait, 'function', 'has a wait method'); t.equal(typeof ws.abort, 'function', 'has an abort method'); t.equal(typeof ws.close, 'function', 'has a close method'); t.equal(ws.state, 'writable', 'state starts out writable'); + t.ok(ws.ready, 'has a ready property'); + t.ok(ws.ready.then, 'ready property is a thenable'); t.ok(ws.closed, 'has a closed property'); t.ok(ws.closed.then, 'closed property is thenable'); }); @@ -232,7 +233,7 @@ test('WritableStream wait() fulfills immediately if the stream is writable', t = strategy: { shouldApplyBackpressure() { return true; } } }); - ws.wait().then(() => { + ws.ready.then(() => { t.pass('wait() promise was fulfilled'); t.end(); }); @@ -339,7 +340,7 @@ test('If close is called on a WritableStream in writable state, wait will return ws.close(); t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); - ws.wait().then( + ws.ready.then( () => t.fail('wait on ws returned a fulfilled promise unexpectedly'), r => { t.equal(r.constructor, TypeError, @@ -366,7 +367,7 @@ test('If close is called on a WritableStream in waiting state, wait will return ws.close(); t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); - ws.wait().then( + ws.ready.then( () => t.fail('wait on ws returned a fulfilled promise unexpectedly'), r => { t.equal(r.constructor, TypeError, @@ -401,7 +402,7 @@ test('If sink rejects on a WritableStream in writable state, wait will return a t.equal(r, passedError, 'write() should be rejected with the passed error'); t.equal(ws.state, 'errored', 'state is errored as error is called'); - ws.wait().then( + ws.ready.then( () => t.fail('wait on ws returned a fulfilled promise unexpectedly'), r => t.equal(r, passedError, 'wait() should be rejected with the passed error') ); @@ -439,7 +440,7 @@ test('WritableStream if sink\'s close throws', t => { r => { t.equal(ws.state, 'errored', 'state must be errored as error is called'); - ws.wait().then( + ws.ready.then( () => t.fail('wait on ws returned a fulfilled promise unexpectedly'), r => { t.equal(r, passedError, 'wait() should be rejected with the passed error'); @@ -480,8 +481,8 @@ test('WritableStream if the promise returned by sink\'s close rejects', t => { r => { t.equal(ws.state, 'errored', 'state must be errored as error is called'); - ws.wait().then( - () => t.fail('ws.wait() returned a fulfilled promise'), + ws.ready.then( + () => t.fail('ws.ready returned a fulfilled promise'), r => { t.equal(r, passedError, 'wait() should be rejected with the passed error'); t.end(); @@ -518,7 +519,7 @@ test('If sink rejects on a WritableStream in waiting state, wait will return a r t.equal(r, passedError, 'write() should be rejected with the passed error'); t.equal(ws.state, 'errored', 'state is errored as error is called'); - ws.wait().then( + ws.ready.then( () => t.fail('wait on ws returned a fulfilled promise unexpectedly'), r => t.equal(r, passedError, 'wait() should be rejected with the passed error') );