Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 122 additions & 10 deletions src/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});
Expand All @@ -1690,6 +1690,57 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
expect(text).toContain('data: ');
});

it('should NOT send priming event for old protocol versions (backwards compatibility)', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 5000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Send a tool call request with OLD protocol version
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 100,
method: 'tools/call',
params: { name: 'greet', arguments: { name: 'Test' } }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-06-18'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);
expect(postResponse.headers.get('content-type')).toBe('text/event-stream');

// Read the first chunk - should be the actual response, not a priming event
const reader = postResponse.body?.getReader();
const { value } = await reader!.read();
const text = new TextDecoder().decode(value);

// Should NOT contain a priming event (empty data line before the response)
// The first message should be the actual tool result
expect(text).toContain('event: message');
expect(text).toContain('"result"');
// Should NOT have a separate priming event line with empty data
expect(text).not.toMatch(/^id:.*\ndata:\s*\n\n/);
});

it('should send priming event without retry field when retryInterval is not configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
Expand Down Expand Up @@ -1720,7 +1771,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});
Expand Down Expand Up @@ -1786,7 +1837,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});
Expand Down Expand Up @@ -1849,7 +1900,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});
Expand All @@ -1868,6 +1919,67 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
expect(typeof receivedCloseSSEStream).toBe('function');
});

it('should NOT provide closeSSEStream callback for old protocol versions (backwards compatibility)', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Track whether closeSSEStream callback was provided
let receivedCloseSSEStream: (() => void) | undefined;
let receivedCloseStandaloneSSEStream: (() => void) | undefined;

// Register a tool that captures the extra.closeSSEStream callback
mcpServer.tool('test-old-version-tool', 'Test tool', {}, async (_args, extra) => {
receivedCloseSSEStream = extra.closeSSEStream;
receivedCloseStandaloneSSEStream = extra.closeStandaloneSSEStream;
return { content: [{ type: 'text', text: 'Done' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Call the tool with OLD protocol version
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 200,
method: 'tools/call',
params: { name: 'test-old-version-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-06-18'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

// Read all events to completion
const reader = postResponse.body?.getReader();
while (true) {
const { done } = await reader!.read();
if (done) break;
}

// Verify closeSSEStream callbacks were NOT provided for old protocol version
// even though eventStore is configured
expect(receivedCloseSSEStream).toBeUndefined();
expect(receivedCloseStandaloneSSEStream).toBeUndefined();
});

it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID()
Expand Down Expand Up @@ -1963,7 +2075,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});
Expand Down Expand Up @@ -2010,7 +2122,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
}
});
expect(sseResponse.status).toBe(200);
Expand Down Expand Up @@ -2040,7 +2152,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});
Expand Down Expand Up @@ -2091,7 +2203,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
}
});
expect(sseResponse.status).toBe(200);
Expand Down Expand Up @@ -2122,7 +2234,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});
Expand Down Expand Up @@ -2152,7 +2264,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26',
'mcp-protocol-version': '2025-11-25',
'last-event-id': lastEventId
}
});
Expand Down
28 changes: 24 additions & 4 deletions src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,21 @@ export class StreamableHTTPServerTransport implements Transport {

/**
* Writes a priming event to establish resumption capability.
* Only sends if eventStore is configured (opt-in for resumability).
* Only sends if eventStore is configured (opt-in for resumability) and
* the client's protocol version supports empty SSE data (>= 2025-11-25).
*/
private async _maybeWritePrimingEvent(res: ServerResponse, streamId: string): Promise<void> {
private async _maybeWritePrimingEvent(res: ServerResponse, streamId: string, protocolVersion: string): Promise<void> {
if (!this._eventStore) {
return;
}

// Priming events have empty data which older clients cannot handle.
// Only send priming events to clients with protocol version >= 2025-11-25
// which includes the fix for handling empty SSE data.
if (protocolVersion < '2025-11-25') {
return;
}

const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage);

let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
Expand Down Expand Up @@ -619,6 +627,15 @@ export class StreamableHTTPServerTransport implements Transport {
// The default behavior is to use SSE streaming
// but in some cases server will return JSON responses
const streamId = randomUUID();

// Extract protocol version for priming event decision.
// For initialize requests, get from request params.
// For other requests, get from header (already validated).
const initRequest = messages.find(m => isInitializeRequest(m));
const clientProtocolVersion = initRequest
? initRequest.params.protocolVersion
: ((req.headers['mcp-protocol-version'] as string) ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION);

if (!this._enableJsonResponse) {
const headers: Record<string, string> = {
'Content-Type': 'text/event-stream',
Expand All @@ -633,7 +650,7 @@ export class StreamableHTTPServerTransport implements Transport {

res.writeHead(200, headers);

await this._maybeWritePrimingEvent(res, streamId);
await this._maybeWritePrimingEvent(res, streamId, clientProtocolVersion);
}
// Store the response for this request to send messages back through this connection
// We need to track by request ID to maintain the connection
Expand All @@ -656,9 +673,12 @@ export class StreamableHTTPServerTransport implements Transport {
// handle each message
for (const message of messages) {
// Build closeSSEStream callback for requests when eventStore is configured
// AND client supports resumability (protocol version >= 2025-11-25).
// Old clients can't resume if the stream is closed early because they
// didn't receive a priming event with an event ID.
let closeSSEStream: (() => void) | undefined;
let closeStandaloneSSEStream: (() => void) | undefined;
if (isJSONRPCRequest(message) && this._eventStore) {
if (isJSONRPCRequest(message) && this._eventStore && clientProtocolVersion >= '2025-11-25') {
closeSSEStream = () => {
this.closeSSEStream(message.id);
};
Expand Down
Loading