From a6667759f92f61c25dfe2d9ce2ef4e02792705ee Mon Sep 17 00:00:00 2001 From: ARJUN-TS1 Date: Tue, 9 Dec 2025 19:35:28 +0530 Subject: [PATCH 1/3] Skip empty SSE data to avoid parsing errors --- src/mcp/client/sse.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 5d57cc5a5..5666f471d 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -105,6 +105,9 @@ async def sse_reader( task_status.started(endpoint_url) case "message": + # Skip empty data (keep-alive pings) + if not sse.data: + return False try: message = types.JSONRPCMessage.model_validate_json( # noqa: E501 sse.data From 19199b356b6a46359bdbc7797082aab057a29d35 Mon Sep 17 00:00:00 2001 From: ARJUN-TS1 Date: Tue, 9 Dec 2025 20:04:53 +0530 Subject: [PATCH 2/3] UT Skip empty SSE data to avoid parsing errors --- src/mcp/client/sse.py | 2 +- tests/shared/test_sse.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 5666f471d..b2ac67744 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -107,7 +107,7 @@ async def sse_reader( case "message": # Skip empty data (keep-alive pings) if not sse.data: - return False + continue try: message = types.JSONRPCMessage.model_validate_json( # noqa: E501 sse.data diff --git a/tests/shared/test_sse.py b/tests/shared/test_sse.py index fcad12707..265ca5583 100644 --- a/tests/shared/test_sse.py +++ b/tests/shared/test_sse.py @@ -532,3 +532,21 @@ def test_sse_server_transport_endpoint_validation(endpoint: str, expected_result sse = SseServerTransport(endpoint) assert sse._endpoint == expected_result assert sse._endpoint.startswith("/") + + +@pytest.mark.anyio +async def test_sse_client_handles_empty_keepalive_pings(server: None, server_url: str) -> None: + """Test that SSE client properly handles empty data lines (keep-alive pings).""" + async with sse_client(server_url + "/sse") as streams: + async with ClientSession(*streams) as session: + # Initialize the session + result = await session.initialize() + assert isinstance(result, InitializeResult) + assert result.serverInfo.name == SERVER_NAME + + # Test that we can still make requests after receiving keep-alive pings + # The server may send empty data lines between actual messages + response = await session.read_resource(uri=AnyUrl("foobar://test")) + assert len(response.contents) == 1 + assert isinstance(response.contents[0], TextResourceContents) + assert response.contents[0].text == "Read test" From efd779e56f07ff71d8e4c50c5d1bf1888634e90f Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 9 Dec 2025 15:10:24 +0000 Subject: [PATCH 3/3] test: improve SSE empty data test to properly mock SSE events The original test didn't actually exercise the `if not sse.data: continue` code path. This rewrite: - Mocks the SSE event stream to include an empty "message" event - Uses httpx_sse.ServerSentEvent for accurate event simulation - Uses proper type serialization instead of hardcoded JSON strings - Validates that the client skips empty data and receives the real response --- tests/shared/test_sse.py | 82 ++++++++++++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 15 deletions(-) diff --git a/tests/shared/test_sse.py b/tests/shared/test_sse.py index 265ca5583..7604450f8 100644 --- a/tests/shared/test_sse.py +++ b/tests/shared/test_sse.py @@ -4,12 +4,13 @@ import time from collections.abc import AsyncGenerator, Generator from typing import Any -from unittest.mock import Mock +from unittest.mock import AsyncMock, MagicMock, Mock, patch import anyio import httpx import pytest import uvicorn +from httpx_sse import ServerSentEvent from inline_snapshot import snapshot from pydantic import AnyUrl from starlette.applications import Starlette @@ -28,8 +29,11 @@ from mcp.types import ( EmptyResult, ErrorData, + Implementation, InitializeResult, + JSONRPCResponse, ReadResourceResult, + ServerCapabilities, TextContent, TextResourceContents, Tool, @@ -534,19 +538,67 @@ def test_sse_server_transport_endpoint_validation(endpoint: str, expected_result assert sse._endpoint.startswith("/") +# ResourceWarning filter: When mocking aconnect_sse, the sse_client's internal task +# group doesn't receive proper cancellation signals, so the sse_reader task's finally +# block (which closes read_stream_writer) doesn't execute. This is a test artifact - +# the actual code path (`if not sse.data: continue`) IS exercised and works correctly. +# Production code with real SSE connections cleans up properly. +@pytest.mark.filterwarnings("ignore::ResourceWarning") @pytest.mark.anyio -async def test_sse_client_handles_empty_keepalive_pings(server: None, server_url: str) -> None: - """Test that SSE client properly handles empty data lines (keep-alive pings).""" - async with sse_client(server_url + "/sse") as streams: - async with ClientSession(*streams) as session: - # Initialize the session - result = await session.initialize() - assert isinstance(result, InitializeResult) - assert result.serverInfo.name == SERVER_NAME +async def test_sse_client_handles_empty_keepalive_pings() -> None: + """Test that SSE client properly handles empty data lines (keep-alive pings). - # Test that we can still make requests after receiving keep-alive pings - # The server may send empty data lines between actual messages - response = await session.read_resource(uri=AnyUrl("foobar://test")) - assert len(response.contents) == 1 - assert isinstance(response.contents[0], TextResourceContents) - assert response.contents[0].text == "Read test" + Per the MCP spec (Streamable HTTP transport): "The server SHOULD immediately + send an SSE event consisting of an event ID and an empty data field in order + to prime the client to reconnect." + + This test mocks the SSE event stream to include empty "message" events and + verifies the client skips them without crashing. + """ + # Build a proper JSON-RPC response using types (not hardcoded strings) + init_result = InitializeResult( + protocolVersion="2024-11-05", + capabilities=ServerCapabilities(), + serverInfo=Implementation(name="test", version="1.0"), + ) + response = JSONRPCResponse( + jsonrpc="2.0", + id=1, + result=init_result.model_dump(by_alias=True, exclude_none=True), + ) + response_json = response.model_dump_json(by_alias=True, exclude_none=True) + + # Create mock SSE events using httpx_sse's ServerSentEvent + async def mock_aiter_sse() -> AsyncGenerator[ServerSentEvent, None]: + # First: endpoint event + yield ServerSentEvent(event="endpoint", data="/messages/?session_id=abc123") + # Empty data keep-alive ping - this is what we're testing + yield ServerSentEvent(event="message", data="") + # Real JSON-RPC response + yield ServerSentEvent(event="message", data=response_json) + + mock_event_source = MagicMock() + mock_event_source.aiter_sse.return_value = mock_aiter_sse() + mock_event_source.response = MagicMock() + mock_event_source.response.raise_for_status = MagicMock() + + mock_aconnect_sse = MagicMock() + mock_aconnect_sse.__aenter__ = AsyncMock(return_value=mock_event_source) + mock_aconnect_sse.__aexit__ = AsyncMock(return_value=None) + + mock_client = MagicMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client.post = AsyncMock(return_value=MagicMock(status_code=200, raise_for_status=MagicMock())) + + with ( + patch("mcp.client.sse.create_mcp_http_client", return_value=mock_client), + patch("mcp.client.sse.aconnect_sse", return_value=mock_aconnect_sse), + ): + async with sse_client("http://test/sse") as (read_stream, _): + # Read the message - should skip the empty one and get the real response + msg = await read_stream.receive() + # If we get here without error, the empty message was skipped successfully + assert not isinstance(msg, Exception) + assert isinstance(msg.message.root, types.JSONRPCResponse) + assert msg.message.root.id == 1