diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 0e1acd6be..d2a29ac9e 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1738,7 +1738,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { expect(text).not.toContain('retry:'); }); - it('should close POST SSE stream when closeSseStream is called', async () => { + it('should close POST SSE stream when extra.closeSSEStream is called', async () => { const result = await createTestServer({ sessionIdGenerator: () => randomUUID(), eventStore: createEventStore(), @@ -1749,15 +1749,21 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { baseUrl = result.baseUrl; mcpServer = result.mcpServer; - // Track tool execution state + // Track when stream close is called and tool completes + let streamCloseCalled = false; let toolResolve: () => void; - const toolPromise = new Promise(resolve => { + const toolCompletePromise = new Promise(resolve => { toolResolve = resolve; }); - // Register a blocking tool - mcpServer.tool('blocking-tool', 'A blocking tool', {}, async () => { - await toolPromise; + // Register a tool that closes its own SSE stream via extra callback + mcpServer.tool('close-stream-tool', 'Closes its own stream', {}, async (_args, extra) => { + // Close the SSE stream for this request + extra.closeSSEStream?.(); + streamCloseCalled = true; + + // Wait before returning so we can observe the stream closure + await toolCompletePromise; return { content: [{ type: 'text', text: 'Done' }] }; }); @@ -1771,7 +1777,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { jsonrpc: '2.0', id: 100, method: 'tools/call', - params: { name: 'blocking-tool', arguments: {} } + params: { name: 'close-stream-tool', arguments: {} } }; const postResponse = await fetch(baseUrl, { @@ -1792,8 +1798,9 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { // Read the priming event await reader!.read(); - // Close the SSE stream - transport.closeSSEStream(100); + // Wait a moment for the tool to call closeSSEStream + await new Promise(resolve => setTimeout(resolve, 100)); + expect(streamCloseCalled).toBe(true); // Stream should now be closed const { done } = await reader!.read(); @@ -1916,6 +1923,261 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { // Verify closeSSEStream callback was NOT provided expect(receivedCloseSSEStream).toBeUndefined(); }); + + it('should provide closeStandaloneSSEStream callback in extra when eventStore is configured', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track whether closeStandaloneSSEStream callback was provided + let receivedCloseStandaloneSSEStream: (() => void) | undefined; + + // Register a tool that captures the extra.closeStandaloneSSEStream callback + mcpServer.tool('test-standalone-callback-tool', 'Test tool', {}, async (_args, extra) => { + receivedCloseStandaloneSSEStream = extra.closeStandaloneSSEStream; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Call the tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 203, + method: 'tools/call', + params: { name: 'test-standalone-callback-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Read all events to completion + const reader = postResponse.body?.getReader(); + while (true) { + const { done } = await reader!.read(); + if (done) break; + } + + // Verify closeStandaloneSSEStream callback was provided + expect(receivedCloseStandaloneSSEStream).toBeDefined(); + expect(typeof receivedCloseStandaloneSSEStream).toBe('function'); + }); + + it('should close standalone GET SSE stream when extra.closeStandaloneSSEStream is called', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Register a tool that closes the standalone SSE stream via extra callback + mcpServer.tool('close-standalone-stream-tool', 'Closes standalone stream', {}, async (_args, extra) => { + extra.closeStandaloneSSEStream?.(); + return { content: [{ type: 'text', text: 'Stream closed' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Open a standalone GET SSE stream + const sseResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + expect(sseResponse.status).toBe(200); + + const getReader = sseResponse.body?.getReader(); + + // Send a notification to confirm GET stream is established + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Stream established' }); + + // Read the notification to confirm stream is working + const { value } = await getReader!.read(); + const text = new TextDecoder().decode(value); + expect(text).toContain('id: '); + expect(text).toContain('Stream established'); + + // Call the tool that closes the standalone SSE stream + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 300, + method: 'tools/call', + params: { name: 'close-standalone-stream-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + expect(postResponse.status).toBe(200); + + // Read the POST response to completion + const postReader = postResponse.body?.getReader(); + while (true) { + const { done } = await postReader!.read(); + if (done) break; + } + + // GET stream should now be closed - use a race with timeout to avoid hanging + const readPromise = getReader!.read(); + const timeoutPromise = new Promise<{ done: boolean; value: undefined }>((_, reject) => + setTimeout(() => reject(new Error('Stream did not close in time')), 1000) + ); + + const { done } = await Promise.race([readPromise, timeoutPromise]); + expect(done).toBe(true); + }); + + it('should allow client to reconnect after standalone SSE stream is closed via extra.closeStandaloneSSEStream', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Register a tool that closes the standalone SSE stream + mcpServer.tool('close-standalone-for-reconnect', 'Closes standalone stream', {}, async (_args, extra) => { + extra.closeStandaloneSSEStream?.(); + return { content: [{ type: 'text', text: 'Stream closed' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Open a standalone GET SSE stream + const sseResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + expect(sseResponse.status).toBe(200); + + const getReader = sseResponse.body?.getReader(); + + // Send a notification to get an event ID + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Initial message' }); + + // Read the notification to get the event ID + const { value } = await getReader!.read(); + const text = new TextDecoder().decode(value); + const idMatch = text.match(/id: ([^\n]+)/); + expect(idMatch).toBeTruthy(); + const lastEventId = idMatch![1]; + + // Call the tool to close the standalone SSE stream + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 301, + method: 'tools/call', + params: { name: 'close-standalone-for-reconnect', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + expect(postResponse.status).toBe(200); + + // Read the POST response to completion + const postReader = postResponse.body?.getReader(); + while (true) { + const { done } = await postReader!.read(); + if (done) break; + } + + // Wait for GET stream to close - use a race with timeout + const readPromise = getReader!.read(); + const timeoutPromise = new Promise<{ done: boolean; value: undefined }>((_, reject) => + setTimeout(() => reject(new Error('Stream did not close in time')), 1000) + ); + const { done } = await Promise.race([readPromise, timeoutPromise]); + expect(done).toBe(true); + + // Send a notification while client is disconnected + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Missed while disconnected' }); + + // Client reconnects with Last-Event-ID + const reconnectResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26', + 'last-event-id': lastEventId + } + }); + expect(reconnectResponse.status).toBe(200); + + // Read the replayed notification + const reconnectReader = reconnectResponse.body?.getReader(); + let allText = ''; + const readWithTimeout = async () => { + const timeout = setTimeout(() => reconnectReader!.cancel(), 2000); + try { + while (!allText.includes('Missed while disconnected')) { + const { value, done } = await reconnectReader!.read(); + if (done) break; + allText += new TextDecoder().decode(value); + } + } finally { + clearTimeout(timeout); + } + }; + await readWithTimeout(); + + // Verify we received the notification that was sent while disconnected + expect(allText).toContain('Missed while disconnected'); + }); }); // Test onsessionclosed callback diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index bdd8e516c..1e04390cf 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -651,13 +651,17 @@ export class StreamableHTTPServerTransport implements Transport { for (const message of messages) { // Build closeSSEStream callback for requests when eventStore is configured let closeSSEStream: (() => void) | undefined; + let closeStandaloneSSEStream: (() => void) | undefined; if (isJSONRPCRequest(message) && this._eventStore) { closeSSEStream = () => { this.closeSSEStream(message.id); }; + closeStandaloneSSEStream = () => { + this.closeStandaloneSSEStream(); + }; } - this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream }); + this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream, closeStandaloneSSEStream }); } // The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses // This will be handled by the send() method when responses are ready @@ -814,6 +818,18 @@ export class StreamableHTTPServerTransport implements Transport { } } + /** + * Close the standalone GET SSE stream, triggering client reconnection. + * Use this to implement polling behavior for server-initiated notifications. + */ + closeStandaloneSSEStream(): void { + const stream = this._streamMapping.get(this._standaloneSseStreamId); + if (stream) { + stream.end(); + this._streamMapping.delete(this._standaloneSseStreamId); + } + } + async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { let requestId = options?.relatedRequestId; if (isJSONRPCResponse(message) || isJSONRPCError(message)) { @@ -829,19 +845,21 @@ export class StreamableHTTPServerTransport implements Transport { if (isJSONRPCResponse(message) || isJSONRPCError(message)) { throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request'); } - const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId); - if (standaloneSse === undefined) { - // The spec says the server MAY send messages on the stream, so it's ok to discard if no stream - return; - } // Generate and store event ID if event store is provided + // Store even if stream is disconnected so events can be replayed on reconnect let eventId: string | undefined; if (this._eventStore) { // Stores the event and gets the generated event ID eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message); } + const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId); + if (standaloneSse === undefined) { + // Stream is disconnected - event is stored for replay, nothing more to do + return; + } + // Send the message to the standalone SSE stream this.writeSSEEvent(standaloneSse, message, eventId); return; diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index ce25e45fb..e195478f2 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -290,6 +290,13 @@ export type RequestHandlerExtra void; + + /** + * Closes the standalone GET SSE stream, triggering client reconnection. + * Only available when using StreamableHTTPServerTransport with eventStore configured. + * Use this to implement polling behavior for server-initiated notifications. + */ + closeStandaloneSSEStream?: () => void; }; /** @@ -736,7 +743,8 @@ export abstract class Protocol void; + + /** + * Callback to close the standalone GET SSE stream, triggering client reconnection. + * Only available when using StreamableHTTPServerTransport with eventStore configured. + */ + closeStandaloneSSEStream?: () => void; } /* JSON-RPC types */