Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/mcp/client/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ async def sse_reader(
task_status.started(endpoint_url)

case "message":
# Skip empty data (keep-alive pings)
if not sse.data:
continue
try:
message = types.JSONRPCMessage.model_validate_json( # noqa: E501
sse.data
Expand Down
72 changes: 71 additions & 1 deletion tests/shared/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import time
from collections.abc import AsyncGenerator, Generator
from typing import Any
from unittest.mock import Mock
from unittest.mock import AsyncMock, MagicMock, Mock, patch

import anyio
import httpx
import pytest
import uvicorn
from httpx_sse import ServerSentEvent
from inline_snapshot import snapshot
from pydantic import AnyUrl
from starlette.applications import Starlette
Expand All @@ -28,8 +29,11 @@
from mcp.types import (
EmptyResult,
ErrorData,
Implementation,
InitializeResult,
JSONRPCResponse,
ReadResourceResult,
ServerCapabilities,
TextContent,
TextResourceContents,
Tool,
Expand Down Expand Up @@ -532,3 +536,69 @@ def test_sse_server_transport_endpoint_validation(endpoint: str, expected_result
sse = SseServerTransport(endpoint)
assert sse._endpoint == expected_result
assert sse._endpoint.startswith("/")


# ResourceWarning filter: When mocking aconnect_sse, the sse_client's internal task
# group doesn't receive proper cancellation signals, so the sse_reader task's finally
# block (which closes read_stream_writer) doesn't execute. This is a test artifact -
# the actual code path (`if not sse.data: continue`) IS exercised and works correctly.
# Production code with real SSE connections cleans up properly.
@pytest.mark.filterwarnings("ignore::ResourceWarning")
@pytest.mark.anyio
async def test_sse_client_handles_empty_keepalive_pings() -> None:
"""Test that SSE client properly handles empty data lines (keep-alive pings).

Per the MCP spec (Streamable HTTP transport): "The server SHOULD immediately
send an SSE event consisting of an event ID and an empty data field in order
to prime the client to reconnect."

This test mocks the SSE event stream to include empty "message" events and
verifies the client skips them without crashing.
"""
# Build a proper JSON-RPC response using types (not hardcoded strings)
init_result = InitializeResult(
protocolVersion="2024-11-05",
capabilities=ServerCapabilities(),
serverInfo=Implementation(name="test", version="1.0"),
)
response = JSONRPCResponse(
jsonrpc="2.0",
id=1,
result=init_result.model_dump(by_alias=True, exclude_none=True),
)
response_json = response.model_dump_json(by_alias=True, exclude_none=True)

# Create mock SSE events using httpx_sse's ServerSentEvent
async def mock_aiter_sse() -> AsyncGenerator[ServerSentEvent, None]:
# First: endpoint event
yield ServerSentEvent(event="endpoint", data="/messages/?session_id=abc123")
# Empty data keep-alive ping - this is what we're testing
yield ServerSentEvent(event="message", data="")
# Real JSON-RPC response
yield ServerSentEvent(event="message", data=response_json)

mock_event_source = MagicMock()
mock_event_source.aiter_sse.return_value = mock_aiter_sse()
mock_event_source.response = MagicMock()
mock_event_source.response.raise_for_status = MagicMock()

mock_aconnect_sse = MagicMock()
mock_aconnect_sse.__aenter__ = AsyncMock(return_value=mock_event_source)
mock_aconnect_sse.__aexit__ = AsyncMock(return_value=None)

mock_client = MagicMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(return_value=MagicMock(status_code=200, raise_for_status=MagicMock()))

with (
patch("mcp.client.sse.create_mcp_http_client", return_value=mock_client),
patch("mcp.client.sse.aconnect_sse", return_value=mock_aconnect_sse),
):
async with sse_client("http://test/sse") as (read_stream, _):
# Read the message - should skip the empty one and get the real response
msg = await read_stream.receive()
# If we get here without error, the empty message was skipped successfully
assert not isinstance(msg, Exception)
assert isinstance(msg.message.root, types.JSONRPCResponse)
assert msg.message.root.id == 1