Hanging streams due to internal pause() without corresponding resume() during pipe() #8351
Comments
@patrick-steele-idem do you have some sort of a test case to demonstrate the issue? |
I spent some more time on this issue and I seem to have also uncovered a significant performance regression, as well as some other bug related to streaming. Here's a program to illustrate the problem: var inherits = require('util').inherits;
var Readable = require('stream').Readable;
var fs = require('fs');
process.on('exit', function(code) {
console.log('\nExiting with code ', code);
});
// Helper Readable stream that just writes out numbers 0-9 up to the given max number of times
function MyReadable(max) {
Readable.call(this, { encoding: 'utf8' });
this.count = 0;
this.max = max;
}
inherits(MyReadable, Readable);
MyReadable.prototype._read = function() {
while (true) {
if (this.count === this.max) {
this.push('\n');
this.push(null);
break;
}
this.count++;
if (this.push((this.count % 10).toString()) === false) {
break;
}
}
};
// *** WORKING IMPLEMENTATION ***
function createCombinedStreamGood() {
var combined = new Readable();
var streams = [];
var readStarted = false;
var curStream;
var i = -1;
var len;
var paused = false;
function onData(chunk) {
if (combined.push(chunk) === false) {
// If backpressure was applied then
// lets pause the underlying stream
paused = true;
curStream.pause();
console.log('Pausing the underlying stream...');
}
}
function onError(err) {
combined.emit('error', err);
}
function next() {
if (++i >= len) {
// we're done
combined.push(null);
} else {
curStream = streams[i];
curStream.on('end', next);
curStream.on('data', onData);
curStream.on('error', onError);
}
}
combined._read = function() {
if (readStarted) {
if (paused) {
console.log('Resuming the underlying stream...');
// If backpressure was applied then we previously
// paused the underlying stream. Therefore, we need
// to resume it now
paused = false;
curStream.resume();
}
} else {
readStarted = true;
len = streams.length;
if (len === 0) {
combined.push(null);
return;
} else {
next();
}
}
};
combined.addStream = function(stream) {
streams.push(stream);
};
return combined;
}
// *** BUGGY IMPLEMENTATION ***
function createCombinedStreamBad() {
var combined = new Readable();
var streams = [];
var readStarted = false;
var curStream;
var i = -1;
var len;
function onData(chunk) {
combined.push(chunk);
}
function onError(err) {
combined.emit('error', err);
}
function next() {
if (++i >= len) {
// we're done
combined.push(null);
} else {
curStream = streams[i];
curStream.on('end', next);
curStream.on('data', onData);
curStream.on('error', onError);
}
}
combined._read = function() {
if (!readStarted) {
readStarted = true;
len = streams.length;
if (len === 0) {
combined.push(null);
return;
}
next();
}
};
combined.pause = function(stream) {
// Pause the current underlying stream if this stream is paused
if (curStream) {
curStream.pause();
}
Readable.prototype.pause.call(this);
};
combined.resume = function(stream) {
// Resume the current underlying stream if this stream is resumed
if (curStream) {
curStream.resume();
}
Readable.prototype.resume.call(this);
};
combined.addStream = function(stream) {
streams.push(stream);
};
return combined;
}
function run(name, callback) {
var startTime = Date.now();
console.log('\nRunning (' + name + ')...');
var combinedStream = name === 'good' ?
createCombinedStreamGood() :
createCombinedStreamBad();
combinedStream.addStream(new MyReadable(10000));
combinedStream.addStream(new MyReadable(10000));
combinedStream.addStream(new MyReadable(10000));
var outFile = 'out-' + name + '.txt';
console.log('Writing to "' + outFile + '"...');
combinedStream
.on('end', function() {
console.log('Combined stream ended for "' + name + '"');
})
.on('error', function(err) {
console.error('ERROR: ', err);
})
.pipe(fs.createWriteStream(outFile, 'utf8'))
.on('error', function(err) {
console.error('ERROR: ', err);
})
.on('close', function() {
var delta = Date.now() - startTime;
console.log('Completed writing to "' + outFile + '" in ' + delta + 'ms!');
callback();
});
}
// Run the "good" version and then the "bad" version (one after another)
run('good', function() {
run('bad', function() {
console.log('\nAll done!');
});
}); Running the above program on Node.js 0.10.31 produces the correct results for both the "good" and "bad" implementation. You should see the following output when running the program:
Running the above program on Node.js
In addition, to the different output, also notice the huge time difference for the "good" run!: 1573ms versus 29ms When running the program on Node.js |
@patrick-steele-idem the problem is that you are overwriting What about performance, I think it's due general buffer performance degradation, see #7633. It's better on master, though not much. |
I agree that How about the other issue? Why is the "bad" version exiting early and never fully writing to the output file? There is no The performance problem is concerning, but hopefully it has been resolved on master. |
@patrick-steele-idem I see no problem here. The stream has been paused and it's not going to be resumed, nothing is going to happen, so the program exits. |
Thanks @vkurchatkin. Seems reasonable about the process exiting after giving it some additional thought. I'm okay with closing this issue and hopefully no one else will think it is a good idea to override either the Thanks again. |
After switching from Node.js v0.10.31 to v0.11.13 I started noticing that some streaming operations are hanging. I tracked it down and found that it appears to be due to the introduction of an internal and automatic
src.pause()
during piping without a correspondingsrc.resume()
. The followingsrc.pause()
line is indicated below:Please see: _stream_readable.js:540
It makes perfect sense that a source stream would be paused if the destination stream says that its buffer is full during the
write()
call. However, what doesn't make sense is that there is no correspondingresume()
when the destination stream is drained. This leaves the source stream in a permanent paused state. Instead of callingresume()
when the destination stream is drained I see that only theflowing
state is set to true:_stream_readable.js:601
How is a source
Readable
stream supposed to know that it is supposed to start producing data again? I tried to workaround around this problem by callingresume()
in my implementation of_read()
but I then found that automaticpause()
was being called after the last_read()
call for some unknown reason.This seems like a bug to me, but maybe I am missing something. In the meantime I had to disable pausing of the stream as a workaround.
The following commit by @isaacs appears to have introduced this problem: 0f8de5e
Based on my research so far I see a few options:
Option 1) Add a corresponding
src.resume()
when the destination stream is drained.That is, replace
state.flowing = true;
withsrc.resume()
at the following line:_stream_readable.js:601
Option 2) Don't use
pause()
to control back-pressureFrom the docs it appears that the return value of
this.push()
should be used to control back-pressure.Option 3) Some how differentiate between between an internal
pause()
(done as a result of back-pressure) and an outsidepause()
(that may been done for some unknown outside reason).I can work on isolating out my code to help you reproduce the problem, but it will take some time. In the mean time, maybe someone can provide some additional insights or suggested fixes?
Edit:
Further clarification: in my case I am creating a
Readable
stream that wraps one or more underlyingReadable
streams. When the wrapping stream is paused, the underlying wrapped streams are paused. Whenpause()
is called on the wrapping stream, but there is no subsequentresume()
then the wrapped stream remains permanently paused.The text was updated successfully, but these errors were encountered: