Skip to content

Commit 89ff338

Browse files
fix: skip priming events and close_sse_stream for old protocol versions (#1719)
1 parent 9ed0b93 commit 89ff338

File tree

2 files changed

+174
-19
lines changed

2 files changed

+174
-19
lines changed

src/mcp/server/streamable_http.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -238,30 +238,50 @@ def _create_session_message( # pragma: no cover
238238
message: JSONRPCMessage,
239239
request: Request,
240240
request_id: RequestId,
241+
protocol_version: str,
241242
) -> SessionMessage:
242-
"""Create a session message with metadata including close_sse_stream callback."""
243+
"""Create a session message with metadata including close_sse_stream callback.
243244
244-
async def close_stream_callback() -> None:
245-
self.close_sse_stream(request_id)
245+
The close_sse_stream callbacks are only provided when the client supports
246+
resumability (protocol version >= 2025-11-25). Old clients can't resume if
247+
the stream is closed early because they didn't receive a priming event.
248+
"""
249+
# Only provide close callbacks when client supports resumability
250+
if self._event_store and protocol_version >= "2025-11-25":
246251

247-
async def close_standalone_stream_callback() -> None:
248-
self.close_standalone_sse_stream()
252+
async def close_stream_callback() -> None:
253+
self.close_sse_stream(request_id)
254+
255+
async def close_standalone_stream_callback() -> None:
256+
self.close_standalone_sse_stream()
257+
258+
metadata = ServerMessageMetadata(
259+
request_context=request,
260+
close_sse_stream=close_stream_callback,
261+
close_standalone_sse_stream=close_standalone_stream_callback,
262+
)
263+
else:
264+
metadata = ServerMessageMetadata(request_context=request)
249265

250-
metadata = ServerMessageMetadata(
251-
request_context=request,
252-
close_sse_stream=close_stream_callback,
253-
close_standalone_sse_stream=close_standalone_stream_callback,
254-
)
255266
return SessionMessage(message, metadata=metadata)
256267

257-
async def _send_priming_event( # pragma: no cover
268+
async def _maybe_send_priming_event(
258269
self,
259270
request_id: RequestId,
260271
sse_stream_writer: MemoryObjectSendStream[dict[str, Any]],
272+
protocol_version: str,
261273
) -> None:
262-
"""Send priming event for SSE resumability if event_store is configured."""
274+
"""Send priming event for SSE resumability if event_store is configured.
275+
276+
Only sends priming events to clients with protocol version >= 2025-11-25,
277+
which includes the fix for handling empty SSE data. Older clients would
278+
crash trying to parse empty data as JSON.
279+
"""
263280
if not self._event_store:
264281
return
282+
# Priming events have empty data which older clients cannot handle.
283+
if protocol_version < "2025-11-25":
284+
return
265285
priming_event_id = await self._event_store.store_event(
266286
str(request_id), # Convert RequestId to StreamId (str)
267287
None, # Priming event has no payload
@@ -499,6 +519,15 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
499519

500520
return
501521

522+
# Extract protocol version for priming event decision.
523+
# For initialize requests, get from request params.
524+
# For other requests, get from header (already validated).
525+
protocol_version = (
526+
str(message.root.params.get("protocolVersion", DEFAULT_NEGOTIATED_VERSION))
527+
if is_initialization_request and message.root.params
528+
else request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)
529+
)
530+
502531
# Extract the request ID outside the try block for proper scope
503532
request_id = str(message.root.id) # pragma: no cover
504533
# Register this stream for the request ID
@@ -560,7 +589,7 @@ async def sse_writer():
560589
try:
561590
async with sse_stream_writer, request_stream_reader:
562591
# Send priming event for SSE resumability
563-
await self._send_priming_event(request_id, sse_stream_writer)
592+
await self._maybe_send_priming_event(request_id, sse_stream_writer, protocol_version)
564593

565594
# Process messages from the request-specific stream
566595
async for event_message in request_stream_reader:
@@ -605,7 +634,7 @@ async def sse_writer():
605634
async with anyio.create_task_group() as tg:
606635
tg.start_soon(response, scope, receive, send)
607636
# Then send the message to be processed by the server
608-
session_message = self._create_session_message(message, request, request_id)
637+
session_message = self._create_session_message(message, request, request_id, protocol_version)
609638
await writer.send(session_message)
610639
except Exception:
611640
logger.exception("SSE response error")
@@ -864,6 +893,9 @@ async def _replay_events(self, last_event_id: str, request: Request, send: Send)
864893
if self.mcp_session_id:
865894
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
866895

896+
# Get protocol version from header (already validated in _validate_protocol_version)
897+
replay_protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)
898+
867899
# Create SSE stream for replay
868900
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)
869901

