From 4941b4020e3a7f035b1d876fdd1a18651cfb5e37 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 1 Dec 2025 15:20:58 +0000 Subject: [PATCH 1/2] test: add failing tests for closeStandaloneSSEStream TDD setup for closing standalone GET SSE streams via extra callback. Tests added: - should provide closeStandaloneSSEStream callback in extra (passes) - should close standalone GET SSE stream when extra.closeStandaloneSSEStream is called (fails) - should allow client to reconnect after standalone SSE stream is closed (fails) Also updated existing closeSSEStream test to use extra.closeSSEStream() for consistency (all stream close tests now use the extra callback API). Stub implementation added - tests fail with "Stream did not close in time" --- src/server/streamableHttp.test.ts | 280 +++++++++++++++++++++++++++++- src/server/streamableHttp.ts | 14 +- src/shared/protocol.ts | 10 +- src/types.ts | 6 + 4 files changed, 299 insertions(+), 11 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 0e1acd6be..d2a29ac9e 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1738,7 +1738,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { expect(text).not.toContain('retry:'); }); - it('should close POST SSE stream when closeSseStream is called', async () => { + it('should close POST SSE stream when extra.closeSSEStream is called', async () => { const result = await createTestServer({ sessionIdGenerator: () => randomUUID(), eventStore: createEventStore(), @@ -1749,15 +1749,21 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { baseUrl = result.baseUrl; mcpServer = result.mcpServer; - // Track tool execution state + // Track when stream close is called and tool completes + let streamCloseCalled = false; let toolResolve: () => void; - const toolPromise = new Promise(resolve => { + const toolCompletePromise = new Promise(resolve => { toolResolve = resolve; }); - // Register a blocking tool - mcpServer.tool('blocking-tool', 'A blocking tool', {}, async () => { - await toolPromise; + // Register a tool that closes its own SSE stream via extra callback + mcpServer.tool('close-stream-tool', 'Closes its own stream', {}, async (_args, extra) => { + // Close the SSE stream for this request + extra.closeSSEStream?.(); + streamCloseCalled = true; + + // Wait before returning so we can observe the stream closure + await toolCompletePromise; return { content: [{ type: 'text', text: 'Done' }] }; }); @@ -1771,7 +1777,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { jsonrpc: '2.0', id: 100, method: 'tools/call', - params: { name: 'blocking-tool', arguments: {} } + params: { name: 'close-stream-tool', arguments: {} } }; const postResponse = await fetch(baseUrl, { @@ -1792,8 +1798,9 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { // Read the priming event await reader!.read(); - // Close the SSE stream - transport.closeSSEStream(100); + // Wait a moment for the tool to call closeSSEStream + await new Promise(resolve => setTimeout(resolve, 100)); + expect(streamCloseCalled).toBe(true); // Stream should now be closed const { done } = await reader!.read(); @@ -1916,6 +1923,261 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { // Verify closeSSEStream callback was NOT provided expect(receivedCloseSSEStream).toBeUndefined(); }); + + it('should provide closeStandaloneSSEStream callback in extra when eventStore is configured', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track whether closeStandaloneSSEStream callback was provided + let receivedCloseStandaloneSSEStream: (() => void) | undefined; + + // Register a tool that captures the extra.closeStandaloneSSEStream callback + mcpServer.tool('test-standalone-callback-tool', 'Test tool', {}, async (_args, extra) => { + receivedCloseStandaloneSSEStream = extra.closeStandaloneSSEStream; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Call the tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 203, + method: 'tools/call', + params: { name: 'test-standalone-callback-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Read all events to completion + const reader = postResponse.body?.getReader(); + while (true) { + const { done } = await reader!.read(); + if (done) break; + } + + // Verify closeStandaloneSSEStream callback was provided + expect(receivedCloseStandaloneSSEStream).toBeDefined(); + expect(typeof receivedCloseStandaloneSSEStream).toBe('function'); + }); + + it('should close standalone GET SSE stream when extra.closeStandaloneSSEStream is called', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Register a tool that closes the standalone SSE stream via extra callback + mcpServer.tool('close-standalone-stream-tool', 'Closes standalone stream', {}, async (_args, extra) => { + extra.closeStandaloneSSEStream?.(); + return { content: [{ type: 'text', text: 'Stream closed' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Open a standalone GET SSE stream + const sseResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + expect(sseResponse.status).toBe(200); + + const getReader = sseResponse.body?.getReader(); + + // Send a notification to confirm GET stream is established + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Stream established' }); + + // Read the notification to confirm stream is working + const { value } = await getReader!.read(); + const text = new TextDecoder().decode(value); + expect(text).toContain('id: '); + expect(text).toContain('Stream established'); + + // Call the tool that closes the standalone SSE stream + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 300, + method: 'tools/call', + params: { name: 'close-standalone-stream-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + expect(postResponse.status).toBe(200); + + // Read the POST response to completion + const postReader = postResponse.body?.getReader(); + while (true) { + const { done } = await postReader!.read(); + if (done) break; + } + + // GET stream should now be closed - use a race with timeout to avoid hanging + const readPromise = getReader!.read(); + const timeoutPromise = new Promise<{ done: boolean; value: undefined }>((_, reject) => + setTimeout(() => reject(new Error('Stream did not close in time')), 1000) + ); + + const { done } = await Promise.race([readPromise, timeoutPromise]); + expect(done).toBe(true); + }); + + it('should allow client to reconnect after standalone SSE stream is closed via extra.closeStandaloneSSEStream', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Register a tool that closes the standalone SSE stream + mcpServer.tool('close-standalone-for-reconnect', 'Closes standalone stream', {}, async (_args, extra) => { + extra.closeStandaloneSSEStream?.(); + return { content: [{ type: 'text', text: 'Stream closed' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Open a standalone GET SSE stream + const sseResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + expect(sseResponse.status).toBe(200); + + const getReader = sseResponse.body?.getReader(); + + // Send a notification to get an event ID + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Initial message' }); + + // Read the notification to get the event ID + const { value } = await getReader!.read(); + const text = new TextDecoder().decode(value); + const idMatch = text.match(/id: ([^\n]+)/); + expect(idMatch).toBeTruthy(); + const lastEventId = idMatch![1]; + + // Call the tool to close the standalone SSE stream + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 301, + method: 'tools/call', + params: { name: 'close-standalone-for-reconnect', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + expect(postResponse.status).toBe(200); + + // Read the POST response to completion + const postReader = postResponse.body?.getReader(); + while (true) { + const { done } = await postReader!.read(); + if (done) break; + } + + // Wait for GET stream to close - use a race with timeout + const readPromise = getReader!.read(); + const timeoutPromise = new Promise<{ done: boolean; value: undefined }>((_, reject) => + setTimeout(() => reject(new Error('Stream did not close in time')), 1000) + ); + const { done } = await Promise.race([readPromise, timeoutPromise]); + expect(done).toBe(true); + + // Send a notification while client is disconnected + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Missed while disconnected' }); + + // Client reconnects with Last-Event-ID + const reconnectResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26', + 'last-event-id': lastEventId + } + }); + expect(reconnectResponse.status).toBe(200); + + // Read the replayed notification + const reconnectReader = reconnectResponse.body?.getReader(); + let allText = ''; + const readWithTimeout = async () => { + const timeout = setTimeout(() => reconnectReader!.cancel(), 2000); + try { + while (!allText.includes('Missed while disconnected')) { + const { value, done } = await reconnectReader!.read(); + if (done) break; + allText += new TextDecoder().decode(value); + } + } finally { + clearTimeout(timeout); + } + }; + await readWithTimeout(); + + // Verify we received the notification that was sent while disconnected + expect(allText).toContain('Missed while disconnected'); + }); }); // Test onsessionclosed callback diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index bdd8e516c..701a1690c 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -651,13 +651,17 @@ export class StreamableHTTPServerTransport implements Transport { for (const message of messages) { // Build closeSSEStream callback for requests when eventStore is configured let closeSSEStream: (() => void) | undefined; + let closeStandaloneSSEStream: (() => void) | undefined; if (isJSONRPCRequest(message) && this._eventStore) { closeSSEStream = () => { this.closeSSEStream(message.id); }; + closeStandaloneSSEStream = () => { + this.closeStandaloneSSEStream(); + }; } - this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream }); + this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream, closeStandaloneSSEStream }); } // The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses // This will be handled by the send() method when responses are ready @@ -814,6 +818,14 @@ export class StreamableHTTPServerTransport implements Transport { } } + /** + * Close the standalone GET SSE stream, triggering client reconnection. + * Use this to implement polling behavior for server-initiated notifications. + */ + closeStandaloneSSEStream(): void { + // TODO: implement - currently does nothing, stream won't close + } + async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { let requestId = options?.relatedRequestId; if (isJSONRPCResponse(message) || isJSONRPCError(message)) { diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index ce25e45fb..e195478f2 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -290,6 +290,13 @@ export type RequestHandlerExtra void; + + /** + * Closes the standalone GET SSE stream, triggering client reconnection. + * Only available when using StreamableHTTPServerTransport with eventStore configured. + * Use this to implement polling behavior for server-initiated notifications. + */ + closeStandaloneSSEStream?: () => void; }; /** @@ -736,7 +743,8 @@ export abstract class Protocol void; + + /** + * Callback to close the standalone GET SSE stream, triggering client reconnection. + * Only available when using StreamableHTTPServerTransport with eventStore configured. + */ + closeStandaloneSSEStream?: () => void; } /* JSON-RPC types */ From de41e4703c937753de5299b11255656efae59bc4 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 1 Dec 2025 15:30:22 +0000 Subject: [PATCH 2/2] feat: implement closeStandaloneSSEStream Implement the ability to close standalone GET SSE streams, enabling polling behavior for server-initiated notifications (SEP-1699). Changes: - closeStandaloneSSEStream() now closes the GET stream by ending the response and removing from stream mapping - Events are now stored even when standalone stream is disconnected, allowing replay on client reconnect with Last-Event-ID This complements the existing closeSSEStream() for POST streams. --- src/server/streamableHttp.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 701a1690c..1e04390cf 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -823,7 +823,11 @@ export class StreamableHTTPServerTransport implements Transport { * Use this to implement polling behavior for server-initiated notifications. */ closeStandaloneSSEStream(): void { - // TODO: implement - currently does nothing, stream won't close + const stream = this._streamMapping.get(this._standaloneSseStreamId); + if (stream) { + stream.end(); + this._streamMapping.delete(this._standaloneSseStreamId); + } } async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { @@ -841,19 +845,21 @@ export class StreamableHTTPServerTransport implements Transport { if (isJSONRPCResponse(message) || isJSONRPCError(message)) { throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request'); } - const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId); - if (standaloneSse === undefined) { - // The spec says the server MAY send messages on the stream, so it's ok to discard if no stream - return; - } // Generate and store event ID if event store is provided + // Store even if stream is disconnected so events can be replayed on reconnect let eventId: string | undefined; if (this._eventStore) { // Stores the event and gets the generated event ID eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message); } + const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId); + if (standaloneSse === undefined) { + // Stream is disconnected - event is stored for replay, nothing more to do + return; + } + // Send the message to the standalone SSE stream this.writeSSEEvent(standaloneSse, message, eventId); return;