Skip to content
Permalink
Browse files

stream: reset flowing state if no 'readable' or 'data' listeners

If we don't have any 'readable' or 'data' listeners and we are
not about to resume. Then reset flowing state to initial null state.

PR-URL: #31036
Fixes: #24474
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
ronag authored and BridgeAR committed Dec 20, 2019
1 parent 1463214 commit bca23b9e16c2780070d5682d567dba1cb9107153
@@ -28,6 +28,7 @@ const {
ObjectDefineProperty,
ObjectSetPrototypeOf,
SymbolAsyncIterator,
Symbol
} = primordials;

module.exports = Readable;
@@ -51,6 +52,8 @@ const {
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
} = require('internal/errors').codes;

const kPaused = Symbol('kPaused');

// Lazy loaded to improve the startup performance.
let StringDecoder;
let createReadableStreamAsyncIterator;
@@ -126,7 +129,7 @@ function ReadableState(options, stream, isDuplex) {
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this.paused = true;
this[kPaused] = null;

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
@@ -170,6 +173,16 @@ ObjectDefineProperty(ReadableState.prototype, 'pipesCount', {
}
});

// Legacy property for `paused`
ObjectDefineProperty(ReadableState.prototype, 'paused', {
get() {
return this[kPaused] !== false;
},
set(value) {
this[kPaused] = !!value;
}
});

function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
@@ -365,7 +378,8 @@ function chunkInvalid(state, chunk) {


Readable.prototype.isPaused = function() {
return this._readableState.flowing === false;
const state = this._readableState;
return state[kPaused] === true || state.flowing === false;
};

// Backwards compatibility.
@@ -962,14 +976,16 @@ function updateReadableListening(self) {
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;

if (state.resumeScheduled && !state.paused) {
if (state.resumeScheduled && state[kPaused] === false) {
// Flowing needs to be set to true now, otherwise
// the upcoming resume will not flow.
state.flowing = true;

// Crude way to check if we should resume
} else if (self.listenerCount('data') > 0) {
self.resume();
} else if (!state.readableListening) {
state.flowing = null;
}
}

@@ -990,7 +1006,7 @@ Readable.prototype.resume = function() {
state.flowing = !state.readableListening;
resume(this, state);
}
state.paused = false;
state[kPaused] = false;
return this;
};

@@ -1021,7 +1037,7 @@ Readable.prototype.pause = function() {
this._readableState.flowing = false;
this.emit('pause');
}
this._readableState.paused = true;
this._readableState[kPaused] = true;
return this;
};

@@ -0,0 +1,19 @@
'use strict';
const common = require('../common');

const { Readable } = require('stream');

const readable = new Readable({
read() {}
});

function read() {}

readable.setEncoding('utf8');
readable.on('readable', read);
readable.removeListener('readable', read);

process.nextTick(function() {
readable.on('data', common.mustCall());
readable.push('hello');
});
@@ -1,6 +1,7 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');

let ticks = 18;
@@ -38,3 +39,20 @@ function readAndPause() {

rs.on('data', ondata);
}

{
const readable = new Readable({
read() {}
});

function read() {}

readable.setEncoding('utf8');
readable.on('readable', read);
readable.removeListener('readable', read);
readable.pause();

process.nextTick(function() {
assert(readable.isPaused());
});
}

0 comments on commit bca23b9

Please sign in to comment.
You can’t perform that action at this time.