Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions _MCP_CLIENT_ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# MCP Client Architecture

## Overview

The MCPClient enables developers to use MCP tools in Strands agents without dealing with async complexity. Since MCP requires async operations but Strands aims for simple synchronous usage (`agent = Agent(); agent("Do something")`), the client uses a background thread with its own event loop to handle MCP communication. This creates challenges around thread synchronization, hanging prevention, and connection stability that this architecture addresses.

## Background Thread Flow

```mermaid
sequenceDiagram
participant Dev as Developer
participant Main as Main Thread
participant BG as Background Thread
participant MCP as MCP Server

Dev->>Main: with MCPClient() as client:
Main->>BG: start() - create thread
BG->>BG: _background_task() - setup event loop
BG->>BG: _async_background_thread() - establish transport
BG->>MCP: ClientSession.initialize()
MCP-->>BG: initialization response
BG->>Main: _init_future.set_result() - signal ready
Dev->>Main: client.call_tool_sync()
Main->>BG: tool calls via _invoke_on_background_thread()
BG->>MCP: tool requests
MCP-->>BG: tool responses

alt Normal response
BG-->>Main: tool response via Future.set_result()
Main-->>Dev: return tool result
else Fatal error in tool response
BG->>BG: _handle_error_message() - raise exception
BG->>BG: Background thread exits
Note over BG: Connection collapses
BG-->>Main: exception via Future.set_exception()
Main-->>Dev: raise exception
end

Note over MCP,BG: Separate flow - server can send unexpected messages anytime
MCP-->>BG: orphaned response (unknown request id)
BG->>BG: _handle_error_message() - log & continue
Note over BG: Connection stays alive (non-fatal error)

Dev->>Main: exit context manager
Main->>BG: stop() - signal close
BG->>BG: _close_future.set_result() - cleanup
```

## Thread Synchronization & Event Loop Management

### Why Two Different Future Types?

**The challenge is synchronizing between the main thread (no event loop) and background thread (with event loop).**

**Main Thread Problem**:
```python
self._init_future: futures.Future[None] = futures.Future()
```
When `MCPClient.__init__()` runs, no event loop exists yet. The background thread hasn't started, so we cannot use `asyncio.Future`. We must use `concurrent.futures.Future` which works without an event loop. This allows the main thread to block safely on `self._init_future.result(timeout=startup_timeout)` until the background thread signals readiness.

**Background Thread Solution**:
```python
self._close_future: asyncio.futures.Future[None] | None = None
# Later in _async_background_thread:
self._close_future = asyncio.Future() # Created inside event loop
```
Once the background thread's event loop is running, we can create `asyncio.Future` objects. The background thread needs to `await self._close_future` to stay alive because we want to keep the MCP connection running on this dedicated event loop. The session must remain active to handle incoming messages and process tool calls. We cannot use `concurrent.futures.Future` here because blocking on `.result()` would freeze the event loop, preventing it from processing MCP messages. Using `asyncio.Future` with `await` keeps the event loop responsive while waiting for the shutdown signal.

## Exception Handling, Hanging, & Connection Termination

### Hanging Scenarios & Defenses

