### OpenAI 函数调用（tools Schema）回顾

在 Chat Completions API 中，通过 `tools` 字段向模型暴露可调用的函数（工具）。每个工具定义如下：

- `type`: 固定为 `function`
- `function`:
  - `name` (string): 工具名称（小写、下划线风格更稳妥，长度 ≤ 64 常见做法）
  - `description` (string): 工具用途的自然语言描述（有助于模型选择正确工具，建议简洁清晰）
  - `parameters` (object): 满足 JSON Schema Draft-07 的参数定义，用于指导模型正确组装入参
    - `type`: 通常为 `object`
    - `properties`: 各字段的类型与描述
    - `required`: 必填字段名列表
    - `additionalProperties`: 是否允许未声明字段（建议 `false` 以提高鲁棒性）

示例（与本笔记 `Action` 转换保持一致）：
```json
{
  "type": "function",
  "function": {
    "name": "read_project_file",
    "description": "Reads a file from the project.",
    "parameters": {
      "type": "object",
      "properties": {
        "name": { "type": "string", "description": "The file path to read" }
      },
      "required": ["name"],
      "additionalProperties": false
    }
  }
}
```

模型返回时会在 `message.tool_calls` 中给出调用的 `function.name` 与 `function.arguments`（JSON 字符串）。你的代码需：
- 解析 `arguments`（`json.loads`）
- 路由到本地实现函数执行
- 将执行结果写回对话历史（便于下一轮推理）

可选高级配置（按实际接口版本支持情况使用）：
- `tool_choice`: 强制使用某个工具或允许模型自由选择
- `parallel_tool_calls`: 是否允许并行调用（若可用）
- `response_format`: 强制 JSON 输出等（如需要结构化）

