Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 271 additions & 9 deletions src/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
expect(text).not.toContain('retry:');
});

it('should close POST SSE stream when closeSseStream is called', async () => {
it('should close POST SSE stream when extra.closeSSEStream is called', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
Expand All @@ -1749,15 +1749,21 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Track tool execution state
// Track when stream close is called and tool completes
let streamCloseCalled = false;
let toolResolve: () => void;
const toolPromise = new Promise<void>(resolve => {
const toolCompletePromise = new Promise<void>(resolve => {
toolResolve = resolve;
});

// Register a blocking tool
mcpServer.tool('blocking-tool', 'A blocking tool', {}, async () => {
await toolPromise;
// Register a tool that closes its own SSE stream via extra callback
mcpServer.tool('close-stream-tool', 'Closes its own stream', {}, async (_args, extra) => {
// Close the SSE stream for this request
extra.closeSSEStream?.();
streamCloseCalled = true;

// Wait before returning so we can observe the stream closure
await toolCompletePromise;
return { content: [{ type: 'text', text: 'Done' }] };
});

Expand All @@ -1771,7 +1777,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
jsonrpc: '2.0',
id: 100,
method: 'tools/call',
params: { name: 'blocking-tool', arguments: {} }
params: { name: 'close-stream-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
Expand All @@ -1792,8 +1798,9 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
// Read the priming event
await reader!.read();

// Close the SSE stream
transport.closeSSEStream(100);
// Wait a moment for the tool to call closeSSEStream
await new Promise(resolve => setTimeout(resolve, 100));
expect(streamCloseCalled).toBe(true);

// Stream should now be closed
const { done } = await reader!.read();
Expand Down Expand Up @@ -1916,6 +1923,261 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
// Verify closeSSEStream callback was NOT provided
expect(receivedCloseSSEStream).toBeUndefined();
});

it('should provide closeStandaloneSSEStream callback in extra when eventStore is configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Track whether closeStandaloneSSEStream callback was provided
let receivedCloseStandaloneSSEStream: (() => void) | undefined;

// Register a tool that captures the extra.closeStandaloneSSEStream callback
mcpServer.tool('test-standalone-callback-tool', 'Test tool', {}, async (_args, extra) => {
receivedCloseStandaloneSSEStream = extra.closeStandaloneSSEStream;
return { content: [{ type: 'text', text: 'Done' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Call the tool
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 203,
method: 'tools/call',
params: { name: 'test-standalone-callback-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

// Read all events to completion
const reader = postResponse.body?.getReader();
while (true) {
const { done } = await reader!.read();
if (done) break;
}

// Verify closeStandaloneSSEStream callback was provided
expect(receivedCloseStandaloneSSEStream).toBeDefined();
expect(typeof receivedCloseStandaloneSSEStream).toBe('function');
});

it('should close standalone GET SSE stream when extra.closeStandaloneSSEStream is called', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Register a tool that closes the standalone SSE stream via extra callback
mcpServer.tool('close-standalone-stream-tool', 'Closes standalone stream', {}, async (_args, extra) => {
extra.closeStandaloneSSEStream?.();
return { content: [{ type: 'text', text: 'Stream closed' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Open a standalone GET SSE stream
const sseResponse = await fetch(baseUrl, {
method: 'GET',
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
}
});
expect(sseResponse.status).toBe(200);

const getReader = sseResponse.body?.getReader();

// Send a notification to confirm GET stream is established
await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Stream established' });

// Read the notification to confirm stream is working
const { value } = await getReader!.read();
const text = new TextDecoder().decode(value);
expect(text).toContain('id: ');
expect(text).toContain('Stream established');

// Call the tool that closes the standalone SSE stream
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 300,
method: 'tools/call',
params: { name: 'close-standalone-stream-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
},
body: JSON.stringify(toolCallRequest)
});
expect(postResponse.status).toBe(200);

// Read the POST response to completion
const postReader = postResponse.body?.getReader();
while (true) {
const { done } = await postReader!.read();
if (done) break;
}

// GET stream should now be closed - use a race with timeout to avoid hanging
const readPromise = getReader!.read();
const timeoutPromise = new Promise<{ done: boolean; value: undefined }>((_, reject) =>
setTimeout(() => reject(new Error('Stream did not close in time')), 1000)
);

const { done } = await Promise.race([readPromise, timeoutPromise]);
expect(done).toBe(true);
});

it('should allow client to reconnect after standalone SSE stream is closed via extra.closeStandaloneSSEStream', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Register a tool that closes the standalone SSE stream
mcpServer.tool('close-standalone-for-reconnect', 'Closes standalone stream', {}, async (_args, extra) => {
extra.closeStandaloneSSEStream?.();
return { content: [{ type: 'text', text: 'Stream closed' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Open a standalone GET SSE stream
const sseResponse = await fetch(baseUrl, {
method: 'GET',
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
}
});
expect(sseResponse.status).toBe(200);

const getReader = sseResponse.body?.getReader();

// Send a notification to get an event ID
await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Initial message' });

// Read the notification to get the event ID
const { value } = await getReader!.read();
const text = new TextDecoder().decode(value);
const idMatch = text.match(/id: ([^\n]+)/);
expect(idMatch).toBeTruthy();
const lastEventId = idMatch![1];

// Call the tool to close the standalone SSE stream
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 301,
method: 'tools/call',
params: { name: 'close-standalone-for-reconnect', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
},
body: JSON.stringify(toolCallRequest)
});
expect(postResponse.status).toBe(200);

// Read the POST response to completion
const postReader = postResponse.body?.getReader();
while (true) {
const { done } = await postReader!.read();
if (done) break;
}

// Wait for GET stream to close - use a race with timeout
const readPromise = getReader!.read();
const timeoutPromise = new Promise<{ done: boolean; value: undefined }>((_, reject) =>
setTimeout(() => reject(new Error('Stream did not close in time')), 1000)
);
const { done } = await Promise.race([readPromise, timeoutPromise]);
expect(done).toBe(true);

// Send a notification while client is disconnected
await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Missed while disconnected' });

// Client reconnects with Last-Event-ID
const reconnectResponse = await fetch(baseUrl, {
method: 'GET',
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26',
'last-event-id': lastEventId
}
});
expect(reconnectResponse.status).toBe(200);

// Read the replayed notification
const reconnectReader = reconnectResponse.body?.getReader();
let allText = '';
const readWithTimeout = async () => {
const timeout = setTimeout(() => reconnectReader!.cancel(), 2000);
try {
while (!allText.includes('Missed while disconnected')) {
const { value, done } = await reconnectReader!.read();
if (done) break;
allText += new TextDecoder().decode(value);
}
} finally {
clearTimeout(timeout);
}
};
await readWithTimeout();

// Verify we received the notification that was sent while disconnected
expect(allText).toContain('Missed while disconnected');
});
});

// Test onsessionclosed callback
Expand Down
30 changes: 24 additions & 6 deletions src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -651,13 +651,17 @@ export class StreamableHTTPServerTransport implements Transport {
for (const message of messages) {
// Build closeSSEStream callback for requests when eventStore is configured
let closeSSEStream: (() => void) | undefined;
let closeStandaloneSSEStream: (() => void) | undefined;
if (isJSONRPCRequest(message) && this._eventStore) {
closeSSEStream = () => {
this.closeSSEStream(message.id);
};
closeStandaloneSSEStream = () => {
this.closeStandaloneSSEStream();
};
}

this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream });
this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream, closeStandaloneSSEStream });
}
// The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses
// This will be handled by the send() method when responses are ready
Expand Down Expand Up @@ -814,6 +818,18 @@ export class StreamableHTTPServerTransport implements Transport {
}
}

/**
* Close the standalone GET SSE stream, triggering client reconnection.
* Use this to implement polling behavior for server-initiated notifications.
*/
closeStandaloneSSEStream(): void {
const stream = this._streamMapping.get(this._standaloneSseStreamId);
if (stream) {
stream.end();
this._streamMapping.delete(this._standaloneSseStreamId);
}
}

async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
let requestId = options?.relatedRequestId;
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
Expand All @@ -829,19 +845,21 @@ export class StreamableHTTPServerTransport implements Transport {
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request');
}
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
if (standaloneSse === undefined) {
// The spec says the server MAY send messages on the stream, so it's ok to discard if no stream
return;
}

// Generate and store event ID if event store is provided
// Store even if stream is disconnected so events can be replayed on reconnect
let eventId: string | undefined;
if (this._eventStore) {
// Stores the event and gets the generated event ID
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
}

const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
if (standaloneSse === undefined) {
// Stream is disconnected - event is stored for replay, nothing more to do
return;
}

// Send the message to the standalone SSE stream
this.writeSSEEvent(standaloneSse, message, eventId);
return;
Expand Down
Loading
Loading