# 🤖 LangChain智能代理系统入门

## 🎯 学习目标

在本课程中，你将学习：
- Agent（智能代理）的核心概念
- 工具系统的设计和实现
- 不同类型的Agent架构
- ReAct推理模式的应用
- 自定义工具的开发
- Agent的调试和优化技巧

---

In [None]:
# 初始化学习环境
import sys
import os
sys.path.append('../utils')

from progress_tracker import ProgressTracker
import warnings
warnings.filterwarnings('ignore')

# 初始化进度追踪器
tracker = ProgressTracker()
lesson_id = "02_agents_basics"
tracker.start_lesson(lesson_id, "LangChain智能代理系统入门")

print("🚀 欢迎来到LangChain智能代理系统课程！")
print("📊 正在初始化学习环境...")

# 显示学习进度
tracker.display_progress_summary()

## 1. 🧠 什么是智能代理（Agent）？

### 1.1 基本概念

智能代理是能够使用工具、进行推理并执行复杂任务的AI系统。它不仅能生成文本，还能与外部世界交互。

In [None]:
# 环境配置检查
try:
    from langchain.agents import AgentType, initialize_agent, create_react_agent
    from langchain.agents import AgentExecutor
    from langchain_core.tools import Tool, BaseTool
    from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
    from langchain_openai import ChatOpenAI
    from langchain.schema import AgentAction, AgentFinish
    from dotenv import load_dotenv
    import json
    
    # 加载环境变量
    load_dotenv()
    
    print("✅ LangChain Agent模块加载成功")
    
    # 检查API密钥
    if os.getenv('OPENAI_API_KEY'):
        print("✅ OpenAI API密钥已配置")
        has_api_key = True
        # 初始化LLM
        llm = ChatOpenAI(
            model="gpt-3.5-turbo",
            temperature=0.1,  # 降低随机性，提高推理准确性
            max_tokens=1000
        )
    else:
        print("⚠️ 未检测到OpenAI API密钥，将使用模拟模式")
        has_api_key = False
        
        # 创建模拟LLM类
        class MockLLM:
            def invoke(self, prompt):
                return f"[模拟响应] 基于提示词的Agent推理..."
            
            def predict(self, text):
                return self.invoke(text)
        
        llm = MockLLM()
        
except ImportError as e:
    print(f"❌ 导入错误: {e}")
    print("💡 请运行: pip install langchain langchain-openai")
    has_api_key = False

### 1.2 Agent vs Chain 的区别

In [None]:
print("🔍 Agent vs Chain 对比：")
print("="*60)

comparison = {
    "特征": ["执行方式", "决策能力", "工具使用", "复杂度", "适用场景"],
    "Chain（链）": [
        "预定义的固定步骤",
        "无动态决策",
        "不能使用外部工具",
        "相对简单",
        "结构化、可预测的任务"
    ],
    "Agent（代理）": [
        "动态选择执行路径",
        "具备推理和决策能力",
        "可以使用多种工具",
        "较为复杂",
        "开放式、需要推理的任务"
    ]
}

for i, feature in enumerate(comparison["特征"]):
    print(f"\n📋 {feature}:")
    print(f"   🔗 Chain: {comparison['Chain（链）'][i]}")
    print(f"   🤖 Agent: {comparison['Agent（代理）'][i]}")

print("\n💡 Agent的核心组件：")
components = [
    "🧠 LLM (大语言模型) - 推理引擎",
    "🛠️ Tools (工具集) - 执行具体操作",
    "📝 Prompt Template - 指导推理过程",
    "🔄 Agent Executor - 协调执行流程",
    "💭 Memory (可选) - 存储对话历史"
]

for component in components:
    print(f"   {component}")

# 记录完成状态
tracker.complete_exercise(lesson_id, "agent_concepts")

## 2. 🛠️ 工具系统基础

### 2.1 创建基础工具

In [None]:
from langchain_core.tools import Tool
import math
import datetime
import requests

# 工具1：计算器
def calculate(expression: str) -> str:
    """执行基础数学计算
    
    Args:
        expression: 数学表达式，如 "2+3*4" 或 "sqrt(16)"
    
    Returns:
        计算结果的字符串
    """
    try:
        # 安全的数学计算（限制可用函数）
        allowed_names = {
            k: v for k, v in math.__dict__.items() if not k.startswith("__")
        }
        allowed_names.update({"abs": abs, "round": round})
        
        # 替换常用函数名
        expression = expression.replace("sqrt", "math.sqrt")
        expression = expression.replace("sin", "math.sin")
        expression = expression.replace("cos", "math.cos")
        expression = expression.replace("tan", "math.tan")
        expression = expression.replace("log", "math.log")
        
        result = eval(expression, {"__builtins__": {}, "math": math}, allowed_names)
        return f"计算结果: {result}"
    
    except Exception as e:
        return f"计算错误: {str(e)}"

calculator_tool = Tool(
    name="Calculator",
    description="用于执行数学计算，支持基础运算和数学函数",
    func=calculate
)

# 工具2：日期时间查询
def get_current_time(format_type: str = "datetime") -> str:
    """获取当前日期时间
    
    Args:
        format_type: 格式类型，可选 "datetime", "date", "time"
    
    Returns:
        格式化的日期时间字符串
    """
    now = datetime.datetime.now()
    
    if format_type == "date":
        return now.strftime("%Y-%m-%d")
    elif format_type == "time":
        return now.strftime("%H:%M:%S")
    else:
        return now.strftime("%Y-%m-%d %H:%M:%S")

datetime_tool = Tool(
    name="DateTime",
    description="获取当前日期和时间信息",
    func=get_current_time
)

