In [1]:
import asyncio
import logging
import os
from contextlib import AsyncExitStack
from typing import List, Optional

from dotenv import load_dotenv
from langchain_core.messages import SystemMessage 
from langchain_core.tools import BaseTool
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


In [2]:
class Configuration:
    """配置管理类，负责管理和验证环境变量"""
    
    def __init__(self) -> None:
        """初始化配置并加载环境变量"""
        self.load_env()
        self._validate_env()
        
    @staticmethod
    def load_env() -> None:
        """从.env文件加载环境变量"""
        load_dotenv()
        
    def _validate_env(self) -> None:
        """验证必需的环境变量是否存在"""
        required_vars = ["DEEPSEEK_API_KEY"]
        missing_vars = [var for var in required_vars if not os.getenv(var)]
        if missing_vars:
            raise ValueError(f"缺少必需的环境变量: {', '.join(missing_vars)}")
    
    @property
    def api_key(self) -> str:
        """获取 DeepSeek API 密钥"""
        return os.getenv("DEEPSEEK_API_KEY", "")
    
    @property
    def base_url(self) -> str:
        """获取 DeepSeek API 基础 URL"""
        return os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
    
    @property
    def model(self) -> str:
        """获取 DeepSeek 模型名称"""
        return os.getenv("DEEPSEEK_MODEL", "deepseek-chat")


In [None]:
class MCPServer:
    """MCP 服务器管理类，处理服务器连接和工具执行"""
    def __init__(self, server_path: str) -> None:
        """
        初始化服务器管理器
        
        Args:
            server_path: 服务器脚本路径
        """
        self.server_path = server_path
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self._cleanup_lock = asyncio.Lock()
        

    async def initialize(self) -> None:
        """初始化服务器连接，包含重试机制"""
        max_retries = 3
        retry_delay = 1.0
        
        for attempt in range(max_retries):
            try:
                if not os.path.exists(self.server_path):
                    raise FileNotFoundError(f"找不到服务器文件: {self.server_path}")
                #这里可以增加使用在线MCP服务器的处理逻辑，参考https://github.com/langchain-ai/langchain-mcp-adapters
                
                server_params = StdioServerParameters(
                    command='python',
                    args=[self.server_path],
                    env=None
                )
                
                stdio_transport = await self.exit_stack.enter_async_context(
                    stdio_client(server_params)
                )
                stdio, write = stdio_transport
                
                self.session = await self.exit_stack.enter_async_context(
                    ClientSession(stdio, write)
                )
                await self.session.initialize()
                logger.info("成功连接到 MCP 服务器")
                break
                
            except Exception as e:
                logger.error(f"第 {attempt + 1}/{max_retries} 次尝试失败: {str(e)}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay)
                else:
                    raise

    async def list_tools(self) -> List[BaseTool]:
        """获取服务器提供的可用工具列表"""
        if not self.session:
            raise RuntimeError("服务器未初始化")
        # LangChain方式获取可用工具列表
        tools = await load_mcp_tools(self.session)
        logger.info(f"成功加载工具: {[tool.name for tool in tools]}")
        return tools

    async def cleanup(self) -> None:
        """清理服务器资源"""
        async with self._cleanup_lock:
            try:
                await self.exit_stack.aclose()
                self.session = None
                logger.info("服务器资源清理完成")
            except Exception as e:
                logger.error(f"清理过程中出错: {str(e)}")


In [None]:
class MCPClient:
    """MCP 客户端实现，集成了 DeepSeek API"""
    
    def __init__(self, config: Configuration) -> None:
        """
        初始化 MCP 客户端
        
        Args:
            config: 配置对象
        """
        self.config = config
        self.server: Optional[MCPServer] = None
        self.llm_client = ChatOpenAI(
            api_key=config.api_key,
            base_url=config.base_url,
            model=config.model
        )
        
    async def initialize(self) -> None:
        """初始化客户端并连接到服务器"""
        server_path = os.path.join(
            os.getcwd(),
            "server",
            "weather_server.py"
        )
        self.server = MCPServer(server_path)
        await self.server.initialize()
        
    async def process_query(self, query: str):
        """
        处理用户查询，集成工具调用，支持多轮工具交互

        Args:
            query: 用户查询字符串

        Returns:
            处理后的响应结果
        """
        if not self.server:
            raise RuntimeError("客户端未初始化")

        # 创建提示模板
        prompt = SystemMessage(content="""你是一个专注于天气信息的助手...（详细系统提示内容）""")
        
        # 获取工具
        tools = await self.server.list_tools()

        # 创建ReAct Agent
        logger.info("正在创建agent...")
        agent = create_react_agent(
            model=self.llm_client,  
            tools=tools,
            prompt=prompt
        )
        logger.info("Agent创建成功")

        # 发送查询
        logger.info("正在发送天气查询...")
        agent_response = await agent.ainvoke({
            "messages": query
        })

        # 返回响应
        return agent_response


In [11]:
logging.basicConfig(level=logging.INFO)
global logger
logger = logging.getLogger(__name__)

config = Configuration()
client = MCPClient(config)

await client.initialize()

query = "请告诉我今天北京的天气"
response = await client.process_query(query)

logger.info(f"查询响应: {response}")

INFO:__main__:成功连接到 MCP 服务器
INFO:__main__:成功加载工具: ['get_weather']
INFO:__main__:正在创建agent...
INFO:__main__:Agent创建成功
INFO:__main__:正在发送天气查询...
INFO:httpx:HTTP Request: POST https://api.deepseek.com/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.deepseek.com/chat/completions "HTTP/1.1 200 OK"
  response = await client.process_query(query)
INFO:__main__:查询响应: {'messages': [HumanMessage(content='请告诉我今天北京的天气', additional_kwargs={}, response_metadata={}, id='ab1677bb-2614-478a-b02d-826b6533e71d'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_0_3cb78a8b-2f5c-4b32-ac64-32419124d0d5', 'function': {'arguments': '{"city":"北京"}', 'name': 'get_weather'}, 'type': 'function', 'index': 0}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 20, 'prompt_tokens': 114, 'total_tokens': 134, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 64}, 'prompt_cache_hit_tokens': 64, 'pro