# MCP Multi-Agent Example: Multiagent multiple MCP Server.


this notebook is an example of a multi-agent orchestration where you can run multiple MCP server via an agent tool call.


In [3]:
import os
from contextlib import asynccontextmanager
from datetime import timedelta
from pathlib import Path
from typing import Any, AsyncIterator, Literal

import dotenv
import nest_asyncio
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from mcp.client.stdio import StdioServerParameters, stdio_client

from autogen import LLMConfig
from autogen.agentchat import AssistantAgent, ConversableAgent, initiate_group_chat
from autogen.agentchat.group import (
    AgentTarget,
    ReplyResult,
)
from autogen.agentchat.group.patterns import AutoPattern
from autogen.mcp import create_toolkit

nest_asyncio.apply()

define LLMConfig for the agent. make sure to set the OPENAI_API_KEY in your environment variables. you can use a .env file or set it directly in your environment


In [None]:
dotenv.load_dotenv()
llm_config = LLMConfig(
    model="o3-mini",
    api_type="openai",
    api_key=os.getenv("OPENAI_API_KEY"),
)

define server file paths for stdio client and server url for sse client.


In [5]:
FILESYSTEM_SERVER_PATH = Path("mcp/mcp_filesystem.py")
ARXIV_SERVER_PATH = Path("mcp/mcp_archive.py")
WIKIPEDIA_SERVER_URL = "http://localhost:8000/sse"

In [6]:
EncodingErrorHandlerType = Literal["strict", "ignore", "replace"]

DEFAULT_TEXT_ENCODING = "utf-8"
DEFAULT_TEXT_ENCODING_ERROR_HANDLER: EncodingErrorHandlerType = "strict"
DEFAULT_HTTP_REQUEST_TIMEOUT = 5
DEFAULT_SSE_EVENT_READ_TIMEOUT = 60 * 5
DEFAULT_STREAMABLE_HTTP_REQUEST_TIMEOUT = timedelta(seconds=30)
DEFAULT_STREAMABLE_HTTP_SSE_EVENT_READ_TIMEOUT = timedelta(seconds=60 * 5)

define connection params


In [7]:
# add MCP server params
mcp_connections: dict = {
    "FilesystemServer": {
        "command": "python3",
        "args": [FILESYSTEM_SERVER_PATH],
        "transport": "stdio",
    },
    "ArxivServer": {
        "command": "python3",
        "args": [ARXIV_SERVER_PATH],
        "transport": "stdio",
    },
    "WikipediaServer": {
        "url": WIKIPEDIA_SERVER_URL,
        "transport": "sse",
    },
}

add methods to create client sessions


In [17]:
@asynccontextmanager
async def create_stdio_mcp_session(
    *,
    command: str,
    arguments: list[str],
    environment: dict[str, str] | None = None,
    working_dir: str | Path | None = None,
    encoding: str = DEFAULT_TEXT_ENCODING,
    encoding_error_handler: EncodingErrorHandlerType = DEFAULT_TEXT_ENCODING_ERROR_HANDLER,
    session_options: dict[str, Any] | None = None,
) -> AsyncIterator[ClientSession]:
    """
    Create a new session to an MCP server using stdio

    Args:
        command: Command to execute
        arguments: Arguments for the command
        environment: Environment variables for the command
        working_dir: Working directory for the command
        encoding: Character encoding
        encoding_error_handler: How to handle encoding errors
        session_options: Additional keyword arguments to pass to the ClientSession
    """

    environment = environment or {}
    if "PATH" not in environment:
        environment["PATH"] = os.environ.get("PATH", "")

    server_params = StdioServerParameters(
        command=command,
        args=arguments,
        env=environment,
        cwd=working_dir,
        encoding=encoding,
        encoding_error_handler=encoding_error_handler,
    )

    async with (
        stdio_client(server_params) as (reader, writer),
        ClientSession(reader, writer, **(session_options or {})) as session,
    ):
        yield session


async def create_sse_mcp_session(
    *,
    url: str,
    headers: dict[str, Any] | None = None,
    timeout: float = DEFAULT_HTTP_REQUEST_TIMEOUT,
    sse_read_timeout: float = DEFAULT_SSE_EVENT_READ_TIMEOUT,
    session_options: dict[str, Any] | None = None,
    httpx_client_factory: Any = None,
) -> AsyncIterator[ClientSession]:
    """Create a new session to an MCP server using SSE

    Args:
        url: URL of the SSE server
        headers: HTTP headers to send to the SSE endpoint
        timeout: HTTP timeout
        sse_read_timeout: SSE read timeout
        session_options: Additional keyword arguments to pass to the ClientSession
        httpx_client_factory: Custom factory for httpx.AsyncClient (optional)
    """
    kwargs = {}
    if httpx_client_factory is not None:
        kwargs["httpx_client_factory"] = httpx_client_factory

    async with (
        sse_client(url, headers, timeout, sse_read_timeout, **kwargs) as (reader, writer),
        ClientSession(reader, writer, **(session_options or {})) as session,
    ):
        yield session

open a mcp session