@@ -884,7 +916,7 @@ async def send_event(event_message: EventMessage) -> None:
884916
self._sse_stream_writers[stream_id] = sse_stream_writer
885917

886918
# Send priming event for this new connection
887-
await self._send_priming_event(stream_id, sse_stream_writer)
919+
await self._maybe_send_priming_event(stream_id, sse_stream_writer, replay_protocol_version)
888920

889921
# Create new request streams for this connection
890922
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](0)

tests/shared/test_streamable_http.py

Lines changed: 127 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import time
1111
from collections.abc import Generator
1212
from typing import Any
13+
from unittest.mock import MagicMock
1314

1415
import anyio
1516
import httpx
@@ -41,9 +42,16 @@
4142
from mcp.server.transport_security import TransportSecuritySettings
4243
from mcp.shared.context import RequestContext
4344
from mcp.shared.exceptions import McpError
44-
from mcp.shared.message import ClientMessageMetadata, SessionMessage
45+
from mcp.shared.message import ClientMessageMetadata, ServerMessageMetadata, SessionMessage
4546
from mcp.shared.session import RequestResponder
46-
from mcp.types import InitializeResult, TextContent, TextResourceContents, Tool
47+
from mcp.types import (
48+
InitializeResult,
49+
JSONRPCMessage,
50+
JSONRPCRequest,
51+
TextContent,
52+
TextResourceContents,
53+
Tool,
54+
)
4755
from tests.test_helpers import wait_for_server
4856

4957
# Test constants
@@ -1761,6 +1769,116 @@ async def test_handle_sse_event_skips_empty_data():
17611769
await read_stream.aclose()
17621770

17631771