参考文档（官方）：
- OpenAI 工具/函数调用总览（Chat Completions）: [Function calling & tools](https://platform.openai.com/docs/guides/function-calling)
- JSON Schema 规范（参考）: [JSON Schema](https://json-schema.org/)
- Chat Completions 消息与工具调用字段说明: [Chat Completions API](https://platform.openai.com/docs/api-reference/chat)



# GAME框架：AI智能体设计架构

本笔记实现了一个“可复用的智能体（Agent）最小框架”，用来演示如何基于 GAME 设计法将智能体拆分为四个核心部件，并以可插拔方式组织：

- **G（Goals / Instructions）**：目标与指令。描述智能体要实现的结果，以及实现策略/规则。
- **A（Actions）**：动作/工具。定义智能体可以调用的能力（如读取文件、列目录、结束会话等）。
- **M（Memory）**：记忆。跨回合保留上下文（用户输入、助手决策、工具执行结果），支持后续回合继续推理。
- **E（Environment）**：环境。动作在真实世界中的执行载体，负责真正“落地执行”动作并返回结果（含时间戳与错误信息）。

本框架通过一个统一的 **Agent** 循环（Loop）把 G/A/M/E 串起来：
1. 构造 Prompt（包含 Goals、可用 Actions 的函数调用Schema、Memory 历史）。
2. 发送给 LLM，得到“选择的动作以及参数”（函数调用）。」
3. 在 **Environment** 中执行该动作，得到结果（或错误）。
4. 将决策与结果写入 **Memory**，进入下一轮。
5. 如果动作为终止类动作（如 `terminate`），则结束循环。

你可以把 **Actions** 看成“能力接口”，把 **Environment** 看成“执行实现”。这种解耦使得：
- 你可以替换不同环境（本地、云端、GitHub Actions、容器等），而无需修改智能体决策逻辑；
- 你可以更换一组 Actions（比如从文件工具换成 Web API 工具），而无需修改主循环；
- 你可以替换/扩展 **AgentLanguage**（Prompt 格式与解析逻辑），以适配“函数调用/纯文本解析”等不同LLM交互方式。

本笔记下半部分提供了一个最小示例：
- 定义了 3 个动作：`list_project_files`、`read_project_file`、`terminate`
- 目标：读取项目文件并在结束时输出 README 内容（示例运行环境为空目录时会直接终止）
- 使用 OpenAI 调用 `gpt-4o`，但可轻松替换为任意 LLM 提供商

### GAME 智能体业务架构
```mermaid
graph TB
    subgraph "初始化阶段"
        A[设置环境变量] --> B[OpenAI API Key]
        A --> C[OpenAI Base URL]
    end
    
    subgraph "核心组件定义 - GAME"
        D[Goals 目标定义] --> |包含优先级/名称/描述| D1[Goal对象]
        E[Actions 动作定义] --> |包含函数/描述/参数Schema| E1[Action对象]
        F[Memory 记忆管理] --> |存储user/assistant/environment消息| F1[Memory对象]
        G[Environment 环境执行] --> |执行动作/捕获异常/格式化结果| G1[Environment对象]
    end
    
    subgraph "动作注册"
        E1 --> H[ActionRegistry]
        H --> H1[list_project_files]
        H --> H2[read_project_file]
        H --> H3[terminate]
    end
    
    subgraph "语言适配层"
        I[AgentLanguage] --> J[AgentFunctionCallingActionLanguage]
        J --> J1[构造Prompt: Goals+Memory+Tools]
        J --> J2[解析Response: 提取工具调用]
    end
    
    subgraph "Agent 主控制器"
        K[Agent实例化] --> |注入G/A/M/E| L[Agent.run方法]
    end
    
    B --> K
    C --> K
    D1 --> K
    H --> K
    F1 --> K
    G1 --> K
    J --> K
    
    L --> M[执行Agent循环]
    M --> N[返回最终Memory]
```

In [1]:
# 安装必要的依赖包
!!pip install openai==1.107.0

['Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple',
 '\x1b[0m']

In [2]:
# 导入必要的模块
import os, getpass

def _set_env(var: str):
    """
    设置环境变量的辅助函数

    参数:
        var (str): 要设置的环境变量名称

    功能:
        - 检查环境变量是否已存在
        - 如果不存在，则提示用户输入并设置
    """
    if not os.environ.get(var):  # 检查环境变量是否已设置
        os.environ[var] = getpass.getpass(f"{var}: ")  # 安全地获取用户输入

# 设置 OpenAI API 密钥
# 这是使用 OpenAI 模型所必需的
_set_env("OPENAI_API_KEY")
# 设置 OpenAI API代理地址 (例如：https://api.apiyi.com/v1）
_set_env("OPENAI_BASE_URL")

In [3]:
# =============================== 核心框架：导入与类型定义 ===============================
# 说明：以下代码实现了一个最小可复用的智能体框架（面向函数调用工具）。
# - 不修改任何原有逻辑，仅通过中文注释解释设计意图与用法。
# - 关键模块：Prompt 数据结构、LLM 响应函数、Goal/Action/ActionRegistry、Memory、Environment、AgentLanguage、Agent。

import json
import time
import traceback
from openai import OpenAI # 用于调用OpenAI API
from dataclasses import dataclass, field
from typing import List, Callable, Dict, Any
from pathlib import Path


# 大语言模型
client=OpenAI(
    base_url=os.environ['OPENAI_BASE_URL'],
    api_key=os.environ['OPENAI_API_KEY']
)

# Prompt：封装要发给 LLM 的消息与工具定义
# - messages：对话上下文（系统/用户/助手三类）
# - tools：工具（函数）调用的 JSON Schema 描述（让 LLM 能“看见”可用的动作）
# - metadata：元数据（可选扩展，用 dict 保存）
@dataclass
class Prompt:
    messages: List[Dict] = field(default_factory=list)
    tools: List[Dict] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)


# generate_response：统一的 LLM 调用入口
# - 入参是 Prompt，内部自动根据是否提供 tools 来决定是否启用函数调用能力
# - 目标：把模型提供商与主循环解耦；将来切换模型时无需改 Agent 逻辑
# - 返回：
#   * 无工具时：直接返回助手文本
#   * 有工具时：优先解析 tool_calls（并转为 {tool, args} 的 JSON 字符串）
#               若无工具调用，则退化为普通文本回复
def generate_response(prompt: Prompt) -> str:
    """调用大语言模型（LLM）生成响应：
    - 当未提供 tools（函数调用能力）时，作为普通对话返回文本
    - 当提供 tools 时，优先解析函数调用的结构化结果；若无函数调用则退化为普通文本
    """

    messages = prompt.messages
    tools = prompt.tools

    result = None

    if not tools:
        # 无工具：普通对话
        response = client.chat.completions.create(
            model="gpt-4o",   # 指定使用的模型
            messages=messages,  # 发送消息历史
            max_tokens=1024   # 限制响应长度
        )
        result = response.choices[0].message.content
    else:
        # 有工具：提示模型按函数调用格式返回 tool_calls
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
            max_tokens=1024
        )

        if response.choices[0].message.tool_calls:
            # 这里仅取第一个工具调用作为最小可运行演示
            tool = response.choices[0].message.tool_calls[0]
            result = {
                "tool": tool.function.name,
                "args": json.loads(tool.function.arguments),
            }
            # 将 dict 序列化为字符串，便于统一处理与存入记忆
            result = json.dumps(result)
        else:
            # 即使提供了 tools，也可能返回纯文本（例如模型策略判断不调用工具）
            result = response.choices[0].message.content


    return result


# Goal：目标对象
# - priority：目标优先级（便于排序/裁剪）
# - name/description：目标名称和详细说明（同时涵盖“要做什么/如何做”）
# 使用 @dataclass 装饰器定义 Goal 为一个不可变的数据类（frozen=True），这意味着其实例一旦创建，其属性值就不能被修改，有助于保证数据安全和可靠性。
@dataclass(frozen=True)
class Goal:
    priority: int
    name: str
    description: str


# Action：动作/工具的抽象
# - name：动作名（作为工具名暴露给 LLM）
# - function：实际执行的 Python 函数
# - description：工具说明，帮助 LLM 选择正确工具
# - parameters：JSON Schema（决定 LLM 该如何拼好参数）
# - terminal：是否为“终止型”动作（被选中后终止主循环）
class Action:
    def __init__(self,
                 name: str,
                 function: Callable,
                 description: str,
                 parameters: Dict,
                 terminal: bool = False):
        self.name = name
        self.function = function
        self.description = description
        self.terminal = terminal
        self.parameters = parameters

    def execute(self, **args) -> Any:
        """执行该动作所绑定的底层函数，参数通过关键字形式解包传入"""
        # 解包参数并调用底层实现函数
        return self.function(**args)


# ActionRegistry：动作/工具注册表
# - 负责集中管理动作/工具对象，支持按名称检索与批量导出供 AgentLanguage 生成工具Schema
class ActionRegistry:
    def __init__(self):
        self.actions = {}

    def register(self, action: Action):
        self.actions[action.name] = action

    def get_action(self, name: str) -> [Action, None]:
        return self.actions.get(name, None)

    def get_actions(self) -> List[Action]:
        """获取所有已注册的动作，按注册顺序返回列表"""
        return list(self.actions.values())


# Memory：回合记忆
# - items：统一存储“用户/助手/环境”等事件，形成对话历史
# - 通过 get_memories 提供最近N条消息给提示构造使用
# - 通过 copy_without_system_memories 可过滤掉系统消息（某些场景需要）
class Memory:
    def __init__(self):
        self.items = []  # Basic conversation histor

    def add_memory(self, memory: dict):
        """将一条记忆事件追加到工作记忆，用于后续提示词构造与推理"""
        self.items.append(memory)

    def get_memories(self, limit: int = None) -> List[Dict]:
        """获取用于提示词的对话历史；可通过 limit 限制条数以控制上下文长度"""
        return self.items[:limit]

    def copy_without_system_memories(self):
        """返回一份不包含系统类型（type==system）记忆的副本，用于部分提示场景"""
        filtered_items = [m for m in self.items if m["type"] != "system"]
        memory = Memory()
        memory.items = filtered_items
        return memory


# Environment：环境层（动作的真实执行者）
# - execute_action：捕获执行异常，统一返回结构（是否执行成功/错误/traceback/时间戳）
# - format_result：为成功结果补充元数据（时间戳），便于记录与日志化
class Environment:
    def execute_action(self, action: Action, args: dict) -> dict:
        """执行指定动作并返回标准化结果；捕获异常并提供错误与追踪信息"""
        try:
            result = action.execute(**args)
            return self.format_result(result)
        except Exception as e:
            return {
                "tool_executed": False,
                "error": str(e),
                "traceback": traceback.format_exc()
            }

    def format_result(self, result: Any) -> dict:
        """为执行结果补充元数据（如时间戳）并统一为标准结构"""
        return {
            "tool_executed": True,
            "result": result,
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z")
        }


# AgentLanguage：语言适配层
# - 负责把（Goals/Actions/Memory）格式化为 LLM 需要的 Prompt
# - 负责从 LLM 的原始输出中解析出“要调用的工具与参数”
class AgentLanguage:
    def __init__(self):
        pass

    def construct_prompt(self,
                         actions: List[Action],
                         environment: Environment,
                         goals: List[Goal],
                         memory: Memory) -> Prompt:
        raise NotImplementedError("Subclasses must implement this method")


    def parse_response(self, response: str) -> dict:
        raise NotImplementedError("Subclasses must implement this method")



# AgentFunctionCallingActionLanguage：基于“函数调用”范式的语言适配实现
# - 将 Goals 拼接为 system 消息
# - 将 Memory 规范化映射为 user/assistant 消息
# - 将 Actions 转换为符合 OpenAI 函数调用的 tools Schema
class AgentFunctionCallingActionLanguage(AgentLanguage):

    def __init__(self):
        super().__init__()

    def format_goals(self, goals: List[Goal]) -> List:
        # 把所有目标拼接为一个 system 消息，便于集中表达“要做什么/如何做”
        sep = "\n-------------------\n"
        goal_instructions = "\n\n".join([f"{goal.name}:{sep}{goal.description}{sep}" for goal in goals])
        return [
            {"role": "system", "content": goal_instructions}
        ]

    def format_memory(self, memory: Memory) -> List:
        """将 Memory 转换为对话消息格式，供 LLM 上下文使用"""
        # 记忆格式化策略：
        # - environment 的输出也作为 assistant 角色加入（让模型能“看到”工具执行结果）
        # - user/assistant 原样映射
        items = memory.get_memories()
        mapped_items = []
        for item in items:

            content = item.get("content", None)
            if not content:
                content = json.dumps(item, indent=4)

            if item["type"] == "assistant":
                mapped_items.append({"role": "assistant", "content": content})
            elif item["type"] == "environment":
                mapped_items.append({"role": "assistant", "content": content})
            else:
                mapped_items.append({"role": "user", "content": content})

        return mapped_items

    def format_actions(self, actions: List[Action]) -> [List,List]:
        """将已注册的动作转换为 OpenAI 函数调用所需的 tools Schema"""

        # 将注册的 Action 转为 OpenAI 函数调用工具的 Schema 数组
        tools = [
            {
                "type": "function",
                "function": {
                    "name": action.name,
                    # 描述过长可能无效，限制到 1024 字符
                    "description": action.description[:1024],
                    "parameters": action.parameters,
                },
            } for action in actions
        ]

        return tools

    def construct_prompt(self,
                         actions: List[Action],
                         environment: Environment,
                         goals: List[Goal],
                         memory: Memory) -> Prompt:

        # 构造最终 Prompt：Goals（system）+ Memory（历史消息）+ Tools（函数Schema）
        prompt = []
        prompt += self.format_goals(goals)
        prompt += self.format_memory(memory)

        tools = self.format_actions(actions)

        return Prompt(messages=prompt, tools=tools)

    def adapt_prompt_after_parsing_error(self,
                                         prompt: Prompt,
                                         response: str,
                                         traceback: str,
                                         error: Any,
                                         retries_left: int) -> Prompt:
        # 解析失败后的“自适应 Prompt”策略（此处保留扩展点，演示版不做修改）
        return prompt

    def parse_response(self, response: str) -> dict:
        """将 LLM 的响应解析为结构化格式（优先尝试 JSON 解析，失败则回退为终止工具）"""

        # 期望 LLM 返回 JSON 字符串：{"tool": 工具名, "args": {...}}
        try:
            return json.loads(response)

        except Exception as e:
            # 若无法解析，则将内容作为 message 交给终止工具，友好退出
            return {
                "tool": "terminate",
                "args": {"message":response}
            }


# Agent：智能体主循环
# - 维护并协调 G/A/M/E（目标/动作/记忆/环境）
# - 统一的 prompt 构造、响应解析、动作执行、记忆更新、终止判断
class Agent:
    def __init__(self,
                 goals: List[Goal],
                 agent_language: AgentLanguage,
                 action_registry: ActionRegistry,
                 generate_response: Callable[[Prompt], str],
                 environment: Environment):
        """
        使用核心的 GAME 组件初始化智能体：
        - goals：目标与指令集合
        - agent_language：语言适配层（提示词构造与解析）
        - generate_response：LLM 调用函数
        - environment：动作执行环境
        """
        self.goals = goals
        self.generate_response = generate_response
        self.agent_language = agent_language
        self.actions = action_registry
        self.environment = environment

    def construct_prompt(self, goals: List[Goal], memory: Memory, actions: ActionRegistry) -> Prompt:
        """基于当前目标、记忆与动作集合构造提示词（Prompt）"""
        return self.agent_language.construct_prompt(
            actions=actions.get_actions(),
            environment=self.environment,
            goals=goals,
            memory=memory
        )

    def get_action(self, response):
        # 解析 LLM 的返回，得到动作名与参数（invocation）
        invocation = self.agent_language.parse_response(response)
        action = self.actions.get_action(invocation["tool"])
        return action, invocation

    def should_terminate(self, response: str) -> bool:
        # 若当前选择的动作被标记为 terminal，则结束主循环
        action_def, _ = self.get_action(response)
        return action_def.terminal

    def set_current_task(self, memory: Memory, task: str):
        # 将用户输入写入记忆，作为本轮起始任务语境
        memory.add_memory({"type": "user", "content": task})

    def update_memory(self, memory: Memory, response: str, result: dict):
        """
        使用“决策 + 执行结果”更新记忆：
        - 将助手的决策（response）作为 assistant 事件存入
        - 将环境执行结果（result）序列化为 JSON，作为 environment 事件存入
        """
        # 统一把“助手的决策（response）”与“环境执行结果（result）”写入记忆
        new_memories = [
            {"type": "assistant", "content": response},
            {"type": "environment", "content": json.dumps(result)}
        ]
        for m in new_memories:
            memory.add_memory(m)

    def prompt_llm_for_action(self, full_prompt: Prompt) -> str:
        # 将 Prompt 发送给 LLM，得到“下一步动作/或文本回复”
        response = self.generate_response(full_prompt)
        return response

    def run(self, user_input: str, memory=None, max_iterations: int = 50) -> Memory:
        """
        执行该智能体的 GAME 主循环，可设置最大迭代次数：
        - 每轮：构造 Prompt -> 让 LLM 决策 -> 解析动作 -> 环境执行 -> 写回记忆 -> 终止判断
        """
        # 初始化记忆并写入用户任务
        memory = memory or Memory()
        self.set_current_task(memory, user_input)

        for _ in range(max_iterations):
            # 1) 用当前 Goals/Actions/Memory 构造 Prompt
            prompt = self.construct_prompt(self.goals, memory, self.actions)

            print("Agent thinking...")
            # 2) 发送给 LLM，得到“将要调用的动作及其参数”或普通文本
            response = self.prompt_llm_for_action(prompt)
            print(f"Agent Decision: {response}")

            # 3) 解析动作与参数
            action, invocation = self.get_action(response)

            # 4) 在环境中真实执行动作
            result = self.environment.execute_action(action, invocation["args"])
            print(f"Action Result: {result}")

            # 5) 将“决策 + 结果”写回记忆，形成闭环
            self.update_memory(memory, response, result)

            # 6) 终止判断：如果动作为终止型，则跳出循环
            if self.should_terminate(response):
                break

        return memory


In [4]:
# =============================== 示例：最小可运行 Agent ===============================
# 1) 定义智能体目标（Goals）：
#    - 读取项目中的每个文件
#    - 当已读取完毕时调用 terminate，并在消息中提供 README 的内容（示例环境如为空目录会直接终止）
goals = [
    Goal(priority=1, name="Gather Information", description="Read each file in the project"),
    Goal(priority=1, name="Terminate", description="Call the terminate call when you have read all the files "
                                                   "and provide the content of the README in the terminate message")
]

# 2) 指定语言适配器（基于函数调用的 Prompt/解析策略）
agent_language = AgentFunctionCallingActionLanguage()

# 3) 实现底层动作：读取文件
def read_project_file(name: str) -> str:
    with open(name, "r") as f:
        return f.read()

# 4) 实现底层动作：列出当前目录及子目录下的 .py 文件（最小示例）
def list_project_files() -> List[str]:
    """
    使用 pathlib.Path.rglob 递归查找所有 .py 文件。
    """
    # 1. Path(".") 代表当前目录
    # 2. .rglob("*.py") 递归地 (r) 查找 (glob) 所有匹配 "*.py" 的文件
    
    # rglob 会返回一个生成器，包含所有匹配的 Path 对象
    py_files_paths = Path(".").rglob("*.py")
    
    # 3. 将 Path 对象转换为字符串，并确保它们是文件（排除目录）
    #    str(p) 会自动生成正确的相对路径（例如 'my_app/helpers.py'）
    files = [str(p) for p in py_files_paths if p.is_file()]
    
    return sorted(files)



# 5) 注册动作：将 Python 函数“暴露”为可被 LLM 选择的工具
action_registry = ActionRegistry()
action_registry.register(Action(
    name="list_project_files",
    function=list_project_files,
    description="Lists all files in the project.",
    parameters={},
    terminal=False
))
action_registry.register(Action(
    name="read_project_file",
    function=read_project_file,
    description="Reads a file from the project.",
    parameters={
        "type": "object",
        "properties": {
            "name": {"type": "string"}
        },
        "required": ["name"]
    },
    terminal=False
))
action_registry.register(Action(
    name="terminate",
    function=lambda message: f"{message}\nTerminating...",
    description="Terminates the session and prints the message to the user.",
    parameters={
        "type": "object",
        "properties": {
            "message": {"type": "string"}
        },
        "required": []
    },
    terminal=True
))

# 6) 准备环境（负责真实执行动作并返回标准化结果）
environment = Environment()

# 7) 构建 Agent 实例（组装 G/A/M/E 与 LLM 响应函数）
agent = Agent(goals, agent_language, action_registry, generate_response, environment)

# 8) 运行智能体（输入一个自然语言任务），内部会进入循环直到触发终止或达到最大轮数
user_input = "Write a README for this project."
final_memory = agent.run(user_input)

# 9) 输出最终的记忆（包含用户任务、助手决策、环境执行结果等）
print(final_memory.get_memories())

Agent thinking...
Agent Decision: {"tool": "list_project_files", "args": {}}
Action Result: {'tool_executed': True, 'result': ['mcp-demo/client/mcp_client.py', 'mcp-demo/client/mcp_client_deepseek.py', 'mcp-demo/client/mcp_client_langchain_chat.py', 'mcp-demo/server/weather_server.py'], 'timestamp': '2025-11-09T00:16:12+0800'}
Agent thinking...
Agent Decision: {"tool": "read_project_file", "args": {"name": "mcp-demo/client/mcp_client.py"}}
Agent thinking...
Agent Decision: {"tool": "read_project_file", "args": {"name": "mcp-demo/client/mcp_client_deepseek.py"}}
Agent thinking...
Agent Decision: {"tool": "read_project_file", "args": {"name": "mcp-demo/client/mcp_client_langchain_chat.py"}}
Agent thinking...
Agent Decision: {"tool": "read_project_file", "args": {"name": "mcp-demo/server/weather_server.py"}}
Agent thinking...


In [5]:
# 将智能体运行结果以 Markdown 形式美化展示
from IPython.display import display, Markdown
import json

def _format_env_result(env_json_str: str) -> str:
    try:
        obj = json.loads(env_json_str)
    except Exception:
        return env_json_str
    if isinstance(obj, dict) and obj.get("tool_executed") is True:
        result = obj.get("result", "")
        ts = obj.get("timestamp", "")
        if isinstance(result, list):
            body = "\n".join([f"- {x}" for x in result])
            return f"执行成功（{ts}）\n\n可用文件列表：\n{body}"
        if isinstance(result, str):
            if result.strip().startswith("# ") or "\n## " in result:
                return f"执行成功（{ts}）\n\n生成内容：\n\n```markdown\n{result}\n```"
            return f"执行成功（{ts}）\n\n```text\n{result}\n```"
        return f"执行成功（{ts}）\n\n```json\n{json.dumps(result, ensure_ascii=False, indent=2)}\n```"
    if isinstance(obj, dict) and obj.get("tool_executed") is False:
        err = obj.get("error", "")
        tb = obj.get("traceback", "")
        return f"执行失败\n\n错误：`{err}`\n\n<details><summary>Traceback</summary>\n\n```text\n{tb}\n```\n\n</details>"
    return f"```json\n{json.dumps(obj, ensure_ascii=False, indent=2)}\n```"

md_lines = [
    "# 智能体执行报告",
]

if 'final_memory' not in globals():
    display(Markdown("> 未检测到 final_memory 变量，请先运行上方示例执行智能体。"))
else:
    memories = final_memory.get_memories()

    # 概览
    md_lines.append("## 概览")
    md_lines.append(f"- 总事件数：{len(memories)}")

    # 逐步展示
    md_lines.append("\n## 交互明细\n")
    for idx, item in enumerate(memories, 1):
        typ = item.get("type", "unknown")
        content = item.get("content", "")
        if typ == "user":
            md_lines.append(f"### 步骤 {idx} · 用户输入")
            md_lines.append(f"> {content}")
        elif typ == "assistant":
            md_lines.append(f"### 步骤 {idx} · 助手决策（工具调用）")
            try:
                call = json.loads(content)
                tool = call.get("tool", "?")
                args = call.get("args", {})
                md_lines.append(f"- 工具：`{tool}`")
                md_lines.append("- 参数：")
                md_lines.append(f"```json\n{json.dumps(args, ensure_ascii=False, indent=2)}\n```")
            except Exception:
                md_lines.append("- 文本回复：")
                md_lines.append(f"```text\n{content}\n```")
        elif typ == "environment":
            md_lines.append(f"### 步骤 {idx} · 环境执行结果")
            md_lines.append(_format_env_result(content))
        else:
            md_lines.append(f"### 步骤 {idx} · 其他")
            md_lines.append(f"```text\n{content}\n```")

    # 摘取 README 内容（若存在 terminate 消息）
    md_lines.append("\n## 生成的 README（若已终止并返回）\n")
    readme_blocks = []
    for item in memories[::-1]:
        if item.get("type") == "environment":
            try:
                obj = json.loads(item["content"]) if isinstance(item.get("content"), str) else item["content"]
                if obj.get("tool_executed") and isinstance(obj.get("result"), str) and obj["result"].lstrip().startswith("# "):
                    readme_blocks.append(obj["result"].replace("\nTerminating...", "").strip())
                    break
            except Exception:
                pass
    if readme_blocks:
        md_lines.append("```markdown\n" + readme_blocks[0] + "\n```")
    else:
        md_lines.append("> 本次执行未生成 README 内容或未调用终止工具。")

    display(Markdown("\n\n".join(md_lines)))


# 智能体执行报告

## 概览

- 总事件数：13


## 交互明细


### 步骤 1 · 用户输入

> Write a README for this project.

### 步骤 2 · 助手决策（工具调用）

- 工具：`list_project_files`

- 参数：

```json
{}
```

### 步骤 3 · 环境执行结果

执行成功（2025-11-09T00:16:12+0800）

可用文件列表：
- mcp-demo/client/mcp_client.py
- mcp-demo/client/mcp_client_deepseek.py
- mcp-demo/client/mcp_client_langchain_chat.py
- mcp-demo/server/weather_server.py

### 步骤 4 · 助手决策（工具调用）

- 工具：`read_project_file`

- 参数：

```json
{
  "name": "mcp-demo/client/mcp_client.py"
}
```

### 步骤 5 · 环境执行结果

执行成功（2025-11-09T00:16:14+0800）

```text
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
MCP 客户端示例

用于与 MCP 服务器交互，调用工具并处理结果

Author: FlyAIBox
Date: 2025.10.11
"""

import asyncio
import json
import os
import subprocess
import sys
from typing import Dict, Any, List, Optional
from contextlib import AsyncExitStack

# 使用正确的 MCP 导入
from mcp import ClientSession, StdioServerParameters, Tool
from mcp.client.stdio import stdio_client

class SimpleClientApp:
    """简单的 MCP 客户端应用"""
    
    def __init__(self, server_command: List[str]):
        """
        初始化 MCP 客户端应用
        
        参数:
            server_command: 启动服务器的命令列表
        """
        self.server_command = server_command
        self.server_process = None
        self.client = None
        self.tool_definitions = []
        self.exit_stack = AsyncExitStack()
    
    async def start(self):
        """启动 MCP 客户端并连接到服务器"""
        print("启动 MCP 服务器进程...")
        
        # 配置服务器参数
        server_params = StdioServerParameters(
            command=self.server_command[0],
            args=self.server_command[1:],
            env=None  # 使用默认环境变量
        )
        
        # 启动服务器并获取通信流
        # 通过 stdio_client 协程，以异步方式建立与服务器进程的标准输入/输出通道
        # enter_async_context 可以确保后续资源能自动释放（类似 try...finally）
        # stdio_client(server_params) 会启动子进程并返回 (reader, writer) 的元组
        read_write = await self.exit_stack.enter_async_context(stdio_client(server_params))
        # 解包得到 reader（读取服务器输出）、writer（写入服务器输入），用于后续通信
        # read_write 是一个包含两个元素的元组，分别对应于与服务器通信的 “读取” (reader) 和 “写入” (writer) 流对象。
        read, write = read_write
        
        # 创建 MCP 客户端会话
        self.client = await self.exit_stack.enter_async_context(ClientSession(read, write))
        
        # 初始化连接
        await self.client.initialize()
        
        # 获取工具定义
        response = await self.client.list_tools()
        self.tool_definitions = response.tools
        
        print(f"已连接到服务器，可用工具: {len(self.tool_definitions)}")
        
        for tool in self.tool_definitions:
            print(f"  - {tool.name}: {tool.description}")
        
        print("\n使用 'help' 查看帮助，使用 'exit' 退出")
    
    async def execute_tool(self, tool_name: str, params: Dict[str, Any]) -> Optional[str]:
        """
        执行工具调用
        
        参数:
            tool_name: 要调用的工具名称
            params: 工具参数字典
            
        返回:
            工具执行结果或错误消息
        """
        # 检查工具是否存在
        # 遍历工具定义列表，查找名称与 tool_name 匹配的工具对象
        # next() 用于返回第一个匹配的工具（如未找到则返回 None）
        tool_def = next((t for t in self.tool_definitions if t.name == tool_name), None)
        if not tool_def:
            return f"错误: 未找到工具 '{tool_name}'"
        
        try:
            # 调用工具并等待结果
            result = await self.client.call_tool(tool_name, arguments=params)
            
            # 处理结果
            if result and hasattr(result, 'content'):
                return result.content
            else:
                return "工具执行未返回任何结果"
        
        except Exception as e:
            return f"执行过程中出错: {str(e)}"
    
    async def stop(self):
        """停止客户端和服务器"""
        print("正在关闭 MCP 客户端...")
        await self.exit_stack.aclose()
        print("已关闭")
    
    def print_help(self):
        """打印帮助信息"""
        print("\n可用命令:")
        print("  help - 显示此帮助信息")
        print("  list - 列出可用工具")
        print("  call <工具名> <参数JSON> - 调用工具")
        print("  exit - 退出程序")
        print("\n示例:")
        print("  call get_weather_warning {\"location\": \"101010100\"}")
        print("  call get_daily_forecast 116.41,39.92")
        print("  call get_daily_forecast 101010100 7")
    
    def print_tools(self):
        """打印工具列表和描述"""
        print("\n可用工具:")
        for tool in self.tool_definitions:
            print(f"  {tool.name} - {tool.description}")
            if hasattr(tool, 'inputSchema') and tool.inputSchema:
                print("    参数:")
                props = tool.inputSchema.get('properties', {})
                for param_name, param_info in props.items():
                    param_type = param_info.get('type', 'unknown')
                    param_desc = param_info.get('description', '')
                    print(f"      {param_name} ({param_type}): {param_desc}")

async def main():
    """主函数"""
    # 确定服务器路径
    server_path = os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "server",
        "weather_server.py"
    )
    
    # 检查服务器文件是否存在
    if not os.path.exists(server_path):
        print(f"错误: 服务器文件不存在: {server_path}")
        return
    
    # 创建客户端应用
    app = SimpleClientApp(["python", server_path])
    
    try:
        # 启动客户端
        await app.start()
        
        # 主循环
        while True:
            try:
                # 获取用户输入
                command = input("\n> ").strip()
                
                # 处理命令
                if command == "exit":
                    break
                
                elif command == "help":
                    app.print_help()
                
                elif command == "list":
                    app.print_tools()
                
                elif command.startswith("call "):
                    # 解析命令
                    parts = command[5:].strip().split(" ", 1)
                    if len(parts) < 1:
                        print("错误: 缺少工具名")
                        continue
                    
                    tool_name = parts[0]
                    
                    # 解析参数
                    params = {}
                    if len(parts) > 1:
                        params_str = parts[1]
                        
                        # 尝试解析为 JSON
                        try:
                            params = json.loads(params_str)
                        except json.JSONDecodeError:
                            # 如果不是 JSON，尝试简单解析
                            try:
                                # 对于 get_weather_warning，只需要一个位置参数
                                if tool_name == "get_weather_warning":
                                    params = {"location": params_str}
                                
                                # 对于 get_daily_forecast，需要位置和可选的天数
                                elif tool_name == "get_daily_forecast":
                                    forecast_args = params_str.split()
                                    if len(forecast_args) < 1:
                                        print("错误: get_daily_forecast 至少需要一个位置参数")
                                        continue
                                    
                                    params = {"location": forecast_args[0]}
                                    
                                    # 如果提供了天数参数
                                    if len(forecast_args) > 1:
                                        try:
                                            params["days"] = int(forecast_args[1])
                                        except ValueError:
                                            print("错误: days 参数必须是整数")
                                            continue
                                else:
                                    print(f"错误: 无法解析 {tool_name} 的参数")
                                    continue
                            
                            except Exception as e:
                                print(f"参数解析错误: {str(e)}")
                                continue
                    
                    # 执行工具调用
                    print("正在调用工具...")
                    result = await app.execute_tool(tool_name, params)
                    print("\n结果:")
                    print(result)
                
                else:
                    print(f"未知命令: {command}")
                    print("使用 'help' 查看可用命令")
            
            except KeyboardInterrupt:
                print("\n操作被中断")
                break
            
            except Exception as e:
                print(f"错误: {str(e)}")
    
    except Exception as e:
        print(f"客户端错误: {str(e)}")
    
    finally:
        # 停止客户端
        await app.stop()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n程序被中断")
        sys.exit(0) 
```

### 步骤 6 · 助手决策（工具调用）

- 工具：`read_project_file`

- 参数：

```json
{
  "name": "mcp-demo/client/mcp_client_deepseek.py"
}
```

### 步骤 7 · 环境执行结果

执行成功（2025-11-09T00:16:16+0800）

```text
"""
# 集成 DeepSeek 的 MCP 客户端
#参考官方案例：https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/clients/simple-chatbot/mcp_simple_chatbot/main.py


# 本模块实现了一个模型上下文协议（MCP）客户端，该客户端使用 DeepSeek 的 API
# 来处理查询并与 MCP 工具进行交互。它演示了如何：
# 1. 连接到 MCP 服务器
# 2. 使用 DeepSeek 的 API 来处理查询
# 3. 处理工具调用和响应
# 4. 维护一个交互式聊天循环

# 所需环境变量：
# DEEPSEEK_API_KEY：DeepSeek API 密钥 (格式：sk-xxxx...)
# DEEPSEEK_BASE_URL：DeepSeek API 基础 URL (https://api.deepseek.com)
# DEEPSEEK_MODEL：DeepSeek 模型名称 (例如 deepseek-chat)

Author: FlyAIBox
Date: 2025.10.11
"""

import json
import asyncio
import logging
import os
from typing import Optional, Dict, Any, List, Tuple
from contextlib import AsyncExitStack

from openai import OpenAI
from dotenv import load_dotenv

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

load_dotenv()


class Configuration:
    """配置管理类，负责管理和验证环境变量"""
    
    def __init__(self) -> None:
        """初始化配置并加载环境变量"""
        self.load_env()
        self._validate_env()
        
    @staticmethod
    def load_env() -> None:
        """从.env文件加载环境变量"""
        load_dotenv()
        
    def _validate_env(self) -> None:
        """验证必需的环境变量是否存在"""
        required_vars = ["DEEPSEEK_API_KEY"]
        missing_vars = [var for var in required_vars if not os.getenv(var)]
        if missing_vars:
            raise ValueError(f"缺少必需的环境变量: {', '.join(missing_vars)}")
    
    @property
    def api_key(self) -> str:
        """获取 DeepSeek API 密钥"""
        return os.getenv("DEEPSEEK_API_KEY", "")
    
    @property
    def base_url(self) -> str:
        """获取 DeepSeek API 基础 URL"""
        return os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
    
    @property
    def model(self) -> str:
        """获取 DeepSeek 模型名称"""
        return os.getenv("DEEPSEEK_MODEL", "deepseek-chat")

class Tool:
    """MCP 工具类，表示一个具有属性的工具"""
    
    def __init__(self, name: str, description: str, input_schema: Dict[str, Any]) -> None:
        """
        初始化工具
        
        Args:
            name: 工具名称
            description: 工具描述
            input_schema: 输入参数模式
        """
        self.name = name
        self.description = description
        self.input_schema = input_schema
        
    def to_openai_format(self) -> Dict[str, Any]:
        """将工具转换为 OpenAI API 格式"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.input_schema
            }
        }

class MCPServer:
    """MCP 服务器管理类，处理服务器连接和工具执行"""
    
    def __init__(self, server_path: str) -> None:
        """
        初始化服务器管理器
        
        Args:
            server_path: 服务器脚本路径
        """
        self.server_path = server_path
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self._cleanup_lock = asyncio.Lock()
        
    async def initialize(self) -> None:
        """初始化服务器连接，包含重试机制"""
        max_retries = 3
        retry_delay = 1.0
        
        for attempt in range(max_retries):
            try:
                if not os.path.exists(self.server_path):
                    raise FileNotFoundError(f"找不到服务器文件: {self.server_path}")
                
                server_params = StdioServerParameters(
                    command='python',
                    args=[self.server_path],
                    env=None
                )
                
                stdio_transport = await self.exit_stack.enter_async_context(
                    stdio_client(server_params)
                )
                stdio, write = stdio_transport
                
                self.session = await self.exit_stack.enter_async_context(
                    ClientSession(stdio, write)
                )
                await self.session.initialize()
                logger.info("成功连接到 MCP 服务器")
                break
                
            except Exception as e:
                logger.error(f"第 {attempt + 1}/{max_retries} 次尝试失败: {str(e)}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay)
                else:
                    raise
                    
    async def list_tools(self) -> List[Tool]:
        """获取服务器提供的可用工具列表"""
        if not self.session:
            raise RuntimeError("服务器未初始化")
            
        response = await self.session.list_tools()
        return [
            Tool(tool.name, tool.description, tool.inputSchema)
            for tool in response.tools
        ]
        
    async def execute_tool(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        retries: int = 2,
        delay: float = 1.0
    ) -> Any:
        """
        执行工具，包含重试机制
        
        Args:
            tool_name: 工具名称
            arguments: 工具参数
            retries: 重试次数
            delay: 重试延迟时间（秒）
            
        Returns:
            工具执行结果
        """
        if not self.session:
            raise RuntimeError("服务器未初始化")
            
        for attempt in range(retries):
            try:
                logger.info(f"执行工具 {tool_name}，参数: {arguments}")
                result = await self.session.call_tool(tool_name, arguments)
                return result
                
            except Exception as e:
                logger.error(f"工具执行失败 (第 {attempt + 1}/{retries} 次尝试): {str(e)}")
                if attempt < retries - 1:
                    await asyncio.sleep(delay)
                else:
                    raise
                    
    async def cleanup(self) -> None:
        """清理服务器资源"""
        async with self._cleanup_lock:
            try:
                await self.exit_stack.aclose()
                self.session = None
                logger.info("服务器资源清理完成")
            except Exception as e:
                logger.error(f"清理过程中出错: {str(e)}")

class MCPClient:
    """MCP 客户端实现，集成了 DeepSeek API"""
    
    def __init__(self, config: Configuration) -> None:
        """
        初始化 MCP 客户端
        
        Args:
            config: 配置对象
        """
        self.config = config
        self.server: Optional[MCPServer] = None
        self.client = OpenAI(
            api_key=config.api_key,
            base_url=config.base_url
        )
        
    async def initialize(self) -> None:
        """初始化客户端并连接到服务器"""
        server_path = os.path.join(
            os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
            "server",
            "weather_server.py"
        )
        self.server = MCPServer(server_path)
        await self.server.initialize()
        

    async def process_query(self, query: str) -> str:
        """
        处理用户查询，集成工具调用，支持多轮工具交互

        Args:
            query: 用户查询字符串

        Returns:
            处理后的响应结果
        """
        if not self.server:
            raise RuntimeError("客户端未初始化")

        system_prompt = (
            "You are a helpful assistant specializing in weather information.\n"
            "You have access to the MCP Weather Server tool with the following functions:\n"
            "- get_weather_warning(city_id=None, latitude=None, longitude=None): Retrieves weather disaster warnings for a specified city ID or coordinates.\n"
            "- get_daily_forecast(city_id=None, latitude=None, longitude=None): Retrieves the multi-day weather forecast for a specified city ID or coordinates.\n"
            "\n"
            "Core Instructions:\n"
            "1.  **Carefully analyze the user's request**: Understand all components of the user's query. Determine if the user needs weather warning information, weather forecast information, or both.\n"
            "2.  **Identify Information Needs**:\n"
            "    * If the user only asks for warnings (e.g., \"Are there any warnings in Beijing?\"), only use `get_weather_warning`.\n"
            "    * If the user only asks for the forecast (e.g., \"What's the weather like in Beijing tomorrow?\"), only use `get_daily_forecast`.\n"
            "    * **If the user's question includes multiple aspects**, such as asking about **warning status** and also asking **if it's suitable for a certain activity** (which implies a query about future weather, like \"Have there been high temperature warnings in Beijing in the last week? Is it suitable for outdoor activities?\"), you need to **call both tools sequentially**.\n"
            "3.  **Call Tools as Needed**:\n"
            "    * **Prioritize getting warning information**: If warning information is needed, first call `get_weather_warning`.\n"
            "    * **Get the weather forecast**: If the user mentions a specific time period (e.g., \"weekend\", \"next three days\", \"next week\") or asks about activity suitability (which typically concerns the next few days), call `get_daily_forecast` to get the forecast for the corresponding period. For vague phrases like \"last week\" or \"recently\", interpret it as asking about *current* conditions and the *upcoming* few days (covered by the forecast). For questions like \"Is it suitable for outdoor activities?\", you should get the forecast for at least the next 2-3 days (e.g., today, tomorrow, the day after tomorrow, or the upcoming weekend) to support your judgment.\n"
            "    * **Ensure tool call order**: When multiple tools need to be called, they should be called in a logical sequence. For example, first get the warning, then get the forecast. Wait for one tool to finish executing before deciding whether to call the next tool or generate a response.\n"
            "4.  **Information Integration and Response**:\n"
            "    * After obtaining all necessary information (warning, forecast), you **must synthesize and analyze this information**.\n"
            "    * **Completely answer the user's question**: Ensure you answer all parts of the user's query.\n"
            "    * **Provide advice**: If the user asks about activity suitability, based on the retrieved warning status and forecast information (temperature, weather condition - clear/rainy, wind strength, etc.), provide a clear, data-supported recommendation (e.g., \"Currently there are no high temperature warnings, but it's expected to rain this weekend, so it's not very suitable for outdoor activities,\" or \"It will be sunny for the next few days with no warnings, suitable for outdoor activities.\").\n"
            "5.  **Tool Usage Details**:\n"
            "    * When using the tools, retain the full context of the user's original question.\n"
            "    * Unless explicitly requested by the user, do not insert specific times of day (e.g., \"3 PM\") into the search query or your response.\n"
            "    * When city information is needed, if the user provides a city name (e.g., \"Beijing\"), use the corresponding `city_id` (e.g., Beijing's city_id might be '101010100').\n"
        )

        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": query}
        ]

        tools = await self.server.list_tools()
        available_tools = [tool.to_openai_format() for tool in tools]

        max_tool_turns = 5 # 设置一个最大工具调用回合数，防止无限循环

        for _ in range(max_tool_turns):
            try:
                response = self.client.chat.completions.create(
                    model=self.config.model,
                    messages=messages,
                    tools=available_tools,
                    tool_choice="auto" # 允许模型决定是否调用工具
                )

                content = response.choices[0].message
                finish_reason = response.choices[0].finish_reason

                if finish_reason == "stop":
                    # 模型生成了最终的文本回复
                    messages.append({"role": "assistant", "content": content.content})
                    return content.content
                elif finish_reason == "tool_calls":
                    # 模型预测了工具调用
                    messages.append(content.model_dump()) # 将 assistant tool_calls 消息添加到历史

                    tool_outputs = []
                    for tool_call in content.tool_calls:
                        tool_name = tool_call.function.name
                        tool_args = json.loads(tool_call.function.arguments)

                        try:
                            # 执行工具
                            result = await self.server.execute_tool(tool_name, tool_args)
                            logger.info(f"工具执行完成: {tool_name}")
                            # 将工具结果添加到 tool_outputs 列表
                            tool_outputs.append({
                                "tool_call_id": tool_call.id,
                                # MCP tool results have 'content' which is a list of Parts
                                # Assuming text content is the first part
                                "output": result.content[0].text
                            })
                        except Exception as tool_exec_error:
                            logger.error(f"工具执行失败 {tool_name}: {str(tool_exec_error)}")
                            # 如果工具执行失败，也需要向模型反馈失败信息
                            tool_outputs.append({
                                "tool_call_id": tool_call.id,
                                "output": f"Error executing tool {tool_name}: {str(tool_exec_error)}"
                            })

                    # 将所有工具结果作为 tool 消息添加到历史
                    # 注意：OpenAI API 要求每个 tool_call_id 对应一个 tool 消息
                    # 如果模型预测了多个工具调用，这里需要为每个结果添加 tool 消息
                    for output in tool_outputs:
                        messages.append({
                            "role": "tool",
                            "content": output["output"],
                            "tool_call_id": output["tool_call_id"],
                        })

                    # 继续下一轮循环，将完整的消息历史发送给模型，让它根据工具结果继续
                    logger.info(f"工具调用回合完成，继续与模型交互...")
                    continue # 继续循环，发送包含工具结果的消息历史给模型
                else:
                    # 发生未知情况或错误
                    logger.error(f"模型返回意外的 finish_reason: {finish_reason}")
                    return f"处理你的查询时遇到意外情况 ({finish_reason})"

            except Exception as e:
                logger.error(f"处理查询时出错: {str(e)}")
                return f"处理你的查询时发生错误: {str(e)}"

        # 如果循环达到最大次数仍未得到最终回复
        logger.warning("达到最大工具调用回合数，未能获得最终回复")
        return "抱歉，处理你的请求时出现问题，未能获得最终结果。"

    async def chat_loop(self) -> None:
        """运行交互式聊天循环"""
        logger.info("开始聊天会话...")
        
        while True:
            try:
                query = input("\n请输入你的问题 (输入 'quit' 或 'exit' 退出): ").strip()
                
                if query.lower() in ['quit', 'exit']:
                    logger.info("结束聊天会话...")
                    break
                    
                response = await self.process_query(query)
                print(f"\n助手: {response}")
                
            except KeyboardInterrupt:
                logger.info("\n收到键盘中断，结束会话...")
                break
            except Exception as e:
                logger.error(f"聊天循环中出错: {str(e)}")
                print(f"\n发生错误: {str(e)}")
                
    async def cleanup(self) -> None:
        """清理客户端资源"""
        if self.server:
            await self.server.cleanup()

async def main() -> None:
    """主程序入口"""
    try:
        config = Configuration()
        client = MCPClient(config)
        
        await client.initialize()
        await client.chat_loop()
        
    except Exception as e:
        logger.error(f"致命错误: {str(e)}")
        raise
        
    finally:
        if 'client' in locals():
            await client.cleanup()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("程序被用户终止")
    except Exception as e:
        logger.error(f"程序因错误终止: {str(e)}")
        raise

```

### 步骤 8 · 助手决策（工具调用）

- 工具：`read_project_file`

- 参数：

```json
{
  "name": "mcp-demo/client/mcp_client_langchain_chat.py"
}
```

### 步骤 9 · 环境执行结果

执行成功（2025-11-09T00:16:18+0800）

```text
"""
# 集成 DeepSeek 的 MCP 客户端[LangChain版]
#更详细的使用方法请参考：https://github.com/langchain-ai/langchain-mcp-adapters

# 所需环境变量：
# DEEPSEEK_API_KEY：DeepSeek API 密钥 (格式：sk-xxxx...)
# DEEPSEEK_BASE_URL：DeepSeek API 基础 URL (https://api.deepseek.com)
# DEEPSEEK_MODEL：DeepSeek 模型名称 (例如 deepseek-chat)

Author: FlyAIBox
Date: 2025.10.11
"""
import asyncio
import logging
import os
from contextlib import AsyncExitStack
from typing import List, Optional

from dotenv import load_dotenv
from langchain_core.messages import SystemMessage
from langchain_core.tools import BaseTool
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


# 优先从环境变量加载API密钥等信息
load_dotenv()

class Configuration:
    """配置管理类，负责管理和验证环境变量"""
    
    def __init__(self) -> None:
        """初始化配置并加载环境变量"""
        self.load_env()
        self._validate_env()
        
    @staticmethod
    def load_env() -> None:
        """从.env文件加载环境变量"""
        load_dotenv()
        
    def _validate_env(self) -> None:
        """验证必需的环境变量是否存在"""
        required_vars = ["DEEPSEEK_API_KEY"]
        missing_vars = [var for var in required_vars if not os.getenv(var)]
        if missing_vars:
            raise ValueError(f"缺少必需的环境变量: {', '.join(missing_vars)}")
    
    @property
    def api_key(self) -> str:
        """获取 DeepSeek API 密钥"""
        return os.getenv("DEEPSEEK_API_KEY", "")
    
    @property
    def base_url(self) -> str:
        """获取 DeepSeek API 基础 URL"""
        return os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
    
    @property
    def model(self) -> str:
        """获取 DeepSeek 模型名称"""
        return os.getenv("DEEPSEEK_MODEL", "deepseek-chat")

class MCPServer:
    """MCP 服务器管理类，处理服务器连接和工具执行"""
    def __init__(self, server_path: str) -> None:
        """
        初始化服务器管理器
        
        Args:
            server_path: 服务器脚本路径
        """
        self.server_path = server_path
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self._cleanup_lock = asyncio.Lock()


    async def initialize(self) -> None:
        """初始化服务器连接，包含重试机制"""
        max_retries = 3
        retry_delay = 1.0
        
        for attempt in range(max_retries):
            try:
                if not os.path.exists(self.server_path):
                    raise FileNotFoundError(f"找不到服务器文件: {self.server_path}")
                
                server_params = StdioServerParameters(
                    command='python',
                    args=[self.server_path],
                    env=None
                )
                
                stdio_transport = await self.exit_stack.enter_async_context(
                    stdio_client(server_params)
                )
                stdio, write = stdio_transport
                
                self.session = await self.exit_stack.enter_async_context(
                    ClientSession(stdio, write)
                )
                await self.session.initialize()
                logger.info("成功连接到 MCP 服务器")
                break
                
            except Exception as e:
                logger.error(f"第 {attempt + 1}/{max_retries} 次尝试失败: {str(e)}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay)
                else:
                    raise

    async def list_tools(self) -> List[BaseTool]:
        """获取服务器提供的可用工具列表"""
        if not self.session:
            raise RuntimeError("服务器未初始化")
        # LangChain方式获取可用工具列表

        # 自动化的工具处理：
        # LangChain版本使用load_mcp_tools函数
        # 自动适配MCP工具，省去了工具格式转换的工作
        tools = await load_mcp_tools(self.session)
        logger.info(f"成功加载工具: {[tool.name for tool in tools]}")
        return tools


    async def cleanup(self) -> None:
        """清理服务器资源"""
        async with self._cleanup_lock:
            try:
                await self.exit_stack.aclose()
                self.session = None
                logger.info("服务器资源清理完成")
            except Exception as e:
                logger.error(f"清理过程中出错: {str(e)}")
    

class MCPClient:
    """MCP 客户端实现，集成了 DeepSeek API"""
    
    def __init__(self, config: Configuration) -> None:
        """
        初始化 MCP 客户端
        
        Args:
            config: 配置对象
        """
        self.config = config
        self.server: Optional[MCPServer] = None
        self.llm_client = ChatOpenAI(
            api_key=config.api_key,
            base_url=config.base_url,
            model=config.model
        )
        
    async def initialize(self) -> None:
        """初始化客户端并连接到服务器"""
        server_path = os.path.join(
            os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
            "server",
            "weather_server.py"
        )
        self.server = MCPServer(server_path)
        await self.server.initialize()
        

    async def process_query(self, query: str):
        """
        处理用户查询，集成工具调用，支持多轮工具交互

        Args:
            query: 用户查询字符串

        Returns:
            处理后的响应结果
        """
        if not self.server:
            raise RuntimeError("客户端未初始化")

        # 创建提示模板
        prompt = SystemMessage(content=""""You are a helpful assistant specializing in weather information.\n
                            You have access to the MCP Weather Server tool with the following functions:\n
                            - get_weather_warning(city_id=None, latitude=None, longitude=None): Retrieves weather disaster warnings for a specified city ID or coordinates.\n
                            - get_daily_forecast(city_id=None, latitude=None, longitude=None): Retrieves the multi-day weather forecast for a specified city ID or coordinates.\n
                            \n
                            Core Instructions:\n
                            1.  **Carefully analyze the user's request**: Understand all components of the user's query. Determine if the user needs weather warning information, weather forecast information, or both.\n
                            2.  **Identify Information Needs**:\n
                                * If the user only asks for warnings (e.g., \Are there any warnings in Beijing?\), only use `get_weather_warning`.\n
                                * If the user only asks for the forecast (e.g., \What's the weather like in Beijing tomorrow?\), only use `get_daily_forecast`.\n
                                * **If the user's question includes multiple aspects**, such as asking about **warning status** and also asking **if it's suitable for a certain activity** (which implies a query about future weather, like \Have there been high temperature warnings in Beijing in the last week? Is it suitable for outdoor activities?\), you need to **call both tools sequentially**.\n
                            3.  **Call Tools as Needed**:\n
                                * **Prioritize getting warning information**: If warning information is needed, first call `get_weather_warning`.\n
                                * **Get the weather forecast**: If the user mentions a specific time period (e.g., \weekend\, \next three days\, \next week\) or asks about activity suitability (which typically concerns the next few days), call `get_daily_forecast` to get the forecast for the corresponding period. For vague phrases like \last week\ or \recently\, interpret it as asking about *current* conditions and the *upcoming* few days (covered by the forecast). For questions like \Is it suitable for outdoor activities?\, you should get the forecast for at least the next 2-3 days (e.g., today, tomorrow, the day after tomorrow, or the upcoming weekend) to support your judgment.\n
                                * **Ensure tool call order**: When multiple tools need to be called, they should be called in a logical sequence. For example, first get the warning, then get the forecast. Wait for one tool to finish executing before deciding whether to call the next tool or generate a response.\n
                            4.  **Information Integration and Response**:\n
                                * After obtaining all necessary information (warning, forecast), you **must synthesize and analyze this information**.\n
                                * **Completely answer the user's question**: Ensure you answer all parts of the user's query.\n
                                * **Provide advice**: If the user asks about activity suitability, based on the retrieved warning status and forecast information (temperature, weather condition - clear/rainy, wind strength, etc.), provide a clear, data-supported recommendation (e.g., \Currently there are no high temperature warnings, but it's expected to rain this weekend, so it's not very suitable for outdoor activities,\ or \It will be sunny for the next few days with no warnings, suitable for outdoor activities.\).\n
                            5.  **Tool Usage Details**:\n
                                * When using the tools, retain the full context of the user's original question.\n
                                * Unless explicitly requested by the user, do not insert specific times of day (e.g., \3 PM\) into the search query or your response.\n
                                * When city information is needed, if the user provides a city name (e.g., \Beijing\), use the corresponding `city_id` (e.g., Beijing's city_id might be '101010100').\n                      
                            """)
        
        
        ## 列举工具
        tools = await self.server.list_tools()

        ## 更简洁的代理创建流程：
        # LangChain版本使用create_react_agent函数
        # 直接创建代理，并指定模型、工具和提示
        # 简化了代码复杂度
        
        # LangChain方式创建和运行agent
        logger.info("正在创建agent...")
        agent = create_react_agent(
            model=self.llm_client,  
            tools=tools,
            prompt=prompt
        )
        logger.info("Agent创建成功")

        # 发送查询
        logger.info("正在发送天气查询...")
        agent_response = await agent.ainvoke({
            "messages": query
        })

        # 打印响应
        logger.info("\nAgent Response:")
        print(agent_response)

        # 遍历消息列表并打印 ToolMessage 的 content
        if 'messages' in agent_response:
            print("\n--- Tool Message Contents ---")
            for message in agent_response['messages']:
                print(f"\nTool: {message.name}") # 可以选择打印工具名称
                print(f"Content:\n{message.content}")
                print("-" * 20) # 分隔不同 ToolMessage 的内容
             

    async def chat_loop(self) -> None:
        """运行交互式聊天循环"""
        logger.info("开始聊天会话...")
        
        while True:
            try:
                query = input("\n请输入你的问题 (输入 'quit' 或 'exit' 退出): ").strip()
                
                if query.lower() in ['quit', 'exit']:
                    logger.info("结束聊天会话...")
                    break
                    
                await self.process_query(query)
                
                
            except KeyboardInterrupt:
                logger.info("\n收到键盘中断，结束会话...")
                break
            except Exception as e:
                logger.error(f"聊天循环中出错: {str(e)}")
                print(f"\n发生错误: {str(e)}")
                
    async def cleanup(self) -> None:
        """清理客户端资源"""
        if self.server:
            await self.server.cleanup()

async def main() -> None:
    """主程序入口"""
    try:
        config = Configuration()
        client = MCPClient(config)
        
        await client.initialize()
        await client.chat_loop()
        
    except Exception as e:
        logger.error(f"致命错误: {str(e)}")
        raise
        
    finally:
        if 'client' in locals():
            await client.cleanup()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("程序被用户终止")
    except Exception as e:
        logger.error(f"程序因错误终止: {str(e)}")
        raise


```

### 步骤 10 · 助手决策（工具调用）

- 工具：`read_project_file`

- 参数：

```json
{
  "name": "mcp-demo/server/weather_server.py"
}
```

### 步骤 11 · 环境执行结果

执行成功（2025-11-09T00:16:19+0800）

```text
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
MCP 服务器可以提供三种主要类型的功能：

资源：客户端可以读取的类似文件的数据（例如 API 响应或文件内容）
工具：可由 LLM 调用的函数（经用户批准）
提示：预先编写的模板，帮助用户完成特定任务

######################################

MCP 天气服务器

提供两个工具：
1. get_weather_warning: 获取指定城市ID或经纬度的天气灾害预警
2. get_daily_forecast: 获取指定城市ID或经纬度的天气预报

Author: FlyAIBox
Date: 2025.10.11
"""

from typing import Any, Dict, List, Optional, Union
import asyncio
import httpx
import os
from urllib.parse import urljoin
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
from pathlib import Path

# 加载 .env 文件中的环境变量
dotenv_path = Path(__file__).resolve().parents[1] / '.env'
load_dotenv(dotenv_path)

# 初始化 FastMCP 服务器
mcp = FastMCP("weather",  # 服务器名称
              debug=True,  # 启用调试模式，会输出详细日志
              host="0.0.0.0") # 监听所有网络接口，允许远程连接

# 从环境变量中读取常量
QWEATHER_API_BASE = os.getenv("QWEATHER_API_BASE")
QWEATHER_API_KEY = os.getenv("QWEATHER_API_KEY")

def _normalize_base_url(raw_base: Optional[str]) -> str:
    """
    确保基础 URL 包含协议并以单个斜杠结尾，兼容 .env 中未写协议的情况
    """
    if not raw_base:
        raise RuntimeError("未配置 QWEATHER_API_BASE 环境变量")

    base = raw_base.strip()
    if not base.startswith(("http://", "https://")):
        base = f"https://{base.lstrip('/')}"

    # urljoin 要求目录风格以斜杠结尾，避免 'v7/weather/7d' 被覆盖
    if not base.endswith("/"):
        base = f"{base}/"

    return base

try:
    _QWEATHER_BASE_URL = _normalize_base_url(QWEATHER_API_BASE)
except RuntimeError as err:
    print(f"[配置错误] {err}")
    _QWEATHER_BASE_URL = None

async def make_qweather_request(endpoint: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """
    向和风天气 API 发送请求
    
    参数:
        endpoint: API 端点路径（不包含基础 URL）
        params: API 请求的参数
        
    返回:
        成功时返回 JSON 响应，失败时返回 None
    """
    if not _QWEATHER_BASE_URL:
        print("QWEATHER_API_BASE 未正确配置，已跳过请求。")
        return None

    if not QWEATHER_API_KEY:
        print("QWEATHER_API_KEY 未设置，已跳过请求。")
        return None

    safe_endpoint = endpoint.lstrip("/")
    url = urljoin(_QWEATHER_BASE_URL, safe_endpoint)

    # 使用 Header 方式认证（和风天气的新版本API）
    headers = {
        "X-QW-Api-Key": QWEATHER_API_KEY
    }
    
    async with httpx.AsyncClient() as client:
        try:
            print(f"请求 URL: {url}")
            print(f"请求参数: {params}")
            response = await client.get(url, params=params, headers=headers, timeout=30.0)
            print(f"响应状态码: {response.status_code}")
            response.raise_for_status()
            result = response.json()
            print(f"响应内容: {result}")
            return result
        except httpx.HTTPStatusError as e:
            print(f"HTTP 状态错误: {e.response.status_code} - {e.response.text}")
            return None
        except Exception as e:
            print(f"API 请求错误: {type(e).__name__}: {e}")
            return None

def format_warning(warning: Dict[str, Any]) -> str:
    """
    将天气预警数据格式化为可读字符串
    
    参数:
        warning: 天气预警数据对象
        
    返回:
        格式化后的预警信息
    """
    return f"""
预警ID: {warning.get('id', '未知')}
标题: {warning.get('title', '未知')}
发布时间: {warning.get('pubTime', '未知')}
开始时间: {warning.get('startTime', '未知')}
结束时间: {warning.get('endTime', '未知')}
预警类型: {warning.get('typeName', '未知')}
预警等级: {warning.get('severity', '未知')} ({warning.get('severityColor', '未知')})
发布单位: {warning.get('sender', '未知')}
状态: {warning.get('status', '未知')}
详细信息: {warning.get('text', '无详细信息')}
"""

@mcp.tool()
async def get_weather_warning(location: Union[str, int]) -> str:
    """
    获取指定位置的天气灾害预警
    
    参数:
        location: 城市ID或经纬度坐标（经度,纬度）
                例如：'101010100'（北京）或 '116.41,39.92'
                也可以直接传入数字ID，如 101010100
        
    返回:
        格式化的预警信息字符串
    """
    # 确保 location 为字符串类型
    location = str(location)
    
    params = {
        "location": location,
        "lang": "zh"
    }
    
    data = await make_qweather_request("v7/warning/now", params)
    
    if not data:
        return "无法获取预警信息或API请求失败。"
    
    if data.get("code") != "200":
        return f"API 返回错误: {data.get('code')}"
    
    warnings = data.get("warning", [])
    
    if not warnings:
        return f"当前位置 {location} 没有活动预警。"
    
    formatted_warnings = [format_warning(warning) for warning in warnings]
    return "\n---\n".join(formatted_warnings)

def format_daily_forecast(daily: Dict[str, Any]) -> str:
    """
    将天气预报数据格式化为可读字符串
    
    参数:
        daily: 天气预报数据对象
        
    返回:
        格式化后的预报信息
    """
    return f"""
日期: {daily.get('fxDate', '未知')}
日出: {daily.get('sunrise', '未知')}  日落: {daily.get('sunset', '未知')}
最高温度: {daily.get('tempMax', '未知')}°C  最低温度: {daily.get('tempMin', '未知')}°C
白天天气: {daily.get('textDay', '未知')}  夜间天气: {daily.get('textNight', '未知')}
白天风向: {daily.get('windDirDay', '未知')} {daily.get('windScaleDay', '未知')}级 ({daily.get('windSpeedDay', '未知')}km/h)
夜间风向: {daily.get('windDirNight', '未知')} {daily.get('windScaleNight', '未知')}级 ({daily.get('windSpeedNight', '未知')}km/h)
相对湿度: {daily.get('humidity', '未知')}%
降水量: {daily.get('precip', '未知')}mm
紫外线指数: {daily.get('uvIndex', '未知')}
能见度: {daily.get('vis', '未知')}km
"""

@mcp.tool()
async def get_daily_forecast(location: Union[str, int], days: int = 3) -> str:
    """
    获取指定位置的天气预报
    
    参数:
        location: 城市ID或经纬度坐标（经度,纬度）
                例如：'101010100'（北京）或 '116.41,39.92'
                也可以直接传入数字ID，如 101010100
        days: 预报天数，可选值为 3、7、10、15、30，默认为 3
        
    返回:
        格式化的天气预报字符串
    """
    # 确保 location 为字符串类型
    location = str(location)
    
    # 确保 days 参数有效
    valid_days = [3, 7, 10, 15, 30]
    if days not in valid_days:
        days = 3  # 默认使用3天预报
    
    params = {
        "location": location,
        "lang": "zh"
    }
    # 和风天气API文档 https://dev.qweather.com/docs/api/weather/weather-daily-forecast/
    endpoint = f"v7/weather/{days}d"
    data = await make_qweather_request(endpoint, params)
    
    if not data:
        return "无法获取天气预报或API请求失败。"
    
    if data.get("code") != "200":
        return f"API 返回错误: {data.get('code')}"
    
    daily_forecasts = data.get("daily", [])
    
    if not daily_forecasts:
        return f"无法获取 {location} 的天气预报数据。"
    
    formatted_forecasts = [format_daily_forecast(daily) for daily in daily_forecasts]
    return "\n---\n".join(formatted_forecasts)

if __name__ == "__main__":
    print("正在启动 MCP 天气服务器...")
    print("提供工具: get_weather_warning, get_daily_forecast")
    print("请确保环境变量 QWEATHER_API_KEY 已设置")
    print("使用 Ctrl+C 停止服务器")
    
    # 初始化并运行服务器
    mcp.run(transport='stdio') 

```

### 步骤 12 · 助手决策（工具调用）

- 工具：`terminate`

- 参数：

```json
{
  "message": "# MCP Weather Client and Server Project\n\n## Overview:\n\nThis project consists of a client-server application designed to interact with the Model Context Protocol (MCP), a protocol that facilitates standardized communication among various AI models and tools. The MCP Weather application provides weather-related functions by utilizing external APIs and integrates these capabilities into a client-server model.\n\n## Project Structure:\n\n- **Client:** \n  - The client contains multiple implementations demonstrating different integrations:\n    - `mcp_client.py`: A simple client application showcasing MCP client-server interactions, calling tools, and handling responses.\n    - `mcp_client_deepseek.py`: Integration with DeepSeek API in the MCP client, handling user queries and processing them via DeepSeek.\n    - `mcp_client_langchain_chat.py`: Integration with DeepSeek API using a LangChain version, illustrating streamlined tool handling and agent creation.\n\n- **Server:**\n  - `weather_server.py`: An MCP server providing weather information services. It includes tools for retrieving weather warnings and forecasts from a weather API and formats the responses for client use.\n\n## Key Features:\n\n- **MCP Client-Server Communication**: Demonstrates how the client communicates with the server using standard input/output, with asynchronous handling for efficiency.\n- **Tool Integration**: Features tools that gather weather data from external APIs and make it easily accessible through standardized MCP protocols.\n- **Flexible Implementations**: Shows different ways to integrate with AI functionalities - starting from a basic model to more sophisticated methods using third-party libraries like DeepSeek and LangChain.\n\n## Getting Started:\n\n### Prerequisites:\n- Install Python 3.7 or later.\n- Install necessary packages using pip (e.g., `asyncio`, `httpx`, `openai`, `langchain_openai`, `dotenv`, and others mentioned in client implementations).\n\n### Environment Configuration:\n- Create a `.env` file at the root of the project to store essential environment variables.\n- **Sample `.env` Configuration**:\n  ```\n  DEEPSEEK_API_KEY=your_deepseek_api_key\n  DEEPSEEK_BASE_URL=https://api.deepseek.com\n  DEEPSEEK_MODEL=deepseek-chat\n  QWEATHER_API_BASE=your_qweather_api_base\n  QWEATHER_API_KEY=your_qweather_api_key\n  ```\n\n### Running the Server:\n1. Navigate to the server directory (e.g., `mcp-demo/server`).\n2. Execute `python weather_server.py` to start the server.\n\n### Running the Client:\n1. Choose a client implementation from the `mcp-demo/client` directory.\n2. Execute the script using Python (e.g., `python mcp_client.py`).\n3. Follow the command prompts to interact with the MCP server.\n\n## Usage:\n- The client applications prompt for user input commands to perform various operations, including fetching weather warnings and forecasts.\n- Server responses are processed and displayed to the user through formatted text, providing valuable weather insights.\n\n## Future Improvements:\n\n- Expand tool capabilities to include more advanced MCP features and integrate additional APIs.\n- Enhance error handling and logging to provide deeper insights into application behavior.\n\n---\nThis project offers a well-structured demonstration of how MCP can be utilized for tool integration and provides practical examples for weather-based applications using modern AI frameworks and protocols."
}
```

### 步骤 13 · 环境执行结果

执行成功（2025-11-09T00:16:27+0800）

生成内容：

```markdown
# MCP Weather Client and Server Project