In [18]:
@asynccontextmanager
async def open_mcp_session(
    config,
) -> AsyncIterator[ClientSession]:
    """
    Open a new session to an MCP server.
    Args:
        config: Configuration dictionary for the connection
    Raises:
        KeyError: If required parameters for the specified transport are missing
        NotImplementedError: If transport is not recognized

    Yields:
        A ClientSession
    """
    transport_type = config["transport"]
    if transport_type == "sse":
        if "url" not in config:
            raise KeyError("'url' parameter is required for SSE connection")
        async with create_sse_mcp_session(
            url=config["url"],
            headers=config.get("headers"),
            timeout=config.get("timeout", DEFAULT_HTTP_REQUEST_TIMEOUT),
            sse_read_timeout=config.get("sse_read_timeout", DEFAULT_SSE_EVENT_READ_TIMEOUT),
            session_options=config.get("session_options"),
            httpx_client_factory=config.get("httpx_client_factory"),
        ) as session:
            yield session
    elif transport_type == "stdio":
        if "command" not in config:
            raise KeyError("'command' parameter is required for stdio connection")
        if "arguments" not in config:
            raise KeyError("'arguments' parameter is required for stdio connection")
        async with create_stdio_mcp_session(
            command=config["command"],
            arguments=config["arguments"],
            environment=config.get("environment"),
            working_dir=config.get("working_dir"),
            encoding=config.get("encoding", DEFAULT_TEXT_ENCODING),
            encoding_error_handler=config.get("encoding_error_handler", DEFAULT_TEXT_ENCODING_ERROR_HANDLER),
            session_options=config.get("session_options"),
        ) as session:
            yield session
    else:
        raise NotImplementedError(f"Unsupported transport: {transport_type}. Must be one of: 'stdio', 'sse'")

define agent orchestration pattern where mcp_agent is responsible for responding to mcp queries.


In [10]:
system_message = """
You are a mcp agent that can use mcp tools to analyze the content of the file and return the result.
you have two mcp servers connected to you.
1. FilesystemServer
2. WikipediaServer
3. ArxivServer
You can use the FilesystemServer to read files from the filesystem, and you can use the WikipediaServer to search for information on Wikipedia.
You can also use the ArxivServer to search for scientific papers on arXiv.
you can use the tools of both the servers to analyze the content of the file and return the result.
# Note:
    - Identify the server name by analyzing the human question/initial message.
"""
with llm_config:
    mcp_agent = ConversableAgent(
        name="mcp_agent",
        system_message=system_message,
    )
    assistant = ConversableAgent(
        name="assistant",
        system_message="""You are an intelligent assistant agent. Your job is to analyze user queries and decide which MCP server/tool is most appropriate to answer the question:
        - Use 'FilesystemServer' for file reading, file content analysis, or anything related to local files.
        - Use 'WikipediaServer' for general knowledge, facts, or information that can be found on Wikipedia.
        - Use 'ArxivServer' for scientific literature, research papers, or academic topics.
        Route the query to the correct server/tool and provide clear, concise responses based on the tool's output.
        """,
        max_consecutive_auto_reply=3,
    )
    arxiv_summary_agent = ConversableAgent(
        name="arxiv_summary_agent",
        system_message="""You are an agent specializing in summarizing scientific papers and arXiv responses.
        When you receive information from the assistant that originates from ArxivServer, extract the main contributions, summarize the findings,
        and present them in a clear, accessible way for a general audience.""",
        max_consecutive_auto_reply=1,
    )
    file_analysis_agent = ConversableAgent(
        name="file_analysis_agent",
        system_message="""You are an expert in file content analysis.
        When provided with file data (from FilesystemServer), extract key information, summarize the main points,
        and highlight any anomalies or important findings. Present your analysis in a structured and easy-to-understand format.""",
        max_consecutive_auto_reply=1,
    )


# define orchestrate a pattern
pattern = AutoPattern(
    initial_agent=mcp_agent,
    agents=[mcp_agent, assistant, arxiv_summary_agent, file_analysis_agent],
    group_manager_args={"llm_config": llm_config},
)

In [11]:
tool_prompt = """
    Run the MCP agent for the given server name and query.
    This function executes the MCP agent with the specified query and server name,
    returning the result of the MCP agent's processing.
    Args:
        query (str): The query to be processed by the MCP agent.
        server_name (str): The name of the server that is connected to the MCP agent.
    Returns:
        ReplyResult: The result of the MCP agent's processing.
"""


@mcp_agent.register_for_llm(description=tool_prompt)
@assistant.register_for_execution(description=tool_prompt)
async def run_mcp_agent_to_client(query: str, server_name: str) -> ReplyResult:
    if server_name not in mcp_connections:
        raise KeyError(
            f"Couldn't find a server with name '{server_name}', expected one of '{list(mcp_connections.keys())}'"
        )
    async with open_mcp_session(mcp_connections[server_name]) as session:
        await session.initialize()
        toolkit = await create_toolkit(session=session)
        agent = AssistantAgent(
            name="assistant",
            llm_config=llm_config,
            human_input_mode="NEVER",
        )
        toolkit.register_for_llm(agent)
        # Make a request using the MCP tool
        result = await agent.a_run(
            message=query,
            tools=toolkit.tools,
            max_turns=2,
        )
        res = await result.process()
        return ReplyResult(
            message=str(res),
            target=AgentTarget(assistant),
        )

In [None]:
result, context_variables, last_agent = initiate_group_chat(
    messages="perform a wikipedia search for the term 'domain-driven design' show me at least 10 results",
    max_rounds=5,
    pattern=pattern,
)