Skip to content

Commit

Permalink
Revert "Update pipeTo to allow unpiping"
Browse files Browse the repository at this point in the history
This reverts commit 0de787a. Per discussions in #297, we can punt on unpiping until we also figure out cancelable promises.

Conflicts:
	reference-implementation/lib/readable-stream.js
  • Loading branch information
domenic committed Mar 16, 2015
1 parent fc32294 commit f11a86f
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 157 deletions.
8 changes: 4 additions & 4 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ reader (if one exists) will automatically release its lock.
the chain:

<pre><code class="lang-javascript">
readableStream.pipeTo(writableStream).finished
readableStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Expand Down Expand Up @@ -941,7 +941,7 @@ a variable <var>stream</var>, that performs the following steps:
chance to slow down its data production.

<pre><code class="lang-javascript">
readableStream.pipeTo(writableStream).finished
readableStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Expand Down Expand Up @@ -1856,7 +1856,7 @@ writable stream:
<pre><code class="lang-javascript">
const webSocketStream = makeReadableWebSocketStream("wss://example.com", 443);

webSocketStream.pipeTo(writableStream).finished
webSocketStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Expand Down Expand Up @@ -1989,7 +1989,7 @@ We can then use this function to create writable streams for a web socket, and p
<pre><code class="lang-javascript">
const webSocketStream = makeWritableWebSocketStream("wss://example.com", 443);