## Overview:

This project consists of a client-server application designed to interact with the Model Context Protocol (MCP), a protocol that facilitates standardized communication among various AI models and tools. The MCP Weather application provides weather-related functions by utilizing external APIs and integrates these capabilities into a client-server model.

## Project Structure:

- **Client:** 
  - The client contains multiple implementations demonstrating different integrations:
    - `mcp_client.py`: A simple client application showcasing MCP client-server interactions, calling tools, and handling responses.
    - `mcp_client_deepseek.py`: Integration with DeepSeek API in the MCP client, handling user queries and processing them via DeepSeek.
    - `mcp_client_langchain_chat.py`: Integration with DeepSeek API using a LangChain version, illustrating streamlined tool handling and agent creation.

- **Server:**
  - `weather_server.py`: An MCP server providing weather information services. It includes tools for retrieving weather warnings and forecasts from a weather API and formats the responses for client use.

## Key Features:

- **MCP Client-Server Communication**: Demonstrates how the client communicates with the server using standard input/output, with asynchronous handling for efficiency.
- **Tool Integration**: Features tools that gather weather data from external APIs and make it easily accessible through standardized MCP protocols.
- **Flexible Implementations**: Shows different ways to integrate with AI functionalities - starting from a basic model to more sophisticated methods using third-party libraries like DeepSeek and LangChain.