# 工具3：文本处理
def text_processor(operation: str) -> str:
    """处理文本的各种操作
    
    Args:
        operation: 操作指令，格式为 "action:text"，如 "upper:hello" 或 "count:hello world"
    
    Returns:
        处理结果
    """
    try:
        if ":" not in operation:
            return "错误：请使用格式 'action:text'，如 'upper:hello'"
        
        action, text = operation.split(":", 1)
        action = action.strip().lower()
        
        if action == "upper":
            return text.upper()
        elif action == "lower":
            return text.lower()
        elif action == "count":
            return f"字符数: {len(text)}, 单词数: {len(text.split())}"
        elif action == "reverse":
            return text[::-1]
        elif action == "title":
            return text.title()
        else:
            return f"不支持的操作: {action}。支持的操作：upper, lower, count, reverse, title"
    
    except Exception as e:
        return f"处理错误: {str(e)}"

text_tool = Tool(
    name="TextProcessor",
    description="处理文本，支持大小写转换、字符统计、反转等操作",
    func=text_processor
)

# 工具4：文件信息查询（模拟）
def file_info(file_path: str) -> str:
    """获取文件信息（模拟）
    
    Args:
        file_path: 文件路径
    
    Returns:
        文件信息描述
    """
    # 模拟文件信息
    import os
    
    if os.path.exists(file_path):
        try:
            stat = os.stat(file_path)
            size = stat.st_size
            modified = datetime.datetime.fromtimestamp(stat.st_mtime)
            
            return f"文件: {file_path}\n大小: {size} 字节\n修改时间: {modified.strftime('%Y-%m-%d %H:%M:%S')}"
        except Exception as e:
            return f"获取文件信息失败: {str(e)}"
    else:
        return f"文件不存在: {file_path}"

file_tool = Tool(
    name="FileInfo",
    description="获取文件的基本信息，如大小、修改时间等",
    func=file_info
)

# 创建工具列表
tools = [calculator_tool, datetime_tool, text_tool, file_tool]

print("🛠️ 已创建的工具列表：")
for i, tool in enumerate(tools, 1):
    print(f"   {i}. {tool.name}: {tool.description}")

# 测试工具
print("\n🧪 工具测试：")
test_cases = [
    ("Calculator", "2 + 3 * 4"),
    ("DateTime", "datetime"),
    ("TextProcessor", "upper:hello world"),
    ("TextProcessor", "count:LangChain Agent系统")
]

for tool_name, test_input in test_cases:
    tool = next((t for t in tools if t.name == tool_name), None)
    if tool:
        result = tool.func(test_input)
        print(f"   {tool_name}({test_input}) = {result}")

# 记录完成状态
tracker.complete_exercise(lesson_id, "basic_tools")

### 2.2 自定义高级工具

In [None]:
from langchain_core.tools import BaseTool
from typing import Optional, Type
from pydantic import BaseModel, Field

# 定义工具输入模式
class WeatherInput(BaseModel):
    city: str = Field(description="城市名称")
    country: Optional[str] = Field(default="CN", description="国家代码，默认为CN")

class DataAnalysisInput(BaseModel):
    data: str = Field(description="要分析的数据，JSON格式的数字列表")
    analysis_type: str = Field(description="分析类型：basic, statistical, trend")

# 自定义工具1：天气查询（模拟）
class WeatherTool(BaseTool):
    name = "weather_query"
    description = "查询指定城市的天气信息"
    args_schema: Type[BaseModel] = WeatherInput
    
    def _run(self, city: str, country: str = "CN") -> str:
        """执行天气查询"""
        # 模拟天气数据
        weather_data = {
            "北京": {"temperature": "15°C", "condition": "多云", "humidity": "65%"},
            "上海": {"temperature": "18°C", "condition": "小雨", "humidity": "80%"},
            "广州": {"temperature": "25°C", "condition": "晴朗", "humidity": "55%"},
            "深圳": {"temperature": "24°C", "condition": "多云", "humidity": "70%"},
        }
        
        if city in weather_data:
            data = weather_data[city]
            return f"{city}天气: {data['condition']}, 温度: {data['temperature']}, 湿度: {data['humidity']}"
        else:
            return f"抱歉，暂时无法获取{city}的天气信息。支持的城市：北京、上海、广州、深圳"
    
    async def _arun(self, city: str, country: str = "CN") -> str:
        """异步执行（可选实现）"""
        return self._run(city, country)

# 自定义工具2：数据分析
class DataAnalysisTool(BaseTool):
    name = "data_analysis"
    description = "对数字数据进行统计分析"
    args_schema: Type[BaseModel] = DataAnalysisInput
    
    def _run(self, data: str, analysis_type: str = "basic") -> str:
        """执行数据分析"""
        try:
            # 解析JSON数据
            import json
            numbers = json.loads(data)
            
            if not isinstance(numbers, list) or not all(isinstance(x, (int, float)) for x in numbers):
                return "错误：数据必须是数字列表，如 [1, 2, 3, 4, 5]"
            
            if not numbers:
                return "错误：数据列表不能为空"
            
            # 基础分析
            count = len(numbers)
            total = sum(numbers)
            mean = total / count
            minimum = min(numbers)
            maximum = max(numbers)
            
            if analysis_type == "basic":
                return f"基础统计：数量={count}, 总和={total}, 平均值={mean:.2f}, 最小值={minimum}, 最大值={maximum}"
            
            elif analysis_type == "statistical":
                # 计算方差和标准差
                variance = sum((x - mean) ** 2 for x in numbers) / count
                std_dev = variance ** 0.5
                
                # 计算中位数
                sorted_numbers = sorted(numbers)
                n = len(sorted_numbers)
                if n % 2 == 0:
                    median = (sorted_numbers[n//2-1] + sorted_numbers[n//2]) / 2
                else:
                    median = sorted_numbers[n//2]
                
                return f"统计分析：平均值={mean:.2f}, 中位数={median:.2f}, 标准差={std_dev:.2f}, 方差={variance:.2f}"
            
            elif analysis_type == "trend":
                # 简单趋势分析
                if count < 2:
                    return "趋势分析需要至少2个数据点"
                
                # 计算线性趋势
                x_values = list(range(count))
                n = count
                sum_x = sum(x_values)
                sum_y = sum(numbers)
                sum_xy = sum(x * y for x, y in zip(x_values, numbers))
                sum_x2 = sum(x * x for x in x_values)
                
                slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
                
                if slope > 0.1:
                    trend = "上升趋势"
                elif slope < -0.1:
                    trend = "下降趋势"
                else:
                    trend = "稳定趋势"
                
                return f"趋势分析：{trend} (斜率={slope:.3f}), 变化幅度={maximum-minimum:.2f}"
            
            else:
                return "不支持的分析类型。支持：basic, statistical, trend"
        
        except json.JSONDecodeError:
            return "错误：无法解析JSON数据，请确保格式正确，如 [1, 2, 3]"
        except Exception as e:
            return f"分析错误：{str(e)}"
    
    async def _arun(self, data: str, analysis_type: str = "basic") -> str:
        return self._run(data, analysis_type)

# 创建自定义工具实例
weather_tool = WeatherTool()
data_analysis_tool = DataAnalysisTool()

# 扩展工具列表
advanced_tools = [weather_tool, data_analysis_tool]
all_tools = tools + advanced_tools

print("🔧 自定义高级工具：")
for tool in advanced_tools:
    print(f"   • {tool.name}: {tool.description}")

# 测试自定义工具
print("\n🧪 高级工具测试：")

# 测试天气工具
weather_result = weather_tool._run("北京")
print(f"   天气查询(北京): {weather_result}")

# 测试数据分析工具
test_data = "[10, 15, 12, 18, 20, 14, 16, 22, 19, 17]"
analysis_results = [
    data_analysis_tool._run(test_data, "basic"),
    data_analysis_tool._run(test_data, "statistical"),
    data_analysis_tool._run(test_data, "trend")
]

for i, result in enumerate(analysis_results, 1):
    print(f"   数据分析{i}: {result}")

# 记录完成状态
tracker.complete_exercise(lesson_id, "advanced_tools")

## 3. 🧠 ReAct推理模式

### 3.1 理解ReAct模式

In [None]:
print("🧠 ReAct推理模式详解")
print("="*50)

print("\n📚 ReAct = Reasoning + Acting")
print("ReAct是一种将推理(Reasoning)和行动(Acting)结合的AI推理框架")

print("\n🔄 ReAct循环流程：")
react_steps = [
    "1. 🤔 Thought (思考): 分析当前情况，制定计划",
    "2. 🎯 Action (行动): 选择并执行一个工具",
    "3. 📊 Observation (观察): 获取工具执行结果",
    "4. 🔄 重复: 根据观察结果继续思考或结束"
]

for step in react_steps:
    print(f"   {step}")

print("\n💡 ReAct的优势：")
advantages = [
    "🎯 目标导向：每一步都朝着解决问题的方向",
    "🔍 透明度：推理过程清晰可见",
    "🛠️ 工具整合：能够有效使用外部工具",
    "🔄 自适应：可以根据结果调整策略",
    "🧪 可调试：便于分析和优化"
]

for advantage in advantages:
    print(f"   {advantage}")

print("\n📝 ReAct提示词模板示例：")
react_template = """
你是一个智能助手，可以使用以下工具来解决问题：

{tools}

使用以下格式进行推理：

Question: 用户的问题
Thought: 我需要思考如何解决这个问题
Action: 工具名称
Action Input: 工具的输入参数
Observation: 工具的执行结果
... (重复 Thought/Action/Action Input/Observation)
Thought: 我现在知道了最终答案
Final Answer: 最终答案

开始!

Question: {input}
Thought: {agent_scratchpad}
"""

print(react_template[:300] + "...")

# 记录完成状态
tracker.complete_exercise(lesson_id, "react_concepts")

### 3.2 创建ReAct Agent

In [None]:
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.prompts import PromptTemplate

# 创建ReAct提示词模板
react_prompt = PromptTemplate.from_template("""
你是一个智能助手，具备使用工具解决问题的能力。

你可以使用以下工具：
{tools}

使用以下格式进行推理和行动：

Question: 用户的问题
Thought: 我需要分析这个问题并决定下一步行动
Action: 选择一个工具
Action Input: 工具需要的输入参数
Observation: 工具执行的结果
... (可以重复 Thought/Action/Action Input/Observation)
Thought: 现在我有足够的信息给出最终答案
Final Answer: 对用户问题的完整回答

重要提示：
- 仔细思考每一步
- 选择最合适的工具
- 根据观察结果调整策略
- 确保最终答案完整准确

开始!

Question: {input}
Thought: {agent_scratchpad}
""")

class ReactAgentSimulator:
    """ReAct Agent模拟器（用于无API密钥时的演示）"""
    
    def __init__(self, tools):
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = 5
    
    def simulate_reasoning(self, question: str) -> str:
        """模拟ReAct推理过程"""
        print(f"🤖 Question: {question}")
        print("\n🧠 开始ReAct推理过程：")
        print("="*50)
        
        # 模拟不同类型问题的推理路径
        if "计算" in question or "数学" in question or any(op in question for op in ["+", "-", "*", "/", "="]):
            return self._simulate_math_reasoning(question)
        elif "天气" in question or "温度" in question:
            return self._simulate_weather_reasoning(question)
        elif "时间" in question or "日期" in question:
            return self._simulate_time_reasoning(question)
        elif "分析" in question and any(char.isdigit() for char in question):
            return self._simulate_data_analysis_reasoning(question)
        else:
            return self._simulate_general_reasoning(question)
    
    def _simulate_math_reasoning(self, question: str) -> str:
        print("💭 Thought: 这是一个数学计算问题，我需要使用Calculator工具")
        print("🎯 Action: Calculator")
        
        # 提取数学表达式
        import re
        math_pattern = r'[0-9+\-*/()\s.]+'
        matches = re.findall(math_pattern, question)
        if matches:
            expression = max(matches, key=len).strip()
        else:
            expression = "2+2"  # 默认表达式
        
        print(f"📥 Action Input: {expression}")
        
        # 执行计算
        result = self.tools["Calculator"].func(expression)
        print(f"👁️ Observation: {result}")
        
        print("💭 Thought: 我已经得到了计算结果，可以给出最终答案")
        final_answer = f"根据计算，{result}"
        print(f"✅ Final Answer: {final_answer}")
        
        return final_answer
    
    def _simulate_weather_reasoning(self, question: str) -> str:
        print("💭 Thought: 用户询问天气信息，我需要使用weather_query工具")
        print("🎯 Action: weather_query")
        
        # 提取城市名
        cities = ["北京", "上海", "广州", "深圳"]
        city = "北京"  # 默认城市
        for c in cities:
            if c in question:
                city = c
                break
        
        print(f"📥 Action Input: city={city}")
        
        # 执行天气查询
        result = self.tools["weather_query"]._run(city)
        print(f"👁️ Observation: {result}")
        
        print("💭 Thought: 我已经获取了天气信息，可以回答用户")
        final_answer = f"根据查询，{result}"
        print(f"✅ Final Answer: {final_answer}")
        
        return final_answer
    
    def _simulate_time_reasoning(self, question: str) -> str:
        print("💭 Thought: 用户询问时间信息，我需要使用DateTime工具")
        print("🎯 Action: DateTime")
        print("📥 Action Input: datetime")
        
        # 执行时间查询
        result = self.tools["DateTime"].func("datetime")
        print(f"👁️ Observation: 当前时间是 {result}")
        
        print("💭 Thought: 我已经获取了当前时间，可以回答用户")
        final_answer = f"当前时间是 {result}"
        print(f"✅ Final Answer: {final_answer}")
        
        return final_answer
    
    def _simulate_data_analysis_reasoning(self, question: str) -> str:
        print("💭 Thought: 这是一个数据分析问题，我需要使用data_analysis工具")
        print("🎯 Action: data_analysis")
        
        # 使用示例数据
        test_data = "[10, 15, 12, 18, 20, 14, 16, 22, 19, 17]"
        analysis_type = "statistical" if "统计" in question else "basic"
        
        print(f"📥 Action Input: data={test_data}, analysis_type={analysis_type}")
        
        # 执行数据分析
        result = self.tools["data_analysis"]._run(test_data, analysis_type)
        print(f"👁️ Observation: {result}")
        
        print("💭 Thought: 数据分析完成，我可以基于结果回答用户")
        final_answer = f"数据分析结果：{result}"
        print(f"✅ Final Answer: {final_answer}")
        
        return final_answer
    
    def _simulate_general_reasoning(self, question: str) -> str:
        print("💭 Thought: 这是一个一般性问题，我需要分析是否需要使用工具")
        print("💭 Thought: 看起来我可以直接回答这个问题，不需要使用额外的工具")
        
        final_answer = f"关于您的问题 '{question}'，这是一个需要基于知识和推理的问题。我会尽力为您提供有用的信息和建议。"
        print(f"✅ Final Answer: {final_answer}")
        
        return final_answer

# 创建Agent
if has_api_key:
    try:
        # 创建真实的ReAct Agent
        react_agent = create_react_agent(llm, all_tools, react_prompt)
        agent_executor = AgentExecutor(
            agent=react_agent,
            tools=all_tools,
            verbose=True,
            max_iterations=5,
            handle_parsing_errors=True
        )
        print("✅ 真实ReAct Agent创建成功")
        use_real_agent = True
    except Exception as e:
        print(f"⚠️ 真实Agent创建失败，使用模拟器: {e}")
        agent_simulator = ReactAgentSimulator(all_tools)
        use_real_agent = False
else:
    # 使用模拟器
    agent_simulator = ReactAgentSimulator(all_tools)
    use_real_agent = False
    print("🤖 使用ReAct Agent模拟器")

# 记录完成状态
tracker.complete_exercise(lesson_id, "react_agent")

### 3.3 Agent推理演示

In [None]:
print("🎭 ReAct Agent推理演示")
print("="*60)

# 准备测试问题
test_questions = [
    "计算 25 * 4 + 18 的结果",
    "现在几点了？",
    "北京的天气怎么样？",
    "分析数据 [5, 10, 15, 20, 25] 的统计特征",
    "将文本 'Hello World' 转换为大写"
]

for i, question in enumerate(test_questions, 1):
    print(f"\n{'='*60}")
    print(f"🧪 测试 {i}: {question}")
    print(f"{'='*60}")
    
    try:
        if use_real_agent:
            # 使用真实Agent
            result = agent_executor.invoke({"input": question})
            print(f"\n🎯 最终结果: {result.get('output', '执行失败')}")
        else:
            # 使用模拟器
            result = agent_simulator.simulate_reasoning(question)
            
    except Exception as e:
        print(f"❌ 执行失败: {e}")
    
    # 添加分隔
    if i < len(test_questions):
        print("\n" + "🔄 继续下一个测试..." + "\n")

print("\n" + "="*60)
print("✅ ReAct Agent演示完成！")
print("\n💡 观察要点：")
observations = [
    "🧠 每次推理都有明确的思考过程",
    "🎯 根据问题类型选择合适的工具",
    "📊 工具执行结果会影响后续决策",
    "🔄 可以进行多轮推理直到得出答案",
    "✅ 最终答案基于工具执行结果"
]

for obs in observations:
    print(f"   {obs}")

# 记录完成状态
tracker.complete_exercise(lesson_id, "agent_demonstration")

## 4. 🎯 实战练习

### 练习1：构建智能数据分析助手

In [None]:
print("🎯 练习1：智能数据分析助手")
print("="*50)

# 任务：创建一个专门用于数据分析的智能助手
# 功能包括：数据验证、统计分析、可视化建议、报告生成

from typing import List, Dict, Any
import json
import math

class DataValidationTool(BaseTool):
    name = "data_validator"
    description = "验证数据格式和质量，检查异常值"
    
    def _run(self, data: str) -> str:
        try:
            numbers = json.loads(data)
            
            if not isinstance(numbers, list):
                return "错误：数据必须是列表格式"
            
            # 验证数据类型
            valid_numbers = []
            invalid_count = 0
            
            for i, item in enumerate(numbers):
                if isinstance(item, (int, float)) and not math.isnan(item) and math.isfinite(item):
                    valid_numbers.append(item)
                else:
                    invalid_count += 1
            
            if not valid_numbers:
                return "错误：没有有效的数值数据"
            
            # 检查异常值（使用IQR方法）
            if len(valid_numbers) >= 4:
                sorted_data = sorted(valid_numbers)
                n = len(sorted_data)
                q1 = sorted_data[n//4]
                q3 = sorted_data[3*n//4]
                iqr = q3 - q1
                lower_bound = q1 - 1.5 * iqr
                upper_bound = q3 + 1.5 * iqr
                
                outliers = [x for x in valid_numbers if x < lower_bound or x > upper_bound]
                outlier_count = len(outliers)
            else:
                outlier_count = 0
            
            result = {
                "total_items": len(numbers),
                "valid_numbers": len(valid_numbers),
                "invalid_items": invalid_count,
                "outliers": outlier_count,
                "data_quality": len(valid_numbers) / len(numbers) * 100
            }
            
            return f"数据验证结果：总数据{result['total_items']}个，有效数据{result['valid_numbers']}个，无效数据{result['invalid_items']}个，异常值{result['outliers']}个，数据质量{result['data_quality']:.1f}%"
            
        except json.JSONDecodeError:
            return "错误：无法解析JSON格式数据"
        except Exception as e:
            return f"验证错误：{str(e)}"

class VisualizationAdvisorTool(BaseTool):
    name = "visualization_advisor"
    description = "根据数据特征推荐合适的可视化方法"
    
    def _run(self, data_info: str) -> str:
        try:
            # 分析数据信息
            if "数据量" in data_info:
                # 提取数据量信息
                import re
                numbers = re.findall(r'\d+', data_info)
                if numbers:
                    data_count = int(numbers[0])
                else:
                    data_count = 10  # 默认值
            else:
                data_count = 10
            
            recommendations = []
            
            # 基于数据量推荐
            if data_count <= 10:
                recommendations.append("柱状图 - 适合少量数据的比较")
                recommendations.append("饼图 - 显示数据占比关系")
            elif data_count <= 50:
                recommendations.append("折线图 - 显示数据趋势变化")
                recommendations.append("散点图 - 观察数据分布")
            else:
                recommendations.append("直方图 - 显示数据分布")
                recommendations.append("箱线图 - 显示数据统计特征")
            
            # 通用推荐
            recommendations.append("统计摘要表 - 数值型数据概览")
            
            if "异常值" in data_info:
                recommendations.append("箱线图 - 突出显示异常值")
            
            if "趋势" in data_info:
                recommendations.append("趋势线图 - 显示变化趋势")
            
            return f"可视化建议：\n" + "\n".join(f"• {rec}" for rec in recommendations)
            
        except Exception as e:
            return f"建议生成错误：{str(e)}"

class ReportGeneratorTool(BaseTool):
    name = "report_generator"
    description = "基于分析结果生成数据分析报告"
    
    def _run(self, analysis_results: str) -> str:
        try:
            # 生成结构化报告
            report = f"""
📊 数据分析报告
{'='*40}

📋 执行摘要
本次分析基于提供的数据集进行了全面的统计分析和质量评估。

📈 分析结果
{analysis_results}

💡 关键发现
• 数据质量评估完成，识别了数据中的异常值和无效项
• 统计特征分析提供了数据的分布和中心趋势信息
• 趋势分析揭示了数据的变化模式

🎯 建议和后续行动
• 定期监控数据质量，及时处理异常值
• 根据数据特征选择合适的可视化方法
• 持续收集更多数据以提高分析的可靠性

📅 报告生成时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
            return report
            
        except Exception as e:
            return f"报告生成错误：{str(e)}"

# 创建数据分析助手工具集
data_analysis_tools = [
    DataValidationTool(),
    data_analysis_tool,  # 之前创建的数据分析工具
    VisualizationAdvisorTool(),
    ReportGeneratorTool()
]

# 创建数据分析助手模拟器
class DataAnalysisAssistant:
    """智能数据分析助手"""
    
    def __init__(self, tools):
        self.tools = {tool.name: tool for tool in tools}
    
    def analyze_dataset(self, data: str, analysis_request: str) -> str:
        """执行完整的数据分析流程"""
        print(f"🔍 分析请求: {analysis_request}")
        print(f"📊 数据: {data}")
        print("\n🤖 开始智能数据分析流程：")
        print("="*50)
        
        # 步骤1：数据验证
        print("💭 Thought: 首先需要验证数据质量")
        print("🎯 Action: data_validator")
        print(f"📥 Action Input: {data}")
        
        validation_result = self.tools["data_validator"]._run(data)
        print(f"👁️ Observation: {validation_result}")
        
        # 步骤2：统计分析
        print("\n💭 Thought: 数据验证完成，现在进行统计分析")
        print("🎯 Action: data_analysis")
        print(f"📥 Action Input: data={data}, analysis_type=statistical")
        
        stats_result = self.tools["data_analysis"]._run(data, "statistical")
        print(f"👁️ Observation: {stats_result}")
        
        # 步骤3：可视化建议
        print("\n💭 Thought: 基于数据特征提供可视化建议")
        print("🎯 Action: visualization_advisor")
        data_info = f"数据量{len(json.loads(data))}个，{validation_result}，{stats_result}"
        print(f"📥 Action Input: {data_info[:100]}...")
        
        viz_result = self.tools["visualization_advisor"]._run(data_info)
        print(f"👁️ Observation: {viz_result}")
        
        # 步骤4：生成报告
        print("\n💭 Thought: 所有分析完成，现在生成综合报告")
        print("🎯 Action: report_generator")
        all_results = f"验证结果：{validation_result}\n统计分析：{stats_result}\n可视化建议：{viz_result}"
        print(f"📥 Action Input: 综合分析结果")
        
        report = self.tools["report_generator"]._run(all_results)
        print(f"👁️ Observation: 报告已生成")
        
        print("\n💭 Thought: 数据分析流程完成，可以提供最终答案")
        
        final_answer = f"""
✅ 数据分析完成！

🔍 数据质量：{validation_result}
📊 统计特征：{stats_result}
🎨 可视化建议：{viz_result}

{report}
"""
        
        print(f"✅ Final Answer: {final_answer[:200]}...")
        return final_answer

# 创建数据分析助手
data_assistant = DataAnalysisAssistant(data_analysis_tools)

# 测试数据分析助手
test_dataset = "[12, 15, 18, 14, 20, 16, 13, 17, 19, 15, 100, 14, 16, 18, 15]"
analysis_request = "请分析这个销售数据的统计特征，并提供可视化建议"

print("\n🧪 测试智能数据分析助手：")
print("="*60)

result = data_assistant.analyze_dataset(test_dataset, analysis_request)

print("\n✅ 练习1完成！你已经学会了如何构建专业的数据分析Agent。")

# 记录练习完成
tracker.complete_exercise(lesson_id, "exercise_data_assistant")

### 练习2：多工具协作Agent

In [None]:
print("🎯 练习2：多工具协作Agent")
print("="*50)

# 任务：创建一个能够协调多个工具完成复杂任务的Agent
# 场景：智能办公助手，能处理日程、计算、文档和通知等任务

import uuid
from datetime import datetime, timedelta

class ScheduleTool(BaseTool):
    name = "schedule_manager"
    description = "管理日程安排，可以添加、查询和删除日程"
    
    def __init__(self):
        super().__init__()
        self.schedule = {}  # 简单的内存存储
    
    def _run(self, operation: str) -> str:
        try:
            # 操作格式：action:details
            if ":" not in operation:
                return "错误：请使用格式 'action:details'，如 'add:会议,2024-01-15 14:00'"
            
            action, details = operation.split(":", 1)
            action = action.strip().lower()
            
            if action == "add":
                # 添加日程：title,datetime
                if "," not in details:
                    return "错误：添加日程格式为 'add:标题,时间'"
                
                title, time_str = details.split(",", 1)
                event_id = str(uuid.uuid4())[:8]
                
                self.schedule[event_id] = {
                    "title": title.strip(),
                    "time": time_str.strip(),
                    "created": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                }
                
                return f"日程已添加：{title} - {time_str} (ID: {event_id})"
            
            elif action == "list":
                # 列出所有日程
                if not self.schedule:
                    return "当前没有安排任何日程"
                
                result = "当前日程安排：\n"
                for event_id, event in self.schedule.items():
                    result += f"• {event['title']} - {event['time']} (ID: {event_id})\n"
                
                return result.strip()
            
            elif action == "delete":
                # 删除日程
                event_id = details.strip()
                if event_id in self.schedule:
                    deleted_event = self.schedule.pop(event_id)
                    return f"已删除日程：{deleted_event['title']} - {deleted_event['time']}"
                else:
                    return f"未找到ID为 {event_id} 的日程"
            
            else:
                return f"不支持的操作：{action}。支持的操作：add, list, delete"
        
        except Exception as e:
            return f"日程管理错误：{str(e)}"

class DocumentTool(BaseTool):
    name = "document_processor"
    description = "处理文档相关任务，如创建摘要、格式转换等"
    
    def _run(self, operation: str) -> str:
        try:
            parts = operation.split(":", 1)
            if len(parts) != 2:
                return "错误：请使用格式 'action:content'"
            
            action, content = parts
            action = action.strip().lower()
            
            if action == "summarize":
                # 简单的文本摘要
                sentences = content.split("。")
                word_count = len(content.split())
                char_count = len(content)
                
                # 提取关键信息
                key_sentences = sentences[:2] if len(sentences) >= 2 else sentences
                summary = "。".join(key_sentences)
                
                return f"文档摘要：\n{summary}\n\n统计信息：{len(sentences)}句，{word_count}词，{char_count}字符"
            
            elif action == "format":
                # 格式化文本
                formatted = content.replace("\n\n", "\n").replace("  ", " ").strip()
                lines = formatted.split("\n")
                
                result = "格式化文档：\n"
                for i, line in enumerate(lines, 1):
                    if line.strip():
                        result += f"{i}. {line.strip()}\n"
                
                return result.strip()
            
            elif action == "extract":
                # 提取关键信息
                import re
                
                # 提取数字
                numbers = re.findall(r'\b\d+(?:\.\d+)?\b', content)
                # 提取日期
                dates = re.findall(r'\b\d{4}-\d{2}-\d{2}\b|\b\d{1,2}/\d{1,2}/\d{4}\b', content)
                # 提取邮箱
                emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', content)
                
                result = "提取的关键信息：\n"
                if numbers:
                    result += f"数字：{', '.join(numbers)}\n"
                if dates:
                    result += f"日期：{', '.join(dates)}\n"
                if emails:
                    result += f"邮箱：{', '.join(emails)}\n"
                
                if not any([numbers, dates, emails]):
                    result += "未找到数字、日期或邮箱信息"
                
                return result.strip()
            
            else:
                return f"不支持的操作：{action}。支持的操作：summarize, format, extract"
        
        except Exception as e:
            return f"文档处理错误：{str(e)}"

class NotificationTool(BaseTool):
    name = "notification_system"
    description = "发送通知和提醒"
    
    def __init__(self):
        super().__init__()
        self.notifications = []
    
    def _run(self, operation: str) -> str:
        try:
            parts = operation.split(":", 1)
            if len(parts) != 2:
                return "错误：请使用格式 'action:content'"
            
            action, content = parts
            action = action.strip().lower()
            
            if action == "send":
                # 发送通知
                notification = {
                    "id": len(self.notifications) + 1,
                    "content": content.strip(),
                    "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                    "status": "sent"
                }
                
                self.notifications.append(notification)
                return f"通知已发送：{content} (ID: {notification['id']})"
            
            elif action == "list":
                # 列出通知
                if not self.notifications:
                    return "当前没有通知记录"
                
                result = "通知记录：\n"
                for notif in self.notifications:
                    result += f"• [{notif['timestamp']}] {notif['content']} (ID: {notif['id']})\n"
                
                return result.strip()
            
            elif action == "reminder":
                # 设置提醒
                reminder = f"⏰ 提醒：{content}"
                notification = {
                    "id": len(self.notifications) + 1,
                    "content": reminder,
                    "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                    "status": "reminder"
                }
                
                self.notifications.append(notification)
                return f"提醒已设置：{content} (ID: {notification['id']})"
            
            else:
                return f"不支持的操作：{action}。支持的操作：send, list, reminder"
        
        except Exception as e:
            return f"通知系统错误：{str(e)}"

# 创建办公助手工具集
office_tools = [
    ScheduleTool(),
    DocumentTool(),
    NotificationTool(),
    calculator_tool,  # 复用之前的计算器
    datetime_tool    # 复用之前的时间工具
]

# 创建智能办公助手
class OfficeAssistant:
    """智能办公助手"""
    
    def __init__(self, tools):
        self.tools = {tool.name: tool for tool in tools}
    
    def handle_complex_request(self, request: str) -> str:
        """处理复杂的办公请求"""
        print(f"💼 办公请求: {request}")
        print("\n🤖 智能办公助手开始处理：")
        print("="*50)
        
        # 分析请求类型并制定执行计划
        if "日程" in request and "计算" in request:
            return self._handle_schedule_calculation(request)
        elif "文档" in request and "通知" in request:
            return self._handle_document_notification(request)
        elif "会议" in request and "提醒" in request:
            return self._handle_meeting_reminder(request)
        else:
            return self._handle_general_request(request)
    
    def _handle_schedule_calculation(self, request: str) -> str:
        """处理涉及日程和计算的请求"""
        print("💭 Thought: 这个请求涉及日程管理和计算，我需要先处理计算，然后安排日程")
        
        # 步骤1：执行计算
        print("🎯 Action: Calculator")
        print("📥 Action Input: 8*60")
        calc_result = self.tools["Calculator"].func("8*60")
        print(f"👁️ Observation: {calc_result}")
        
        # 步骤2：添加日程
        print("\n💭 Thought: 计算完成，现在添加工作日程")
        print("🎯 Action: schedule_manager")
        print("📥 Action Input: add:工作日程,2024-01-15 09:00")
        schedule_result = self.tools["schedule_manager"]._run("add:工作日程,2024-01-15 09:00")
        print(f"👁️ Observation: {schedule_result}")
        
        # 步骤3：发送确认通知
        print("\n💭 Thought: 日程已安排，发送确认通知")
        print("🎯 Action: notification_system")
        print("📥 Action Input: send:工作日程已安排，总计480分钟")
        notif_result = self.tools["notification_system"]._run("send:工作日程已安排，总计480分钟")
        print(f"👁️ Observation: {notif_result}")
        
        final_answer = f"✅ 任务完成！\n计算结果：{calc_result}\n{schedule_result}\n{notif_result}"
        print(f"\n✅ Final Answer: {final_answer}")
        return final_answer
    
    def _handle_document_notification(self, request: str) -> str:
        """处理文档和通知的请求"""
        print("💭 Thought: 需要处理文档并发送相关通知")
        
        # 步骤1：处理文档
        sample_doc = "项目进展报告：本周完成了3个重要功能的开发。预计下周将完成测试并发布版本1.0。团队表现优秀，提前2天完成任务。"
        print("🎯 Action: document_processor")
        print(f"📥 Action Input: summarize:{sample_doc[:50]}...")
        doc_result = self.tools["document_processor"]._run(f"summarize:{sample_doc}")
        print(f"👁️ Observation: 文档已处理")
        
        # 步骤2：发送通知
        print("\n💭 Thought: 文档处理完成，现在发送通知")
        print("🎯 Action: notification_system")
        print("📥 Action Input: send:项目报告已完成摘要处理")
        notif_result = self.tools["notification_system"]._run("send:项目报告已完成摘要处理")
        print(f"👁️ Observation: {notif_result}")
        
        final_answer = f"✅ 文档处理和通知任务完成！\n{doc_result}\n{notif_result}"
        print(f"\n✅ Final Answer: {final_answer[:150]}...")
        return final_answer
    
    def _handle_meeting_reminder(self, request: str) -> str:
        """处理会议安排和提醒"""
        print("💭 Thought: 需要安排会议并设置提醒")
        
        # 步骤1：获取当前时间
        print("🎯 Action: DateTime")
        print("📥 Action Input: datetime")
        time_result = self.tools["DateTime"].func("datetime")
        print(f"👁️ Observation: {time_result}")
        
        # 步骤2：安排会议
        print("\n💭 Thought: 现在安排明天的会议")
        print("🎯 Action: schedule_manager")
        print("📥 Action Input: add:团队会议,2024-01-16 14:00")
        meeting_result = self.tools["schedule_manager"]._run("add:团队会议,2024-01-16 14:00")
        print(f"👁️ Observation: {meeting_result}")
        
        # 步骤3：设置提醒
        print("\n💭 Thought: 会议已安排，设置提前提醒")
        print("🎯 Action: notification_system")
        print("📥 Action Input: reminder:团队会议将在明天14:00开始")
        reminder_result = self.tools["notification_system"]._run("reminder:团队会议将在明天14:00开始")
        print(f"👁️ Observation: {reminder_result}")
        
        final_answer = f"✅ 会议和提醒已设置！\n当前时间：{time_result}\n{meeting_result}\n{reminder_result}"
        print(f"\n✅ Final Answer: {final_answer}")
        return final_answer
    
    def _handle_general_request(self, request: str) -> str:
        """处理一般性请求"""
        print("💭 Thought: 这是一个一般性请求，我来分析最适合的处理方式")
        
        if "时间" in request:
            time_result = self.tools["DateTime"].func("datetime")
            return f"当前时间：{time_result}"
        elif "日程" in request:
            schedule_result = self.tools["schedule_manager"]._run("list:")
            return f"日程查询结果：{schedule_result}"
        else:
            return "我理解您的请求，我会继续学习以便更好地为您服务。"

# 创建办公助手
office_assistant = OfficeAssistant(office_tools)

# 测试复杂的办公场景
test_scenarios = [
    "我需要安排一个8小时的工作日程，并计算总共多少分钟",
    "请处理项目文档并通知团队",
    "安排明天的团队会议并设置提醒"
]

print("\n🧪 测试智能办公助手：")
for i, scenario in enumerate(test_scenarios, 1):
    print(f"\n{'='*60}")
    print(f"🧪 场景 {i}:")
    print(f"{'='*60}")
    
    result = office_assistant.handle_complex_request(scenario)
    
    if i < len(test_scenarios):
        print("\n🔄 继续下一个场景...")

print("\n" + "="*60)
print("✅ 练习2完成！你已经学会了如何构建多工具协作的Agent系统。")

# 记录练习完成
tracker.complete_exercise(lesson_id, "exercise_office_assistant")

## 5. 📊 学习总结与进度回顾

In [None]:
# 完成课程并生成学习报告
completion_time = tracker.complete_lesson(lesson_id)

print("🎓 LangChain智能代理系统入门 - 课程完成！")
print("="*60)

print("\n📚 本课程核心知识点：")
key_concepts = [
    "Agent智能代理的基本概念",
    "ReAct推理模式的原理和应用",
    "工具系统的设计和实现",
    "基础工具和自定义工具开发",
    "Agent执行器的配置",
    "多步骤推理和决策过程",
    "工具协作和流程编排",
    "Agent调试和错误处理",
    "数据分析Agent的构建",
    "办公自动化Agent系统"
]

for i, concept in enumerate(key_concepts, 1):
    print(f"   {i:2d}. {concept}")

print("\n🛠️ 实践技能收获：")
skills = [
    "掌握Agent的核心架构和组件",
    "能够创建各种类型的工具",
    "理解ReAct推理循环机制",
    "构建复杂的多工具协作系统",
    "开发专业的AI助手应用"
]

for i, skill in enumerate(skills, 1):
    print(f"   ⚡ {skill}")

print("\n🎯 Agent设计最佳实践：")
best_practices = [
    "🎯 明确定义每个工具的职责和功能",
    "🧠 设计清晰的推理提示词模板",
    "🔄 实现合理的错误处理和重试机制",
    "📊 添加执行日志和调试信息",
    "⚡ 优化工具调用的性能和成本",
    "🛡️ 确保工具执行的安全性",
    "🔍 提供透明的推理过程"
]

for practice in best_practices:
    print(f"   {practice}")

print("\n💡 Agent vs Chain 选择指南：")
selection_guide = [
    "🔗 使用Chain：任务流程固定、步骤可预测",
    "🤖 使用Agent：需要动态决策、工具选择",
    "🎯 混合使用：复杂应用中Chain和Agent协作"
]

for guide in selection_guide:
    print(f"   {guide}")

print("\n🎯 下一步学习建议：")
next_steps = [
    "学习记忆系统和上下文管理（03_memory_systems.ipynb）",
    "探索高级Agent架构模式",
    "深入多Agent协作系统",
    "实践更复杂的业务场景应用"
]

for i, step in enumerate(next_steps, 1):
    print(f"   📈 {step}")

# 显示整体学习进度
print("\n" + "="*60)
tracker.display_progress_summary()

print("\n🎉 恭喜完成LangChain智能代理系统入门课程！")
print("💪 你已经掌握了构建智能Agent的核心技术！")
print("🚀 继续学习Memory系统，让AI应用更加智能！")