Skip to content

Possible ressource leak / race condition in streamable_http_client #1805

@h-filzer

Description

@h-filzer

Initial Checks

Description

Observed Issue

When using the MCP SDK with the streamable-http transport, the cpu usage spikes and never goes back down after sending multiple requests and exiting the client context. I observed this bahavior especially in the google adk-python when the mcp-toolset tries to close the client.

When running the testcode to reproduce i get:

Client-Side Testcode shows Thread Leaks

Iteration 1: 1 thread  → 2 threads
Iteration 2: 2 threads → 7 threads
Iteration 3: 7 threads → 7 threads
Final: 7 threads (6 leaked asyncio_* threads)

Server-Side: Exceptions

Session crashed: unhandled errors in a TaskGroup (1 sub-exception)

ClosedResourceError at session.py:349
  → _write_stream.send() fails - stream already closed

BrokenResourceError at streamable_http.py:638
  → SSE writer has no receiver

Possible Cause

  1. Client sends multiple requests via write_stream.send()
  2. Client exits async with streamable_http_client context
  3. tg.cancel_scope.cancel() is called during cleanup
  4. Server is still processing requests (e.g., slow_echo with delay)
  5. Server tries to send responses via _write_stream.send()
  6. Stream is already closed → ClosedResourceError
  7. Background asyncio threads handling responses become orphaned
  8. Threads never terminate → memory/resource leak

Expected Behavior

  • No thread leaks after exiting streamable_http_client context
  • Graceful handling of client disconnection on server side

Impact

  • Memory leak from accumulated threads
  • Resource exhaustion in long-running applications

Affected Code Paths

  • mcp/client/streamable_http.py - streamable_http_client() cleanup
  • mcp/shared/session.py:237-238 - cancel_scope.cancel() and __aexit__
  • mcp/server/streamable_http.py:638 - SSE response handling

Example Code

"""
This script demonstrates a possible bug in the MCP SDK where cleanup of
streamable_http_client causes BrokenResourceError and leaks threads.

Bug?: When tg.cancel_scope.cancel() is called during cleanup, child tasks
spawned by post_writer (like _handle_json_response) are still trying to
use read_stream_writer. The stream gets closed before they finish,
causing BrokenResourceError and preventing proper httpx client cleanup.
"""

import asyncio
import threading
import httpx
from mcp.client.streamable_http import streamable_http_client
from mcp.shared.message import SessionMessage
from mcp.types import JSONRPCRequest, JSONRPCMessage


async def make_mcp_request(url: str, trigger_race: bool = False) -> None:
    """Make an MCP request and observe cleanup behavior."""

    print(f"[Before] Active threads: {threading.active_count()}")
    print(f"[Before] Thread names: {[t.name for t in threading.enumerate()]}")

    # Create custom httpx client with short timeouts
    http_client = httpx.AsyncClient(
        headers={"Authorization": "test"},
        timeout=httpx.Timeout(5.0, read=10.0),
    )

    try:
        async with streamable_http_client(
            url=url,
            http_client=http_client,
        ) as (read_stream, write_stream, get_session_id):
            print(f"[Connected] Session ID: {get_session_id()}")
            print(f"[Connected] Active threads: {threading.active_count()}")

            # Send an initialize request
            init_request = JSONRPCRequest(
                jsonrpc="2.0",
                id="1",
                method="initialize",
                params={
                    "protocolVersion": "2024-11-05",
                    "capabilities": {},
                    "clientInfo": {"name": "test-client", "version": "1.0.0"},
                },
            )
            await write_stream.send(SessionMessage(JSONRPCMessage(init_request)))
            print("[Sent] Initialize request")

            # Read the response
            async for message in read_stream:
                print(f"[Received] {type(message).__name__}")
                if isinstance(message, Exception):
                    print(f"[Error in stream] {message}")
                break

            if trigger_race:
                # Send multiple requests quickly to create race during cleanup
                for i in range(5):
                    tool_request = JSONRPCRequest(
                        jsonrpc="2.0",
                        id=f"tool-{i}",
                        method="tools/call",
                        params={
                            "name": "slow_echo",
                            "arguments": {"message": f"test-{i}"},
                        },
                    )
                    await write_stream.send(SessionMessage(JSONRPCMessage(tool_request)))
                print("[Sent] 5 tool requests - exiting immediately to trigger race")
                # Exit immediately without reading responses - this should trigger the race

    except Exception as e:
        import traceback
        print(f"[Error] {type(e).__name__}: {e}")
        traceback.print_exc()

    print(f"[After cleanup] Active threads: {threading.active_count()}")
    print(f"[After cleanup] Thread names: {[t.name for t in threading.enumerate()]}")

    # Wait and check if threads persist
    await asyncio.sleep(5)
    print(f"[After 5s] Active threads: {threading.active_count()}")
    print(f"[After 5s] Thread names: {[t.name for t in threading.enumerate()]}")


async def main():
    # Replace with your MCP server URL
    MCP_URL = "http://localhost:8000/mcp"

    print("=" * 60)
    print("MCP SDK Cleanup Race Condition Reproduction")
    print("=" * 60)

    # Run multiple iterations to accumulate leaked threads
    for i in range(3):
        print(f"\n--- Iteration {i + 1} ---")
        # Trigger race condition on iterations 2 and 3
        await make_mcp_request(MCP_URL, trigger_race=(i > 0))
        await asyncio.sleep(2)

    print("\n" + "=" * 60)
    print("Final State")
    print("=" * 60)
    print(f"Active threads: {threading.active_count()}")
    for t in threading.enumerate():
        print(f"  - {t.name} (daemon={t.daemon})")


if __name__ == "__main__":
    asyncio.run(main())


-----

"""Minimal MCP server for race condition testing."""

from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Test Server")


@mcp.tool()
def echo(message: str) -> str:
    """Echo the message back."""
    return f"Echo: {message}"


@mcp.tool()
def slow_echo(message: str) -> str:
    """Echo with a delay to simulate work."""
    import time

    time.sleep(0.5)
    return f"Slow Echo: {message}"


if __name__ == "__main__":
    mcp.run(transport="streamable-http")

Python & MCP Python SDK

- Python: 3.13
- MCP SDK: 1.25.0
- Transport: streamable-http

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions