Skip to content

Commit

Permalink
stream: make pipeline try to wait for 'close'
Browse files Browse the repository at this point in the history
Pipeline uses eos which will invoke the callback
on 'finish' and 'end' before all streams have been
fully destroyed.

Fixes: nodejs#32032
  • Loading branch information
ronag committed Mar 25, 2020
1 parent 388cef6 commit 4f76b58
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
25 changes: 25 additions & 0 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,47 @@ function eos(stream, opts, callback) {

const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
const willEmitClose = (
state &&
state.autoDestroy &&
state.emitClose &&
state.closed === false
);

let writableFinished = stream.writableFinished ||
(wState && wState.finished);
const onfinish = () => {
writableFinished = true;
if (willEmitClose && (!stream.readable || readable)) {
// TODO(ronag): Duplex won't autoDestroy if `stream.readable` was
// explicitly set to false.
if (!rState || rState.readable !== false) {
return;
}
}
if (!readable || readableEnded) callback.call(stream);
};

let readableEnded = stream.readableEnded ||
(rState && rState.endEmitted);
const onend = () => {
readableEnded = true;
if (willEmitClose && (!stream.writable || writable)) {
// TODO(ronag): Duplex won't autoDestroy if `stream.writable` was
// explicitly set to false.
if (!wState || wState.writable !== false) {
return;
}
}
if (!writable || writableFinished) callback.call(stream);
};

Expand Down
43 changes: 42 additions & 1 deletion test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const {
Readable,
Transform,
pipeline,
PassThrough
PassThrough,
Duplex
} = require('stream');
const assert = require('assert');
const http = require('http');
Expand Down Expand Up @@ -1077,3 +1078,43 @@ const { promisify } = require('util');
assert.ifError(err);
}));
}

{
let closed = false;
const src = new Readable({
read() {},
destroy(err, cb) {
process.nextTick(cb);
}
});
const dst = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
src.on('close', () => {
closed = true;
});
src.push(null);
pipeline(src, dst, common.mustCall((err) => {
assert.strictEqual(closed, true);
}));
}

{
let closed = false;
const src = new Readable({
read() {},
destroy(err, cb) {
process.nextTick(cb);
}
});
const dst = new Duplex({});
src.on('close', common.mustCall(() => {
closed = true;
}));
src.push(null);
pipeline(src, dst, common.mustCall((err) => {
assert.strictEqual(closed, true);
}));
}

0 comments on commit 4f76b58

Please sign in to comment.