Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: add session tracking and graceful server shutdown of http2 server #57586

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
@@ -251,6 +251,7 @@ const kServer = Symbol('server');
const kState = Symbol('state');
const kType = Symbol('type');
const kWriteGeneric = Symbol('write-generic');
const kSessions = Symbol('sessions');

const {
kBitfield,
@@ -1125,9 +1126,13 @@ function emitClose(self, error) {
function cleanupSession(session) {
const socket = session[kSocket];
const handle = session[kHandle];
const server = session[kServer];
session[kProxySocket] = undefined;
session[kSocket] = undefined;
session[kHandle] = undefined;
if (server) {
server[kSessions].delete(session);
}
session[kNativeFields] = trackAssignmentsTypedArray(
new Uint8Array(kSessionUint8FieldCount));
if (handle)
@@ -1644,6 +1649,9 @@ class ServerHttp2Session extends Http2Session {
constructor(options, socket, server) {
super(NGHTTP2_SESSION_SERVER, options, socket);
this[kServer] = server;
if (server) {
server[kSessions].add(this);
}
// This is a bit inaccurate because it does not reflect changes to
// number of listeners made after the session was created. This should
// not be an issue in practice. Additionally, the 'priority' event on
@@ -3168,11 +3176,25 @@ function onErrorSecureServerSession(err, socket) {
socket.destroy(err);
}

/**
* This function closes all active sessions gracefully.
* @param {*} server the underlying server whose sessions to be closed
*/
function closeAllSessions(server) {
const sessions = server[kSessions];
if (sessions.size > 0) {
sessions.forEach((session) => {
session.close();
});
}
}

class Http2SecureServer extends TLSServer {
constructor(options, requestListener) {
options = initializeTLSOptions(options);
super(options, connectionListener);
this[kOptions] = options;
this[kSessions] = new SafeSet();
this.timeout = 0;
this.on('newListener', setupCompat);
if (options.allowHTTP1 === true) {
@@ -3205,6 +3227,7 @@ class Http2SecureServer extends TLSServer {
if (this[kOptions].allowHTTP1 === true) {
httpServerPreClose(this);
}
closeAllSessions(this);
ReflectApply(TLSServer.prototype.close, this, arguments);
}

@@ -3220,6 +3243,7 @@ class Http2Server extends NETServer {
options = initializeOptions(options);
super(options, connectionListener);
this[kOptions] = options;
this[kSessions] = new SafeSet();
this.timeout = 0;
this.on('newListener', setupCompat);
if (typeof requestListener === 'function')
@@ -3241,6 +3265,11 @@ class Http2Server extends NETServer {
this[kOptions].settings = { ...this[kOptions].settings, ...settings };
}

close() {
closeAllSessions(this);
ReflectApply(NETServer.prototype.close, this, arguments);
}

async [SymbolAsyncDispose]() {
return promisify(super.close).call(this);
}
4 changes: 2 additions & 2 deletions test/parallel/test-http2-capture-rejection.js
Original file line number Diff line number Diff line change
@@ -108,8 +108,6 @@ events.captureRejections = true;
server.on('stream', common.mustCall(async (stream) => {
const { port } = server.address();

server.close();

stream.pushStream({
':scheme': 'http',
':path': '/foobar',
@@ -127,6 +125,8 @@ events.captureRejections = true;
stream.respond({
':status': 200
});

server.close();
}));

server.listen(0, common.mustCall(() => {
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@ server.listen(0, common.mustCall(function() {
response.statusMessage = 'test';
response.statusMessage = 'test'; // only warn once
assert.strictEqual(response.statusMessage, ''); // no change
server.close();
}));
response.end();
}));
@@ -44,6 +43,9 @@ server.listen(0, common.mustCall(function() {
request.on('end', common.mustCall(function() {
client.close();
}));
request.on('close', common.mustCall(function() {
server.close();
}));
request.end();
request.resume();
}));
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ server.listen(0, common.mustCall(function() {
response.on('finish', common.mustCall(function() {
assert.strictEqual(response.statusMessage, '');
assert.strictEqual(response.statusMessage, ''); // only warn once
server.close();
}));
response.end();
}));
@@ -43,6 +42,9 @@ server.listen(0, common.mustCall(function() {
request.on('end', common.mustCall(function() {
client.close();
}));
request.on('close', common.mustCall(function() {
server.close();
}));
request.end();
request.resume();
}));
217 changes: 217 additions & 0 deletions test/parallel/test-http2-request-after-server-close.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto) { common.skip('missing crypto'); };
const fixtures = require('../common/fixtures');
// This test ensure that the server will not accept any new request
// after server close is called.
const assert = require('assert');
const http2 = require('http2');

const { test } = require('node:test');

/**
* Create and manage an HTTP/2 client stream with controlled write patterns
* @param {http2.ClientHttp2Session} client - The HTTP/2 client session
* @param {string} clientId - Identifier for the client (e.g., '1', '2')
* @param {number} writeCount - Number of writes to perform
* @param {number} writeInterval - Interval between writes in ms
* @returns {object} - Object containing stream, status tracking, and functions
*/
function createClientStream(client, clientId, writeCount, writeInterval = 100) {
let currentWriteCount = 0;
let intervalId = null;
let streamClosed = false;

// Create the request
const req = client.request({
':path': `/client${clientId}`,
':method': 'POST',
'client-id': clientId,
'content-type': 'text/plain'
});

// Set up event handlers
req.on('response', (_) => {});

req.on('data', (_) => {});

req.on('end', () => {
streamClosed = true;
});

req.on('close', () => {
streamClosed = true;
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
}
});

req.on('error', (err) => {
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
}
});

// Start the write interval
intervalId = setInterval(() => {
currentWriteCount++;
if (currentWriteCount > writeCount) {
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
}
req.close();
return;
}

req.write(`Client ${clientId} write #${currentWriteCount}\n`);
}, writeInterval);

// Return object with stream, status tracking, and cleanup function
return {
stream: req,
getWriteCount: () => currentWriteCount,
isActive: () => !streamClosed && !req.destroyed && !req.closed,
};
}

// This test start a server and create a client. Client open a request and
// send 20 writes at interval of 100ms and then close at 2000ms from server start.
// Server close is fired after 1000ms from server start.
// Same client open another request after 1500ms from server start and tries to
// send 10 writes at interval of 100ms but failed to connect as server close is already fired at 1000ms.
// Request 1 from client is gracefully closed after accepting all 20 writes as it started before server close fired.
// server successfully closes gracefully after receiving all 20 writes from client and also server refused to accept any new request.
test('HTTP/2 server close with existing and new requests', async () => {

// Server setup
const server = http2.createSecureServer({
key: fixtures.readKey('agent1-key.pem'),
cert: fixtures.readKey('agent1-cert.pem')
});

// Track server events
let serverStart = 0;
let serverCloseTime = 0;
let requestsReceived = 0;
let writesReceived = 0;
let req1Complete = false;
let req2Error = null;

// Handle streams on the server
server.on('stream', (stream, headers) => {
requestsReceived++;

stream.respond({
':status': 200,
'content-type': 'text/plain'
});

// Count writes from clients
stream.on('data', (chunk) => {
writesReceived++;
stream.write(`Echo: ${chunk.toString().trim()}`);
});

stream.on('end', () => {
stream.end('Server: Stream closed');
});
});

// Start the server
await new Promise((resolve) => server.listen(0, () => {
serverStart = Date.now();
resolve();
}));
const port = server.address().port;

// Create client
const client = http2.connect(`https://localhost:${port}`, {
rejectUnauthorized: false
});

// Create first request that will start immediately and write 20 times eache write at interval of 100ms
// The request will be closed at 2000ms after 20 writes
const request1 = createClientStream(client, '1', 20, 100);

// wait 1000ms before closing the server
await new Promise((resolve) => setTimeout(resolve, common.platformTimeout(1000)));

// close the server
await new Promise((resolve) => {
server.close(() => {
serverCloseTime = Date.now();
resolve();
});
});

// Wait 500ms before creating the second request
await new Promise((resolve) => setTimeout(resolve, common.platformTimeout(500)));

// Try to create the second request after 1500ms of server start - should fail
try {
const request2 = createClientStream(client, '2', 10, 100);
// If we get here without error, wait to see if an error event happens
request2.stream.on('error', (err) => {
req2Error = err;
});

} catch (err) {
// Should fail synchronously with ERR_HTTP2_INVALID_SESSION
req2Error = err;
}

// Wait for request 1 to complete gracefully (should be around 2000ms)
await new Promise((resolve) => {
const checkComplete = () => {
if (!request1.isActive()) {
req1Complete = true;
resolve();
} else {
// Check again in 100ms
setTimeout(checkComplete, common.platformTimeout(100));
}
};

// Set a timeout to prevent hanging if request never completes
setTimeout(() => {
resolve();
}, common.platformTimeout(1500));

checkComplete();
});

// Ensure client is closed
client.close();

// Wait for cleanup
await new Promise((resolve) => setTimeout(resolve, common.platformTimeout(200)));

// Verify test expectations

// Request 1 should have completed
assert.ok(req1Complete, 'Request 1 should complete gracefully');
assert.ok(request1.getWriteCount() > 0, 'Request 1 should have written data');
// Request 1 should have written 20 times and request 2 written 0 times
assert.strictEqual(writesReceived, 20);

// Request 2 fails with ERR_HTTP2_INVALID_SESSION because the server
// fired close at 1000ms which stops accepting any new request.
// Since Request 2 starts at 1500ms, it fails.
assert.ok(req2Error, 'Request 2 should have an error');
// Request 2 should fail with ERR_HTTP2_INVALID_SESSION
assert.strictEqual(req2Error.code, 'ERR_HTTP2_INVALID_SESSION');

// Server should have received only the first request as 2nd request received after server close fired.
assert.strictEqual(requestsReceived, 1);
assert.ok(
serverCloseTime - serverStart >= 2000,
`Server should fully close after 2000ms of server start when all streams complete (actual: ${serverCloseTime - serverStart}ms)`
);
assert.ok(
(serverCloseTime - serverStart) - 2000 < 200,
`Server should fully close just after all streams complete`
);
});
Loading
Oops, something went wrong.