1772+
@pytest.mark.anyio
1773+
async def test_priming_event_not_sent_for_old_protocol_version():
1774+
"""Test that _maybe_send_priming_event skips for old protocol versions (backwards compat)."""
1775+
# Create a transport with an event store
1776+
transport = StreamableHTTPServerTransport(
1777+
"/mcp",
1778+
event_store=SimpleEventStore(),
1779+
)
1780+
1781+
# Create a mock stream writer
1782+
write_stream, read_stream = anyio.create_memory_object_stream[dict[str, Any]](1)
1783+
1784+
try:
1785+
# Call _maybe_send_priming_event with OLD protocol version - should NOT send
1786+
await transport._maybe_send_priming_event("test-request-id", write_stream, "2025-06-18")
1787+
1788+
# Nothing should have been written to the stream
1789+
assert write_stream.statistics().current_buffer_used == 0
1790+
1791+
# Now test with NEW protocol version - should send
1792+
await transport._maybe_send_priming_event("test-request-id-2", write_stream, "2025-11-25")
1793+
1794+
# Should have written a priming event
1795+
assert write_stream.statistics().current_buffer_used == 1
1796+
finally:
1797+
await write_stream.aclose()
1798+
await read_stream.aclose()
1799+
1800+
1801+
@pytest.mark.anyio
1802+
async def test_priming_event_not_sent_without_event_store():
1803+
"""Test that _maybe_send_priming_event returns early when no event_store is configured."""
1804+
# Create a transport WITHOUT an event store
1805+
transport = StreamableHTTPServerTransport("/mcp")
1806+
1807+
# Create a mock stream writer
1808+
write_stream, read_stream = anyio.create_memory_object_stream[dict[str, Any]](1)
1809+
1810+
try:
1811+
# Call _maybe_send_priming_event - should return early without sending
1812+
await transport._maybe_send_priming_event("test-request-id", write_stream, "2025-11-25")
1813+
1814+
# Nothing should have been written to the stream
1815+
assert write_stream.statistics().current_buffer_used == 0
1816+
finally:
1817+
await write_stream.aclose()
1818+
await read_stream.aclose()
1819+
1820+
1821+
@pytest.mark.anyio
1822+
async def test_priming_event_includes_retry_interval():
1823+
"""Test that _maybe_send_priming_event includes retry field when retry_interval is set."""
1824+
# Create a transport with an event store AND retry_interval
1825+
transport = StreamableHTTPServerTransport(
1826+
"/mcp",
1827+
event_store=SimpleEventStore(),
1828+
retry_interval=5000,
1829+
)
1830+
1831+
# Create a mock stream writer
1832+
write_stream, read_stream = anyio.create_memory_object_stream[dict[str, Any]](1)
1833+
1834+
try:
1835+
# Call _maybe_send_priming_event with new protocol version
1836+
await transport._maybe_send_priming_event("test-request-id", write_stream, "2025-11-25")
1837+
1838+
# Should have written a priming event with retry field
1839+
assert write_stream.statistics().current_buffer_used == 1
1840+
1841+
# Read the event and verify it has retry field
1842+
event = await read_stream.receive()
1843+
assert "retry" in event
1844+
assert event["retry"] == 5000
1845+
finally:
1846+
await write_stream.aclose()
1847+
await read_stream.aclose()
1848+
1849+
1850+
@pytest.mark.anyio
1851+
async def test_close_sse_stream_callback_not_provided_for_old_protocol_version():
1852+
"""Test that close_sse_stream callbacks are NOT provided for old protocol versions."""
1853+
# Create a transport with an event store
1854+
transport = StreamableHTTPServerTransport(
1855+
"/mcp",
1856+
event_store=SimpleEventStore(),
1857+
)
1858+
1859+
# Create a mock message and request
1860+
mock_message = JSONRPCMessage(root=JSONRPCRequest(jsonrpc="2.0", id="test-1", method="tools/list"))
1861+
mock_request = MagicMock()
1862+
1863+
# Call _create_session_message with OLD protocol version
1864+
session_msg = transport._create_session_message(mock_message, mock_request, "test-request-id", "2025-06-18")
1865+
1866+
# Callbacks should NOT be provided for old protocol version
1867+
assert session_msg.metadata is not None
1868+
assert isinstance(session_msg.metadata, ServerMessageMetadata)
1869+
assert session_msg.metadata.close_sse_stream is None
1870+
assert session_msg.metadata.close_standalone_sse_stream is None
1871+
1872+
# Now test with NEW protocol version - should provide callbacks
1873+
session_msg_new = transport._create_session_message(mock_message, mock_request, "test-request-id-2", "2025-11-25")
1874+
1875+
# Callbacks SHOULD be provided for new protocol version
1876+
assert session_msg_new.metadata is not None
1877+
assert isinstance(session_msg_new.metadata, ServerMessageMetadata)
1878+
assert session_msg_new.metadata.close_sse_stream is not None
1879+
assert session_msg_new.metadata.close_standalone_sse_stream is not None
1880+
1881+
17641882
@pytest.mark.anyio
17651883
async def test_streamablehttp_client_receives_priming_event(
17661884
event_server: tuple[SimpleEventStore, str],
@@ -2060,7 +2178,9 @@ async def on_resumption_token(token: str) -> None:
20602178

20612179

20622180
@pytest.mark.anyio
2063-
async def test_standalone_get_stream_reconnection(basic_server: None, basic_server_url: str) -> None:
2181+
async def test_standalone_get_stream_reconnection(
2182+
event_server: tuple[SimpleEventStore, str],
2183+
) -> None:
20642184
"""
20652185
Test that standalone GET stream automatically reconnects after server closes it.
20662186
@@ -2069,8 +2189,11 @@ async def test_standalone_get_stream_reconnection(basic_server: None, basic_serv
20692189
2. Server closes GET stream
20702190
3. Client reconnects with Last-Event-ID
20712191
4. Client receives notification 2 on new connection
2192+
2193+
Note: Requires event_server fixture (with event store) because close_standalone_sse_stream
2194+
callback is only provided when event_store is configured and protocol version >= 2025-11-25.
20722195
"""
2073-
server_url = basic_server_url
2196+
_, server_url = event_server
20742197
received_notifications: list[str] = []
20752198

20762199
async def message_handler(

0 commit comments

Comments
 (0)