Skip to content

Commit d016b9d

Browse files
committed
stream: finished callback for closed streams
Previously finished(stream, cb) would not invoke the callback for streams that have already finished, ended or errored before being passed to finished(stream, cb). PR-URL: #31509 Refs: #31508 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent e559842 commit d016b9d

File tree

6 files changed

+158
-14
lines changed

6 files changed

+158
-14
lines changed

lib/_stream_readable.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ function ReadableState(options, stream, isDuplex) {
148148
// Indicates whether the stream has errored.
149149
this.errored = false;
150150

151+
// Indicates whether the stream has finished destroying.
152+
this.closed = false;
153+
151154
// Crypto is kind of old and crusty. Historically, its default string
152155
// encoding is 'binary' so we have to make this configurable.
153156
// Everything else in the universe uses 'utf8', though.

lib/_stream_writable.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@ function WritableState(options, stream, isDuplex) {
175175
// is disabled we need a way to tell whether the stream has failed.
176176
this.errored = false;
177177

178+
// Indicates whether the stream has finished destroying.
179+
this.closed = false;
180+
178181
// Count buffered requests
179182
this.bufferedRequestCount = 0;
180183

lib/internal/streams/async_iterator.js

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,6 @@ function finish(self, err) {
6767
return new Promise((resolve, reject) => {
6868
const stream = self[kStream];
6969

70-
// TODO(ronag): Remove this check once finished() handles
71-
// already ended and/or destroyed streams.
72-
const ended = stream.destroyed || stream.readableEnded ||
73-
(stream._readableState && stream._readableState.endEmitted);
74-
75-
if (ended) {
76-
resolve(createIterResult(undefined, true));
77-
return;
78-
}
79-
8070
finished(stream, (err) => {
8171
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
8272
reject(err);

lib/internal/streams/destroy.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ function destroy(err, cb) {
4848
}
4949
}
5050

51+
if (w) {
52+
w.closed = true;
53+
}
54+
if (r) {
55+
r.closed = true;
56+
}
57+
5158
if (cb) {
5259
// Invoke callback before scheduling emitClose so that callback
5360
// can schedule before.
@@ -101,6 +108,7 @@ function undestroy() {
101108
const w = this._writableState;
102109

103110
if (r) {
111+
r.closed = false;
104112
r.destroyed = false;
105113
r.errored = false;
106114
r.reading = false;
@@ -110,6 +118,7 @@ function undestroy() {
110118
}
111119

112120
if (w) {
121+
w.closed = false;
113122
w.destroyed = false;
114123
w.errored = false;
115124
w.ended = false;

lib/internal/streams/end-of-stream.js

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ function isWritableFinished(stream) {
3232
return wState.finished || (wState.ended && wState.length === 0);
3333
}
3434

35+
function nop() {}
36+
3537
function eos(stream, opts, callback) {
3638
if (arguments.length === 2) {
3739
callback = opts;
@@ -52,20 +54,23 @@ function eos(stream, opts, callback) {
5254
let writable = opts.writable ||
5355
(opts.writable !== false && isWritable(stream));
5456

57+
const wState = stream._writableState;
58+
const rState = stream._readableState;
59+
5560
const onlegacyfinish = () => {
5661
if (!stream.writable) onfinish();
5762
};
5863

5964
let writableFinished = stream.writableFinished ||
60-
(stream._writableState && stream._writableState.finished);
65+
(rState && rState.finished);
6166
const onfinish = () => {
6267
writable = false;
6368
writableFinished = true;
6469
if (!readable) callback.call(stream);
6570
};
6671

6772
let readableEnded = stream.readableEnded ||
68-
(stream._readableState && stream._readableState.endEmitted);
73+
(rState && rState.endEmitted);
6974
const onend = () => {
7075
readable = false;
7176
readableEnded = true;
@@ -79,7 +84,7 @@ function eos(stream, opts, callback) {
7984
const onclose = () => {
8085
let err;
8186
if (readable && !readableEnded) {
82-
if (!stream._readableState || !stream._readableState.ended)
87+
if (!rState || !rState.ended)
8388
err = new ERR_STREAM_PREMATURE_CLOSE();
8489
return callback.call(stream, err);
8590
}
@@ -99,7 +104,7 @@ function eos(stream, opts, callback) {
99104
stream.on('abort', onclose);
100105
if (stream.req) onrequest();
101106
else stream.on('request', onrequest);
102-
} else if (writable && !stream._writableState) { // legacy streams
107+
} else if (writable && !wState) { // legacy streams
103108
stream.on('end', onlegacyfinish);
104109
stream.on('close', onlegacyfinish);
105110
}
@@ -114,7 +119,24 @@ function eos(stream, opts, callback) {
114119
if (opts.error !== false) stream.on('error', onerror);
115120
stream.on('close', onclose);
116121

122+
const closed = (wState && wState.closed) || (rState && rState.closed) ||
123+
(wState && wState.errorEmitted) || (rState && rState.errorEmitted) ||
124+
(wState && wState.finished) || (rState && rState.endEmitted) ||
125+
(rState && stream.req && stream.aborted);
126+
127+
if (closed) {
128+
// TODO(ronag): Re-throw error if errorEmitted?
129+
// TODO(ronag): Throw premature close as if finished was called?
130+
// before being closed? i.e. if closed but not errored, ended or finished.
131+
// TODO(ronag): Throw some kind of error? Does it make sense
132+
// to call finished() on a "finished" stream?
133+
process.nextTick(() => {
134+
callback();
135+
});
136+
}
137+
117138
return function() {
139+
callback = nop;
118140
stream.removeListener('aborted', onclose);
119141
stream.removeListener('complete', onfinish);
120142
stream.removeListener('abort', onclose);

test/parallel/test-stream-finished.js

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,3 +215,120 @@ const { promisify } = require('util');
215215
w.end('asd');
216216
w.destroy();
217217
}
218+
219+
function testClosed(factory) {
220+
{
221+
// If already destroyed but finished is cancelled in same tick
222+
// don't invoke the callback,
223+
224+
const s = factory();
225+
s.destroy();
226+
const dispose = finished(s, common.mustNotCall());
227+
dispose();
228+
}
229+
230+
{
231+
// If already destroyed invoked callback.
232+
233+
const s = factory();
234+
s.destroy();
235+
finished(s, common.mustCall());
236+
}
237+
238+
{
239+
// Don't invoke until destroy has completed.
240+
241+
let destroyed = false;
242+
const s = factory({
243+
destroy(err, cb) {
244+
setImmediate(() => {
245+
destroyed = true;
246+
cb();
247+
});
248+
}
249+
});
250+
s.destroy();
251+
finished(s, common.mustCall(() => {
252+
assert.strictEqual(destroyed, true);
253+
}));
254+
}
255+
256+
{
257+
// Invoke callback even if close is inhibited.
258+
259+
const s = factory({
260+
emitClose: false,
261+
destroy(err, cb) {
262+
cb();
263+
finished(s, common.mustCall());
264+
}
265+
});
266+
s.destroy();
267+
}
268+
269+
{
270+
// Invoke with deep async.
271+
272+
const s = factory({
273+
destroy(err, cb) {
274+
setImmediate(() => {
275+
cb();
276+
setImmediate(() => {
277+
finished(s, common.mustCall());
278+
});
279+
});
280+
}
281+
});
282+
s.destroy();
283+
}
284+
}
285+
286+
testClosed((opts) => new Readable({ ...opts }));
287+
testClosed((opts) => new Writable({ write() {}, ...opts }));
288+
289+
{
290+
const w = new Writable({
291+
write(chunk, encoding, cb) {
292+
cb();
293+
},
294+
autoDestroy: false
295+
});
296+
w.end('asd');
297+
process.nextTick(() => {
298+
finished(w, common.mustCall());
299+
});
300+
}
301+
302+
{
303+
const w = new Writable({
304+
write(chunk, encoding, cb) {
305+
cb(new Error());
306+
},
307+
autoDestroy: false
308+
});
309+
w.write('asd');
310+
w.on('error', common.mustCall(() => {
311+
finished(w, common.mustCall());
312+
}));
313+
}
314+
315+
316+
{
317+
const r = new Readable({
318+
autoDestroy: false
319+
});
320+
r.push(null);
321+
r.resume();
322+
r.on('end', common.mustCall(() => {
323+
finished(r, common.mustCall());
324+
}));
325+
}
326+
327+
{
328+
const rs = fs.createReadStream(__filename, { autoClose: false });
329+
rs.resume();
330+
rs.on('close', common.mustNotCall());
331+
rs.on('end', common.mustCall(() => {
332+
finished(rs, common.mustCall());
333+
}));
334+
}

0 commit comments

Comments
 (0)