diff --git a/_MCP_CLIENT_ARCHITECTURE.md b/_MCP_CLIENT_ARCHITECTURE.md new file mode 100644 index 000000000..f77b17da5 --- /dev/null +++ b/_MCP_CLIENT_ARCHITECTURE.md @@ -0,0 +1,145 @@ +# MCP Client Architecture + +## Overview + +The MCPClient enables developers to use MCP tools in Strands agents without dealing with async complexity. Since MCP requires async operations but Strands aims for simple synchronous usage (`agent = Agent(); agent("Do something")`), the client uses a background thread with its own event loop to handle MCP communication. This creates challenges around thread synchronization, hanging prevention, and connection stability that this architecture addresses. + +## Background Thread Flow + +```mermaid +sequenceDiagram + participant Dev as Developer + participant Main as Main Thread + participant BG as Background Thread + participant MCP as MCP Server + + Dev->>Main: with MCPClient() as client: + Main->>BG: start() - create thread + BG->>BG: _background_task() - setup event loop + BG->>BG: _async_background_thread() - establish transport + BG->>MCP: ClientSession.initialize() + MCP-->>BG: initialization response + BG->>Main: _init_future.set_result() - signal ready + Dev->>Main: client.call_tool_sync() + Main->>BG: tool calls via _invoke_on_background_thread() + BG->>MCP: tool requests + MCP-->>BG: tool responses + + alt Normal response + BG-->>Main: tool response via Future.set_result() + Main-->>Dev: return tool result + else Fatal error in tool response + BG->>BG: _handle_error_message() - raise exception + BG->>BG: Background thread exits + Note over BG: Connection collapses + BG-->>Main: exception via Future.set_exception() + Main-->>Dev: raise exception + end + + Note over MCP,BG: Separate flow - server can send unexpected messages anytime + MCP-->>BG: orphaned response (unknown request id) + BG->>BG: _handle_error_message() - log & continue + Note over BG: Connection stays alive (non-fatal error) + + Dev->>Main: exit context manager + Main->>BG: stop() - signal close + BG->>BG: _close_future.set_result() - cleanup +``` + +## Thread Synchronization & Event Loop Management + +### Why Two Different Future Types? + +**The challenge is synchronizing between the main thread (no event loop) and background thread (with event loop).** + +**Main Thread Problem**: +```python +self._init_future: futures.Future[None] = futures.Future() +``` +When `MCPClient.__init__()` runs, no event loop exists yet. The background thread hasn't started, so we cannot use `asyncio.Future`. We must use `concurrent.futures.Future` which works without an event loop. This allows the main thread to block safely on `self._init_future.result(timeout=startup_timeout)` until the background thread signals readiness. + +**Background Thread Solution**: +```python +self._close_future: asyncio.futures.Future[None] | None = None +# Later in _async_background_thread: +self._close_future = asyncio.Future() # Created inside event loop +``` +Once the background thread's event loop is running, we can create `asyncio.Future` objects. The background thread needs to `await self._close_future` to stay alive because we want to keep the MCP connection running on this dedicated event loop. The session must remain active to handle incoming messages and process tool calls. We cannot use `concurrent.futures.Future` here because blocking on `.result()` would freeze the event loop, preventing it from processing MCP messages. Using `asyncio.Future` with `await` keeps the event loop responsive while waiting for the shutdown signal. + +## Exception Handling, Hanging, & Connection Termination + +### Hanging Scenarios & Defenses + +**Hanging Scenario 1: Silent Exception Swallowing** ([PR #922](https://github.com/strands-agents/sdk-python/pull/922)) + +*Problem*: MCP SDK silently swallows server exceptions (HTTP timeouts, connection errors) without a message handler. Tool calls timeout on server side but client waits indefinitely for responses that never arrive. + +*Defense*: `message_handler=self._handle_error_message` in ClientSession +```python +async with ClientSession( + read_stream, + write_stream, + message_handler=self._handle_error_message, # Prevents hanging + elicitation_callback=self._elicitation_callback, +) as session: +``` + +*How it works in Strands' threaded setup*: + +1. **Main thread calls** `client.call_tool_sync()` and blocks on `invoke_future.result()` +2. **Background thread** submits the tool request to MCP server via `asyncio.run_coroutine_threadsafe()` +3. **Server times out** and sends an exception message back to the MCP client +4. **Without message handler**: MCP SDK silently ignores the exception, never calls `Future.set_result()` or `Future.set_exception()` +5. **Main thread hangs forever** waiting for `invoke_future.result()` that will never complete +6. **With `_handle_error_message`**: Exception is raised in background thread, propagates to `Future.set_exception()`, unblocks main thread + +The threading architecture makes this particularly problematic because the main thread has no way to detect that the background thread received an error - it can only wait for the Future to complete. Without the message handler, that Future never gets resolved. + +**Hanging Scenario 2: 5xx Server Errors** ([PR #1169](https://github.com/strands-agents/sdk-python/pull/1169)) + +*Problem*: When MCP servers return 5xx errors, the underlying client raises an exception that cancels all TaskGroup tasks and tears down the entire asyncio background thread. Pending tool calls hang forever waiting for responses from a dead connection. + +*Defense*: Session closure detection in `_invoke_on_background_thread` +```python +async def run_async() -> T: + invoke_event = asyncio.create_task(coro) + tasks = [invoke_event, close_future] + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + if done.pop() == close_future: + raise RuntimeError("Connection to the MCP server was closed") + else: + return await invoke_event +``` + +*How it works*: All tool calls race against `close_future`. When the background thread dies from 5xx errors, `close_future` completes and pending operations immediately fail with a clear error message instead of hanging. + +### Defense Against Premature Connection Collapse + +Before [PR #922](https://github.com/strands-agents/sdk-python/pull/922), the MCP client would never collapse connections because exceptions were silently ignored. After adding `_handle_error_message`, we introduced the risk of collapsing connections on client-side errors that should be recoverable. The challenge is ensuring we: + +1. **DO collapse** when we want to (fatal server errors) +2. **DO NOT collapse** when we don't want to (client-side errors, orphaned responses) +3. **DO notify users** when collapse occurs ([PR #1169](https://github.com/strands-agents/sdk-python/pull/1169) detection) + +**Non-Fatal Error Patterns**: +```python +# Errors that should NOT terminate the connection +_NON_FATAL_ERROR_PATTERNS = ["unknown request id"] +``` + +**Why "unknown request id" is Non-Fatal**: +Client receives a response from server with an ID it doesn't recognize (orphaned response). This happens when responses arrive after their corresponding requests have timed out or been cancelled. More broadly, once a connection is established, the server can send whatever it wants - the client should generally remain stable and not collapse the connection over unexpected messages. "Unknown request id" is just one example of server behavior that shouldn't terminate an otherwise healthy connection. + +**Connection Decision Flow**: +1. MCP server sends error message to client +2. `ClientSession` calls `message_handler=self._handle_error_message` +3. **Decision point**: Is error in `_NON_FATAL_ERROR_PATTERNS`? + - **Yes**: Log and continue (connection stays alive) + - **No**: Raise exception (connection collapses) +4. If collapsed: Exception propagates to `_async_background_thread` +5. Background thread exits, `_close_exception` set for main thread +6. Pending operations detect collapse via `close_future` and fail with clear error + +**Why This Strategy Works**: +We get the benefits of [PR #922](https://github.com/strands-agents/sdk-python/pull/922) (no hanging) while avoiding unnecessary connection collapse from recoverable client-side errors. When collapse does occur, [PR #1169](https://github.com/strands-agents/sdk-python/pull/1169) ensures users get clear error messages instead of hanging. \ No newline at end of file diff --git a/src/strands/tools/mcp/mcp_client.py b/src/strands/tools/mcp/mcp_client.py index b16b9c2b4..bb5dca19c 100644 --- a/src/strands/tools/mcp/mcp_client.py +++ b/src/strands/tools/mcp/mcp_client.py @@ -75,6 +75,14 @@ class ToolFilters(TypedDict, total=False): "https://strandsagents.com/latest/user-guide/concepts/tools/mcp-tools/#mcpclientinitializationerror" ) +# Non-fatal error patterns that should not cause connection collapse +_NON_FATAL_ERROR_PATTERNS = [ + # Occurs when client receives response with unrecognized ID + # Can occur after a client-side timeout + # See: https://github.com/modelcontextprotocol/python-sdk/blob/c51936f61f35a15f0b1f8fb6887963e5baee1506/src/mcp/shared/session.py#L421 + "unknown request id", +] + class MCPClient(ToolProvider): """Represents a connection to a Model Context Protocol (MCP) server. @@ -558,13 +566,6 @@ def _handle_tool_result(self, tool_use_id: str, call_tool_result: MCPCallToolRes return result - # Raise an exception if the underlying client raises an exception in a message - # This happens when the underlying client has an http timeout error - async def _handle_error_message(self, message: Exception | Any) -> None: - if isinstance(message, Exception): - raise message - await anyio.lowlevel.checkpoint() - async def _async_background_thread(self) -> None: """Asynchronous method that runs in the background thread to manage the MCP connection. @@ -616,6 +617,17 @@ async def _async_background_thread(self) -> None: "encountered exception on background thread after initialization %s", str(e) ) + # Raise an exception if the underlying client raises an exception in a message + # This happens when the underlying client has an http timeout error + async def _handle_error_message(self, message: Exception | Any) -> None: + if isinstance(message, Exception): + error_msg = str(message).lower() + if any(pattern in error_msg for pattern in _NON_FATAL_ERROR_PATTERNS): + self._log_debug_with_thread("ignoring non-fatal MCP session error", message) + else: + raise message + await anyio.lowlevel.checkpoint() + def _background_task(self) -> None: """Sets up and runs the event loop in the background thread. diff --git a/tests/strands/tools/mcp/test_mcp_client.py b/tests/strands/tools/mcp/test_mcp_client.py index 130a4703e..ec77b48a2 100644 --- a/tests/strands/tools/mcp/test_mcp_client.py +++ b/tests/strands/tools/mcp/test_mcp_client.py @@ -688,3 +688,38 @@ def __init__(self): mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None) assert result["status"] == "success" assert len(result["content"]) == 0 # Unknown resource type should be dropped + + +@pytest.mark.asyncio +async def test_handle_error_message_non_fatal_error(): + """Test that _handle_error_message ignores non-fatal errors and logs them.""" + client = MCPClient(MagicMock()) + + # Test the message handler directly with a non-fatal error + with patch.object(client, "_log_debug_with_thread") as mock_log: + # This should not raise an exception + await client._handle_error_message(Exception("unknown request id: abc123")) + + # Verify the non-fatal error was logged as ignored + assert mock_log.called + call_args = mock_log.call_args[0] + assert "ignoring non-fatal MCP session error" in call_args[0] + + +@pytest.mark.asyncio +async def test_handle_error_message_fatal_error(): + """Test that _handle_error_message raises fatal errors.""" + client = MCPClient(MagicMock()) + + # This should raise the exception + with pytest.raises(Exception, match="connection timeout"): + await client._handle_error_message(Exception("connection timeout")) + + +@pytest.mark.asyncio +async def test_handle_error_message_non_exception(): + """Test that _handle_error_message handles non-exception messages.""" + client = MCPClient(MagicMock()) + + # This should not raise an exception + await client._handle_error_message("normal message") diff --git a/tests_integ/mcp/test_mcp_client.py b/tests_integ/mcp/test_mcp_client.py index 35cfd7e86..5c3baeba8 100644 --- a/tests_integ/mcp/test_mcp_client.py +++ b/tests_integ/mcp/test_mcp_client.py @@ -487,3 +487,38 @@ def transport_callback() -> MCPTransport: assert result["status"] == "error" assert result["content"][0]["text"] == "Tool execution failed: Connection to the MCP server was closed" + + +def test_mcp_client_connection_stability_with_client_timeout(): + """Integration test to verify connection remains stable with very small timeouts.""" + from datetime import timedelta + from unittest.mock import patch + + stdio_mcp_client = MCPClient( + lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/mcp/echo_server.py"])) + ) + + with stdio_mcp_client: + # Spy on the logger to capture non-fatal error messages + with patch.object(stdio_mcp_client, "_log_debug_with_thread") as mock_log: + # Make multiple calls with very small timeout to trigger "unknown request id" errors + for i in range(3): + try: + result = stdio_mcp_client.call_tool_sync( + tool_use_id=f"test_{i}", + name="echo", + arguments={"to_echo": f"test_{i}"}, + read_timeout_seconds=timedelta(milliseconds=0), # Very small timeout + ) + except Exception: + pass # Ignore exceptions, we're testing connection stability + + # Verify connection is still alive by making a successful call + result = stdio_mcp_client.call_tool_sync( + tool_use_id="final_test", name="echo", arguments={"to_echo": "connection_alive"} + ) + assert result["status"] == "success" + assert result["content"][0]["text"] == "connection_alive" + + # Verify that non-fatal error messages were logged + assert any("ignoring non-fatal MCP session error" in str(call) for call in mock_log.call_args_list)