diff --git a/index.bs b/index.bs index 5104e9f59..14e94c255 100644 --- a/index.bs +++ b/index.bs @@ -194,7 +194,7 @@ reader (if one exists) will automatically release its lock. the chain:

-    readableStream.pipeTo(writableStream).finished
+    readableStream.pipeTo(writableStream)
       .then(() => console.log("All data successfully written!"))
       .catch(e => console.error("Something went wrong!", e));
   
@@ -941,7 +941,7 @@ a variable stream, that performs the following steps: chance to slow down its data production.

-    readableStream.pipeTo(writableStream).finished
+    readableStream.pipeTo(writableStream)
       .then(() => console.log("All data successfully written!"))
       .catch(e => console.error("Something went wrong!", e));
   
@@ -1856,7 +1856,7 @@ writable stream:

   const webSocketStream = makeReadableWebSocketStream("wss://example.com", 443);
 
-  webSocketStream.pipeTo(writableStream).finished
+  webSocketStream.pipeTo(writableStream)
     .then(() => console.log("All data successfully written!"))
     .catch(e => console.error("Something went wrong!", e));
 
@@ -1989,7 +1989,7 @@ We can then use this function to create writable streams for a web socket, and p

   const webSocketStream = makeWritableWebSocketStream("wss://example.com", 443);
 
-  readableStream.pipeTo(webSocketStream).finished
+  readableStream.pipeTo(webSocketStream)
     .then(() => console.log("All data successfully written!"))
     .catch(e => console.error("Something went wrong!", e));
 
diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index c2e405b7e..0ce686197 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -72,13 +72,12 @@ export default class ReadableStream { let reader; let lastRead; let closedPurposefully = false; - let unpiped = false; - let resolveFinishedPromise; - let rejectFinishedPromise; + let resolvePipeToPromise; + let rejectPipeToPromise; - const finishedPromise = new Promise((resolve, reject) => { - resolveFinishedPromise = resolve; - rejectFinishedPromise = reject; + return new Promise((resolve, reject) => { + resolvePipeToPromise = resolve; + rejectPipeToPromise = reject; reader = source.getReader(); @@ -95,25 +94,9 @@ export default class ReadableStream { doPipe(); }); - return { finished: finishedPromise, unpipe }; - - function unpipe() { - unpiped = true; - return lastRead.then(finishUnpipe, finishUnpipe); - - function finishUnpipe() { - reader.releaseLock(); - resolveFinishedPromise(undefined); - } - } - function doPipe() { lastRead = reader.read(); Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => { - if (unpiped === true) { - return; - } - if (Boolean(done) === true) { closeDest(); } else { @@ -127,30 +110,22 @@ export default class ReadableStream { } function cancelSource(reason) { - if (unpiped === true) { - return; - } - if (preventCancel === false) { // cancelling automatically releases the lock (and that doesn't fail, since source is then closed) reader.cancel(reason); - rejectFinishedPromise(reason); + rejectPipeToPromise(reason); } else { // If we don't cancel, we need to wait for lastRead to finish before we're allowed to release. // We don't need to handle lastRead failing because that will trigger abortDest which takes care of // both of these. lastRead.then(() => { reader.releaseLock(); - rejectFinishedPromise(reason); + rejectPipeToPromise(reason); }); } } function closeDest() { - if (unpiped === true) { - return; - } - // Does not need to wait for lastRead since it occurs only on source closed. reader.releaseLock(); @@ -158,17 +133,13 @@ export default class ReadableStream { const destState = dest.state; if (preventClose === false && (destState === 'waiting' || destState === 'writable')) { closedPurposefully = true; - dest.close().then(resolveFinishedPromise, rejectFinishedPromise); + dest.close().then(resolvePipeToPromise, rejectPipeToPromise); } else { - resolveFinishedPromise(); + resolvePipeToPromise(); } } function abortDest(reason) { - if (unpiped === true) { - return; - } - // Does not need to wait for lastRead since it only occurs on source errored. reader.releaseLock(); @@ -176,7 +147,7 @@ export default class ReadableStream { if (preventAbort === false) { dest.abort(reason); } - rejectFinishedPromise(reason); + rejectPipeToPromise(reason); } } diff --git a/reference-implementation/test/brand-checks.js b/reference-implementation/test/brand-checks.js index 305995f9e..f94ab50c6 100644 --- a/reference-implementation/test/brand-checks.js +++ b/reference-implementation/test/brand-checks.js @@ -15,9 +15,7 @@ function fakeReadableStream() { get closed() { return Promise.resolve(); }, cancel(reason) { return Promise.resolve(); }, pipeThrough({ writable, readable }, options) { return readable; }, - pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { - return { finished: Promise.resolve(), unpipe() { } }; - }, + pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { return Promise.resolve(); }, getReader() { return new ReadableStream(new ReadableStream()); } }; } diff --git a/reference-implementation/test/pipe-through.js b/reference-implementation/test/pipe-through.js index 14597ae9b..079052e86 100644 --- a/reference-implementation/test/pipe-through.js +++ b/reference-implementation/test/pipe-through.js @@ -33,7 +33,7 @@ test('Piping through an identity transform stream will close the destination whe const ws = new WritableStream(); - rs.pipeThrough(ts).pipeTo(ws).finished.then(() => { + rs.pipeThrough(ts).pipeTo(ws).then(() => { t.equal(ws.state, 'closed', 'the writable stream was closed'); }) .catch(e => t.error(e)); @@ -82,7 +82,7 @@ test.skip('Piping through a default transform stream causes backpressure to be e }); setTimeout(() => { - rs.pipeThrough(ts).pipeTo(ws).finished.then(() => { + rs.pipeThrough(ts).pipeTo(ws).then(() => { t.deepEqual( enqueueReturnValues, [true, true, true, true, false, false, false, false], diff --git a/reference-implementation/test/pipe-to-options.js b/reference-implementation/test/pipe-to-options.js index b08cc5747..dc50e7635 100644 --- a/reference-implementation/test/pipe-to-options.js +++ b/reference-implementation/test/pipe-to-options.js @@ -80,11 +80,11 @@ test('Piping with { preventCancel: true } and a destination error', t => { } }); - rs.pipeTo(ws, { preventCancel: true }).finished.catch(e => { - t.equal(e, theError, 'pipeTo finished promise should reject with the sink error'); + rs.pipeTo(ws, { preventCancel: true }).catch(e => { + t.equal(e, theError, 'rejection reason of pipeTo promise is the sink error'); let reader; - t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo finishes'); + t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo completes'); // { value: 'c', done: false } gets consumed before we know that ws has errored, and so is lost. diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 11a6840da..5b7427e10 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -30,7 +30,7 @@ test('Piping from a ReadableStream from which lots of data are readable synchron }); let pipeFinished = false; - rs.pipeTo(ws).finished.then( + rs.pipeTo(ws).then( () => { pipeFinished = true; t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes'); @@ -75,11 +75,11 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos rsClosed = true; }); - rs.pipeTo(ws).finished.then( - () => t.fail('pipeTo finished promise should not fulfill'), + rs.pipeTo(ws).then( + () => t.fail('promise returned by pipeTo should not fulfill'), r => { t.equal(r, cancelReason, - 'pipeTo finished promise should reject with the same error as the underlying source cancel was called with'); + 'the pipeTo promise should reject with the same error as the underlying source cancel was called with'); t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes'); } ); @@ -135,8 +135,8 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro ws.ready.then(() => { t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state'); - rs.pipeTo(ws).finished.catch(e => { - t.equal(e, passedError, 'pipeTo finished promise should be rejected with the error'); + rs.pipeTo(ws).catch(e => { + t.equal(e, passedError, 'pipeTo promise should be rejected with the error'); t.assert(cancelCalled, 'cancel should have been called'); t.end(); }); @@ -187,11 +187,11 @@ test('Piping from a ReadableStream in the readable state which becomes closed af }); startPromise.then(() => { - rs.pipeTo(ws).finished.then(() => { - t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo finishes'); + rs.pipeTo(ws).then(() => { + t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo completes'); }); - t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call'); + t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo'); closeReadableStream(); }); @@ -235,12 +235,12 @@ test('Piping from a ReadableStream in the readable state which becomes errored a }); startPromise.then(() => { - rs.pipeTo(ws).finished.catch(e => { - t.equal(e, passedError, 'pipeTo finished promise should be rejected with the passed error'); - t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo finishes'); + rs.pipeTo(ws).catch(e => { + t.equal(e, passedError, 'pipeTo should be rejected with the passed error'); + t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo completes'); }); - t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call'); + t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo'); errorReadableStream(passedError); }); @@ -276,7 +276,7 @@ test('Piping from an empty ReadableStream which becomes non-empty after pipeTo c } }); - rs.pipeTo(ws).finished.then(() => t.fail('pipeTo finished promise should not fulfill')); + rs.pipeTo(ws).then(() => t.fail('pipeTo promise should not fulfill')); t.equal(ws.state, 'writable', 'writable stream should start in writable state'); enqueue('Hello'); @@ -312,9 +312,7 @@ test('Piping from an empty ReadableStream which becomes errored after pipeTo cal } }); - rs.pipeTo(ws).finished.catch( - e => t.equal(e, passedError, 'pipeTo finished promise should reject with the passed error') - ); + rs.pipeTo(ws).catch(e => t.equal(e, passedError, 'pipeTo should reject with the passed error')); t.equal(ws.state, 'writable', 'writable stream should start out writable'); errorReadableStream(passedError); }); @@ -357,9 +355,7 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st startPromise.then(() => { t.equal(ws.state, 'writable', 'ws should start writable'); - rs.pipeTo(ws).finished.catch( - e => t.equal(e, theError, 'pipeTo finished promise should reject with the passed error') - ); + rs.pipeTo(ws).catch(e => t.equal(e, theError, 'pipeTo should reject with the passed error')); t.equal(ws.state, 'writable', 'ws should be writable after pipe'); errorWritableStream(theError); @@ -792,9 +788,9 @@ test('Piping to a stream that has been aborted passes through the error as the c const passedReason = new Error('I don\'t like you.'); ws.abort(passedReason); - rs.pipeTo(ws).finished.catch(e => { - t.equal(e, passedReason, 'pipeTo finished promise should reject with the cancellation reason'); - t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason'); + rs.pipeTo(ws).catch(e => { + t.equal(e, passedReason, 'pipeTo rejection reason should be the cancellation reason'); + t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason'); t.end(); }); }); @@ -813,9 +809,9 @@ test('Piping to a stream and then aborting it passes through the error as the ca const pipeToPromise = rs.pipeTo(ws); ws.abort(passedReason); - pipeToPromise.finished.catch(e => { - t.equal(e, passedReason, 'pipeTo finished promise should reject with the abortion reason'); - t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason'); + pipeToPromise.catch(e => { + t.equal(e, passedReason, 'pipeTo rejection reason should be the abortion reason'); + t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason'); t.end(); }); }); @@ -831,8 +827,8 @@ test('Piping to a stream that has been closed propagates a TypeError cancellatio const ws = new WritableStream(); ws.close(); - rs.pipeTo(ws).finished.catch(e => { - t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError'); + rs.pipeTo(ws).catch(e => { + t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); t.end(); }); @@ -848,11 +844,11 @@ test('Piping to a stream and then closing it propagates a TypeError cancellation const ws = new WritableStream(); - const pipeToPromise = rs.pipeTo(ws).finished; + const pipeToPromise = rs.pipeTo(ws); ws.close(); pipeToPromise.catch(e => { - t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError'); + t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); t.end(); }); @@ -917,10 +913,10 @@ test('Piping to a stream that errors on write should not pass through the error } }); - rs.pipeTo(ws).finished.then( - () => t.fail('pipeTo finished promise should not fulfill'), + rs.pipeTo(ws).then( + () => t.fail('pipeTo should not fulfill'), r => { - t.equal(r, passedError, 'pipeTo finished promise should reject with the same error as the write'); + t.equal(r, passedError, 'pipeTo should reject with the same error as the write'); t.equal(cancelCalled, false, 'cancel should not have been called'); t.end(); } @@ -990,7 +986,7 @@ test('Piping to a writable stream that does not consume the writes fast enough e }); startPromise.then(() => { - rs.pipeTo(ws).finished.then(() => { + rs.pipeTo(ws).then(() => { t.deepEqual(enqueueReturnValues, [true, true, true, false], 'backpressure was correctly exerted at the source'); t.deepEqual(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks were written'); t.end(); diff --git a/reference-implementation/test/templated/readable-stream-closed.js b/reference-implementation/test/templated/readable-stream-closed.js index f4b15e20f..134b30613 100644 --- a/reference-implementation/test/templated/readable-stream-closed.js +++ b/reference-implementation/test/templated/readable-stream-closed.js @@ -60,8 +60,8 @@ export default (label, factory) => { startPromise.then(() => { t.equal(ws.state, 'writable', 'writable stream should start in writable state'); - rs.pipeTo(ws).finished.then(() => { - t.pass('pipeTo finished promise should be fulfilled'); + rs.pipeTo(ws).then(() => { + t.pass('pipeTo promise should be fulfilled'); t.equal(ws.state, 'closed', 'writable stream should become closed'); }); }); diff --git a/reference-implementation/test/templated/readable-stream-errored-async-only.js b/reference-implementation/test/templated/readable-stream-errored-async-only.js index 86907372c..8ec30721d 100644 --- a/reference-implementation/test/templated/readable-stream-errored-async-only.js +++ b/reference-implementation/test/templated/readable-stream-errored-async-only.js @@ -15,9 +15,9 @@ export default (label, factory, error) => { } }); - rs.pipeTo(ws).finished.catch(e => { + rs.pipeTo(ws).catch(e => { t.equal(ws.state, 'errored', 'destination should be errored'); - t.equal(e, error, 'pipeTo finished promise should reject with the source error'); + t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); }); ws.closed.catch(e => t.equal(e, error), 'rejection reason of dest closed should be the source error'); @@ -33,9 +33,9 @@ export default (label, factory, error) => { } }); - rs.pipeTo(ws, { preventAbort: false }).finished.catch(e => { + rs.pipeTo(ws, { preventAbort: false }).catch(e => { t.equal(ws.state, 'errored', 'destination should be errored'); - t.equal(e, error, 'pipeTo finished promise should reject with the source error'); + t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); }); ws.closed.catch(e => t.equal(e, error), 'rejection reason of dest closed should be the source error'); @@ -51,9 +51,9 @@ export default (label, factory, error) => { } }); - rs.pipeTo(ws, { preventAbort: true }).finished.catch(e => { + rs.pipeTo(ws, { preventAbort: true }).catch(e => { t.equal(ws.state, 'writable', 'destination should remain writable'); - t.equal(e, error, 'pipeTo finished promise should reject with the source error'); + t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); }); }); }; diff --git a/reference-implementation/test/templated/readable-stream-errored.js b/reference-implementation/test/templated/readable-stream-errored.js index 3aeb00382..2866f9a3b 100644 --- a/reference-implementation/test/templated/readable-stream-errored.js +++ b/reference-implementation/test/templated/readable-stream-errored.js @@ -39,33 +39,16 @@ export default (label, factory, error) => { startPromise.then(() => { t.equal(ws.state, 'writable'); - rs.pipeTo(ws).finished.then( - () => t.fail('pipeTo finished promise should not be fulfilled'), + rs.pipeTo(ws).then( + () => t.fail('pipeTo promise should not be fulfilled'), e => { - t.equal(e, error, 'pipeTo finished promise should be rejected with the passed error'); + t.equal(e, error, 'pipeTo promise should be rejected with the passed error'); t.equal(ws.state, 'errored', 'writable stream should become errored'); } ); }); }); - test('unpiping should be a no-op after the pipe fails', t => { - t.plan(2); - - const rs = factory(); - const ws = new WritableStream(); - const pipe = rs.pipeTo(ws); - - pipe.finished.catch(e => { - t.equal(e, error, 'pipeTo finished promise should be rejected with the passed error'); - - return pipe.unpipe().then(v => { - t.equal(v, undefined, 'unpipe() should fulfill with undefined'); - }); - }) - .catch(e => t.error(e)); - }); - test('getReader() should return a reader that acts errored', t => { t.plan(2); const rs = factory(); diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-closed.js b/reference-implementation/test/templated/readable-stream-two-chunks-closed.js index 907c8fa83..2793dfd13 100644 --- a/reference-implementation/test/templated/readable-stream-two-chunks-closed.js +++ b/reference-implementation/test/templated/readable-stream-two-chunks-closed.js @@ -18,7 +18,7 @@ export default (label, factory, chunks) => { } }); - rs.pipeTo(ws).finished.then(() => { + rs.pipeTo(ws).then(() => { t.equal(ws.state, 'closed', 'destination should be closed'); t.deepEqual(chunksWritten, chunks); t.end(); @@ -38,7 +38,7 @@ export default (label, factory, chunks) => { } }); - rs.pipeTo(ws).finished.then(() => { + rs.pipeTo(ws).then(() => { t.equal(ws.state, 'closed', 'destination should be closed'); t.deepEqual(chunksWritten, chunks); t.end(); @@ -61,52 +61,11 @@ export default (label, factory, chunks) => { } }); - rs.pipeTo(ws, { preventClose: true }).finished.then(() => { + rs.pipeTo(ws, { preventClose: true }).then(() => { t.equal(ws.state, 'writable', 'destination should be writable'); t.deepEqual(chunksWritten, chunks); t.end(); }); }); - - test('piping and then immediately unpiping', t => { - t.plan(5); - const rs = factory(); - - const chunksWritten = []; - const ws = new WritableStream({ - close() { - t.fail('unexpected close call'); - }, - abort() { - t.fail('unexpected abort call'); - }, - write(chunk) { - chunksWritten.push(chunk); - } - }); - - const pipe = rs.pipeTo(ws); - - let unpipeFulfilled = false; - - pipe.unpipe().then(() => { - unpipeFulfilled = true; - - let reader; - t.doesNotThrow(() => { reader = rs.getReader(); }, - 'should be able to get a reader after unpipe promise fulfills'); - - reader.read().then(r => { - t.deepEqual(r, { value: chunks[1], done: false }, 'reading from the reader should give the second chunk'); - }); - }); - - pipe.finished.then(v => { - t.equal(v, undefined, 'pipeTo finished promise should fulfill with undefined'); - t.equal(unpipeFulfilled, false, 'pipeTo finished promise should fulfill before the unpipe promise'); - }); - - t.throws(() => rs.getReader(), TypeError, 'should not be able to get a reader immediately after unpipe call'); - }); };