## Getting Started:

### Prerequisites:
- Install Python 3.7 or later.
- Install necessary packages using pip (e.g., `asyncio`, `httpx`, `openai`, `langchain_openai`, `dotenv`, and others mentioned in client implementations).

### Environment Configuration:
- Create a `.env` file at the root of the project to store essential environment variables.
- **Sample `.env` Configuration**:
  ```
  DEEPSEEK_API_KEY=your_deepseek_api_key
  DEEPSEEK_BASE_URL=https://api.deepseek.com
  DEEPSEEK_MODEL=deepseek-chat
  QWEATHER_API_BASE=your_qweather_api_base
  QWEATHER_API_KEY=your_qweather_api_key
  ```

### Running the Server:
1. Navigate to the server directory (e.g., `mcp-demo/server`).
2. Execute `python weather_server.py` to start the server.

### Running the Client:
1. Choose a client implementation from the `mcp-demo/client` directory.
2. Execute the script using Python (e.g., `python mcp_client.py`).
3. Follow the command prompts to interact with the MCP server.

## Usage:
- The client applications prompt for user input commands to perform various operations, including fetching weather warnings and forecasts.
- Server responses are processed and displayed to the user through formatted text, providing valuable weather insights.

## Future Improvements:

- Expand tool capabilities to include more advanced MCP features and integrate additional APIs.
- Enhance error handling and logging to provide deeper insights into application behavior.