readableStream.pipeTo(webSocketStream).finished
readableStream.pipeTo(webSocketStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Expand Down
49 changes: 10 additions & 39 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@ export default class ReadableStream {
let reader;
let lastRead;
let closedPurposefully = false;
let unpiped = false;
let resolveFinishedPromise;
let rejectFinishedPromise;
let resolvePipeToPromise;
let rejectPipeToPromise;

const finishedPromise = new Promise((resolve, reject) => {
resolveFinishedPromise = resolve;
rejectFinishedPromise = reject;
return new Promise((resolve, reject) => {
resolvePipeToPromise = resolve;
rejectPipeToPromise = reject;

reader = source.getReader();

Expand All @@ -95,25 +94,9 @@ export default class ReadableStream {
doPipe();
});

return { finished: finishedPromise, unpipe };

function unpipe() {
unpiped = true;
return lastRead.then(finishUnpipe, finishUnpipe);

function finishUnpipe() {
reader.releaseLock();
resolveFinishedPromise(undefined);
}
}

function doPipe() {
lastRead = reader.read();
Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => {
if (unpiped === true) {
return;
}

if (Boolean(done) === true) {
closeDest();
} else {
Expand All @@ -127,56 +110,44 @@ export default class ReadableStream {
}

function cancelSource(reason) {
if (unpiped === true) {
return;
}

if (preventCancel === false) {
// cancelling automatically releases the lock (and that doesn't fail, since source is then closed)
reader.cancel(reason);
rejectFinishedPromise(reason);
rejectPipeToPromise(reason);
} else {
// If we don't cancel, we need to wait for lastRead to finish before we're allowed to release.
// We don't need to handle lastRead failing because that will trigger abortDest which takes care of
// both of these.
lastRead.then(() => {
reader.releaseLock();
rejectFinishedPromise(reason);
rejectPipeToPromise(reason);
});
}
}

function closeDest() {
if (unpiped === true) {
return;
}

// Does not need to wait for lastRead since it occurs only on source closed.

reader.releaseLock();

const destState = dest.state;
if (preventClose === false && (destState === 'waiting' || destState === 'writable')) {
closedPurposefully = true;
dest.close().then(resolveFinishedPromise, rejectFinishedPromise);
dest.close().then(resolvePipeToPromise, rejectPipeToPromise);
} else {
resolveFinishedPromise();
resolvePipeToPromise();
}
}

function abortDest(reason) {
if (unpiped === true) {
return;
}

// Does not need to wait for lastRead since it only occurs on source errored.

reader.releaseLock();

if (preventAbort === false) {
dest.abort(reason);
}
rejectFinishedPromise(reason);
rejectPipeToPromise(reason);
}
}

Expand Down
4 changes: 1 addition & 3 deletions reference-implementation/test/brand-checks.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ function fakeReadableStream() {
get closed() { return Promise.resolve(); },
cancel(reason) { return Promise.resolve(); },
pipeThrough({ writable, readable }, options) { return readable; },
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) {
return { finished: Promise.resolve(), unpipe() { } };
},
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { return Promise.resolve(); },
getReader() { return new ReadableStream(new ReadableStream()); }
};
}
Expand Down
4 changes: 2 additions & 2 deletions reference-implementation/test/pipe-through.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ test('Piping through an identity transform stream will close the destination whe

const ws = new WritableStream();

rs.pipeThrough(ts).pipeTo(ws).finished.then(() => {
rs.pipeThrough(ts).pipeTo(ws).then(() => {
t.equal(ws.state, 'closed', 'the writable stream was closed');
})
.catch(e => t.error(e));
Expand Down Expand Up @@ -82,7 +82,7 @@ test.skip('Piping through a default transform stream causes backpressure to be e
});

setTimeout(() => {
rs.pipeThrough(ts).pipeTo(ws).finished.then(() => {
rs.pipeThrough(ts).pipeTo(ws).then(() => {
t.deepEqual(
enqueueReturnValues,
[true, true, true, true, false, false, false, false],
Expand Down
6 changes: 3 additions & 3 deletions reference-implementation/test/pipe-to-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ test('Piping with { preventCancel: true } and a destination error', t => {
}
});

rs.pipeTo(ws, { preventCancel: true }).finished.catch(e => {
t.equal(e, theError, 'pipeTo finished promise should reject with the sink error');
rs.pipeTo(ws, { preventCancel: true }).catch(e => {
t.equal(e, theError, 'rejection reason of pipeTo promise is the sink error');

let reader;
t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo finishes');
t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo completes');

// { value: 'c', done: false } gets consumed before we know that ws has errored, and so is lost.

Expand Down
64 changes: 30 additions & 34 deletions reference-implementation/test/pipe-to.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ test('Piping from a ReadableStream from which lots of data are readable synchron
});

let pipeFinished = false;
rs.pipeTo(ws).finished.then(
rs.pipeTo(ws).then(
() => {
pipeFinished = true;
t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes');
Expand Down Expand Up @@ -75,11 +75,11 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos
rsClosed = true;
});

rs.pipeTo(ws).finished.then(
() => t.fail('pipeTo finished promise should not fulfill'),
rs.pipeTo(ws).then(
() => t.fail('promise returned by pipeTo should not fulfill'),
r => {
t.equal(r, cancelReason,
'pipeTo finished promise should reject with the same error as the underlying source cancel was called with');
'the pipeTo promise should reject with the same error as the underlying source cancel was called with');
t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes');
}
);
Expand Down Expand Up @@ -135,8 +135,8 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro
ws.ready.then(() => {
t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state');

rs.pipeTo(ws).finished.catch(e => {
t.equal(e, passedError, 'pipeTo finished promise should be rejected with the error');
rs.pipeTo(ws).catch(e => {
t.equal(e, passedError, 'pipeTo promise should be rejected with the error');
t.assert(cancelCalled, 'cancel should have been called');
t.end();
});
Expand Down Expand Up @@ -187,11 +187,11 @@ test('Piping from a ReadableStream in the readable state which becomes closed af
});

startPromise.then(() => {
rs.pipeTo(ws).finished.then(() => {
t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo finishes');
rs.pipeTo(ws).then(() => {
t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo completes');
});

t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call');
t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo');

closeReadableStream();
});
Expand Down Expand Up @@ -235,12 +235,12 @@ test('Piping from a ReadableStream in the readable state which becomes errored a
});

startPromise.then(() => {
rs.pipeTo(ws).finished.catch(e => {
t.equal(e, passedError, 'pipeTo finished promise should be rejected with the passed error');
t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo finishes');
rs.pipeTo(ws).catch(e => {
t.equal(e, passedError, 'pipeTo should be rejected with the passed error');
t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo completes');
});

t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call');
t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo');

errorReadableStream(passedError);
});
Expand Down Expand Up @@ -276,7 +276,7 @@ test('Piping from an empty ReadableStream which becomes non-empty after pipeTo c
}
});

rs.pipeTo(ws).finished.then(() => t.fail('pipeTo finished promise should not fulfill'));
rs.pipeTo(ws).then(() => t.fail('pipeTo promise should not fulfill'));
t.equal(ws.state, 'writable', 'writable stream should start in writable state');

enqueue('Hello');
Expand Down Expand Up @@ -312,9 +312,7 @@ test('Piping from an empty ReadableStream which becomes errored after pipeTo cal
}
});

rs.pipeTo(ws).finished.catch(
e => t.equal(e, passedError, 'pipeTo finished promise should reject with the passed error')
);
rs.pipeTo(ws).catch(e => t.equal(e, passedError, 'pipeTo should reject with the passed error'));
t.equal(ws.state, 'writable', 'writable stream should start out writable');
errorReadableStream(passedError);
});
Expand Down Expand Up @@ -357,9 +355,7 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st
startPromise.then(() => {
t.equal(ws.state, 'writable', 'ws should start writable');

rs.pipeTo(ws).finished.catch(
e => t.equal(e, theError, 'pipeTo finished promise should reject with the passed error')
);
rs.pipeTo(ws).catch(e => t.equal(e, theError, 'pipeTo should reject with the passed error'));
t.equal(ws.state, 'writable', 'ws should be writable after pipe');

errorWritableStream(theError);
Expand Down Expand Up @@ -792,9 +788,9 @@ test('Piping to a stream that has been aborted passes through the error as the c
const passedReason = new Error('I don\'t like you.');
ws.abort(passedReason);

rs.pipeTo(ws).finished.catch(e => {
t.equal(e, passedReason, 'pipeTo finished promise should reject with the cancellation reason');
t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason');
rs.pipeTo(ws).catch(e => {
t.equal(e, passedReason, 'pipeTo rejection reason should be the cancellation reason');
t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason');
t.end();
});
});
Expand All @@ -813,9 +809,9 @@ test('Piping to a stream and then aborting it passes through the error as the ca
const pipeToPromise = rs.pipeTo(ws);
ws.abort(passedReason);

pipeToPromise.finished.catch(e => {
t.equal(e, passedReason, 'pipeTo finished promise should reject with the abortion reason');
t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason');
pipeToPromise.catch(e => {
t.equal(e, passedReason, 'pipeTo rejection reason should be the abortion reason');
t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason');
t.end();
});
});
Expand All @@ -831,8 +827,8 @@ test('Piping to a stream that has been closed propagates a TypeError cancellatio
const ws = new WritableStream();
ws.close();

rs.pipeTo(ws).finished.catch(e => {
t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError');
rs.pipeTo(ws).catch(e => {
t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError');
t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError');
t.end();
});
Expand All @@ -848,11 +844,11 @@ test('Piping to a stream and then closing it propagates a TypeError cancellation

const ws = new WritableStream();

const pipeToPromise = rs.pipeTo(ws).finished;
const pipeToPromise = rs.pipeTo(ws);
ws.close();

pipeToPromise.catch(e => {
t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError');
t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError');
t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError');
t.end();
});
Expand Down Expand Up @@ -917,10 +913,10 @@ test('Piping to a stream that errors on write should not pass through the error
}
});

rs.pipeTo(ws).finished.then(
() => t.fail('pipeTo finished promise should not fulfill'),
rs.pipeTo(ws).then(
() => t.fail('pipeTo should not fulfill'),
r => {
t.equal(r, passedError, 'pipeTo finished promise should reject with the same error as the write');
t.equal(r, passedError, 'pipeTo should reject with the same error as the write');
t.equal(cancelCalled, false, 'cancel should not have been called');
t.end();
}
Expand Down Expand Up @@ -990,7 +986,7 @@ test('Piping to a writable stream that does not consume the writes fast enough e
});

startPromise.then(() => {
rs.pipeTo(ws).finished.then(() => {
rs.pipeTo(ws).then(() => {
t.deepEqual(enqueueReturnValues, [true, true, true, false], 'backpressure was correctly exerted at the source');
t.deepEqual(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks were written');
t.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ export default (label, factory) => {
startPromise.then(() => {
t.equal(ws.state, 'writable', 'writable stream should start in writable state');

rs.pipeTo(ws).finished.then(() => {
t.pass('pipeTo finished promise should be fulfilled');
rs.pipeTo(ws).then(() => {
t.pass('pipeTo promise should be fulfilled');
t.equal(ws.state, 'closed', 'writable stream should become closed');
});
});
Expand Down
Loading

0 comments on commit f11a86f

Please sign in to comment.