**Hanging Scenario 1: Silent Exception Swallowing** ([PR #922](https://github.com/strands-agents/sdk-python/pull/922))

*Problem*: MCP SDK silently swallows server exceptions (HTTP timeouts, connection errors) without a message handler. Tool calls timeout on server side but client waits indefinitely for responses that never arrive.

*Defense*: `message_handler=self._handle_error_message` in ClientSession
```python
async with ClientSession(
read_stream,
write_stream,
message_handler=self._handle_error_message, # Prevents hanging
elicitation_callback=self._elicitation_callback,
) as session:
```

*How it works in Strands' threaded setup*:

1. **Main thread calls** `client.call_tool_sync()` and blocks on `invoke_future.result()`
2. **Background thread** submits the tool request to MCP server via `asyncio.run_coroutine_threadsafe()`
3. **Server times out** and sends an exception message back to the MCP client
4. **Without message handler**: MCP SDK silently ignores the exception, never calls `Future.set_result()` or `Future.set_exception()`
5. **Main thread hangs forever** waiting for `invoke_future.result()` that will never complete
6. **With `_handle_error_message`**: Exception is raised in background thread, propagates to `Future.set_exception()`, unblocks main thread

The threading architecture makes this particularly problematic because the main thread has no way to detect that the background thread received an error - it can only wait for the Future to complete. Without the message handler, that Future never gets resolved.

**Hanging Scenario 2: 5xx Server Errors** ([PR #1169](https://github.com/strands-agents/sdk-python/pull/1169))

*Problem*: When MCP servers return 5xx errors, the underlying client raises an exception that cancels all TaskGroup tasks and tears down the entire asyncio background thread. Pending tool calls hang forever waiting for responses from a dead connection.

*Defense*: Session closure detection in `_invoke_on_background_thread`
```python
async def run_async() -> T:
invoke_event = asyncio.create_task(coro)
tasks = [invoke_event, close_future]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

if done.pop() == close_future:
raise RuntimeError("Connection to the MCP server was closed")
else:
return await invoke_event
```

*How it works*: All tool calls race against `close_future`. When the background thread dies from 5xx errors, `close_future` completes and pending operations immediately fail with a clear error message instead of hanging.

### Defense Against Premature Connection Collapse

Before [PR #922](https://github.com/strands-agents/sdk-python/pull/922), the MCP client would never collapse connections because exceptions were silently ignored. After adding `_handle_error_message`, we introduced the risk of collapsing connections on client-side errors that should be recoverable. The challenge is ensuring we:

1. **DO collapse** when we want to (fatal server errors)
2. **DO NOT collapse** when we don't want to (client-side errors, orphaned responses)
3. **DO notify users** when collapse occurs ([PR #1169](https://github.com/strands-agents/sdk-python/pull/1169) detection)

**Non-Fatal Error Patterns**:
```python
# Errors that should NOT terminate the connection
_NON_FATAL_ERROR_PATTERNS = ["unknown request id"]
```

**Why "unknown request id" is Non-Fatal**:
Client receives a response from server with an ID it doesn't recognize (orphaned response). This happens when responses arrive after their corresponding requests have timed out or been cancelled. More broadly, once a connection is established, the server can send whatever it wants - the client should generally remain stable and not collapse the connection over unexpected messages. "Unknown request id" is just one example of server behavior that shouldn't terminate an otherwise healthy connection.

**Connection Decision Flow**:
1. MCP server sends error message to client
2. `ClientSession` calls `message_handler=self._handle_error_message`
3. **Decision point**: Is error in `_NON_FATAL_ERROR_PATTERNS`?
- **Yes**: Log and continue (connection stays alive)
- **No**: Raise exception (connection collapses)
4. If collapsed: Exception propagates to `_async_background_thread`
5. Background thread exits, `_close_exception` set for main thread
6. Pending operations detect collapse via `close_future` and fail with clear error

**Why This Strategy Works**:
We get the benefits of [PR #922](https://github.com/strands-agents/sdk-python/pull/922) (no hanging) while avoiding unnecessary connection collapse from recoverable client-side errors. When collapse does occur, [PR #1169](https://github.com/strands-agents/sdk-python/pull/1169) ensures users get clear error messages instead of hanging.
26 changes: 19 additions & 7 deletions src/strands/tools/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ class ToolFilters(TypedDict, total=False):
"https://strandsagents.com/latest/user-guide/concepts/tools/mcp-tools/#mcpclientinitializationerror"
)

# Non-fatal error patterns that should not cause connection collapse
_NON_FATAL_ERROR_PATTERNS = [
# Occurs when client receives response with unrecognized ID
# Can occur after a client-side timeout
# See: https://github.com/modelcontextprotocol/python-sdk/blob/c51936f61f35a15f0b1f8fb6887963e5baee1506/src/mcp/shared/session.py#L421
"unknown request id",
]


class MCPClient(ToolProvider):
"""Represents a connection to a Model Context Protocol (MCP) server.
Expand Down Expand Up @@ -558,13 +566,6 @@ def _handle_tool_result(self, tool_use_id: str, call_tool_result: MCPCallToolRes

return result

# Raise an exception if the underlying client raises an exception in a message
# This happens when the underlying client has an http timeout error
async def _handle_error_message(self, message: Exception | Any) -> None:
if isinstance(message, Exception):
raise message
await anyio.lowlevel.checkpoint()

async def _async_background_thread(self) -> None:
"""Asynchronous method that runs in the background thread to manage the MCP connection.

Expand Down Expand Up @@ -616,6 +617,17 @@ async def _async_background_thread(self) -> None:
"encountered exception on background thread after initialization %s", str(e)
)

# Raise an exception if the underlying client raises an exception in a message
# This happens when the underlying client has an http timeout error
async def _handle_error_message(self, message: Exception | Any) -> None:
if isinstance(message, Exception):
error_msg = str(message).lower()
if any(pattern in error_msg for pattern in _NON_FATAL_ERROR_PATTERNS):
self._log_debug_with_thread("ignoring non-fatal MCP session error", message)
else:
raise message
await anyio.lowlevel.checkpoint()

def _background_task(self) -> None:
"""Sets up and runs the event loop in the background thread.

Expand Down
35 changes: 35 additions & 0 deletions tests/strands/tools/mcp/test_mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,3 +688,38 @@ def __init__(self):
mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None)
assert result["status"] == "success"
assert len(result["content"]) == 0 # Unknown resource type should be dropped


@pytest.mark.asyncio
async def test_handle_error_message_non_fatal_error():
"""Test that _handle_error_message ignores non-fatal errors and logs them."""
client = MCPClient(MagicMock())

# Test the message handler directly with a non-fatal error
with patch.object(client, "_log_debug_with_thread") as mock_log:
# This should not raise an exception
await client._handle_error_message(Exception("unknown request id: abc123"))

# Verify the non-fatal error was logged as ignored
assert mock_log.called
call_args = mock_log.call_args[0]
assert "ignoring non-fatal MCP session error" in call_args[0]


@pytest.mark.asyncio
async def test_handle_error_message_fatal_error():
"""Test that _handle_error_message raises fatal errors."""
client = MCPClient(MagicMock())

# This should raise the exception
with pytest.raises(Exception, match="connection timeout"):
await client._handle_error_message(Exception("connection timeout"))


@pytest.mark.asyncio
async def test_handle_error_message_non_exception():
"""Test that _handle_error_message handles non-exception messages."""
client = MCPClient(MagicMock())

# This should not raise an exception
await client._handle_error_message("normal message")
35 changes: 35 additions & 0 deletions tests_integ/mcp/test_mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,38 @@ def transport_callback() -> MCPTransport:

assert result["status"] == "error"
assert result["content"][0]["text"] == "Tool execution failed: Connection to the MCP server was closed"


def test_mcp_client_connection_stability_with_client_timeout():
"""Integration test to verify connection remains stable with very small timeouts."""
from datetime import timedelta
from unittest.mock import patch

stdio_mcp_client = MCPClient(
lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/mcp/echo_server.py"]))
)

with stdio_mcp_client:
# Spy on the logger to capture non-fatal error messages
with patch.object(stdio_mcp_client, "_log_debug_with_thread") as mock_log:
# Make multiple calls with very small timeout to trigger "unknown request id" errors
for i in range(3):
try:
result = stdio_mcp_client.call_tool_sync(
tool_use_id=f"test_{i}",
name="echo",
arguments={"to_echo": f"test_{i}"},
read_timeout_seconds=timedelta(milliseconds=0), # Very small timeout
)
except Exception:
pass # Ignore exceptions, we're testing connection stability

# Verify connection is still alive by making a successful call
result = stdio_mcp_client.call_tool_sync(
tool_use_id="final_test", name="echo", arguments={"to_echo": "connection_alive"}
)
assert result["status"] == "success"
assert result["content"][0]["text"] == "connection_alive"

# Verify that non-fatal error messages were logged
assert any("ignoring non-fatal MCP session error" in str(call) for call in mock_log.call_args_list)
Loading