---
This project offers a well-structured demonstration of how MCP can be utilized for tool integration and provides practical examples for weather-based applications using modern AI frameworks and protocols.
Terminating...
```


## 生成的 README（若已终止并返回）


```markdown
# MCP Weather Client and Server Project

## Overview:

This project consists of a client-server application designed to interact with the Model Context Protocol (MCP), a protocol that facilitates standardized communication among various AI models and tools. The MCP Weather application provides weather-related functions by utilizing external APIs and integrates these capabilities into a client-server model.

## Project Structure:

- **Client:** 
  - The client contains multiple implementations demonstrating different integrations:
    - `mcp_client.py`: A simple client application showcasing MCP client-server interactions, calling tools, and handling responses.
    - `mcp_client_deepseek.py`: Integration with DeepSeek API in the MCP client, handling user queries and processing them via DeepSeek.
    - `mcp_client_langchain_chat.py`: Integration with DeepSeek API using a LangChain version, illustrating streamlined tool handling and agent creation.

- **Server:**
  - `weather_server.py`: An MCP server providing weather information services. It includes tools for retrieving weather warnings and forecasts from a weather API and formats the responses for client use.

## Key Features:

- **MCP Client-Server Communication**: Demonstrates how the client communicates with the server using standard input/output, with asynchronous handling for efficiency.
- **Tool Integration**: Features tools that gather weather data from external APIs and make it easily accessible through standardized MCP protocols.
- **Flexible Implementations**: Shows different ways to integrate with AI functionalities - starting from a basic model to more sophisticated methods using third-party libraries like DeepSeek and LangChain.

