diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 03b65b0a5..1b32c022e 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -160,6 +160,9 @@ async def _handle_sse_event( ) -> bool: """Handle an SSE event, returning True if the response is complete.""" if sse.event == "message": + # Skip empty data (keep-alive pings) + if not sse.data: + return False try: message = JSONRPCMessage.model_validate_json(sse.data) logger.debug(f"SSE message: {message}") diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3b70c19dc..8e8884270 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -15,6 +15,7 @@ import pytest import requests import uvicorn +from httpx_sse import ServerSentEvent from pydantic import AnyUrl from starlette.applications import Starlette from starlette.requests import Request @@ -22,7 +23,7 @@ import mcp.types as types from mcp.client.session import ClientSession -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import StreamableHTTPTransport, streamablehttp_client from mcp.server import Server from mcp.server.streamable_http import ( MCP_PROTOCOL_VERSION_HEADER, @@ -39,7 +40,7 @@ from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.context import RequestContext from mcp.shared.exceptions import McpError -from mcp.shared.message import ClientMessageMetadata +from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.shared.session import RequestResponder from mcp.types import InitializeResult, TextContent, TextResourceContents, Tool from tests.test_helpers import wait_for_server @@ -1606,3 +1607,29 @@ async def bad_client(): assert isinstance(result, InitializeResult) tools = await session.list_tools() assert tools.tools + + +@pytest.mark.anyio +async def test_handle_sse_event_skips_empty_data(): + """Test that _handle_sse_event skips empty SSE data (keep-alive pings).""" + transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") + + # Create a mock SSE event with empty data (keep-alive ping) + mock_sse = ServerSentEvent(event="message", data="", id=None, retry=None) + + # Create a mock stream writer + write_stream, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](1) + + try: + # Call _handle_sse_event with empty data - should return False and not raise + result = await transport._handle_sse_event(mock_sse, write_stream) + + # Should return False (not complete) for empty data + assert result is False + + # Nothing should have been written to the stream + # Check buffer is empty (statistics().current_buffer_used returns buffer size) + assert write_stream.statistics().current_buffer_used == 0 + finally: + await write_stream.aclose() + await read_stream.aclose()