From 63f7f2e27b8fd340588a5e6619e0c8eabf51b00e Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 4 Dec 2025 11:54:20 +0000 Subject: [PATCH] fix: skip priming events for clients with old protocol versions Priming events (SEP-1699) have empty SSE data which older clients cannot handle - they try to JSON.parse("") and crash. Only send priming events to clients with protocol version >= 2025-11-25, which includes the fix for handling empty SSE data. For the initialize request, the protocol version is extracted from the request params. For subsequent requests, it's taken from the mcp-protocol-version header. --- src/server/streamableHttp.test.ts | 132 +++++++++++++++++++++++++++--- src/server/streamableHttp.ts | 28 ++++++- 2 files changed, 146 insertions(+), 14 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 9cdefe090..be7c36cff 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1671,7 +1671,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { 'Content-Type': 'application/json', Accept: 'text/event-stream, application/json', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' }, body: JSON.stringify(toolCallRequest) }); @@ -1690,6 +1690,57 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { expect(text).toContain('data: '); }); + it('should NOT send priming event for old protocol versions (backwards compatibility)', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 5000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Send a tool call request with OLD protocol version + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 100, + method: 'tools/call', + params: { name: 'greet', arguments: { name: 'Test' } } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-06-18' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + expect(postResponse.headers.get('content-type')).toBe('text/event-stream'); + + // Read the first chunk - should be the actual response, not a priming event + const reader = postResponse.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + + // Should NOT contain a priming event (empty data line before the response) + // The first message should be the actual tool result + expect(text).toContain('event: message'); + expect(text).toContain('"result"'); + // Should NOT have a separate priming event line with empty data + expect(text).not.toMatch(/^id:.*\ndata:\s*\n\n/); + }); + it('should send priming event without retry field when retryInterval is not configured', async () => { const result = await createTestServer({ sessionIdGenerator: () => randomUUID(), @@ -1720,7 +1771,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { 'Content-Type': 'application/json', Accept: 'text/event-stream, application/json', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' }, body: JSON.stringify(toolCallRequest) }); @@ -1786,7 +1837,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { 'Content-Type': 'application/json', Accept: 'text/event-stream, application/json', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' }, body: JSON.stringify(toolCallRequest) }); @@ -1849,7 +1900,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { 'Content-Type': 'application/json', Accept: 'text/event-stream, application/json', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' }, body: JSON.stringify(toolCallRequest) }); @@ -1868,6 +1919,67 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { expect(typeof receivedCloseSSEStream).toBe('function'); }); + it('should NOT provide closeSSEStream callback for old protocol versions (backwards compatibility)', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track whether closeSSEStream callback was provided + let receivedCloseSSEStream: (() => void) | undefined; + let receivedCloseStandaloneSSEStream: (() => void) | undefined; + + // Register a tool that captures the extra.closeSSEStream callback + mcpServer.tool('test-old-version-tool', 'Test tool', {}, async (_args, extra) => { + receivedCloseSSEStream = extra.closeSSEStream; + receivedCloseStandaloneSSEStream = extra.closeStandaloneSSEStream; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Call the tool with OLD protocol version + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 200, + method: 'tools/call', + params: { name: 'test-old-version-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-06-18' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Read all events to completion + const reader = postResponse.body?.getReader(); + while (true) { + const { done } = await reader!.read(); + if (done) break; + } + + // Verify closeSSEStream callbacks were NOT provided for old protocol version + // even though eventStore is configured + expect(receivedCloseSSEStream).toBeUndefined(); + expect(receivedCloseStandaloneSSEStream).toBeUndefined(); + }); + it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => { const result = await createTestServer({ sessionIdGenerator: () => randomUUID() @@ -1963,7 +2075,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { 'Content-Type': 'application/json', Accept: 'text/event-stream, application/json', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' }, body: JSON.stringify(toolCallRequest) }); @@ -2010,7 +2122,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { headers: { Accept: 'text/event-stream', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' } }); expect(sseResponse.status).toBe(200); @@ -2040,7 +2152,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { 'Content-Type': 'application/json', Accept: 'text/event-stream, application/json', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' }, body: JSON.stringify(toolCallRequest) }); @@ -2091,7 +2203,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { headers: { Accept: 'text/event-stream', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' } }); expect(sseResponse.status).toBe(200); @@ -2122,7 +2234,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { 'Content-Type': 'application/json', Accept: 'text/event-stream, application/json', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26' + 'mcp-protocol-version': '2025-11-25' }, body: JSON.stringify(toolCallRequest) }); @@ -2152,7 +2264,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { headers: { Accept: 'text/event-stream', 'mcp-session-id': sessionId, - 'mcp-protocol-version': '2025-03-26', + 'mcp-protocol-version': '2025-11-25', 'last-event-id': lastEventId } }); diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 658592c19..35e7f64e7 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -276,13 +276,21 @@ export class StreamableHTTPServerTransport implements Transport { /** * Writes a priming event to establish resumption capability. - * Only sends if eventStore is configured (opt-in for resumability). + * Only sends if eventStore is configured (opt-in for resumability) and + * the client's protocol version supports empty SSE data (>= 2025-11-25). */ - private async _maybeWritePrimingEvent(res: ServerResponse, streamId: string): Promise { + private async _maybeWritePrimingEvent(res: ServerResponse, streamId: string, protocolVersion: string): Promise { if (!this._eventStore) { return; } + // Priming events have empty data which older clients cannot handle. + // Only send priming events to clients with protocol version >= 2025-11-25 + // which includes the fix for handling empty SSE data. + if (protocolVersion < '2025-11-25') { + return; + } + const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage); let primingEvent = `id: ${primingEventId}\ndata: \n\n`; @@ -619,6 +627,15 @@ export class StreamableHTTPServerTransport implements Transport { // The default behavior is to use SSE streaming // but in some cases server will return JSON responses const streamId = randomUUID(); + + // Extract protocol version for priming event decision. + // For initialize requests, get from request params. + // For other requests, get from header (already validated). + const initRequest = messages.find(m => isInitializeRequest(m)); + const clientProtocolVersion = initRequest + ? initRequest.params.protocolVersion + : ((req.headers['mcp-protocol-version'] as string) ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION); + if (!this._enableJsonResponse) { const headers: Record = { 'Content-Type': 'text/event-stream', @@ -633,7 +650,7 @@ export class StreamableHTTPServerTransport implements Transport { res.writeHead(200, headers); - await this._maybeWritePrimingEvent(res, streamId); + await this._maybeWritePrimingEvent(res, streamId, clientProtocolVersion); } // Store the response for this request to send messages back through this connection // We need to track by request ID to maintain the connection @@ -656,9 +673,12 @@ export class StreamableHTTPServerTransport implements Transport { // handle each message for (const message of messages) { // Build closeSSEStream callback for requests when eventStore is configured + // AND client supports resumability (protocol version >= 2025-11-25). + // Old clients can't resume if the stream is closed early because they + // didn't receive a priming event with an event ID. let closeSSEStream: (() => void) | undefined; let closeStandaloneSSEStream: (() => void) | undefined; - if (isJSONRPCRequest(message) && this._eventStore) { + if (isJSONRPCRequest(message) && this._eventStore && clientProtocolVersion >= '2025-11-25') { closeSSEStream = () => { this.closeSSEStream(message.id); };