## Getting Started:

### Prerequisites:
- Install Python 3.7 or later.
- Install necessary packages using pip (e.g., `asyncio`, `httpx`, `openai`, `langchain_openai`, `dotenv`, and others mentioned in client implementations).

### Environment Configuration:
- Create a `.env` file at the root of the project to store essential environment variables.
- **Sample `.env` Configuration**:
  ```
  DEEPSEEK_API_KEY=your_deepseek_api_key
  DEEPSEEK_BASE_URL=https://api.deepseek.com
  DEEPSEEK_MODEL=deepseek-chat
  QWEATHER_API_BASE=your_qweather_api_base
  QWEATHER_API_KEY=your_qweather_api_key
  ```

### Running the Server:
1. Navigate to the server directory (e.g., `mcp-demo/server`).
2. Execute `python weather_server.py` to start the server.

### Running the Client:
1. Choose a client implementation from the `mcp-demo/client` directory.
2. Execute the script using Python (e.g., `python mcp_client.py`).
3. Follow the command prompts to interact with the MCP server.

## Usage:
- The client applications prompt for user input commands to perform various operations, including fetching weather warnings and forecasts.
- Server responses are processed and displayed to the user through formatted text, providing valuable weather insights.

## Future Improvements:

- Expand tool capabilities to include more advanced MCP features and integrate additional APIs.
- Enhance error handling and logging to provide deeper insights into application behavior.

