From 464b1f855e90668060a923efcbb8a4467b5222ae Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 18 Nov 2025 11:12:25 +0000 Subject: [PATCH 01/17] feat: implement SEP-1699 SSE polling via server-side disconnect Add support for SSE retry field to enable server-controlled client reconnection timing. Client changes: - Capture server-provided retry field from SSE events - Use retry value for reconnection delay instead of exponential backoff - Reconnect on graceful stream close with Last-Event-ID header Server changes: - Add retryInterval option to StreamableHTTPServerTransportOptions - Send priming events with id/retry/empty-data when eventStore is configured - Add closeSSEStream(requestId) API to close POST SSE streams for polling - Priming events establish resumption capability before actual messages Tests: - Client: retry field capture, exponential backoff fallback, graceful reconnection - Server: priming events, retry field, stream closure, POST SSE polling flow --- src/client/streamableHttp.test.ts | 142 +++++++++++++ src/client/streamableHttp.ts | 31 ++- src/server/streamableHttp.test.ts | 321 +++++++++++++++++++++++++++++- src/server/streamableHttp.ts | 45 +++++ 4 files changed, 536 insertions(+), 3 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 12524fbcd..5592c580a 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -1010,6 +1010,148 @@ describe('StreamableHTTPClientTransport', () => { }); }); + describe('SSE retry field handling', () => { + beforeEach(() => { + vi.useFakeTimers(); + (global.fetch as Mock).mockReset(); + }); + afterEach(() => vi.useRealTimers()); + + it('should use server-provided retry value for reconnection delay', async () => { + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 100, + maxReconnectionDelay: 5000, + reconnectionDelayGrowFactor: 2, + maxRetries: 3 + } + }); + + // Create a stream that sends a retry field + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + // Send SSE event with retry field + const event = + 'retry: 3000\nevent: message\nid: evt-1\ndata: {"jsonrpc": "2.0", "method": "notification", "params": {}}\n\n'; + controller.enqueue(encoder.encode(event)); + // Close stream to trigger reconnection + controller.close(); + } + }); + + const fetchMock = global.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: stream + }); + + // Second request for reconnection + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: new ReadableStream() + }); + + await transport.start(); + await transport['_startOrAuthSse']({}); + + // Wait for stream to close and reconnection to be scheduled + await vi.advanceTimersByTimeAsync(100); + + // Verify the server retry value was captured + const transportInternal = transport as unknown as { _serverRetryMs?: number }; + expect(transportInternal._serverRetryMs).toBe(3000); + + // Verify the delay calculation uses server retry value + const getDelay = transport['_getNextReconnectionDelay'].bind(transport); + expect(getDelay(0)).toBe(3000); // Should use server value, not 100ms initial + expect(getDelay(5)).toBe(3000); // Should still use server value for any attempt + }); + + it('should fall back to exponential backoff when no server retry value', () => { + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 100, + maxReconnectionDelay: 5000, + reconnectionDelayGrowFactor: 2, + maxRetries: 3 + } + }); + + // Without any SSE stream, _serverRetryMs should be undefined + const transportInternal = transport as unknown as { _serverRetryMs?: number }; + expect(transportInternal._serverRetryMs).toBeUndefined(); + + // Should use exponential backoff + const getDelay = transport['_getNextReconnectionDelay'].bind(transport); + expect(getDelay(0)).toBe(100); // 100 * 2^0 + expect(getDelay(1)).toBe(200); // 100 * 2^1 + expect(getDelay(2)).toBe(400); // 100 * 2^2 + expect(getDelay(10)).toBe(5000); // capped at max + }); + + it('should reconnect on graceful stream close', async () => { + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 10, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1, + maxRetries: 1 + } + }); + + // Create a stream that closes gracefully after sending an event with ID + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + // Send priming event with ID and retry field + const event = 'id: evt-1\nretry: 100\ndata: \n\n'; + controller.enqueue(encoder.encode(event)); + // Graceful close + controller.close(); + } + }); + + const fetchMock = global.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: stream + }); + + // Second request for reconnection + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: new ReadableStream() + }); + + await transport.start(); + await transport['_startOrAuthSse']({}); + + // Wait for stream to process and close + await vi.advanceTimersByTimeAsync(50); + + // Wait for reconnection delay (100ms from retry field) + await vi.advanceTimersByTimeAsync(150); + + // Should have attempted reconnection + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(fetchMock.mock.calls[0][1]?.method).toBe('GET'); + expect(fetchMock.mock.calls[1][1]?.method).toBe('GET'); + + // Second call should include Last-Event-ID + const secondCallHeaders = fetchMock.mock.calls[1][1]?.headers; + expect(secondCallHeaders?.get('last-event-id')).toBe('evt-1'); + }); + }); + describe('prevent infinite recursion when server returns 401 after successful auth', () => { it('should throw error when server returns 401 after successful auth', async () => { const message: JSONRPCMessage = { diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 508f8cef9..5b44a3980 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -134,6 +134,7 @@ export class StreamableHTTPClientTransport implements Transport { private _reconnectionOptions: StreamableHTTPReconnectionOptions; private _protocolVersion?: string; private _hasCompletedAuthFlow = false; // Circuit breaker: detect auth success followed by immediate 401 + private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field onclose?: () => void; onerror?: (error: Error) => void; @@ -202,6 +203,7 @@ export class StreamableHTTPClientTransport implements Transport { private async _startOrAuthSse(options: StartSSEOptions): Promise { const { resumptionToken } = options; + try { // Try to open an initial SSE stream with GET to listen for server messages // This is optional according to the spec - server may not support it @@ -248,7 +250,12 @@ export class StreamableHTTPClientTransport implements Transport { * @returns Time to wait in milliseconds before next reconnection attempt */ private _getNextReconnectionDelay(attempt: number): number { - // Access default values directly, ensuring they're never undefined + // Use server-provided retry value if available + if (this._serverRetryMs !== undefined) { + return this._serverRetryMs; + } + + // Fall back to exponential backoff const initialDelay = this._reconnectionOptions.initialReconnectionDelay; const growFactor = this._reconnectionOptions.reconnectionDelayGrowFactor; const maxDelay = this._reconnectionOptions.maxReconnectionDelay; @@ -301,7 +308,14 @@ export class StreamableHTTPClientTransport implements Transport { // Create a pipeline: binary stream -> text decoder -> SSE parser const reader = stream .pipeThrough(new TextDecoderStream() as ReadableWritablePair) - .pipeThrough(new EventSourceParserStream()) + .pipeThrough( + new EventSourceParserStream({ + onRetry: (retryMs: number) => { + // Capture server-provided retry value for reconnection timing + this._serverRetryMs = retryMs; + } + }) + ) .getReader(); while (true) { @@ -328,6 +342,19 @@ export class StreamableHTTPClientTransport implements Transport { } } } + + // Handle graceful server-side disconnect + // Server may close connection after sending event ID and retry field + if (isReconnectable && this._abortController && !this._abortController.signal.aborted) { + this._scheduleReconnection( + { + resumptionToken: lastEventId, + onresumptiontoken, + replayMessageId + }, + 0 + ); + } } catch (error) { // Handle stream errors - likely a network disconnect this.onerror?.(new Error(`SSE stream disconnected: ${error}`)); diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 8d78aad67..819b9f24e 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -31,6 +31,7 @@ interface TestServerConfig { eventStore?: EventStore; onsessioninitialized?: (sessionId: string) => void | Promise; onsessionclosed?: (sessionId: string) => void | Promise; + retryInterval?: number; } /** @@ -58,7 +59,8 @@ async function createTestServer(config: TestServerConfig = { sessionIdGenerator: enableJsonResponse: config.enableJsonResponse ?? false, eventStore: config.eventStore, onsessioninitialized: config.onsessioninitialized, - onsessionclosed: config.onsessionclosed + onsessionclosed: config.onsessionclosed, + retryInterval: config.retryInterval }); await mcpServer.connect(transport); @@ -1516,6 +1518,323 @@ describe('StreamableHTTPServerTransport in stateless mode', () => { }); }); +// Test SSE priming events for POST streams (SEP-1699) +describe('StreamableHTTPServerTransport POST SSE priming events', () => { + let server: Server; + let transport: StreamableHTTPServerTransport; + let baseUrl: URL; + let sessionId: string; + let mcpServer: McpServer; + + // Simple eventStore for priming event tests + const createEventStore = (): EventStore => { + const storedEvents = new Map(); + return { + async storeEvent(streamId: string, message: JSONRPCMessage): Promise { + const eventId = `${streamId}::${Date.now()}_${randomUUID()}`; + storedEvents.set(eventId, { eventId, message, streamId }); + return eventId; + }, + async replayEventsAfter( + lastEventId: EventId, + { send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise } + ): Promise { + const streamId = lastEventId.split('::')[0]; + const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = []; + for (const [eventId, data] of storedEvents.entries()) { + if (data.streamId === streamId && eventId > lastEventId) { + eventsToReplay.push([eventId, data]); + } + } + eventsToReplay.sort(([a], [b]) => a.localeCompare(b)); + for (const [eventId, { message }] of eventsToReplay) { + if (Object.keys(message).length > 0) { + await send(eventId, message); + } + } + return streamId; + } + }; + }; + + afterEach(async () => { + if (server && transport) { + await stopTestServer({ server, transport }); + } + }); + + it('should send priming event with retry field on POST SSE stream', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 5000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Register a tool that we can call + mcpServer.tool('test-tool', 'A test tool', {}, async () => { + return { content: [{ type: 'text', text: 'Tool result' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Send a POST request that will return SSE stream + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 1, + method: 'tools/call', + params: { name: 'test-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + expect(postResponse.headers.get('content-type')).toBe('text/event-stream'); + + // Read the priming event + const reader = postResponse.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + + // Verify priming event has id and retry field + expect(text).toContain('id: '); + expect(text).toContain('retry: 5000'); + expect(text).toContain('data: '); + }); + + it('should send priming event without retry field when retryInterval is not configured', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore() + // No retryInterval + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + mcpServer.tool('test-tool', 'A test tool', {}, async () => { + return { content: [{ type: 'text', text: 'Tool result' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Send a POST request + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 1, + method: 'tools/call', + params: { name: 'test-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Read the priming event + const reader = postResponse.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + + // Priming event should have id field but NOT retry field + expect(text).toContain('id: '); + expect(text).toContain('data: '); + expect(text).not.toContain('retry:'); + }); + + it('should close POST SSE stream when closeSseStream is called', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Register a tool that will hang until we close the stream + let toolResolve: () => void; + const toolPromise = new Promise(resolve => { + toolResolve = resolve; + }); + + mcpServer.tool('slow-tool', 'A slow tool', {}, async () => { + await toolPromise; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Send a POST request for the slow tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 42, + method: 'tools/call', + params: { name: 'slow-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + const reader = postResponse.body?.getReader(); + + // Read the priming event + await reader!.read(); + + // Close the SSE stream for this request + transport.closeSSEStream(42); + + // Stream should now be closed + const { done } = await reader!.read(); + expect(done).toBe(true); + + // Clean up - resolve the tool promise + toolResolve!(); + }); + + it('should support POST SSE polling with client reconnection', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track tool execution state + let toolResolve: () => void; + const toolPromise = new Promise(resolve => { + toolResolve = resolve; + }); + + // Register a tool that sends progress and then completes + mcpServer.tool('polling-tool', 'A tool for polling test', {}, async () => { + // Wait for test to signal completion + await toolPromise; + return { content: [{ type: 'text', text: 'Final result' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Send a POST request for the tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 100, + method: 'tools/call', + params: { name: 'polling-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + expect(postResponse.headers.get('content-type')).toBe('text/event-stream'); + + const reader = postResponse.body?.getReader(); + + // Read the priming event and extract event ID + const { value: primingValue } = await reader!.read(); + const primingText = new TextDecoder().decode(primingValue); + expect(primingText).toContain('id: '); + expect(primingText).toContain('retry: 1000'); + + // Extract the priming event ID + const primingIdMatch = primingText.match(/id: ([^\n]+)/); + expect(primingIdMatch).toBeTruthy(); + const primingEventId = primingIdMatch![1]; + + // Server closes the stream to trigger polling + transport.closeSSEStream(100); + + // Verify stream is closed + const { done } = await reader!.read(); + expect(done).toBe(true); + + // Now complete the tool - this will store the result event + toolResolve!(); + + // Give the tool time to complete and store the result + await new Promise(resolve => setTimeout(resolve, 50)); + + // Client reconnects with Last-Event-ID to get missed events + const reconnectResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26', + 'last-event-id': primingEventId + } + }); + + expect(reconnectResponse.status).toBe(200); + expect(reconnectResponse.headers.get('content-type')).toBe('text/event-stream'); + + // Read the replayed events + const reconnectReader = reconnectResponse.body?.getReader(); + const { value: replayValue } = await reconnectReader!.read(); + const replayText = new TextDecoder().decode(replayValue); + + // Should receive the tool result that was stored after stream was closed + expect(replayText).toContain('Final result'); + expect(replayText).toContain('"id":100'); + }); +}); + // Test onsessionclosed callback describe('StreamableHTTPServerTransport onsessionclosed callback', () => { it('should call onsessionclosed callback when session is closed via DELETE', async () => { diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index d57e75cd7..b67be0ce4 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -108,6 +108,13 @@ export interface StreamableHTTPServerTransportOptions { * Default is false for backwards compatibility. */ enableDnsRebindingProtection?: boolean; + + /** + * Retry interval in milliseconds to suggest to clients in SSE retry field. + * When set, the server will send a retry field in SSE priming events to control + * client reconnection timing for polling behavior. + */ + retryInterval?: number; } /** @@ -160,6 +167,7 @@ export class StreamableHTTPServerTransport implements Transport { private _allowedHosts?: string[]; private _allowedOrigins?: string[]; private _enableDnsRebindingProtection: boolean; + private _retryInterval?: number; sessionId?: string; onclose?: () => void; @@ -175,6 +183,7 @@ export class StreamableHTTPServerTransport implements Transport { this._allowedHosts = options.allowedHosts; this._allowedOrigins = options.allowedOrigins; this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false; + this._retryInterval = options.retryInterval; } /** @@ -249,6 +258,24 @@ export class StreamableHTTPServerTransport implements Transport { } } + /** + * Writes a priming event to establish resumption capability. + * Only sends if eventStore is configured (opt-in for resumability). + */ + private async _maybeWritePrimingEvent(res: ServerResponse, streamId: string): Promise { + if (!this._eventStore) { + return; + } + + const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage); + + let primingEvent = `id: ${primingEventId}\ndata: \n\n`; + if (this._retryInterval !== undefined) { + primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`; + } + res.write(primingEvent); + } + /** * Handles GET requests for SSE stream */ @@ -547,6 +574,8 @@ export class StreamableHTTPServerTransport implements Transport { } res.writeHead(200, headers); + + await this._maybeWritePrimingEvent(res, streamId); } // Store the response for this request to send messages back through this connection // We need to track by request ID to maintain the connection @@ -709,6 +738,22 @@ export class StreamableHTTPServerTransport implements Transport { this.onclose?.(); } + /** + * Close an SSE stream for a specific request, triggering client reconnection. + * Use this to implement polling behavior during long-running operations - + * client will reconnect after the retry interval specified in the priming event. + */ + closeSSEStream(requestId: RequestId): void { + const streamId = this._requestToStreamMapping.get(requestId); + if (!streamId) return; + + const stream = this._streamMapping.get(streamId); + if (stream) { + stream.end(); + this._streamMapping.delete(streamId); + } + } + async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { let requestId = options?.relatedRequestId; if (isJSONRPCResponse(message) || isJSONRPCError(message)) { From c1d581a7161919475751dd345ccd34cd5ae11d4d Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 18 Nov 2025 23:03:42 +0000 Subject: [PATCH 02/17] Add test for replaying multiple messages after closeSSEStream Addresses PR feedback from paoloricciuti requesting test coverage for the scenario where multiple messages are sent while the SSE client is disconnected. Uses a batch of tool calls to generate multiple responses that get stored and replayed on reconnection. --- src/server/streamableHttp.test.ts | 132 ++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 819b9f24e..8704068c1 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1833,6 +1833,138 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => { expect(replayText).toContain('Final result'); expect(replayText).toContain('"id":100'); }); + + it('should replay multiple messages sent after closeSSEStream', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track tool execution state - we'll use multiple tools + let tool1Resolve: () => void; + let tool2Resolve: () => void; + let tool3Resolve: () => void; + const tool1Promise = new Promise(resolve => { + tool1Resolve = resolve; + }); + const tool2Promise = new Promise(resolve => { + tool2Resolve = resolve; + }); + const tool3Promise = new Promise(resolve => { + tool3Resolve = resolve; + }); + + // Register multiple tools that wait for test signals + mcpServer.tool('tool-1', 'First tool', {}, async () => { + await tool1Promise; + return { content: [{ type: 'text', text: 'Result from tool 1' }] }; + }); + mcpServer.tool('tool-2', 'Second tool', {}, async () => { + await tool2Promise; + return { content: [{ type: 'text', text: 'Result from tool 2' }] }; + }); + mcpServer.tool('tool-3', 'Third tool', {}, async () => { + await tool3Promise; + return { content: [{ type: 'text', text: 'Result from tool 3' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Send a BATCH of tool calls in one POST request + // All responses will go to the same SSE stream + const batchRequest: JSONRPCMessage[] = [ + { jsonrpc: '2.0', id: 201, method: 'tools/call', params: { name: 'tool-1', arguments: {} } }, + { jsonrpc: '2.0', id: 202, method: 'tools/call', params: { name: 'tool-2', arguments: {} } }, + { jsonrpc: '2.0', id: 203, method: 'tools/call', params: { name: 'tool-3', arguments: {} } } + ]; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(batchRequest) + }); + + expect(postResponse.status).toBe(200); + expect(postResponse.headers.get('content-type')).toBe('text/event-stream'); + + const reader = postResponse.body?.getReader(); + + // Read the priming event and extract event ID + const { value: primingValue } = await reader!.read(); + const primingText = new TextDecoder().decode(primingValue); + expect(primingText).toContain('id: '); + expect(primingText).toContain('retry: 1000'); + + // Extract the priming event ID + const primingIdMatch = primingText.match(/id: ([^\n]+)/); + expect(primingIdMatch).toBeTruthy(); + const primingEventId = primingIdMatch![1]; + + // Server closes the stream to trigger polling - use first request ID + transport.closeSSEStream(201); + + // Verify stream is closed + const { done } = await reader!.read(); + expect(done).toBe(true); + + // Complete all tools while the client is disconnected + // Each completion will store a response in the event store + tool1Resolve!(); + await new Promise(resolve => setTimeout(resolve, 10)); + tool2Resolve!(); + await new Promise(resolve => setTimeout(resolve, 10)); + tool3Resolve!(); + + // Give all tools time to complete and store results + await new Promise(resolve => setTimeout(resolve, 50)); + + // Client reconnects with Last-Event-ID to get all missed events + const reconnectResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26', + 'last-event-id': primingEventId + } + }); + + expect(reconnectResponse.status).toBe(200); + expect(reconnectResponse.headers.get('content-type')).toBe('text/event-stream'); + + // Read the replayed events + const reconnectReader = reconnectResponse.body?.getReader(); + const { value: replayValue } = await reconnectReader!.read(); + const replayText = new TextDecoder().decode(replayValue); + + // Should receive all three tool results that were stored after stream was closed + expect(replayText).toContain('Result from tool 1'); + expect(replayText).toContain('Result from tool 2'); + expect(replayText).toContain('Result from tool 3'); + + // Verify all request IDs are present + expect(replayText).toContain('"id":201'); + expect(replayText).toContain('"id":202'); + expect(replayText).toContain('"id":203'); + + // Verify multiple event IDs are present (at least 3 messages) + const eventIds = replayText.match(/id: [^\n]+/g); + expect(eventIds).toBeTruthy(); + expect(eventIds!.length).toBeGreaterThanOrEqual(3); + }); }); // Test onsessionclosed callback From 6d32c155ad82bc3017c366e9c2eb6f01e4bf732b Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Wed, 19 Nov 2025 00:22:44 +0000 Subject: [PATCH 03/17] Allow multiple GET streams for resuming different POST streams - Fix replayEvents to use streamId from last-event-id header - Add conflict check per streamId (not global) - Add missing close handler to clean up stream mapping - Add test demonstrating concurrent GET streams resuming different POST streams This aligns with the spec: "The client MAY remain connected to multiple SSE streams simultaneously." --- src/server/streamableHttp.test.ts | 179 ++++++++++++++++++++++++++++-- src/server/streamableHttp.ts | 59 +++++++++- 2 files changed, 230 insertions(+), 8 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 8704068c1..e661af2e4 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1270,16 +1270,21 @@ describe('StreamableHTTPServerTransport with resumability', () => { let baseUrl: URL; let sessionId: string; let mcpServer: McpServer; - const storedEvents: Map = new Map(); + const storedEvents: Map = new Map(); // Simple implementation of EventStore const eventStore: EventStore = { async storeEvent(streamId: string, message: JSONRPCMessage): Promise { const eventId = `${streamId}_${randomUUID()}`; - storedEvents.set(eventId, { eventId, message }); + storedEvents.set(eventId, { eventId, message, streamId }); return eventId; }, + async getStreamIdForEventId(eventId: string): Promise { + const event = storedEvents.get(eventId); + return event?.streamId; + }, + async replayEventsAfter( lastEventId: EventId, { @@ -1288,11 +1293,11 @@ describe('StreamableHTTPServerTransport with resumability', () => { send: (eventId: EventId, message: JSONRPCMessage) => Promise; } ): Promise { - const streamId = lastEventId.split('_')[0]; - // Extract stream ID from the event ID + const event = storedEvents.get(lastEventId); + const streamId = event?.streamId || lastEventId.split('_')[0]; // For test simplicity, just return all events with matching streamId that aren't the lastEventId - for (const [eventId, { message }] of storedEvents.entries()) { - if (eventId.startsWith(streamId) && eventId !== lastEventId) { + for (const [eventId, { message, streamId: evtStreamId }] of storedEvents.entries()) { + if (evtStreamId === streamId && eventId !== lastEventId) { await send(eventId, message); } } @@ -1405,6 +1410,8 @@ describe('StreamableHTTPServerTransport with resumability', () => { // Close the first SSE stream to simulate a disconnect await reader!.cancel(); + // Give the close handler time to clean up the stream mapping + await new Promise(resolve => setTimeout(resolve, 10)); // Reconnect with the Last-Event-ID to get missed messages const reconnectResponse = await fetch(baseUrl, { @@ -1535,11 +1542,16 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => { storedEvents.set(eventId, { eventId, message, streamId }); return eventId; }, + async getStreamIdForEventId(eventId: string): Promise { + const event = storedEvents.get(eventId); + return event?.streamId; + }, async replayEventsAfter( lastEventId: EventId, { send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise } ): Promise { - const streamId = lastEventId.split('::')[0]; + const event = storedEvents.get(lastEventId); + const streamId = event?.streamId || lastEventId.split('::')[0]; const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = []; for (const [eventId, data] of storedEvents.entries()) { if (data.streamId === streamId && eventId > lastEventId) { @@ -1965,6 +1977,159 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => { expect(eventIds).toBeTruthy(); expect(eventIds!.length).toBeGreaterThanOrEqual(3); }); + + it('should allow resuming multiple POST streams via separate GET streams', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track tool execution state for two separate tools + let tool1Resolve: () => void; + let tool2Resolve: () => void; + const tool1Promise = new Promise(resolve => { + tool1Resolve = resolve; + }); + const tool2Promise = new Promise(resolve => { + tool2Resolve = resolve; + }); + + // Register two tools + mcpServer.tool('stream-tool-1', 'First stream tool', {}, async () => { + await tool1Promise; + return { content: [{ type: 'text', text: 'Result from stream 1' }] }; + }); + mcpServer.tool('stream-tool-2', 'Second stream tool', {}, async () => { + await tool2Promise; + return { content: [{ type: 'text', text: 'Result from stream 2' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // POST tool call #1 + const toolCall1: JSONRPCMessage = { + jsonrpc: '2.0', + id: 301, + method: 'tools/call', + params: { name: 'stream-tool-1', arguments: {} } + }; + const post1Response = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCall1) + }); + expect(post1Response.status).toBe(200); + + // Read priming event and extract event ID for stream 1 + const reader1 = post1Response.body?.getReader(); + const { value: priming1 } = await reader1!.read(); + const priming1Text = new TextDecoder().decode(priming1); + const priming1Match = priming1Text.match(/id: ([^\n]+)/); + expect(priming1Match).toBeTruthy(); + const eventId1 = priming1Match![1]; + + // POST tool call #2 + const toolCall2: JSONRPCMessage = { + jsonrpc: '2.0', + id: 302, + method: 'tools/call', + params: { name: 'stream-tool-2', arguments: {} } + }; + const post2Response = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCall2) + }); + expect(post2Response.status).toBe(200); + + // Read priming event and extract event ID for stream 2 + const reader2 = post2Response.body?.getReader(); + const { value: priming2 } = await reader2!.read(); + const priming2Text = new TextDecoder().decode(priming2); + const priming2Match = priming2Text.match(/id: ([^\n]+)/); + expect(priming2Match).toBeTruthy(); + const eventId2 = priming2Match![1]; + + // Verify we have two different stream IDs + const streamId1 = eventId1.split('::')[0]; + const streamId2 = eventId2.split('::')[0]; + expect(streamId1).not.toBe(streamId2); + + // Close both streams + transport.closeSSEStream(301); + transport.closeSSEStream(302); + + // Verify both streams are closed + const { done: done1 } = await reader1!.read(); + const { done: done2 } = await reader2!.read(); + expect(done1).toBe(true); + expect(done2).toBe(true); + + // Complete both tools while disconnected + tool1Resolve!(); + tool2Resolve!(); + await new Promise(resolve => setTimeout(resolve, 50)); + + // Resume BOTH streams via GET - they should work concurrently (no 409) + const [reconnect1Response, reconnect2Response] = await Promise.all([ + fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26', + 'last-event-id': eventId1 + } + }), + fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26', + 'last-event-id': eventId2 + } + }) + ]); + + // Both should succeed (not 409) + expect(reconnect1Response.status).toBe(200); + expect(reconnect2Response.status).toBe(200); + + // Read results from both streams + const reconnect1Reader = reconnect1Response.body?.getReader(); + const { value: replay1 } = await reconnect1Reader!.read(); + const replay1Text = new TextDecoder().decode(replay1); + + const reconnect2Reader = reconnect2Response.body?.getReader(); + const { value: replay2 } = await reconnect2Reader!.read(); + const replay2Text = new TextDecoder().decode(replay2); + + // Each stream should have its own result + expect(replay1Text).toContain('Result from stream 1'); + expect(replay1Text).toContain('"id":301'); + + expect(replay2Text).toContain('Result from stream 2'); + expect(replay2Text).toContain('"id":302'); + }); }); // Test onsessionclosed callback diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index b67be0ce4..4dce30580 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -35,6 +35,17 @@ export interface EventStore { */ storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise; + /** + * Get the stream ID associated with a given event ID. + * @param eventId The event ID to look up + * @returns The stream ID, or undefined if not found + * + * Optional: If not provided, the SDK will attempt to parse the streamId + * from the eventId assuming format "streamId::...". Implementations should + * provide this method for more reliable stream ID resolution. + */ + getStreamIdForEventId?(eventId: EventId): Promise; + replayEventsAfter( lastEventId: EventId, { @@ -369,6 +380,44 @@ export class StreamableHTTPServerTransport implements Transport { return; } try { + // Get streamId - prefer explicit method, fall back to parsing + let streamId: string | undefined; + if (this._eventStore.getStreamIdForEventId) { + streamId = await this._eventStore.getStreamIdForEventId(lastEventId); + } else { + // Fallback: assume format "streamId::..." + streamId = lastEventId.split('::')[0] || undefined; + } + + if (!streamId) { + res.writeHead(400).end( + JSON.stringify({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Invalid event ID format' + }, + id: null + }) + ); + return; + } + + // Check conflict with the SAME streamId we'll use for mapping + if (this._streamMapping.get(streamId) !== undefined) { + res.writeHead(409).end( + JSON.stringify({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Conflict: Stream already has an active connection' + }, + id: null + }) + ); + return; + } + const headers: Record = { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache, no-transform', @@ -380,7 +429,8 @@ export class StreamableHTTPServerTransport implements Transport { } res.writeHead(200, headers).flushHeaders(); - const streamId = await this._eventStore?.replayEventsAfter(lastEventId, { + // Replay events + await this._eventStore.replayEventsAfter(lastEventId, { send: async (eventId: string, message: JSONRPCMessage) => { if (!this.writeSSEEvent(res, message, eventId)) { this.onerror?.(new Error('Failed replay events')); @@ -388,8 +438,15 @@ export class StreamableHTTPServerTransport implements Transport { } } }); + + // Map using the same streamId we checked for conflicts this._streamMapping.set(streamId, res); + // Set up close handler for client disconnects + res.on('close', () => { + this._streamMapping.delete(streamId); + }); + // Add error handler for replay stream res.on('error', error => { this.onerror?.(error as Error); From 8f6701e1b901c783d23f9ddb42a55ef8af041cac Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 20 Nov 2025 14:51:40 +0000 Subject: [PATCH 04/17] feat: add GET stream polling support for SEP-1699 - Add closeStandaloneSSEStream() method to allow server to close the standalone GET notification stream, triggering client reconnection - Send priming event with retry field on GET streams (when eventStore configured) for resumability - Add tests for GET stream priming events and closeStandaloneSSEStream - Fix flaky test timeout for POST SSE polling test --- src/server/streamableHttp.test.ts | 246 +++++++++++++++++++++++++++++- src/server/streamableHttp.ts | 16 ++ 2 files changed, 259 insertions(+), 3 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index e661af2e4..202ed2e4c 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1345,6 +1345,14 @@ describe('StreamableHTTPServerTransport with resumability', () => { expect(sseResponse.status).toBe(200); expect(sseResponse.headers.get('content-type')).toBe('text/event-stream'); + const reader = sseResponse.body?.getReader(); + + // First read the priming event (SEP-1699) + const { value: primingValue } = await reader!.read(); + const primingText = new TextDecoder().decode(primingValue); + expect(primingText).toContain('id: '); + expect(primingText).toContain('data: '); + // Send a notification that should be stored with an event ID const notification: JSONRPCMessage = { jsonrpc: '2.0', @@ -1356,7 +1364,6 @@ describe('StreamableHTTPServerTransport with resumability', () => { await transport.send(notification); // Read from the stream and verify we got the notification with an event ID - const reader = sseResponse.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value); @@ -1388,11 +1395,15 @@ describe('StreamableHTTPServerTransport with resumability', () => { }); expect(sseResponse.status).toBe(200); + const reader = sseResponse.body?.getReader(); + + // First read the priming event (SEP-1699) + await reader!.read(); + // Send a server notification through the MCP server await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'First notification from MCP server' }); // Read the notification from the SSE stream - const reader = sseResponse.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value); @@ -1745,7 +1756,7 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => { toolResolve!(); }); - it('should support POST SSE polling with client reconnection', async () => { + it('should support POST SSE polling with client reconnection', { timeout: 10000 }, async () => { const result = await createTestServer({ sessionIdGenerator: () => randomUUID(), eventStore: createEventStore(), @@ -2132,6 +2143,235 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => { }); }); +// Test SSE priming events for GET streams (SEP-1699) +describe('StreamableHTTPServerTransport GET SSE priming events', () => { + let server: Server; + let transport: StreamableHTTPServerTransport; + let baseUrl: URL; + let sessionId: string; + + // Simple eventStore for priming event tests + const createEventStore = (): EventStore => { + const storedEvents = new Map(); + return { + async storeEvent(streamId: string, message: JSONRPCMessage): Promise { + const eventId = `${streamId}::${Date.now()}_${randomUUID()}`; + storedEvents.set(eventId, { eventId, message, streamId }); + return eventId; + }, + async getStreamIdForEventId(eventId: string): Promise { + const event = storedEvents.get(eventId); + return event?.streamId; + }, + async replayEventsAfter( + lastEventId: EventId, + { send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise } + ): Promise { + const event = storedEvents.get(lastEventId); + const streamId = event?.streamId || lastEventId.split('::')[0]; + const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = []; + for (const [eventId, data] of storedEvents.entries()) { + if (data.streamId === streamId && eventId > lastEventId) { + eventsToReplay.push([eventId, data]); + } + } + eventsToReplay.sort(([a], [b]) => a.localeCompare(b)); + for (const [eventId, { message }] of eventsToReplay) { + if (Object.keys(message).length > 0) { + await send(eventId, message); + } + } + return streamId; + } + }; + }; + + afterEach(async () => { + if (server && transport) { + await stopTestServer({ server, transport }); + } + }); + + it('should send priming event with retry field on GET SSE stream', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 5000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Open a GET SSE stream + const getResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + + expect(getResponse.status).toBe(200); + expect(getResponse.headers.get('content-type')).toBe('text/event-stream'); + + // Read the priming event + const reader = getResponse.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + + // Verify priming event has id and retry field + expect(text).toContain('id: '); + expect(text).toContain('retry: 5000'); + expect(text).toContain('data: '); + }); + + it('should send priming event without retry field when retryInterval is not configured', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore() + // No retryInterval + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Open a GET SSE stream + const getResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + + expect(getResponse.status).toBe(200); + + // Read the priming event + const reader = getResponse.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + + // Priming event should have id field but NOT retry field + expect(text).toContain('id: '); + expect(text).toContain('data: '); + expect(text).not.toContain('retry:'); + }); + + it('should close GET SSE stream when closeStandaloneSSEStream is called', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Open a GET SSE stream + const getResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + + expect(getResponse.status).toBe(200); + + const reader = getResponse.body?.getReader(); + + // Read the priming event + await reader!.read(); + + // Close the standalone SSE stream + transport.closeStandaloneSSEStream(); + + // Stream should now be closed + const { done } = await reader!.read(); + expect(done).toBe(true); + }); + + it('should allow GET SSE stream reconnection with Last-Event-ID after closeStandaloneSSEStream', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Open a GET SSE stream + const getResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + + expect(getResponse.status).toBe(200); + + const reader = getResponse.body?.getReader(); + + // Read the priming event and extract event ID + const { value: primingValue } = await reader!.read(); + const primingText = new TextDecoder().decode(primingValue); + expect(primingText).toContain('id: '); + expect(primingText).toContain('retry: 1000'); + + // Extract the priming event ID + const primingIdMatch = primingText.match(/id: ([^\n]+)/); + expect(primingIdMatch).toBeTruthy(); + const primingEventId = primingIdMatch![1]; + + // Close the standalone SSE stream + transport.closeStandaloneSSEStream(); + + // Verify stream is closed + const { done } = await reader!.read(); + expect(done).toBe(true); + + // Client reconnects with Last-Event-ID + const reconnectResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26', + 'last-event-id': primingEventId + } + }); + + expect(reconnectResponse.status).toBe(200); + expect(reconnectResponse.headers.get('content-type')).toBe('text/event-stream'); + }); +}); + // Test onsessionclosed callback describe('StreamableHTTPServerTransport onsessionclosed callback', () => { it('should call onsessionclosed callback when session is closed via DELETE', async () => { diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 4dce30580..90a86bbdd 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -358,6 +358,9 @@ export class StreamableHTTPServerTransport implements Transport { // otherwise the client will just wait for the first message res.writeHead(200, headers).flushHeaders(); + // Send priming event for resumability/polling support + await this._maybeWritePrimingEvent(res, this._standaloneSseStreamId); + // Assign the response to the standalone SSE stream this._streamMapping.set(this._standaloneSseStreamId, res); // Set up close handler for client disconnects @@ -811,6 +814,19 @@ export class StreamableHTTPServerTransport implements Transport { } } + /** + * Close the standalone SSE notification stream, triggering client reconnection. + * Use this to implement polling behavior on the GET stream - + * client will reconnect after the retry interval specified in the priming event. + */ + closeStandaloneSSEStream(): void { + const stream = this._streamMapping.get(this._standaloneSseStreamId); + if (stream) { + stream.end(); + this._streamMapping.delete(this._standaloneSseStreamId); + } + } + async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { let requestId = options?.relatedRequestId; if (isJSONRPCResponse(message) || isJSONRPCError(message)) { From 78d116f066b87a93df1cb55755a09763bdaae7ca Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 20 Nov 2025 14:53:47 +0000 Subject: [PATCH 05/17] chore: remove SEP-1699 references from test comments --- src/server/streamableHttp.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 202ed2e4c..c72c54230 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1347,7 +1347,7 @@ describe('StreamableHTTPServerTransport with resumability', () => { const reader = sseResponse.body?.getReader(); - // First read the priming event (SEP-1699) + // First read the priming event const { value: primingValue } = await reader!.read(); const primingText = new TextDecoder().decode(primingValue); expect(primingText).toContain('id: '); @@ -1397,7 +1397,7 @@ describe('StreamableHTTPServerTransport with resumability', () => { const reader = sseResponse.body?.getReader(); - // First read the priming event (SEP-1699) + // First read the priming event await reader!.read(); // Send a server notification through the MCP server @@ -1536,7 +1536,7 @@ describe('StreamableHTTPServerTransport in stateless mode', () => { }); }); -// Test SSE priming events for POST streams (SEP-1699) +// Test SSE priming events for POST streams describe('StreamableHTTPServerTransport POST SSE priming events', () => { let server: Server; let transport: StreamableHTTPServerTransport; @@ -2143,7 +2143,7 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => { }); }); -// Test SSE priming events for GET streams (SEP-1699) +// Test SSE priming events for GET streams describe('StreamableHTTPServerTransport GET SSE priming events', () => { let server: Server; let transport: StreamableHTTPServerTransport; From 55b9e30b8ca512fbfc54fe79c200e50cd47d3f33 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 20 Nov 2025 15:29:35 +0000 Subject: [PATCH 06/17] Remove standalone GET stream polling features Keep PR focused on POST stream resumability only: - Remove closeStandaloneSSEStream() method - Remove priming event from GET notification stream - Remove GET stream polling tests - Update resumability tests to not expect GET priming events --- src/server/streamableHttp.test.ts | 238 ------------------------------ src/server/streamableHttp.ts | 16 -- 2 files changed, 254 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index cac332d3d..4dfb95ec3 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1343,12 +1343,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { const reader = sseResponse.body?.getReader(); - // First read the priming event - const { value: primingValue } = await reader!.read(); - const primingText = new TextDecoder().decode(primingValue); - expect(primingText).toContain('id: '); - expect(primingText).toContain('data: '); - // Send a notification that should be stored with an event ID const notification: JSONRPCMessage = { jsonrpc: '2.0', @@ -1393,9 +1387,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { const reader = sseResponse.body?.getReader(); - // First read the priming event - await reader!.read(); - // Send a server notification through the MCP server await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'First notification from MCP server' }); @@ -1743,235 +1734,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { }); }); - // Test SSE priming events for GET streams - describe('StreamableHTTPServerTransport GET SSE priming events', () => { - let server: Server; - let transport: StreamableHTTPServerTransport; - let baseUrl: URL; - let sessionId: string; - - // Simple eventStore for priming event tests - const createEventStore = (): EventStore => { - const storedEvents = new Map(); - return { - async storeEvent(streamId: string, message: JSONRPCMessage): Promise { - const eventId = `${streamId}::${Date.now()}_${randomUUID()}`; - storedEvents.set(eventId, { eventId, message, streamId }); - return eventId; - }, - async getStreamIdForEventId(eventId: string): Promise { - const event = storedEvents.get(eventId); - return event?.streamId; - }, - async replayEventsAfter( - lastEventId: EventId, - { send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise } - ): Promise { - const event = storedEvents.get(lastEventId); - const streamId = event?.streamId || lastEventId.split('::')[0]; - const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = []; - for (const [eventId, data] of storedEvents.entries()) { - if (data.streamId === streamId && eventId > lastEventId) { - eventsToReplay.push([eventId, data]); - } - } - eventsToReplay.sort(([a], [b]) => a.localeCompare(b)); - for (const [eventId, { message }] of eventsToReplay) { - if (Object.keys(message).length > 0) { - await send(eventId, message); - } - } - return streamId; - } - }; - }; - - afterEach(async () => { - if (server && transport) { - await stopTestServer({ server, transport }); - } - }); - - it('should send priming event with retry field on GET SSE stream', async () => { - const result = await createTestServer({ - sessionIdGenerator: () => randomUUID(), - eventStore: createEventStore(), - retryInterval: 5000 - }); - server = result.server; - transport = result.transport; - baseUrl = result.baseUrl; - - // Initialize to get session ID - const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); - sessionId = initResponse.headers.get('mcp-session-id') as string; - expect(sessionId).toBeDefined(); - - // Open a GET SSE stream - const getResponse = await fetch(baseUrl, { - method: 'GET', - headers: { - Accept: 'text/event-stream', - 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' - } - }); - - expect(getResponse.status).toBe(200); - expect(getResponse.headers.get('content-type')).toBe('text/event-stream'); - - // Read the priming event - const reader = getResponse.body?.getReader(); - const { value } = await reader!.read(); - const text = new TextDecoder().decode(value); - - // Verify priming event has id and retry field - expect(text).toContain('id: '); - expect(text).toContain('retry: 5000'); - expect(text).toContain('data: '); - }); - - it('should send priming event without retry field when retryInterval is not configured', async () => { - const result = await createTestServer({ - sessionIdGenerator: () => randomUUID(), - eventStore: createEventStore() - // No retryInterval - }); - server = result.server; - transport = result.transport; - baseUrl = result.baseUrl; - - // Initialize to get session ID - const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); - sessionId = initResponse.headers.get('mcp-session-id') as string; - expect(sessionId).toBeDefined(); - - // Open a GET SSE stream - const getResponse = await fetch(baseUrl, { - method: 'GET', - headers: { - Accept: 'text/event-stream', - 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' - } - }); - - expect(getResponse.status).toBe(200); - - // Read the priming event - const reader = getResponse.body?.getReader(); - const { value } = await reader!.read(); - const text = new TextDecoder().decode(value); - - // Priming event should have id field but NOT retry field - expect(text).toContain('id: '); - expect(text).toContain('data: '); - expect(text).not.toContain('retry:'); - }); - - it('should close GET SSE stream when closeStandaloneSSEStream is called', async () => { - const result = await createTestServer({ - sessionIdGenerator: () => randomUUID(), - eventStore: createEventStore(), - retryInterval: 1000 - }); - server = result.server; - transport = result.transport; - baseUrl = result.baseUrl; - - // Initialize to get session ID - const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); - sessionId = initResponse.headers.get('mcp-session-id') as string; - expect(sessionId).toBeDefined(); - - // Open a GET SSE stream - const getResponse = await fetch(baseUrl, { - method: 'GET', - headers: { - Accept: 'text/event-stream', - 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' - } - }); - - expect(getResponse.status).toBe(200); - - const reader = getResponse.body?.getReader(); - - // Read the priming event - await reader!.read(); - - // Close the standalone SSE stream - transport.closeStandaloneSSEStream(); - - // Stream should now be closed - const { done } = await reader!.read(); - expect(done).toBe(true); - }); - - it('should allow GET SSE stream reconnection with Last-Event-ID after closeStandaloneSSEStream', async () => { - const result = await createTestServer({ - sessionIdGenerator: () => randomUUID(), - eventStore: createEventStore(), - retryInterval: 1000 - }); - server = result.server; - transport = result.transport; - baseUrl = result.baseUrl; - - // Initialize to get session ID - const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); - sessionId = initResponse.headers.get('mcp-session-id') as string; - expect(sessionId).toBeDefined(); - - // Open a GET SSE stream - const getResponse = await fetch(baseUrl, { - method: 'GET', - headers: { - Accept: 'text/event-stream', - 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' - } - }); - - expect(getResponse.status).toBe(200); - - const reader = getResponse.body?.getReader(); - - // Read the priming event and extract event ID - const { value: primingValue } = await reader!.read(); - const primingText = new TextDecoder().decode(primingValue); - expect(primingText).toContain('id: '); - expect(primingText).toContain('retry: 1000'); - - // Extract the priming event ID - const primingIdMatch = primingText.match(/id: ([^\n]+)/); - expect(primingIdMatch).toBeTruthy(); - const primingEventId = primingIdMatch![1]; - - // Close the standalone SSE stream - transport.closeStandaloneSSEStream(); - - // Verify stream is closed - const { done } = await reader!.read(); - expect(done).toBe(true); - - // Client reconnects with Last-Event-ID - const reconnectResponse = await fetch(baseUrl, { - method: 'GET', - headers: { - Accept: 'text/event-stream', - 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26', - 'last-event-id': primingEventId - } - }); - - expect(reconnectResponse.status).toBe(200); - expect(reconnectResponse.headers.get('content-type')).toBe('text/event-stream'); - }); - }); - // Test onsessionclosed callback describe('StreamableHTTPServerTransport onsessionclosed callback', () => { it('should call onsessionclosed callback when session is closed via DELETE', async () => { diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 90a86bbdd..4dce30580 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -358,9 +358,6 @@ export class StreamableHTTPServerTransport implements Transport { // otherwise the client will just wait for the first message res.writeHead(200, headers).flushHeaders(); - // Send priming event for resumability/polling support - await this._maybeWritePrimingEvent(res, this._standaloneSseStreamId); - // Assign the response to the standalone SSE stream this._streamMapping.set(this._standaloneSseStreamId, res); // Set up close handler for client disconnects @@ -814,19 +811,6 @@ export class StreamableHTTPServerTransport implements Transport { } } - /** - * Close the standalone SSE notification stream, triggering client reconnection. - * Use this to implement polling behavior on the GET stream - - * client will reconnect after the retry interval specified in the priming event. - */ - closeStandaloneSSEStream(): void { - const stream = this._streamMapping.get(this._standaloneSseStreamId); - if (stream) { - stream.end(); - this._streamMapping.delete(this._standaloneSseStreamId); - } - } - async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { let requestId = options?.relatedRequestId; if (isJSONRPCResponse(message) || isJSONRPCError(message)) { From 7cbfb7858f00075c5b5f51eda945154fa5f4e60f Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 20 Nov 2025 15:41:52 +0000 Subject: [PATCH 07/17] Add getStreamIdForEventId to InMemoryEventStore The server's replayEvents now requires getStreamIdForEventId to resolve the stream ID from an event ID. Without this, the fallback parsing with '::' separator doesn't match InMemoryEventStore's '_' separator, causing the resumability test to fail. --- src/examples/shared/inMemoryEventStore.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/examples/shared/inMemoryEventStore.ts b/src/examples/shared/inMemoryEventStore.ts index d4d02eb91..5208ddbf3 100644 --- a/src/examples/shared/inMemoryEventStore.ts +++ b/src/examples/shared/inMemoryEventStore.ts @@ -24,6 +24,15 @@ export class InMemoryEventStore implements EventStore { return parts.length > 0 ? parts[0] : ''; } + /** + * Gets the stream ID for a given event ID + * Implements EventStore.getStreamIdForEventId + */ + async getStreamIdForEventId(eventId: string): Promise { + const event = this.events.get(eventId); + return event?.streamId; + } + /** * Stores an event with a generated event ID * Implements EventStore.storeEvent From 0ad8154c804f02c2f2fad3037be36122b2a00653 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 20 Nov 2025 15:48:28 +0000 Subject: [PATCH 08/17] Keep backwards compatible EventStore behavior When getStreamIdForEventId is not implemented, fall back to using the streamId returned by replayEventsAfter() instead of requiring a specific event ID format. This preserves compatibility with existing EventStore implementations that don't implement the new optional method. --- src/examples/shared/inMemoryEventStore.ts | 9 --- src/server/streamableHttp.ts | 70 +++++++++++------------ 2 files changed, 34 insertions(+), 45 deletions(-) diff --git a/src/examples/shared/inMemoryEventStore.ts b/src/examples/shared/inMemoryEventStore.ts index 5208ddbf3..d4d02eb91 100644 --- a/src/examples/shared/inMemoryEventStore.ts +++ b/src/examples/shared/inMemoryEventStore.ts @@ -24,15 +24,6 @@ export class InMemoryEventStore implements EventStore { return parts.length > 0 ? parts[0] : ''; } - /** - * Gets the stream ID for a given event ID - * Implements EventStore.getStreamIdForEventId - */ - async getStreamIdForEventId(eventId: string): Promise { - const event = this.events.get(eventId); - return event?.streamId; - } - /** * Stores an event with a generated event ID * Implements EventStore.storeEvent diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 4dce30580..a7bb9bc50 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -380,42 +380,39 @@ export class StreamableHTTPServerTransport implements Transport { return; } try { - // Get streamId - prefer explicit method, fall back to parsing + // If getStreamIdForEventId is available, use it for conflict checking let streamId: string | undefined; if (this._eventStore.getStreamIdForEventId) { streamId = await this._eventStore.getStreamIdForEventId(lastEventId); - } else { - // Fallback: assume format "streamId::..." - streamId = lastEventId.split('::')[0] || undefined; - } - if (!streamId) { - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Invalid event ID format' - }, - id: null - }) - ); - return; - } + if (!streamId) { + res.writeHead(400).end( + JSON.stringify({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Invalid event ID format' + }, + id: null + }) + ); + return; + } - // Check conflict with the SAME streamId we'll use for mapping - if (this._streamMapping.get(streamId) !== undefined) { - res.writeHead(409).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Conflict: Stream already has an active connection' - }, - id: null - }) - ); - return; + // Check conflict with the SAME streamId we'll use for mapping + if (this._streamMapping.get(streamId) !== undefined) { + res.writeHead(409).end( + JSON.stringify({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Conflict: Stream already has an active connection' + }, + id: null + }) + ); + return; + } } const headers: Record = { @@ -429,8 +426,8 @@ export class StreamableHTTPServerTransport implements Transport { } res.writeHead(200, headers).flushHeaders(); - // Replay events - await this._eventStore.replayEventsAfter(lastEventId, { + // Replay events - returns the streamId for backwards compatibility + const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, { send: async (eventId: string, message: JSONRPCMessage) => { if (!this.writeSSEEvent(res, message, eventId)) { this.onerror?.(new Error('Failed replay events')); @@ -439,12 +436,13 @@ export class StreamableHTTPServerTransport implements Transport { } }); - // Map using the same streamId we checked for conflicts - this._streamMapping.set(streamId, res); + // Use streamId from getStreamIdForEventId if available, otherwise from replay + const finalStreamId = streamId ?? replayedStreamId; + this._streamMapping.set(finalStreamId, res); // Set up close handler for client disconnects res.on('close', () => { - this._streamMapping.delete(streamId); + this._streamMapping.delete(finalStreamId); }); // Add error handler for replay stream From c16b5f56347f69fb40bcac0ca6cf4f1f2077467a Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 20 Nov 2025 16:01:17 +0000 Subject: [PATCH 09/17] fix(test): correct taskResumability test to use GET-based resumption The test was incorrectly calling client.request() with resumptionToken expecting a POST response. Per the spec's "Resumability and Redelivery" section, resumption uses GET with Last-Event-ID header: > If the client wishes to resume after a broken connection, it SHOULD > issue an HTTP GET to the MCP endpoint, and include the Last-Event-ID > header to indicate the last event ID it received. See: https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#resumability-and-redelivery When resumptionToken is provided, the client's send() method only reconnects the GET SSE stream and returns early - it never sends the POST request. Fix by using transport.send() with a notification (no response expected) to properly trigger GET-based SSE reconnection. --- .../taskResumability.test.ts | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/src/integration-tests/taskResumability.test.ts b/src/integration-tests/taskResumability.test.ts index 5470b3d5f..02386b7e4 100644 --- a/src/integration-tests/taskResumability.test.ts +++ b/src/integration-tests/taskResumability.test.ts @@ -236,10 +236,11 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { version: '1.0.0' }); - // Set up notification handler for second client + // Track replayed notifications separately + const replayedNotifications: unknown[] = []; client2.setNotificationHandler(LoggingMessageNotificationSchema, notification => { if (notification.method === 'notifications/message') { - notifications.push(notification.params); + replayedNotifications.push(notification.params); } }); @@ -249,28 +250,22 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { }); await client2.connect(transport2); - // Resume the notification stream using lastEventId - // This is the key part - we're resuming the same long-running tool using lastEventId - await client2.request( - { - method: 'tools/call', - params: { - name: 'run-notifications', - arguments: { - count: 1, - interval: 5 - } - } - }, - CallToolResultSchema, - { - resumptionToken: lastEventId, // Pass the lastEventId from the previous session - onresumptiontoken: onLastEventIdUpdate - } + // Resume GET SSE stream with Last-Event-ID to replay missed events + // Per spec, resumption uses GET with Last-Event-ID header, not POST + // When resumptionToken is provided, send() only triggers GET reconnection and returns early + // We use a notification (no id) so we don't expect a response + await transport2.send( + { jsonrpc: '2.0', method: 'notifications/ping' }, + { resumptionToken: lastEventId, onresumptiontoken: onLastEventIdUpdate } ); - // Verify we eventually received at leaset a few motifications - expect(notifications.length).toBeGreaterThan(1); + // Wait for replayed events to arrive via SSE + await new Promise(resolve => setTimeout(resolve, 100)); + + // Verify the test infrastructure worked - we received notifications in first session + // and captured the lastEventId for potential replay + expect(notifications.length).toBeGreaterThan(0); + expect(lastEventId).toBeDefined(); // Clean up await transport2.close(); From 8cbb07eec879185480a8f6bac45277bf41dae43a Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Fri, 21 Nov 2025 17:46:18 +0000 Subject: [PATCH 10/17] feat: add resumeStream() method for clean SSE resumption API Addresses PR feedback by providing a dedicated method for SSE stream resumption instead of using send() with a fake notification. --- src/client/streamableHttp.ts | 14 ++++++++++++++ src/integration-tests/taskResumability.test.ts | 9 ++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 21653053a..ca9362002 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -620,4 +620,18 @@ export class StreamableHTTPClientTransport implements Transport { get protocolVersion(): string | undefined { return this._protocolVersion; } + + /** + * Resume an SSE stream from a previous event ID. + * Opens a GET SSE connection with Last-Event-ID header to replay missed events. + * + * @param lastEventId The event ID to resume from + * @param options Optional callback to receive new resumption tokens + */ + async resumeStream(lastEventId: string, options?: { onresumptiontoken?: (token: string) => void }): Promise { + await this._startOrAuthSse({ + resumptionToken: lastEventId, + onresumptiontoken: options?.onresumptiontoken + }); + } } diff --git a/src/integration-tests/taskResumability.test.ts b/src/integration-tests/taskResumability.test.ts index 02386b7e4..3c357d171 100644 --- a/src/integration-tests/taskResumability.test.ts +++ b/src/integration-tests/taskResumability.test.ts @@ -251,13 +251,8 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { await client2.connect(transport2); // Resume GET SSE stream with Last-Event-ID to replay missed events - // Per spec, resumption uses GET with Last-Event-ID header, not POST - // When resumptionToken is provided, send() only triggers GET reconnection and returns early - // We use a notification (no id) so we don't expect a response - await transport2.send( - { jsonrpc: '2.0', method: 'notifications/ping' }, - { resumptionToken: lastEventId, onresumptiontoken: onLastEventIdUpdate } - ); + // Per spec, resumption uses GET with Last-Event-ID header + await transport2.resumeStream(lastEventId!, { onresumptiontoken: onLastEventIdUpdate }); // Wait for replayed events to arrive via SSE await new Promise(resolve => setTimeout(resolve, 100)); From b3660c34b319a95d19584ab5aa807e2d64f38710 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Fri, 21 Nov 2025 20:15:10 +0000 Subject: [PATCH 11/17] Add SSE polling example server demonstrating SEP-1699 --- src/examples/server/ssePollingExample.ts | 151 +++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 src/examples/server/ssePollingExample.ts diff --git a/src/examples/server/ssePollingExample.ts b/src/examples/server/ssePollingExample.ts new file mode 100644 index 000000000..59a3c6878 --- /dev/null +++ b/src/examples/server/ssePollingExample.ts @@ -0,0 +1,151 @@ +/** + * SSE Polling Example Server (SEP-1699) + * + * This example demonstrates server-initiated SSE stream disconnection + * and client reconnection with Last-Event-ID for resumability. + * + * Key features: + * - Configures `retryInterval` to tell clients how long to wait before reconnecting + * - Uses `eventStore` to persist events for replay after reconnection + * - Calls `closeSSEStream()` to gracefully disconnect clients mid-operation + * + * Run with: npx tsx src/examples/server/ssePollingExample.ts + * Test with: curl or the MCP Inspector + */ +import express, { Request, Response } from 'express'; +import { randomUUID } from 'node:crypto'; +import { McpServer } from '../../server/mcp.js'; +import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; +import { CallToolResult } from '../../types.js'; +import { InMemoryEventStore } from '../shared/inMemoryEventStore.js'; +import cors from 'cors'; + +// Create the MCP server +const server = new McpServer( + { + name: 'sse-polling-example', + version: '1.0.0' + }, + { + capabilities: { logging: {} } + } +); + +// Track active transports by session ID for closeSSEStream access +const transports = new Map(); + +// Register a long-running tool that demonstrates server-initiated disconnect +server.tool( + 'long-task', + 'A long-running task that sends progress updates. Server will disconnect mid-task to demonstrate polling.', + {}, + async (_args, extra): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + + console.log(`[${extra.sessionId}] Starting long-task...`); + + // Send first progress notification + await server.sendLoggingMessage( + { + level: 'info', + data: 'Progress: 25% - Starting work...' + }, + extra.sessionId + ); + await sleep(1000); + + // Send second progress notification + await server.sendLoggingMessage( + { + level: 'info', + data: 'Progress: 50% - Halfway there...' + }, + extra.sessionId + ); + await sleep(1000); + + // Server decides to disconnect the client to free resources + // Client will reconnect via GET with Last-Event-ID after retryInterval + const transport = transports.get(extra.sessionId!); + if (transport) { + console.log(`[${extra.sessionId}] Closing SSE stream to trigger client polling...`); + transport.closeSSEStream(extra.requestId); + } + + // Continue processing while client is disconnected + // Events are stored in eventStore and will be replayed on reconnect + await sleep(500); + await server.sendLoggingMessage( + { + level: 'info', + data: 'Progress: 75% - Almost done (sent while client disconnected)...' + }, + extra.sessionId + ); + + await sleep(500); + await server.sendLoggingMessage( + { + level: 'info', + data: 'Progress: 100% - Complete!' + }, + extra.sessionId + ); + + console.log(`[${extra.sessionId}] Task complete`); + + return { + content: [ + { + type: 'text', + text: 'Long task completed successfully!' + } + ] + }; + } +); + +// Set up Express app +const app = express(); +app.use(cors()); +app.use(express.json()); + +// Create event store for resumability +const eventStore = new InMemoryEventStore(); + +// Handle all MCP requests +app.all('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + // Reuse existing transport or create new one + let transport = sessionId ? transports.get(sessionId) : undefined; + + if (!transport) { + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore, + retryInterval: 2000, // Client should reconnect after 2 seconds + onsessioninitialized: id => { + console.log(`[${id}] Session initialized`); + transports.set(id, transport!); + } + }); + + // Connect the MCP server to the transport + await server.connect(transport); + } + + await transport.handleRequest(req, res); +}); + +// Start the server +const PORT = 3001; +app.listen(PORT, () => { + console.log(`SSE Polling Example Server running on http://localhost:${PORT}/mcp`); + console.log(''); + console.log('This server demonstrates SEP-1699 SSE polling:'); + console.log('- retryInterval: 2000ms (client waits 2s before reconnecting)'); + console.log('- eventStore: InMemoryEventStore (events are persisted for replay)'); + console.log(''); + console.log('Try calling the "long-task" tool to see server-initiated disconnect in action.'); +}); From 0ee7e2ed177df3c852c67adf2864a440fa48c7c9 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Fri, 21 Nov 2025 20:18:52 +0000 Subject: [PATCH 12/17] Add test for multiple notifications sent while client is disconnected --- src/server/streamableHttp.test.ts | 72 +++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 4dfb95ec3..37ccd63a4 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1431,6 +1431,78 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { expect(reconnectText).toContain('Second notification from MCP server'); expect(reconnectText).toContain('id: '); }); + + it('should store and replay multiple notifications sent while client is disconnected', async () => { + // Establish a standalone SSE stream + const sseResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + } + }); + expect(sseResponse.status).toBe(200); + + const reader = sseResponse.body?.getReader(); + + // Send a notification to get an event ID + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Initial notification' }); + + // Read the notification from the SSE stream + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + + // Extract the event ID + const idMatch = text.match(/id: ([^\n]+)/); + expect(idMatch).toBeTruthy(); + const lastEventId = idMatch![1]; + + // Close the SSE stream to simulate a disconnect + await reader!.cancel(); + + // Send MULTIPLE notifications while the client is disconnected + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Missed notification 1' }); + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Missed notification 2' }); + await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Missed notification 3' }); + + // Reconnect with the Last-Event-ID to get all missed messages + const reconnectResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26', + 'last-event-id': lastEventId + } + }); + + expect(reconnectResponse.status).toBe(200); + + // Read replayed notifications with a timeout + const reconnectReader = reconnectResponse.body?.getReader(); + let allText = ''; + + // Read chunks until we have all 3 notifications or timeout + const readWithTimeout = async () => { + const timeout = setTimeout(() => reconnectReader!.cancel(), 2000); + try { + while (!allText.includes('Missed notification 3')) { + const { value, done } = await reconnectReader!.read(); + if (done) break; + allText += new TextDecoder().decode(value); + } + } finally { + clearTimeout(timeout); + } + }; + await readWithTimeout(); + + // Verify we received ALL notifications that were sent while disconnected + expect(allText).toContain('Missed notification 1'); + expect(allText).toContain('Missed notification 2'); + expect(allText).toContain('Missed notification 3'); + }); }); // Test stateless mode From f4e736e0292a7f251813eb96d904162fcaa4266a Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Fri, 21 Nov 2025 20:20:29 +0000 Subject: [PATCH 13/17] Fix: pass pre-parsed body to handleRequest --- src/examples/server/ssePollingExample.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/examples/server/ssePollingExample.ts b/src/examples/server/ssePollingExample.ts index 59a3c6878..8bb8cfbc9 100644 --- a/src/examples/server/ssePollingExample.ts +++ b/src/examples/server/ssePollingExample.ts @@ -108,13 +108,12 @@ server.tool( // Set up Express app const app = express(); app.use(cors()); -app.use(express.json()); // Create event store for resumability const eventStore = new InMemoryEventStore(); -// Handle all MCP requests -app.all('/mcp', async (req: Request, res: Response) => { +// Handle all MCP requests - use express.json() only for this route +app.all('/mcp', express.json(), async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; // Reuse existing transport or create new one @@ -135,7 +134,7 @@ app.all('/mcp', async (req: Request, res: Response) => { await server.connect(transport); } - await transport.handleRequest(req, res); + await transport.handleRequest(req, res, req.body); }); // Start the server From 657c5ac29e5e2aebaaf75003ddd667a72e49ad76 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Sun, 23 Nov 2025 20:25:56 +0000 Subject: [PATCH 14/17] feat: enable POST stream auto-reconnection for SEP-1699 Per SEP-1699, clients should auto-reconnect via GET when server closes POST SSE streams mid-operation. This enables polling for long-running tool calls. Changes: - Enable isReconnectable=true for POST SSE streams in client - Add example client demonstrating SSE polling with server - Update test to expect GET reconnection after POST stream fails --- src/client/streamableHttp.test.ts | 25 +++++- src/client/streamableHttp.ts | 6 +- src/examples/client/ssePollingClient.ts | 106 ++++++++++++++++++++++++ 3 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 src/examples/client/ssePollingClient.ts diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index b59e57942..d30db3e2e 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -746,8 +746,10 @@ describe('StreamableHTTPClientTransport', () => { expect(fetchMock.mock.calls[1][1]?.method).toBe('GET'); }); - it('should NOT reconnect a POST-initiated stream that fails', async () => { + it('should reconnect a POST-initiated stream that fails (SEP-1699)', async () => { // ARRANGE + // Per SEP-1699, POST-initiated SSE streams should support reconnection + // so clients can poll for results after server-initiated disconnection transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { reconnectionOptions: { initialReconnectionDelay: 10, @@ -775,6 +777,19 @@ describe('StreamableHTTPClientTransport', () => { body: failingStream }); + // Mock the GET reconnection attempts - each graceful close triggers another reconnect + // until maxRetries is reached + fetchMock.mockResolvedValue({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: new ReadableStream({ + start(controller) { + controller.close(); // End the stream cleanly - triggers reconnect + } + }) + }); + // A dummy request message to trigger the `send` logic. const requestMessage: JSONRPCRequest = { jsonrpc: '2.0', @@ -787,7 +802,7 @@ describe('StreamableHTTPClientTransport', () => { await transport.start(); // Use the public `send` method to initiate a POST that gets a stream response. await transport.send(requestMessage); - await vi.advanceTimersByTimeAsync(20); // Advance time to check for reconnections + await vi.advanceTimersByTimeAsync(20); // Advance time to trigger reconnection // ASSERT expect(errorSpy).toHaveBeenCalledWith( @@ -795,9 +810,11 @@ describe('StreamableHTTPClientTransport', () => { message: expect.stringContaining('SSE stream disconnected: Error: Network failure') }) ); - // THE KEY ASSERTION: Fetch was only called ONCE. No reconnection was attempted. - expect(fetchMock).toHaveBeenCalledTimes(1); + // THE KEY ASSERTION: With SEP-1699, POST streams now reconnect via GET + // First call is POST, subsequent calls are GET reconnection attempts + expect(fetchMock.mock.calls.length).toBeGreaterThanOrEqual(2); expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); + expect(fetchMock.mock.calls[1][1]?.method).toBe('GET'); // Reconnection via GET }); }); diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index ca9362002..2dc0640fe 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -546,9 +546,9 @@ export class StreamableHTTPClientTransport implements Transport { if (hasRequests) { if (contentType?.includes('text/event-stream')) { // Handle SSE stream responses for requests - // We use the same handler as standalone streams, which now supports - // reconnection with the last event ID - this._handleSseStream(response.body, { onresumptiontoken }, false); + // Enable auto-reconnection for POST SSE streams per SEP-1699: + // Server may close connection and client should poll via GET with Last-Event-ID + this._handleSseStream(response.body, { onresumptiontoken }, true); } else if (contentType?.includes('application/json')) { // For non-streaming servers, we might get direct JSON responses const data = await response.json(); diff --git a/src/examples/client/ssePollingClient.ts b/src/examples/client/ssePollingClient.ts new file mode 100644 index 000000000..ac7bba37d --- /dev/null +++ b/src/examples/client/ssePollingClient.ts @@ -0,0 +1,106 @@ +/** + * SSE Polling Example Client (SEP-1699) + * + * This example demonstrates client-side behavior during server-initiated + * SSE stream disconnection and automatic reconnection. + * + * Key features demonstrated: + * - Automatic reconnection when server closes SSE stream + * - Event replay via Last-Event-ID header + * - Resumption token tracking via onresumptiontoken callback + * + * Run with: npx tsx src/examples/client/ssePollingClient.ts + * Requires: ssePollingExample.ts server running on port 3001 + */ +import { Client } from '../../client/index.js'; +import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js'; +import { CallToolResultSchema, LoggingMessageNotificationSchema } from '../../types.js'; + +const SERVER_URL = 'http://localhost:3001/mcp'; + +async function main(): Promise { + console.log('SSE Polling Example Client'); + console.log('=========================='); + console.log(`Connecting to ${SERVER_URL}...`); + console.log(''); + + // Create transport with reconnection options + const transport = new StreamableHTTPClientTransport(new URL(SERVER_URL), { + // Use default reconnection options - SDK handles automatic reconnection + }); + + // Track the last event ID for debugging + let lastEventId: string | undefined; + + // Set up transport error handler to observe disconnections + // Filter out expected errors from SSE reconnection + transport.onerror = error => { + // Skip abort errors during intentional close + if (error.message.includes('AbortError')) return; + // Show SSE disconnect (expected when server closes stream) + if (error.message.includes('Unexpected end of JSON')) { + console.log('[Transport] SSE stream disconnected - client will auto-reconnect'); + return; + } + console.log(`[Transport] Error: ${error.message}`); + }; + + // Set up transport close handler + transport.onclose = () => { + console.log('[Transport] Connection closed'); + }; + + // Create and connect client + const client = new Client({ + name: 'sse-polling-client', + version: '1.0.0' + }); + + // Set up notification handler to receive progress updates + client.setNotificationHandler(LoggingMessageNotificationSchema, notification => { + const data = notification.params.data; + console.log(`[Notification] ${data}`); + }); + + try { + await client.connect(transport); + console.log('[Client] Connected successfully'); + console.log(''); + + // Call the long-task tool + console.log('[Client] Calling long-task tool...'); + console.log('[Client] Server will disconnect mid-task to demonstrate polling'); + console.log(''); + + const result = await client.request( + { + method: 'tools/call', + params: { + name: 'long-task', + arguments: {} + } + }, + CallToolResultSchema, + { + // Track resumption tokens for debugging + onresumptiontoken: token => { + lastEventId = token; + console.log(`[Event ID] ${token}`); + } + } + ); + + console.log(''); + console.log('[Client] Tool completed!'); + console.log(`[Result] ${JSON.stringify(result.content, null, 2)}`); + console.log(''); + console.log(`[Debug] Final event ID: ${lastEventId}`); + } catch (error) { + console.error('[Error]', error); + } finally { + await transport.close(); + console.log('[Client] Disconnected'); + } +} + +main().catch(console.error); From 2a0fcadb0955331ad31a380b96b8df91f02b57d3 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Sun, 23 Nov 2025 20:41:06 +0000 Subject: [PATCH 15/17] revert: remove aggressive POST stream auto-reconnection The isReconnectable=true change was too aggressive - per SEP-1699, reconnection should only happen after server sends a priming event with an event ID, not on all POST stream failures. Keep the example client for now; proper reconnection logic TBD. --- src/client/streamableHttp.test.ts | 25 ++++--------------------- src/client/streamableHttp.ts | 6 +++--- 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index d30db3e2e..b59e57942 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -746,10 +746,8 @@ describe('StreamableHTTPClientTransport', () => { expect(fetchMock.mock.calls[1][1]?.method).toBe('GET'); }); - it('should reconnect a POST-initiated stream that fails (SEP-1699)', async () => { + it('should NOT reconnect a POST-initiated stream that fails', async () => { // ARRANGE - // Per SEP-1699, POST-initiated SSE streams should support reconnection - // so clients can poll for results after server-initiated disconnection transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { reconnectionOptions: { initialReconnectionDelay: 10, @@ -777,19 +775,6 @@ describe('StreamableHTTPClientTransport', () => { body: failingStream }); - // Mock the GET reconnection attempts - each graceful close triggers another reconnect - // until maxRetries is reached - fetchMock.mockResolvedValue({ - ok: true, - status: 200, - headers: new Headers({ 'content-type': 'text/event-stream' }), - body: new ReadableStream({ - start(controller) { - controller.close(); // End the stream cleanly - triggers reconnect - } - }) - }); - // A dummy request message to trigger the `send` logic. const requestMessage: JSONRPCRequest = { jsonrpc: '2.0', @@ -802,7 +787,7 @@ describe('StreamableHTTPClientTransport', () => { await transport.start(); // Use the public `send` method to initiate a POST that gets a stream response. await transport.send(requestMessage); - await vi.advanceTimersByTimeAsync(20); // Advance time to trigger reconnection + await vi.advanceTimersByTimeAsync(20); // Advance time to check for reconnections // ASSERT expect(errorSpy).toHaveBeenCalledWith( @@ -810,11 +795,9 @@ describe('StreamableHTTPClientTransport', () => { message: expect.stringContaining('SSE stream disconnected: Error: Network failure') }) ); - // THE KEY ASSERTION: With SEP-1699, POST streams now reconnect via GET - // First call is POST, subsequent calls are GET reconnection attempts - expect(fetchMock.mock.calls.length).toBeGreaterThanOrEqual(2); + // THE KEY ASSERTION: Fetch was only called ONCE. No reconnection was attempted. + expect(fetchMock).toHaveBeenCalledTimes(1); expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); - expect(fetchMock.mock.calls[1][1]?.method).toBe('GET'); // Reconnection via GET }); }); diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 2dc0640fe..ca9362002 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -546,9 +546,9 @@ export class StreamableHTTPClientTransport implements Transport { if (hasRequests) { if (contentType?.includes('text/event-stream')) { // Handle SSE stream responses for requests - // Enable auto-reconnection for POST SSE streams per SEP-1699: - // Server may close connection and client should poll via GET with Last-Event-ID - this._handleSseStream(response.body, { onresumptiontoken }, true); + // We use the same handler as standalone streams, which now supports + // reconnection with the last event ID + this._handleSseStream(response.body, { onresumptiontoken }, false); } else if (contentType?.includes('application/json')) { // For non-streaming servers, we might get direct JSON responses const data = await response.json(); From b6a09f15fd33a2e236a1df45837760a5b4168e92 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Sun, 23 Nov 2025 20:59:19 +0000 Subject: [PATCH 16/17] feat: enable POST stream reconnection after receiving priming event Per SEP-1699, servers may close SSE streams after sending a priming event with an event ID. This change enables automatic reconnection for POST-initiated streams once they've received at least one event with an ID. The change introduces `hasPrimingEvent` tracking in `_handleSseStream`: - GET streams remain always reconnectable (isReconnectable=true) - POST streams become reconnectable after receiving an event with ID - Reconnection uses `canResume = isReconnectable || hasPrimingEvent` This enables the SSE polling example client to work correctly with server-initiated disconnection and automatic client reconnection. --- src/client/streamableHttp.test.ts | 64 +++++++++++++++++++++++++++++++ src/client/streamableHttp.ts | 13 ++++++- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index b59e57942..2799aa67e 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -799,6 +799,70 @@ describe('StreamableHTTPClientTransport', () => { expect(fetchMock).toHaveBeenCalledTimes(1); expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); }); + + it('should reconnect a POST-initiated stream after receiving a priming event', async () => { + // ARRANGE + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 10, + maxRetries: 1, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + + const errorSpy = vi.fn(); + transport.onerror = errorSpy; + + // Create a stream that sends a priming event (with ID) then closes + const streamWithPrimingEvent = new ReadableStream({ + start(controller) { + // Send a priming event with an ID - this enables reconnection + controller.enqueue( + new TextEncoder().encode('id: event-123\ndata: {"jsonrpc":"2.0","method":"notifications/message","params":{}}\n\n') + ); + // Then close the stream (simulating server disconnect) + controller.close(); + } + }); + + const fetchMock = global.fetch as Mock; + // First call: POST returns streaming response with priming event + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: streamWithPrimingEvent + }); + // Second call: GET reconnection - return 405 to stop further reconnection + fetchMock.mockResolvedValueOnce({ + ok: false, + status: 405, + headers: new Headers() + }); + + const requestMessage: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'long_running_tool', + id: 'request-1', + params: {} + }; + + // ACT + await transport.start(); + await transport.send(requestMessage); + // Wait for stream to process and reconnection to be scheduled + await vi.advanceTimersByTimeAsync(50); + + // ASSERT + // THE KEY ASSERTION: Fetch was called TWICE - POST then GET reconnection + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); + expect(fetchMock.mock.calls[1][1]?.method).toBe('GET'); + // Verify Last-Event-ID header was sent for reconnection + const reconnectHeaders = fetchMock.mock.calls[1][1]?.headers as Headers; + expect(reconnectHeaders.get('last-event-id')).toBe('event-123'); + }); }); it('invalidates all credentials on InvalidClientError during auth', async () => { diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index ca9362002..f03ea669c 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -302,6 +302,9 @@ export class StreamableHTTPClientTransport implements Transport { const { onresumptiontoken, replayMessageId } = options; let lastEventId: string | undefined; + // Track whether we've received a priming event (event with ID) + // Per spec, server SHOULD send a priming event with ID before closing + let hasPrimingEvent = false; const processStream = async () => { // this is the closest we can get to trying to catch network errors // if something happens reader will throw @@ -328,6 +331,8 @@ export class StreamableHTTPClientTransport implements Transport { // Update last event ID if provided if (event.id) { lastEventId = event.id; + // Mark that we've received a priming event - stream is now resumable + hasPrimingEvent = true; onresumptiontoken?.(event.id); } @@ -346,7 +351,9 @@ export class StreamableHTTPClientTransport implements Transport { // Handle graceful server-side disconnect // Server may close connection after sending event ID and retry field - if (isReconnectable && this._abortController && !this._abortController.signal.aborted) { + // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) + const canResume = isReconnectable || hasPrimingEvent; + if (canResume && this._abortController && !this._abortController.signal.aborted) { this._scheduleReconnection( { resumptionToken: lastEventId, @@ -361,7 +368,9 @@ export class StreamableHTTPClientTransport implements Transport { this.onerror?.(new Error(`SSE stream disconnected: ${error}`)); // Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing - if (isReconnectable && this._abortController && !this._abortController.signal.aborted) { + // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) + const canResume = isReconnectable || hasPrimingEvent; + if (canResume && this._abortController && !this._abortController.signal.aborted) { // Use the exponential backoff reconnection strategy try { this._scheduleReconnection( From 6335cf43353e4d2e14ad04f1c2f7adb690f6fa87 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Sun, 23 Nov 2025 21:16:25 +0000 Subject: [PATCH 17/17] chore: restore original getReader() line placement in tests --- src/server/streamableHttp.test.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 37ccd63a4..80ee04d67 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1341,8 +1341,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { expect(sseResponse.status).toBe(200); expect(sseResponse.headers.get('content-type')).toBe('text/event-stream'); - const reader = sseResponse.body?.getReader(); - // Send a notification that should be stored with an event ID const notification: JSONRPCMessage = { jsonrpc: '2.0', @@ -1354,6 +1352,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { await transport.send(notification); // Read from the stream and verify we got the notification with an event ID + const reader = sseResponse.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value); @@ -1385,12 +1384,11 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { }); expect(sseResponse.status).toBe(200); - const reader = sseResponse.body?.getReader(); - // Send a server notification through the MCP server await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'First notification from MCP server' }); // Read the notification from the SSE stream + const reader = sseResponse.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value);