Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 93 additions & 23 deletions mcp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,66 @@ server.tool(
// --- Transport Setup ---
async function main() {
const transport_type = process.env.TRANSPORT_TYPE || 'http';
let webserver: any = null; // Store server reference for proper shutdown

// Common graceful shutdown handler
const createGracefulShutdownHandler = (transportCleanup: () => Promise<void>) => {
return async (signal: string) => {
console.error(`Received ${signal}, initiating graceful shutdown...`);

const shutdownTimeout = parseInt(process.env.SHUTDOWN_TIMEOUT || '5000', 10);
const forceExitTimeout = setTimeout(() => {
console.error(`Shutdown timeout (${shutdownTimeout}ms) exceeded, force exiting...`);
process.exit(1);
}, shutdownTimeout);

try {
// Close HTTP server first to stop accepting new connections
if (webserver) {
await new Promise<void>((resolve, reject) => {
webserver.close((err: any) => {
if (err) {
console.error('Error closing HTTP server:', err);
reject(err);
} else {
console.error('HTTP server closed');
resolve();
}
});
});
}

// Clean up transports
await transportCleanup();

clearTimeout(forceExitTimeout);
console.error('Graceful shutdown complete');
process.exit(0);
} catch (error) {
console.error('Error during graceful shutdown:', error);
clearTimeout(forceExitTimeout);
process.exit(1);
}
};
};

if (transport_type === 'stdio') {
// Stdio transport for direct communication
console.error("Starting MCP server with stdio transport...");
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("MCP server connected via stdio.");

// Add shutdown handler for stdio transport
const shutdownHandler = createGracefulShutdownHandler(async () => {
console.error('Closing stdio transport...');
// StdioServerTransport doesn't have a close method, but we can clean up the connection
// The transport will be cleaned up when the process exits
});

process.on('SIGTERM', () => shutdownHandler('SIGTERM'));
process.on('SIGINT', () => shutdownHandler('SIGINT'));

} else if (transport_type === 'sse') {
// SSE transport for backward compatibility
console.error("Starting MCP server with SSE transport...");
Expand Down Expand Up @@ -304,22 +357,26 @@ async function main() {
}
});

app.get("/health", (_: Request, res: Response) => {
res.status(200).send("OK");
});

const PORT = process.env.PORT || 3001;
const webserver = app.listen(PORT, () => {
webserver = app.listen(PORT, () => {
console.error(`MCP server is running on port ${PORT} with SSE transport`);
console.error(`Connect to: http://localhost:${PORT}/sse`);
});

webserver.keepAliveTimeout = 3000;

// Keep the process alive
webserver.on('error', (error) => {
webserver.on('error', (error: any) => {
console.error('HTTP server error:', error);
});

// Handle server shutdown
process.on('SIGINT', async () => {
console.error('Shutting down SSE server...');
// Handle server shutdown with proper SIGTERM/SIGINT support
const shutdownHandler = createGracefulShutdownHandler(async () => {
console.error('Closing SSE transports...');

// Close all active SSE transports
for (const [sessionId, transport] of Object.entries(sseTransports)) {
Expand All @@ -331,13 +388,11 @@ async function main() {
console.error(`Error cleaning up SSE transport for session ${sessionId}:`, error);
}
}

console.error('SSE server shutdown complete');
process.exit(0);
});

// Prevent the process from exiting
process.stdin.resume();
process.on('SIGTERM', () => shutdownHandler('SIGTERM'));
process.on('SIGINT', () => shutdownHandler('SIGINT'));

} else if (transport_type === 'http') {
// Streamable HTTP transport for web-based communication
console.error("Starting MCP server with HTTP transport...");
Expand Down Expand Up @@ -476,39 +531,54 @@ async function main() {
});

const PORT = process.env.PORT || 3001;
const webserver = app.listen(PORT, () => {
webserver = app.listen(PORT, () => {
console.error(`MCP server is running on port ${PORT} with HTTP transport`);
console.error(`Connect to: http://localhost:${PORT}/mcp`);
});

webserver.keepAliveTimeout = 3000;

// Keep the process alive
webserver.on('error', (error) => {
webserver.on('error', (error: any) => {
console.error('HTTP server error:', error);
});

// Handle server shutdown
process.on('SIGINT', async () => {
console.error('Shutting down server...');
// Handle server shutdown with proper SIGTERM/SIGINT support and timeout
const shutdownHandler = createGracefulShutdownHandler(async () => {
console.error('Closing HTTP transports...');

// Close all active transports to properly clean up resources
for (const [sessionId, transport] of transports) {
// Close all active transports with individual timeouts
const transportClosePromises = Array.from(transports.entries()).map(async ([sessionId, transport]) => {
try {
console.error(`Closing transport for session ${sessionId}`);
await transport.close();

// Add timeout to individual transport close operations
const closeTimeout = new Promise<void>((_, reject) => {
setTimeout(() => reject(new Error(`Transport close timeout for ${sessionId}`)), 2000);
});

await Promise.race([
transport.close(),
closeTimeout
]);

transports.delete(sessionId);
console.error(`Transport closed for session ${sessionId}`);
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
// Still remove from map even if close failed
transports.delete(sessionId);
}
}
});

console.error('Server shutdown complete');
process.exit(0);
// Wait for all transports to close, but with overall timeout handled by outer function
await Promise.allSettled(transportClosePromises);
console.error('All transports cleanup completed');
});

// Prevent the process from exiting
process.stdin.resume();
process.on('SIGTERM', () => shutdownHandler('SIGTERM'));
process.on('SIGINT', () => shutdownHandler('SIGINT'));

} else {
console.error(`Unknown transport type: ${transport_type}. Use 'stdio', 'sse', or 'http'.`);
process.exit(1);
Expand Down