Permalink
Browse files

stream: always defer 'readable' with nextTick

Emit 'readable' always in the next tick, resulting in a single
call to _read() per microtick. This removes the need for the
user to implement buffering if they wanted to call this.push()
multiple times in an asynchronous fashion, as this.push() triggers
this._read() call.

PR-URL: #17979
Fixes: #3203
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information...
mcollina committed Jan 4, 2018
1 parent 800caac commit 1e0f3315c77033ef0e01bb37c3d41c8e1d65e686
@@ -747,6 +747,12 @@ The listener callback will be passed a single `Error` object.
##### Event: 'readable'
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/17979
description: >
'readable' is always emitted in the next tick after
.push() is called
-->
The `'readable'` event is emitted when there is data available to be read from
@@ -1647,6 +1653,13 @@ const myReadable = new Readable({
```
#### readable.\_read(size)
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/17979
description: call _read() only once per microtick
-->
* `size` {number} Number of bytes to read asynchronously
@@ -1666,6 +1679,8 @@ additional data onto the queue.
*Note*: Once the `readable._read()` method has been called, it will not be
called again until the [`readable.push()`][stream-push] method is called.
`readable._read()` is guaranteed to be called only once within a
synchronous execution, i.e. a microtick.
The `size` argument is advisory. For implementations where a "read" is a
single operation that returns data can use the `size` argument to determine how
@@ -267,7 +267,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
@@ -496,7 +495,11 @@ function onEofChunk(stream, state) {
state.ended = true;
// emit 'readable' now to make sure it gets picked up.
emitReadable(stream);
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
emitReadable_(stream);
}
}
// Don't emit readable right away in sync mode, because this can trigger
@@ -508,16 +511,15 @@ function emitReadable(stream) {
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
if (state.sync)
process.nextTick(emitReadable_, stream);
else
emitReadable_(stream);
process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
var state = stream._readableState;
debug('emit readable');
stream.emit('readable');
state.needReadable = !state.flowing && !state.ended;
flow(stream);
}
@@ -537,7 +539,7 @@ function maybeReadMore(stream, state) {
function maybeReadMore_(stream, state) {
var len = state.length;
while (!state.reading && !state.flowing && !state.ended &&
while (!state.reading && !state.ended &&
state.length < state.highWaterMark) {
debug('maybeReadMore read 0');
stream.read(0);
@@ -644,6 +646,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
debug('dest.write', ret);
if (false === ret && !increasedAwaitDrain) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
@@ -824,8 +827,8 @@ function resume(stream, state) {
}
function resume_(stream, state) {
debug('resume', state.reading);
if (!state.reading) {
debug('resume read 0');
stream.read(0);
}
@@ -1087,13 +1090,16 @@ function copyFromBuffer(n, list) {
function endReadable(stream) {
var state = stream._readableState;
debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}
function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length);
// Check that we didn't get one last unshift.
if (!state.endEmitted && state.length === 0) {
state.endEmitted = true;
@@ -8,18 +8,22 @@ const uv = process.binding('uv');
const s = new net.Socket({
handle: {
readStart: function() {
process.nextTick(() => this.onread(uv.UV_EOF, null));
setImmediate(() => this.onread(uv.UV_EOF, null));
},
close: (cb) => process.nextTick(cb)
close: (cb) => setImmediate(cb)
},
writable: false
});
assert.strictEqual(s, s.resume());
const events = [];
s.on('end', () => events.push('end'));
s.on('close', () => events.push('close'));
s.on('end', () => {
events.push('end');
});
s.on('close', () => {
events.push('close');
});
process.on('exit', () => {
assert.deepStrictEqual(events, [ 'end', 'close' ]);
@@ -3,32 +3,24 @@ const common = require('../common');
const stream = require('stream');
const assert = require('assert');
const awaitDrainStates = [
1, // after first chunk before callback
1, // after second chunk before callback
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
];
// A writable stream which pushes data onto the stream which pipes into it,
// but only the first time it's written to. Since it's not paused at this time,
// a second write will occur. If the pipe increases awaitDrain twice, we'll
// never get subsequent chunks because 'drain' is only emitted once.
const writable = new stream.Writable({
write: common.mustCall(function(chunk, encoding, cb) {
if (chunk.length === 32 * 1024) { // first chunk
const beforePush = readable._readableState.awaitDrain;
readable.push(Buffer.alloc(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased.
const afterPush = readable._readableState.awaitDrain;
assert.strictEqual(afterPush - beforePush, 1,
'Counter is not increased for awaitDrain');
}
assert.strictEqual(
awaitDrainStates.shift(),
readable._readableState.awaitDrain,
0,
'State variable awaitDrain is not correct.'
);
if (chunk.length === 32 * 1024) { // first chunk
readable.push(Buffer.alloc(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased in the next
// tick, because awaitDrain is incremented after this method finished
process.nextTick(() => {
assert.strictEqual(readable._readableState.awaitDrain, 1,
'Counter is not increased for awaitDrain');
});
}
cb();
}, 3)
});
@@ -10,30 +10,33 @@ const readable = new Readable({
// Initialized to false.
assert.strictEqual(readable._readableState.emittedReadable, false);
const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
readable.on('readable', common.mustCall(() => {
// emittedReadable should be true when the readable event is emitted
assert.strictEqual(readable._readableState.emittedReadable, true);
readable.read();
assert.deepStrictEqual(readable.read(), expected.shift());
// emittedReadable is reset to false during read()
assert.strictEqual(readable._readableState.emittedReadable, false);
}, 4));
}, 3));
// When the first readable listener is just attached,
// emittedReadable should be false
assert.strictEqual(readable._readableState.emittedReadable, false);
// Each one of these should trigger a readable event.
// These trigger a single 'readable', as things are batched up
process.nextTick(common.mustCall(() => {
readable.push('foo');
}));
process.nextTick(common.mustCall(() => {
readable.push('bar');
}));
process.nextTick(common.mustCall(() => {
// these triggers two readable events
setImmediate(common.mustCall(() => {
readable.push('quo');
}));
process.nextTick(common.mustCall(() => {
readable.push(null);
process.nextTick(common.mustCall(() => {
readable.push(null);
}));
}));
const noRead = new Readable({
@@ -38,16 +38,17 @@ asyncReadable.on('readable', common.mustCall(() => {
// then we need to notify the reader on future changes.
assert.strictEqual(asyncReadable._readableState.needReadable, true);
}
}, 3));
}, 2));
process.nextTick(common.mustCall(() => {
asyncReadable.push('foooo');
}));
process.nextTick(common.mustCall(() => {
asyncReadable.push('bar');
}));
process.nextTick(common.mustCall(() => {
setImmediate(common.mustCall(() => {
asyncReadable.push(null);
assert.strictEqual(asyncReadable._readableState.needReadable, false);
}));
const flowing = new Readable({
@@ -84,13 +85,13 @@ slowProducer.on('readable', common.mustCall(() => {
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push(null);
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
process.nextTick(common.mustCall(() => {
slowProducer.push(null);
}));
}));
}));
}));
Oops, something went wrong.

0 comments on commit 1e0f331

Please sign in to comment.