From 0984bcaa61b12d52bcea5695b27ac7b6065ba53e Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 18 May 2026 19:16:57 +0200 Subject: [PATCH] http2: emit session close before stream close PR-URL: https://github.com/nodejs/node/pull/63414 Signed-off-by: Matteo Collina --- lib/internal/http2/core.js | 37 ++++++++++--- ...lient-session-close-before-stream-close.js | 54 +++++++++++++++++++ 2 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-http2-client-session-close-before-stream-close.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 273ddd15414b51..a4b4afe1e3a13c 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -343,6 +343,8 @@ const SESSION_FLAGS_PENDING = 0x0; const SESSION_FLAGS_READY = 0x1; const SESSION_FLAGS_CLOSED = 0x2; const SESSION_FLAGS_DESTROYED = 0x4; +const SESSION_FLAGS_CLOSE_QUEUED = 0x8; +const SESSION_FLAGS_CLOSE_EMITTED = 0x10; // Top level to avoid creating a closure function emit(self, ...args) { @@ -1153,14 +1155,29 @@ function setupHandle(socket, type, options) { process.nextTick(emit, this, 'connect', this, socket); } -// Emits a close event followed by an error event if err is truthy. Used +// Emits an error event followed by a close event if err is truthy. Used // by Http2Session.prototype.destroy() function emitClose(self, error) { + const state = self[kState]; + if (state.flags & SESSION_FLAGS_CLOSE_EMITTED) + return; + + state.flags |= SESSION_FLAGS_CLOSE_EMITTED; + if (error) self.emit('error', error); self.emit('close'); } +function emitCloseNextTick(self, error) { + const state = self[kState]; + if (state.flags & (SESSION_FLAGS_CLOSE_QUEUED | SESSION_FLAGS_CLOSE_EMITTED)) + return; + + state.flags |= SESSION_FLAGS_CLOSE_QUEUED; + process.nextTick(emitClose, self, error); +} + function cleanupSession(session) { const socket = session[kSocket]; const handle = session[kHandle]; @@ -1209,7 +1226,7 @@ function finishSessionClose(session, error) { } }); } else { - process.nextTick(emitClose, session, error); + emitCloseNextTick(session, error); } } @@ -1224,6 +1241,16 @@ function closeSession(session, code, error) { session.setTimeout(0); session.removeAllListeners('timeout'); + const socket = session[kSocket]; + const handle = session[kHandle]; + const socketDestroyed = socket?.destroyed === true; + + // If the transport has already closed, queue the session close event before + // stream callbacks are scheduled so clients can invalidate cached sessions + // before associated streams finish closing. + if (socketDestroyed) + emitCloseNextTick(session, error); + // Destroy any pending and open streams if (state.pendingStreams.size > 0 || state.streams.size > 0) { const cancel = new ERR_HTTP2_STREAM_CANCEL(error); @@ -1231,14 +1258,10 @@ function closeSession(session, code, error) { state.streams.forEach((stream) => stream.destroy(error)); } - // Disassociate from the socket and server. - const socket = session[kSocket]; - const handle = session[kHandle]; - // Destroy the handle if it exists at this point. if (handle !== undefined) { handle.ondone = finishSessionClose.bind(null, session, error); - handle.destroy(code, socket.destroyed); + handle.destroy(code, socketDestroyed); } else { finishSessionClose(session, error); } diff --git a/test/parallel/test-http2-client-session-close-before-stream-close.js b/test/parallel/test-http2-client-session-close-before-stream-close.js new file mode 100644 index 00000000000000..35deb385551713 --- /dev/null +++ b/test/parallel/test-http2-client-session-close-before-stream-close.js @@ -0,0 +1,54 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +const assert = require('assert'); +const http2 = require('http2'); + +const server = http2.createServer(); +let serverSocket; + +server.on('connection', common.mustCall((socket) => { + serverSocket = socket; + socket.on('error', () => {}); +})); + +server.on('sessionError', () => {}); +server.on('stream', common.mustCall((stream, headers) => { + if (headers[':path'] === '/close') { + stream.respond({ ':status': 200 }); + stream.write('partial', common.mustCall(() => { + setImmediate(() => serverSocket.destroy()); + })); + return; + } + + stream.respond({ ':status': 200 }); + stream.end('ok'); +})); + +server.listen(0, common.mustCall(() => { + const session = http2.connect(`http://localhost:${server.address().port}`); + let cachedSession = session; + const events = []; + + session.on('error', common.mustNotCall()); + session.on('close', common.mustCall(() => { + events.push('session-close'); + cachedSession = undefined; + server.close(); + })); + + const req = session.request({ ':path': '/close' }); + req.on('response', common.mustCall()); + req.on('close', common.mustCall(() => { + events.push('stream-close'); + assert.strictEqual(session.closed, true); + assert.strictEqual(session.destroyed, true); + assert.strictEqual(cachedSession, undefined); + assert.deepStrictEqual(events, ['session-close', 'stream-close']); + })); + req.resume(); +}));