---
This project offers a well-structured demonstration of how MCP can be utilized for tool integration and provides practical examples for weather-based applications using modern AI frameworks and protocols.
```

### 执行流程回顾
```mermaid
flowchart TD
    Start([用户输入任务]) --> Init[初始化Memory]
    Init --> AddTask[将用户任务添加到Memory]
    
    AddTask --> LoopStart{开始循环<br/>max_iterations=50}
    
    LoopStart --> Step1[步骤1: 构造Prompt]
    
    Step1 --> Step1a[格式化Goals为system消息]
    Step1 --> Step1b[格式化Memory为对话历史]
    Step1 --> Step1c[格式化Actions为tools Schema]
    
    Step1a --> Step2
    Step1b --> Step2
    Step1c --> Step2
    
    Step2[步骤2: 调用LLM] --> Step2a[发送Prompt到OpenAI API]
    Step2a --> Step2b{是否有tools?}
    
    Step2b -->|有| Step2c[启用函数调用模式]
    Step2b -->|无| Step2d[普通对话模式]
    
    Step2c --> Step3
    Step2d --> Step3
    
    Step3[步骤3: 获取响应] --> Step3a[LLM返回tool_calls或文本]
    Step3a --> Step4[步骤4: 解析响应]
    
    Step4 --> Step4a[解析JSON获取tool和args]
    Step4a --> Step4b{解析成功?}
    
    Step4b -->|成功| Step5
    Step4b -->|失败| Step4c[降级为terminate工具]
    Step4c --> Step5
    
    Step5[步骤5: 获取Action对象] --> Step5a[从ActionRegistry查找]
    Step5a --> Step6[步骤6: 执行动作]
    
    Step6 --> Step6a[在Environment中执行]
    Step6a --> Step6b{执行成功?}
    
    Step6b -->|成功| Step6c[格式化结果+时间戳]
    Step6b -->|失败| Step6d[捕获异常+traceback]
    
    Step6c --> Step7
    Step6d --> Step7
    
    Step7[步骤7: 更新Memory] --> Step7a[添加assistant消息<br/>包含决策内容]
    Step7a --> Step7b[添加environment消息<br/>包含执行结果]
    
    Step7b --> Step8{步骤8: 判断是否终止}
    
    Step8 -->|action.terminal=True| End([结束循环])
    Step8 -->|继续| IterCheck{迭代次数<br/>是否达到上限?}
    
    IterCheck -->|否| LoopStart
    IterCheck -->|是| End
    
    End --> Return[返回final_memory]
    
    style Start fill:#e1f5e1
    style End fill:#ffe1e1
    style Return fill:#e1e5ff
    style Step2 fill:#fff4e1
    style Step6 fill:#fff4e1
```