Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: eos on closed #28748

Closed
wants to merge 12 commits into from
10 changes: 0 additions & 10 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,6 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
Expand Down
40 changes: 23 additions & 17 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,49 @@ function eos(stream, opts, callback) {

callback = once(callback);

const onerror = (err) => {
callback.call(stream, err);
};

let writableFinished = stream.writableFinished ||
(stream._writableState && stream._writableState.finished);
let readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);

if (writableFinished || readableEnded || stream.destroyed ||
stream.aborted) {
trivikr marked this conversation as resolved.
Show resolved Hide resolved
if (opts.error !== false) stream.on('error', onerror);
// A destroy(err) call emits error in nextTick.
process.nextTick(callback.bind(stream));
ronag marked this conversation as resolved.
Show resolved Hide resolved
return () => {
stream.removeListener('error', onerror);
};
}

let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

var writableEnded = stream._writableState && stream._writableState.finished;
const onfinish = () => {
writable = false;
writableEnded = true;
writableFinished = true;
if (!readable) callback.call(stream);
};

var readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
if (!writable) callback.call(stream);
};

const onerror = (err) => {
callback.call(stream, err);
};

const onclose = () => {
let err;
if (readable && !readableEnded) {
if (!stream._readableState || !stream._readableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
if (writable && !writableEnded) {
if (!stream._writableState || !stream._writableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (writable && !writableFinished) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
Copy link
Member Author

@ronag ronag Sep 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an added test for this change. We should always be emitting premature close unless we've seen finish or end events. I don't think this actually changes anything in practice.

};

Expand Down
106 changes: 106 additions & 0 deletions test/parallel/test-http-client-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,109 @@ const { finished } = require('stream');
.end();
}));
}

{
// Test abort before finished.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}, common.mustNotCall());
req.abort();
finished(req, common.mustCall(() => {
server.close();
}));
}));
}

{
// Test abort after request.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).end();
finished(req, (err) => {
common.expectsError({
type: Error,
code: 'ERR_STREAM_PREMATURE_CLOSE'
})(err);
finished(req, common.mustCall(() => {
server.close();
}));
});
req.abort();
}));
}

{
// Test abort before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
req.abort();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test destroy before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@addaleax: Isn't it a bit strange that the 'data' handler is required here in order for the response to end/close? Likewise in a test further down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @addaleax? I think this is out of the scope of this PR. Please just confirm whether this is strange and we can create a new issue for later investigation.

res.destroy();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test finish after end.

const server = http.createServer(function(req, res) {
res.end('asd');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}
140 changes: 137 additions & 3 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ const { promisify } = require('util');
}));
}

{
const rs = new Readable();

finished(rs, common.mustCall((err) => {
assert(err, 'premature close error');
}));

rs.push(null);
rs.emit('close');
rs.resume();
}

{
const rs = new Readable();

Expand All @@ -105,7 +117,9 @@ const { promisify } = require('util');
}));

rs.push(null);
rs.emit('close'); // Should not trigger an error
rs.on('end', common.mustCall(() => {
rs.emit('close'); // Should not trigger an error
}));
rs.resume();
}

Expand Down Expand Up @@ -155,8 +169,9 @@ const { promisify } = require('util');
rs.resume();
}

// Test that calling returned function removes listeners
{
// Nothing happens if disposed.

const ws = new Writable({
write(data, env, cb) {
cb();
Expand All @@ -168,6 +183,8 @@ const { promisify } = require('util');
}

{
// Nothing happens if disposed.

const rs = new Readable();
const removeListeners = finished(rs, common.mustNotCall());
removeListeners();
Expand All @@ -178,9 +195,126 @@ const { promisify } = require('util');
}

{
// Completed if readable-like is ended before.

const streamLike = new EE();
streamLike.readableEnded = true;
streamLike.readable = true;
finished(streamLike, common.mustCall);
finished(streamLike, common.mustCall());
}

{
// Completed if readable-like is never ended.

const streamLike = new EE();
streamLike.readableEnded = false;
streamLike.readable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
}));
streamLike.emit('close');
}

{
// Completed if writable-like is destroyed before.

const streamLike = new EE();
streamLike.destroyed = true;
streamLike.writable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if readable-like is aborted before.

const streamLike = new EE();
streamLike.destroyed = true;
streamLike.readable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if writable-like is aborted before.

const streamLike = new EE();
streamLike.aborted = true;
streamLike.writable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if readable-like is aborted before.

const streamLike = new EE();
streamLike.aborted = true;
streamLike.readable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if streamlike is finished before.

const streamLike = new EE();
streamLike.writableFinished = true;
streamLike.writable = true;
finished(streamLike, common.mustCall());
}

{
// Premature close if stream is not finished.

const streamLike = new EE();
streamLike.writableFinished = false;
streamLike.writable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
}));
streamLike.emit('close');
}

{
// Premature close if stream never emitted 'finish'
// even if writableFinished says something else.

const streamLike = new EE();
streamLike.writable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
}));
streamLike.writableFinished = true;
streamLike.emit('close');
}


{
// Premature close if stream never emitted 'end'
// even if readableEnded says something else.

const streamLike = new EE();
streamLike.readable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
}));
streamLike.readableEnded = true;
streamLike.emit('close');
}

{
// Completes if already finished.

const w = new Writable();
finished(w, common.mustCall(() => {
finished(w, common.mustCall());
}));
w.destroy();
}

{
// Completes if already ended.

const r = new Readable();
finished(r, common.mustCall(() => {
finished(r, common.mustCall());
}));
r.destroy();
}