Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align WritableStream structure with ReadableStream structure #488

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 234 additions & 57 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,95 +86,272 @@ class ReadableStream {
}

pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) {
// brandcheck

preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);

const source = this;

let reader;
let lastRead;
let lastWrite;
let closedPurposefully = false;
let resolvePipeToPromise;
let rejectPipeToPromise;
let _resolvePipeToPromise;
let _rejectPipeToPromise;

let _reader;
let _writer;

let _state = 'piping';

let _lastRead;
let _lastWrite;
let _allWrites;

return new Promise((resolve, reject) => {
resolvePipeToPromise = resolve;
rejectPipeToPromise = reject;
_resolvePipeToPromise = resolve;
_rejectPipeToPromise = reject;

reader = source.getReader();
_reader = source.getReader();
_writer = dest.getWriter();

reader.closed.catch(abortDest);
dest.closed.then(
() => {
if (!closedPurposefully) {
cancelSource(new TypeError('destination is closing or closed and cannot be piped to anymore'));
}
},
cancelSource
_reader.closed.catch(handleReaderClosedRejection);
_writer.closed.then(
handleWriterClosedFulfillment,
handleWriterClosedRejection
);

doPipe();
});

function releaseReader() {
console.log('pipeTo(): releaseReader()');

_reader.releaseLock();
_reader = undefined;
}

function releaseWriter() {
console.log('pipeTo(): releaseWriter()');

_writer.releaseLock();
_writer = undefined;
}

function done() {
console.log('pipeTo(): done()');

assert(_reader === undefined);
assert(_writer === undefined);

_state = 'done';

_lastRead = undefined;
_lastWrite = undefined;
_allWrites = undefined;
}

function finishWithFulfillment() {
console.log('pipeTo(): finishWithFulfillment()');

_resolvePipeToPromise(undefined);
_resolvePipeToPromise = undefined;
_rejectPipeToPromise = undefined;

done();
}

function finishWithRejection(reason) {
console.log('pipeTo(): finishWithRejection()');

_rejectPipeToPromise(reason);
_resolvePipeToPromise = undefined;
_rejectPipeToPromise = undefined;

done();
}

function abortWriterCancelReader(reason, skipAbort, skipCancel) {
const promises = [];

if (skipAbort === false) {
_writer.abort(reason);

releaseWriter();
} else if (_lastWrite === undefined) {
releaseWriter();
} else {
promises.push(_lastWrite.then(
() => {
releaseWriter();
},
() => {
releaseWriter();
}
));
}

if (skipCancel === false) {
_reader.cancel(reason);

releaseReader();
} else if (_lastRead === undefined) {
releaseReader();
} else {
promises.push(_lastRead.then(
() => {
releaseReader();
},
() => {
releaseReader();
}
));
}

if (promises.length > 0) {
Promise.all(promises).then(
() => {
finishWithRejection(reason);
}
);
_state = 'waitingForLastReadAndOrLastWrite';
return;
}

finishWithRejection(reason);
}

function handleWriteRejection(reason) {
console.log('pipeTo(): handleWriteRejection()');

if (_state !== 'piping') {
return;
}

abortWriterCancelReader(reason, preventAbort, preventCancel);
}

function handleReadValue(value) {
console.log('pipeTo(): handleReadValue()');

_lastWrite = _writer.write(value);
_lastWrite.catch(handleWriteRejection);

// dest may be already errored. But proceed to write().
_allWrites = Promise.all([_allWrites, _lastWrite]);

doPipe();
}

function handleReadDone() {
console.log('pipeTo(): handleReadDone()');

// Does not need to wait for lastRead since it occurs only on source closed.

releaseReader();

if (preventClose === false) {
console.log('pipeTo(): Close dest');

// We don't use writer.closed. We can ensure that the microtask for writer.closed is run before any
// writer.close() call so that we can determine whether the closure was caused by the close() or ws was already
// closed before pipeTo(). It's possible but fragile.
_writer.close().then(
() => {
return _allWrites;
},
reason => {
releaseWriter();
finishWithRejection(reason);
}
).then(
() => {
releaseWriter();
finishWithFulfillment();
}
);
_state = 'closingDest';

return;
}

if (_lastWrite === undefined) {
releaseWriter()
finishWithFulfillment();
return;
}

// We don't use writer.closed. pipeTo() is responsible only for what it has written.
_lastWrite.then(
() => {
releaseWriter();
finishWithFulfillment();
},
reason => {
releaseWriter();
finishWithRejection(reason)
}
);
_state = 'waitingLastWriteOnReadableClosed';
}

function doPipe() {
lastRead = reader.read();
Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => {
if (Boolean(done) === true) {
closeDest();
} else if (dest.state === 'writable') {
lastWrite = dest.write(value);
doPipe();
console.log('pipeTo(): doPipe()');

_lastRead = _reader.read();

Promise.all([_lastRead, _writer.ready]).then(
([{ value, done }]) => {
if (_state !== 'piping') {
return;
}

if (Boolean(done) === false) {
handleReadValue(value);
} else {
handleReadDone();
}
},
() => {
// Do nothing
}
})
)
.catch(rethrowAssertionErrorRejection);

// Any failures will be handled by listening to reader.closed and dest.closed above.
// TODO: handle malicious dest.write/dest.close?
}

function cancelSource(reason) {
if (preventCancel === false) {
reader.cancel(reason);
reader.releaseLock();
rejectPipeToPromise(reason);
} else {
// If we don't cancel, we need to wait for lastRead to finish before we're allowed to release.
// We don't need to handle lastRead failing because that will trigger abortDest which takes care of
// both of these.
lastRead.then(() => {
reader.releaseLock();
rejectPipeToPromise(reason);
});
function handleReaderClosedRejection(reason) {
console.log('pipeTo(): handleReaderClosedRejection()');

if (_state !== 'piping') {
return;
}
}

function closeDest() {
// Does not need to wait for lastRead since it occurs only on source closed.
_lastRead = undefined;
abortWriterCancelReader(reason, preventAbort, true);
}

reader.releaseLock();
function handleUnexpectedWriterCloseAndError(reason) {
console.log('pipeTo(): handleUnexpectedWriterCloseAndError()');

const destState = dest.state;
if (preventClose === false && (destState === 'waiting' || destState === 'writable')) {
closedPurposefully = true;
dest.close().then(resolvePipeToPromise, rejectPipeToPromise);
} else if (lastWrite !== undefined) {
lastWrite.then(resolvePipeToPromise, rejectPipeToPromise);
} else {
resolvePipeToPromise();
if (_state !== 'piping') {
return;
}

_lastWrite = undefined;
abortWriterCancelReader(reason, true, preventCancel);
}

function abortDest(reason) {
// Does not need to wait for lastRead since it only occurs on source errored.
function handleWriterClosedFulfillment() {
console.log('pipeTo(): handleWriterClosedFulfillment()');

reader.releaseLock();
handleUnexpectedWriterCloseAndError(new TypeError('dest closed unexpectedly'));
}

if (preventAbort === false) {
dest.abort(reason);
}
rejectPipeToPromise(reason);
function handleWriterClosedRejection(reason) {
console.log('pipeTo(): handleWriterClosedRejection()');

handleUnexpectedWriterCloseAndError(reason);
}
}

Expand Down
Loading