Skip to content

Commit

Permalink
http2: fix flakiness in timeout
Browse files Browse the repository at this point in the history
PR-URL: #14239
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
jasnell committed Aug 4, 2017
1 parent 064ac2c commit 34d1b11
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 29 deletions.
63 changes: 38 additions & 25 deletions lib/internal/http2/core.js
Expand Up @@ -673,15 +673,23 @@ function submitShutdown(options) {
} }


function finishSessionDestroy(socket) { function finishSessionDestroy(socket) {
const state = this[kState];
if (state.destroyed)
return;

if (!socket.destroyed) if (!socket.destroyed)
socket.destroy(); socket.destroy();


state.destroying = false;
state.destroyed = true;

// Destroy the handle // Destroy the handle
const handle = this[kHandle]; const handle = this[kHandle];
if (handle !== undefined) { if (handle !== undefined) {
handle.destroy(); handle.destroy(state.skipUnconsume);
debug(`[${sessionName(this[kType])}] nghttp2session handle destroyed`); debug(`[${sessionName(this[kType])}] nghttp2session handle destroyed`);
} }
this[kHandle] = undefined;


process.nextTick(emit.bind(this, 'close')); process.nextTick(emit.bind(this, 'close'));
debug(`[${sessionName(this[kType])}] nghttp2session destroyed`); debug(`[${sessionName(this[kType])}] nghttp2session destroyed`);
Expand Down Expand Up @@ -825,7 +833,7 @@ class Http2Session extends EventEmitter {


// Submits a SETTINGS frame to be sent to the remote peer. // Submits a SETTINGS frame to be sent to the remote peer.
settings(settings) { settings(settings) {
if (this[kState].destroyed) if (this[kState].destroyed || this[kState].destroying)
throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); throw new errors.Error('ERR_HTTP2_INVALID_SESSION');


// Validate the input first // Validate the input first
Expand Down Expand Up @@ -871,7 +879,7 @@ class Http2Session extends EventEmitter {


// Submits a PRIORITY frame to be sent to the remote peer. // Submits a PRIORITY frame to be sent to the remote peer.
priority(stream, options) { priority(stream, options) {
if (this[kState].destroyed) if (this[kState].destroyed || this[kState].destroying)
throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); throw new errors.Error('ERR_HTTP2_INVALID_SESSION');


if (!(stream instanceof Http2Stream)) { if (!(stream instanceof Http2Stream)) {
Expand Down Expand Up @@ -905,6 +913,8 @@ class Http2Session extends EventEmitter {
// Submits an RST-STREAM frame to be sent to the remote peer. This will // Submits an RST-STREAM frame to be sent to the remote peer. This will
// cause the stream to be closed. // cause the stream to be closed.
rstStream(stream, code = NGHTTP2_NO_ERROR) { rstStream(stream, code = NGHTTP2_NO_ERROR) {
// Do not check destroying here, as the rstStream may be sent while
// the session is in the middle of being destroyed.
if (this[kState].destroyed) if (this[kState].destroyed)
throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); throw new errors.Error('ERR_HTTP2_INVALID_SESSION');


Expand Down Expand Up @@ -946,7 +956,6 @@ class Http2Session extends EventEmitter {
const state = this[kState]; const state = this[kState];
if (state.destroyed || state.destroying) if (state.destroyed || state.destroying)
return; return;

debug(`[${sessionName(this[kType])}] destroying nghttp2session`); debug(`[${sessionName(this[kType])}] destroying nghttp2session`);
state.destroying = true; state.destroying = true;


Expand All @@ -963,8 +972,8 @@ class Http2Session extends EventEmitter {
delete this[kSocket]; delete this[kSocket];
delete this[kServer]; delete this[kServer];


state.destroyed = true; state.destroyed = false;
state.destroying = false; state.destroying = true;


if (this[kHandle] !== undefined) if (this[kHandle] !== undefined)
this[kHandle].destroying(); this[kHandle].destroying();
Expand All @@ -975,7 +984,7 @@ class Http2Session extends EventEmitter {
// Graceful or immediate shutdown of the Http2Session. Graceful shutdown // Graceful or immediate shutdown of the Http2Session. Graceful shutdown
// is only supported on the server-side // is only supported on the server-side
shutdown(options, callback) { shutdown(options, callback) {
if (this[kState].destroyed) if (this[kState].destroyed || this[kState].destroying)
throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); throw new errors.Error('ERR_HTTP2_INVALID_SESSION');


if (this[kState].shutdown || this[kState].shuttingDown) if (this[kState].shutdown || this[kState].shuttingDown)
Expand Down Expand Up @@ -1037,7 +1046,7 @@ class Http2Session extends EventEmitter {
} }


_onTimeout() { _onTimeout() {
this.emit('timeout'); process.nextTick(emit.bind(this, 'timeout'));
} }
} }


Expand All @@ -1061,7 +1070,7 @@ class ClientHttp2Session extends Http2Session {
// Submits a new HTTP2 request to the connected peer. Returns the // Submits a new HTTP2 request to the connected peer. Returns the
// associated Http2Stream instance. // associated Http2Stream instance.
request(headers, options) { request(headers, options) {
if (this[kState].destroyed) if (this[kState].destroyed || this[kState].destroying)
throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); throw new errors.Error('ERR_HTTP2_INVALID_SESSION');
debug(`[${sessionName(this[kType])}] initiating request`); debug(`[${sessionName(this[kType])}] initiating request`);
_unrefActive(this); _unrefActive(this);
Expand Down Expand Up @@ -1317,7 +1326,7 @@ class Http2Stream extends Duplex {
} }


_onTimeout() { _onTimeout() {
this.emit('timeout'); process.nextTick(emit.bind(this, 'timeout'));
} }


// true if the Http2Stream was aborted abornomally. // true if the Http2Stream was aborted abornomally.
Expand Down Expand Up @@ -2104,7 +2113,7 @@ const onTimeout = {
configurable: false, configurable: false,
enumerable: false, enumerable: false,
value: function() { value: function() {
this.emit('timeout'); process.nextTick(emit.bind(this, 'timeout'));
} }
}; };


Expand Down Expand Up @@ -2191,20 +2200,22 @@ function socketOnError(error) {
// of the session. // of the session.
function socketOnTimeout() { function socketOnTimeout() {
debug('socket timeout'); debug('socket timeout');
const server = this[kServer]; process.nextTick(() => {
const session = this[kSession]; const server = this[kServer];
// If server or session are undefined, then we're already in the process of const session = this[kSession];
// shutting down, do nothing. // If server or session are undefined, then we're already in the process of
if (server === undefined || session === undefined) // shutting down, do nothing.
return; if (server === undefined || session === undefined)
if (!server.emit('timeout', session, this)) { return;
session.shutdown( if (!server.emit('timeout', session, this)) {
{ session.shutdown(
graceful: true, {
errorCode: NGHTTP2_NO_ERROR graceful: true,
}, errorCode: NGHTTP2_NO_ERROR
this.destroy.bind(this)); },
} this.destroy.bind(this));
}
});
} }


// Handles the on('stream') event for a session and forwards // Handles the on('stream') event for a session and forwards
Expand Down Expand Up @@ -2346,6 +2357,8 @@ function setupCompat(ev) {
function socketOnClose(hadError) { function socketOnClose(hadError) {
const session = this[kSession]; const session = this[kSession];
if (session !== undefined && !session.destroyed) { if (session !== undefined && !session.destroyed) {
// Skip unconsume because the socket is destroyed.
session[kState].skipUnconsume = true;
session.destroy(); session.destroy();
} }
} }
Expand Down
8 changes: 7 additions & 1 deletion src/node_http2.cc
Expand Up @@ -407,7 +407,13 @@ void Http2Session::Destroy(const FunctionCallbackInfo<Value>& args) {
Http2Session* session; Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
DEBUG_HTTP2("Http2Session: destroying session %d\n", session->type()); DEBUG_HTTP2("Http2Session: destroying session %d\n", session->type());
session->Unconsume(); Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();

bool skipUnconsume = args[0]->BooleanValue(context).ToChecked();

if (!skipUnconsume)
session->Unconsume();
session->Free(); session->Free();
} }


Expand Down
2 changes: 1 addition & 1 deletion src/node_http2_core.cc
Expand Up @@ -176,7 +176,7 @@ void Nghttp2Session::GetTrailers(nghttp2_session* session,
handle->OnTrailers(stream, &trailers); handle->OnTrailers(stream, &trailers);
if (trailers.length() > 0) { if (trailers.length() > 0) {
DEBUG_HTTP2("Nghttp2Session %d: sending trailers for stream %d, " DEBUG_HTTP2("Nghttp2Session %d: sending trailers for stream %d, "
"count: %d\n", handle->session_type_, id, "count: %d\n", handle->session_type_, stream->id(),
trailers.length()); trailers.length());
*flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM; *flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
nghttp2_submit_trailer(session, nghttp2_submit_trailer(session,
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-http2-server-timeout.js
Expand Up @@ -9,11 +9,10 @@ server.setTimeout(common.platformTimeout(1));


const onServerTimeout = common.mustCall((session) => { const onServerTimeout = common.mustCall((session) => {
session.destroy(); session.destroy();
server.removeListener('timeout', onServerTimeout);
}); });


server.on('stream', common.mustNotCall()); server.on('stream', common.mustNotCall());
server.on('timeout', onServerTimeout); server.once('timeout', onServerTimeout);


server.listen(0, common.mustCall(() => { server.listen(0, common.mustCall(() => {
const url = `http://localhost:${server.address().port}`; const url = `http://localhost:${server.address().port}`;
Expand Down

0 comments on commit 34d1b11

Please sign in to comment.