Permalink
Browse files

http2: fix stream reading resumption

_read should always resume the underlying code that is attempting
to push data to a readable stream. Adjust http2 core code to
resume its reading appropriately.

Some other general cleanup around reading, resuming & draining.

PR-URL: #16580
Fixes: #16578
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information...
apapirovski committed Oct 29, 2017
1 parent b8888f5 commit e63782d7896dc840acdef1bc9b475da8ee4bb998
View
@@ -282,8 +282,13 @@ function onSessionRead(nread, buf, handle) {
'report this as a bug in Node.js');
_unrefActive(owner); // Reset the session timeout timer
_unrefActive(stream); // Reset the stream timeout timer
if (nread >= 0 && !stream.destroyed)
return stream.push(buf);
if (nread >= 0 && !stream.destroyed) {
// prevent overflowing the buffer while pause figures out the
// stream needs to actually pause and streamOnPause runs
if (!stream.push(buf))
owner[kHandle].streamReadStop(id);
return;
}
// Last chunk was received. End the readable side.
stream.push(null);
@@ -1276,8 +1281,6 @@ function onStreamClosed(code) {
}
function streamOnResume() {
if (this._paused)
return this.pause();
if (this[kID] === undefined) {
this.once('ready', streamOnResume);
return;
@@ -1299,12 +1302,10 @@ function streamOnPause() {
}
}
function streamOnDrain() {
const needPause = 0 > this._writableState.highWaterMark;
if (this._paused && !needPause) {
this._paused = false;
this.resume();
}
function handleFlushData(handle, streamID) {
assert(handle.flushData(streamID) === undefined,
`HTTP/2 Stream ${streamID} does not exist. Please report this as ` +
'a bug in Node.js');
}
function streamOnSessionConnect() {
@@ -1357,7 +1358,6 @@ class Http2Stream extends Duplex {
this.once('finish', onHandleFinish);
this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
this.on('drain', streamOnDrain);
session.once('close', state.closeHandler);
if (session[kState].connecting) {
@@ -1507,9 +1507,7 @@ class Http2Stream extends Duplex {
return;
}
_unrefActive(this);
assert(this[kSession][kHandle].flushData(this[kID]) === undefined,
'HTTP/2 Stream #{this[kID]} does not exist. Please report this as ' +
'a bug in Node.js');
process.nextTick(handleFlushData, this[kSession][kHandle], this[kID]);
}
// Submits an RST-STREAM frame to shutdown this stream.
View
@@ -756,7 +756,7 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
if (!(stream = session->FindStream(id))) {
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
}
stream->FlushDataChunks();
stream->ReadResume();
}
void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {
View
@@ -510,7 +510,7 @@ inline void Nghttp2Session::SendPendingData() {
// the proceed with the rest.
while (srcRemaining > destRemaining) {
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
TypeName(), destRemaining);
TypeName(), destLength + destRemaining);
memcpy(dest.base + destOffset, src + srcOffset, destRemaining);
destLength += destRemaining;
Send(&dest, destLength);
@@ -896,6 +896,14 @@ inline void Nghttp2Stream::ReadStart() {
FlushDataChunks();
}
inline void Nghttp2Stream::ReadResume() {
DEBUG_HTTP2("Nghttp2Stream %d: resume reading\n", id_);
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
// Flush any queued data chunks immediately out to the JS layer
FlushDataChunks();
}
inline void Nghttp2Stream::ReadStop() {
DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_);
if (!IsReading())
View
@@ -384,6 +384,9 @@ class Nghttp2Stream {
// the session to be emitted at the JS side
inline void ReadStart();
// Resume Reading
inline void ReadResume();
// Stop/Pause Reading.
inline void ReadStop();
@@ -18,5 +18,7 @@ test-npm-install: PASS,FLAKY
[$system==solaris] # Also applies to SmartOS
[$system==freebsd]
test-http2-compat-serverrequest-pipe: PASS,FLAKY
test-http2-pipe: PASS,FLAKY
[$system==aix]
@@ -11,17 +11,17 @@ const path = require('path');
// piping should work as expected with createWriteStream
const loc = fixtures.path('person.jpg');
const fn = path.join(common.tmpDir, 'http2pipe.jpg');
common.refreshTmpDir();
const loc = fixtures.path('url-tests.js');
const fn = path.join(common.tmpDir, 'http2-url-tests.js');
const server = http2.createServer();
server.on('request', common.mustCall((req, res) => {
const dest = req.pipe(fs.createWriteStream(fn));
dest.on('finish', common.mustCall(() => {
assert.strictEqual(req.complete, true);
assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
fs.unlinkSync(fn);
res.end();
}));
@@ -0,0 +1,49 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const fixtures = require('../common/fixtures');
const assert = require('assert');
const http2 = require('http2');
const fs = require('fs');
const path = require('path');
// piping should work as expected with createWriteStream
common.refreshTmpDir();
const loc = fixtures.path('url-tests.js');
const fn = path.join(common.tmpDir, 'http2-url-tests.js');
const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
const dest = stream.pipe(fs.createWriteStream(fn));
dest.on('finish', common.mustCall(() => {
assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
fs.unlinkSync(fn);
stream.respond();
stream.end();
}));
}));
server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);
let remaining = 2;
function maybeClose() {
if (--remaining === 0) {
server.close();
client.destroy();
}
}
const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall());
req.resume();
req.on('end', common.mustCall(maybeClose));
const str = fs.createReadStream(loc);
str.on('end', common.mustCall(maybeClose));
str.pipe(req);
}));

0 comments on commit e63782d

Please sign in to comment.