Skip to content

Commit 563fff2

Browse files
mafintoshmcollina
authored andcommitted
stream: defer readable and flow when sync
PR-URL: #18515 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent bc51428 commit 563fff2

File tree

3 files changed

+119
-5
lines changed

3 files changed

+119
-5
lines changed

lib/_stream_readable.js

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -486,11 +486,18 @@ function onEofChunk(stream, state) {
486486
}
487487
state.ended = true;
488488

489-
// emit 'readable' now to make sure it gets picked up.
490-
state.needReadable = false;
491-
if (!state.emittedReadable) {
492-
state.emittedReadable = true;
493-
emitReadable_(stream);
489+
if (state.sync && state.length) {
490+
// if we are sync and have data in the buffer, wait until next tick
491+
// to emit the data. otherwise we risk emitting data in the flow()
492+
// the readable code triggers during a read() call
493+
emitReadable(stream);
494+
} else {
495+
// emit 'readable' now to make sure it gets picked up.
496+
state.needReadable = false;
497+
if (!state.emittedReadable) {
498+
state.emittedReadable = true;
499+
emitReadable_(stream);
500+
}
494501
}
495502
}
496503

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { Readable, Writable, PassThrough } = require('stream');
4+
5+
{
6+
let ticks = 17;
7+
8+
const rs = new Readable({
9+
objectMode: true,
10+
read: () => {
11+
if (ticks-- > 0)
12+
return process.nextTick(() => rs.push({}));
13+
rs.push({});
14+
rs.push(null);
15+
}
16+
});
17+
18+
const ws = new Writable({
19+
highWaterMark: 0,
20+
objectMode: true,
21+
write: (data, end, cb) => setImmediate(cb)
22+
});
23+
24+
rs.on('end', common.mustCall());
25+
ws.on('finish', common.mustCall());
26+
rs.pipe(ws);
27+
}
28+
29+
{
30+
let missing = 8;
31+
32+
const rs = new Readable({
33+
objectMode: true,
34+
read: () => {
35+
if (missing--) rs.push({});
36+
else rs.push(null);
37+
}
38+
});
39+
40+
const pt = rs
41+
.pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }))
42+
.pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));
43+
44+
pt.on('end', function() {
45+
wrapper.push(null);
46+
});
47+
48+
const wrapper = new Readable({
49+
objectMode: true,
50+
read: () => {
51+
process.nextTick(function() {
52+
let data = pt.read();
53+
if (data === null) {
54+
pt.once('readable', function() {
55+
data = pt.read();
56+
if (data !== null) wrapper.push(data);
57+
});
58+
} else {
59+
wrapper.push(data);
60+
}
61+
});
62+
}
63+
});
64+
65+
wrapper.resume();
66+
wrapper.on('end', common.mustCall());
67+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
'use strict';
2+
3+
const { Readable } = require('stream');
4+
const common = require('../common');
5+
6+
let ticks = 18;
7+
let expectedData = 19;
8+
9+
const rs = new Readable({
10+
objectMode: true,
11+
read: () => {
12+
if (ticks-- > 0)
13+
return process.nextTick(() => rs.push({}));
14+
rs.push({});
15+
rs.push(null);
16+
}
17+
});
18+
19+
rs.on('end', common.mustCall());
20+
readAndPause();
21+
22+
function readAndPause() {
23+
// Does a on(data) -> pause -> wait -> resume -> on(data) ... loop.
24+
// Expects on(data) to never fire if the stream is paused.
25+
const ondata = common.mustCall((data) => {
26+
rs.pause();
27+
28+
expectedData--;
29+
if (expectedData <= 0)
30+
return;
31+
32+
setImmediate(function() {
33+
rs.removeListener('data', ondata);
34+
readAndPause();
35+
rs.resume();
36+
});
37+
}, 1); // only call ondata once
38+
39+
rs.on('data', ondata);
40+
}

0 commit comments

Comments
 (0)