diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 5d57cc5a5..b2ac67744 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -105,6 +105,9 @@ async def sse_reader( task_status.started(endpoint_url) case "message": + # Skip empty data (keep-alive pings) + if not sse.data: + continue try: message = types.JSONRPCMessage.model_validate_json( # noqa: E501 sse.data diff --git a/tests/shared/test_sse.py b/tests/shared/test_sse.py index fcad12707..7604450f8 100644 --- a/tests/shared/test_sse.py +++ b/tests/shared/test_sse.py @@ -4,12 +4,13 @@ import time from collections.abc import AsyncGenerator, Generator from typing import Any -from unittest.mock import Mock +from unittest.mock import AsyncMock, MagicMock, Mock, patch import anyio import httpx import pytest import uvicorn +from httpx_sse import ServerSentEvent from inline_snapshot import snapshot from pydantic import AnyUrl from starlette.applications import Starlette @@ -28,8 +29,11 @@ from mcp.types import ( EmptyResult, ErrorData, + Implementation, InitializeResult, + JSONRPCResponse, ReadResourceResult, + ServerCapabilities, TextContent, TextResourceContents, Tool, @@ -532,3 +536,69 @@ def test_sse_server_transport_endpoint_validation(endpoint: str, expected_result sse = SseServerTransport(endpoint) assert sse._endpoint == expected_result assert sse._endpoint.startswith("/") + + +# ResourceWarning filter: When mocking aconnect_sse, the sse_client's internal task +# group doesn't receive proper cancellation signals, so the sse_reader task's finally +# block (which closes read_stream_writer) doesn't execute. This is a test artifact - +# the actual code path (`if not sse.data: continue`) IS exercised and works correctly. +# Production code with real SSE connections cleans up properly. +@pytest.mark.filterwarnings("ignore::ResourceWarning") +@pytest.mark.anyio +async def test_sse_client_handles_empty_keepalive_pings() -> None: + """Test that SSE client properly handles empty data lines (keep-alive pings). + + Per the MCP spec (Streamable HTTP transport): "The server SHOULD immediately + send an SSE event consisting of an event ID and an empty data field in order + to prime the client to reconnect." + + This test mocks the SSE event stream to include empty "message" events and + verifies the client skips them without crashing. + """ + # Build a proper JSON-RPC response using types (not hardcoded strings) + init_result = InitializeResult( + protocolVersion="2024-11-05", + capabilities=ServerCapabilities(), + serverInfo=Implementation(name="test", version="1.0"), + ) + response = JSONRPCResponse( + jsonrpc="2.0", + id=1, + result=init_result.model_dump(by_alias=True, exclude_none=True), + ) + response_json = response.model_dump_json(by_alias=True, exclude_none=True) + + # Create mock SSE events using httpx_sse's ServerSentEvent + async def mock_aiter_sse() -> AsyncGenerator[ServerSentEvent, None]: + # First: endpoint event + yield ServerSentEvent(event="endpoint", data="/messages/?session_id=abc123") + # Empty data keep-alive ping - this is what we're testing + yield ServerSentEvent(event="message", data="") + # Real JSON-RPC response + yield ServerSentEvent(event="message", data=response_json) + + mock_event_source = MagicMock() + mock_event_source.aiter_sse.return_value = mock_aiter_sse() + mock_event_source.response = MagicMock() + mock_event_source.response.raise_for_status = MagicMock() + + mock_aconnect_sse = MagicMock() + mock_aconnect_sse.__aenter__ = AsyncMock(return_value=mock_event_source) + mock_aconnect_sse.__aexit__ = AsyncMock(return_value=None) + + mock_client = MagicMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client.post = AsyncMock(return_value=MagicMock(status_code=200, raise_for_status=MagicMock())) + + with ( + patch("mcp.client.sse.create_mcp_http_client", return_value=mock_client), + patch("mcp.client.sse.aconnect_sse", return_value=mock_aconnect_sse), + ): + async with sse_client("http://test/sse") as (read_stream, _): + # Read the message - should skip the empty one and get the real response + msg = await read_stream.receive() + # If we get here without error, the empty message was skipped successfully + assert not isinstance(msg, Exception) + assert isinstance(msg.message.root, types.JSONRPCResponse) + assert msg.message.root.id == 1