# 第 7 章：AI 智能体系统的架构与范式

> 本笔记文件需要与《LangGraph实战》的第 7 章的内容配套使用。

在本章中，我们将深入探索构建复杂、高效且可扩展的 AI 智能体系统的核心要素：架构设计与模式应用。随着我们不断提升 AI 智能体的能力，使其能够处理日益复杂的任务，仅仅依赖单一、线性的智能体模型已显得力不从心。如同城市规划需要蓝图、软件开发需要架构设计，构建强大的 AI 智能体系统同样需要精心设计的架构作为支撑。

我们将从基础但至关重要的智能体工作流模式开始，例如提示链、路由、并行化、协调器-工作者和评估器-优化器，这些模式构成了构建复杂智能体行为的基石。随后，我们将逐步深入多智能体架构的世界，重点介绍主管架构和分层架构，揭示如何通过组织和协调多个专业智能体来提升系统的整体性能和可管理性。最后，我们将前瞻性地审视情境感知智能体架构，这种架构代表了 AI 智能体发展的新方向。

通过本章的学习，您将不仅了解各种智能体系统的架构蓝图，更将掌握在 LangGraph 中实践这些架构模式的关键技术和方法。

### 🚀 环境准备

首先加载必要的环境变量配置：

In [None]:
from dotenv import load_dotenv

load_dotenv()

## 7.1 常见的智能体系统工作流

在人工智能体开发领域，尤其是在 AI 智能体生态系统中，理解不同类型的智能体系统之间的细微差别至关重要。正如 Anthropic 在其对智能体构建模式的富有洞察力的分析中所强调的那样，我们可以更好地驾驭 AI 系统的复杂性。

Anthropic 的研究突出了工作流（Workflow）和智能体（Agent）之间的一个关键区别：

- **工作流**：LLM 和相关工具通过显式预定义的代码路径进行编排的系统
- **智能体**：大语言模型（LLM）动态指导自身流程的系统，实时决策工具的使用以及实现目标所需的步骤

在 LangGraph 的背景下，工作流使用其状态图架构优雅地实现，允许开发人员以可视化和编程方式定义系统中不同组件之间的信息和控制流。

### 7.1.1 工作流的基础构建模块：增强型 LLM

现代 LLM 不仅仅是独立的模型，它们通过一系列功能得到增强，这些功能使它们能够与世界互动并执行超出简单文本生成的复杂任务。核心增强功能通常包括：

- **检索（Retrieval）**：允许 LLM 访问和整合来自外部来源的信息
- **工具（Tools）**：使 LLM 能够与外部系统交互并在现实世界中执行操作
- **记忆（Memory）**：允许 LLM 保留和利用来自过去交互或工作流步骤的信息

这些增强功能与核心 LLM 协同工作，构成了构建复杂工作流和智能体的基础。

### 7.1.2 提示链（Prompt Chaining）

提示链是一种基本的工作流模式，专注于将复杂任务分解为一系列更简单、相互关联的步骤。在这种工作流中，一个 LLM 调用的输出成为后续调用的输入，从而创建一系列处理阶段。

提示链的主要优势在于它能够通过简化每个 LLM 调用来提高准确性。通过将复杂任务分解为更小、更易于管理的子任务，每个 LLM 调用都会获得更集中且更明确的提示。

##### 示例 7-1：基于 Graph API 的提示链工作流实现

首先导入必要的 LangGraph 和 LangChain 组件，并实现一个笑话生成和改进的提示链：

In [6]:
from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from llm_utils import llm

# 使用 TypedDict 定义图状态，用于类型提示和状态管理
class State(TypedDict):
    topic: str
    joke: str
    improved_joke: str
    final_joke: str

# 图中的节点，每个节点代表提示链中的一个步骤
def generate_joke(state: State):
    """第一个 LLM 调用，根据主题生成初始笑话"""
    msg = llm.invoke(f"写一个关于 {state['topic']} 的简短笑话") # 使用状态中的主题调用 LLM
    return {"joke": msg.content} # 返回生成的笑话，更新状态中的 'joke' 键

def check_punchline(state: State):
    """门控函数，检查笑话是否有妙语"""
    # 简单检查 - 笑话是否包含 "?" 或 "!" 作为妙语存在的代理
    if "?" in state["joke"] or "!" in state["joke"]:
        return "Fail" # 笑话未能通过妙语检查
    return "Pass" # 笑话通过妙语检查

def improve_joke(state: State):
    """第二个 LLM 调用，通过添加文字游戏来改进笑话"""
    msg = llm.invoke(f"通过添加文字游戏使这个笑话更有趣：{state['joke']}") # 调用 LLM 来改进笑话
    return {"improved_joke": msg.content} # 返回改进后的笑话，更新状态中的 'improved_joke'

def polish_joke(state: State):
    """第三个 LLM 调用，用于最终润色，添加令人惊讶的转折"""
    msg = llm.invoke(f"为这个笑话添加一个令人惊讶的转折：{state['improved_joke']}") # 调用 LLM 来润色笑话
    return {"final_joke": msg.content} # 返回润色后的笑话，更新状态中的 'final_joke'

# 使用 StateGraph 构建工作流，使用定义的状态进行初始化
workflow = StateGraph(State)

# 将节点添加到工作流图中，将它们与定义的函数关联起来
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)

# 定义边缘以连接节点并建立工作流序列
workflow.add_edge(START, "generate_joke") # 开始节点连接到 'generate_joke' 节点
workflow.add_conditional_edges(
    "generate_joke", check_punchline, {"Pass": "improve_joke", "Fail": END} # 'generate_joke' 之后的条件边缘，基于 'check_punchline' 输出
)
workflow.add_edge("improve_joke", "polish_joke") # 'improve_joke' 节点连接到 'polish_joke' 节点
workflow.add_edge("polish_joke", END) # 'polish_joke' 节点连接到结束节点

# 将工作流图编译为可执行链
chain = workflow.compile()


# 使用初始状态（主题："cats"）调用编译链
state = chain.invoke({"topic": "cats"})
print("初始笑话：")
print(state["joke"])

if "improved_joke" in state: # 检查 'improved_joke' 是否存在于状态中，指示妙语检查失败
    print("\n改进后的笑话：")
    print(state["improved_joke"])
    print("\n最终笑话：")
    print(state["final_joke"])

# 保存图像
from draw import save_graph_as_png
save_graph_as_png(chain, "./graphs/c7/augumented_chain.png")


初始笑话：
为什么猫咪不喜欢上网？

因为它们怕“鼠”标！

改进后的笑话：
为什么猫咪不喜欢上网？

因为它们怕“鼠”标，但它们更喜欢“点”心，尤其是“喵”点心！

最终笑话：
为什么猫咪不喜欢上网？

因为它们怕“鼠”标，但它们更喜欢“点”心，尤其是“喵”点心！然而，有一天，一只猫咪意外打开了一个关于“养生”的网站，结果它发现只要虚拟点击就能获得无限的“鱼”干和“奶”瓶，顿时就成了网上购物的狂欢者，彻底抛弃了对“鼠”标的恐惧！


**💡 核心概念解析**：

在这段代码中，我们展示了提示链的关键特性：

- **顺序处理**：每个步骤的输出成为下一个步骤的输入
- **门控机制**：`check_punchline` 函数作为质量控制检查点
- **条件分支**：根据门控结果决定是直接结束还是继续改进
- **状态管理**：使用 TypedDict 清晰定义数据流

##### 示例 7-2：基于 Functional API 的提示链工作流实现

接下来让我们看看如何使用 LangGraph 的 Functional API 实现相同的提示链逻辑：

In [None]:
from langchain_openai import ChatOpenAI
from langgraph.func import entrypoint, task

llm = ChatOpenAI(model="Qwen/Qwen3-8B")

# 使用 @task 装饰器定义的任务，代表工作流中的步骤
@task
def generate_joke(topic: str):
    """第一个 LLM 调用，生成初始笑话"""
    msg = llm.invoke(f"写一个关于 {topic} 的简短笑话") # 调用 LLM 以根据主题生成笑话
    return msg.content # 返回生成的笑话

def check_punchline(joke: str):
    """门控函数，检查笑话是否有妙语"""
    # 简单检查 - 笑话是否包含 "?" 或 "!"
    if "?" in joke or "!" in joke:
        return "Fail" # 笑话未能通过妙语检查
    return "Pass" # 笑话通过妙语检查

@task
def improve_joke(joke: str):
    """第二个 LLM 调用，改进笑话"""
    msg = llm.invoke(f"通过添加文字游戏使这个笑话更有趣：{joke}") # 调用 LLM 以改进笑话
    return msg.content # 返回改进后的笑话

@task
def polish_joke(joke: str):
    """第三个 LLM 调用，用于最终润色"""
    msg = llm.invoke(f"为这个笑话添加一个令人惊讶的转折：{joke}") # 调用 LLM 以润色笑话
    return msg.content # 返回润色后的笑话

# 入口点装饰函数使用 Functional API 定义工作流
@entrypoint()
def workflow(topic: str):
    original_joke = generate_joke(topic).result() # 执行 'generate_joke' 任务
    if check_punchline(original_joke) == "Pass": # 基于 'check_punchline' 输出的条件检查
        return original_joke # 如果妙语检查通过，则返回原始笑话

    improved_joke = improve_joke(original_joke).result() # 如果妙语检查失败，则执行 'improve_joke' 任务
    return polish_joke(improved_joke).result() # 执行 'polish_joke' 任务并返回最终结果

# 调用工作流
state = workflow.invoke("cats")
print(state)

**💡 Functional API 的优势**：

- **更简洁的代码**：使用 `@task` 装饰器和函数式编程风格
- **类型安全**：函数参数提供清晰的类型提示
- **易于测试**：每个任务都是独立的函数，便于单元测试
- **灵活的控制流**：使用 Python 的原生控制结构

### 7.1.3 路由 (Routing)

路由工作流旨在通过对输入进行分类并将其定向到专门的下游任务来处理各种输入。当处理需要处理各种输入类型的复杂应用程序时，此模式尤其有价值，每种输入类型都需要不同的处理方法。

路由背后的核心思想是实施决策步骤，该步骤分析输入并确定最合适的后续处理路径。这允许关注点分离，从而可以为每个输入类别开发更集中和优化的提示和流程。

##### 示例 7-3：基于 Graph API 的路由工作流实现

让我们实现一个智能内容路由器，能够根据用户输入生成故事、笑话或诗歌：

In [7]:
import json

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langchain_core.output_parsers import StrOutputParser
from llm_utils import llm

# 路由工作流的状态定义
class State(TypedDict):
    input: str
    decision: str
    output: str

# 图中的节点，每个节点处理特定的路由 (story, joke, poem)
def llm_call_1(state: State):
    """写一个故事"""
    result = llm.invoke(state["input"]) # 调用 LLM 根据输入写一个故事
    return {"output": result.content} # 返回故事，更新状态中的 'output'

def llm_call_2(state: State):
    """写一个笑话"""
    result = llm.invoke(state["input"]) # 调用 LLM 根据输入写一个笑话
    return {"output": result.content} # 返回笑话，更新状态中的 'output'

def llm_call_3(state: State):
    """写一首诗"""
    result = llm.invoke(state["input"]) # 调用 LLM 根据输入写一首诗
    return {"output": result.content} # 返回诗歌，更新状态中的 'output'

def llm_call_router(state: State):
    """使用结构化输出将输入路由到适当的节点"""
    # 使用结构化输出调用增强型 LLM，以充当路由逻辑
    
    # 创建链式调用
    chain = llm | StrOutputParser()
    
    response = chain.invoke([
        SystemMessage(
            content="You are a router that directs user input to the appropriate handler. Return a JSON object with a 'step' key and one of these values: 'story', 'joke', or 'poem'. For example: {'step': 'joke'}"
        ),
        HumanMessage(content=state["input"]),
    ])
    
    # 解析 JSON 字符串
    try:
        decision = json.loads(response) # 这行代码的作用是将 JSON 格式的字符串解析为 Python 对象（通常是字典或列表）
        return {"decision": decision["step"]}
    except (json.JSONDecodeError, KeyError):
        # 如果解析失败，默认返回 joke 路由
        return {"decision": "joke"}

# 条件边缘函数，根据决策路由到适当的节点
def route_decision(state: State):
    # 根据状态中的 'decision' 返回您想要访问的下一个节点名称
    if state["decision"] == "story":
        return "llm_call_1"
    elif state["decision"] == "joke":
        return "llm_call_2"
    elif state["decision"] == "poem":
        return "llm_call_3"

# 使用 StateGraph 构建路由工作流
router_builder = StateGraph(State)

# 将节点添加到图中
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)

# 定义边缘以连接节点并建立路由逻辑
router_builder.add_edge(START, "llm_call_router") # 开始节点连接到路由器节点
router_builder.add_conditional_edges(
    "llm_call_router",
    route_decision,
    {  # 由 route_decision 返回的名称：要访问的下一个节点的名称
        "llm_call_1": "llm_call_1",
        "llm_call_2": "llm_call_2",
        "llm_call_3": "llm_call_3",
    },
) # 从路由器到专用节点的条件边缘，基于路由决策
router_builder.add_edge("llm_call_1", END) # 专用节点连接到结束节点
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)

# 编译路由工作流图
router_workflow = router_builder.compile()

# 使用示例输入调用路由工作流
state = router_workflow.invoke({"input": "给我写一个关于猫的笑话"})
print(state["output"]) 
# 保存图像
# 保存 Mermaid 生成的 PNG 图像到文件
from draw import save_graph_as_png
save_graph_as_png(router_workflow, "./graphs/c7/7-3router_workflow.png")

为什么猫总是喜欢坐在电脑键盘上？

因为它们想要掌控“鼠标”!


**💡 路由机制关键特性**：

- **智能分类**：路由器节点使用 LLM 的理解能力来分析输入意图
- **结构化输出**：使用 JSON 格式确保路由决策的可靠性
- **专门处理**：每个路由目标都有专门的处理逻辑
- **可扩展性**：易于添加新的路由目标和处理器

##### 示例 7-4：基于 Functional API 的路由工作流实现

让我们用 Functional API 实现相同的路由逻辑：

In [None]:
import json

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.func import entrypoint, task

llm = ChatOpenAI(model="Qwen/Qwen2.5-7B-Instruct")

@task
def llm_call_1(input: str):
    """写一个故事"""
    result = llm.invoke(input) # 调用 LLM 根据输入写一个故事
    return result.content # 返回故事

@task
def llm_call_2(input: str):
    """写一个笑话"""
    result = llm.invoke(input) # 调用 LLM 根据输入写一个笑话
    return result.content # 返回笑话

@task
def llm_call_3(input: str):
    """写一首诗"""
    result = llm.invoke(input) # 调用 LLM 根据输入写一首诗
    return result.content # 返回诗歌

def llm_call_router(input: str):
    """使用结构化输出将输入路由到适当的节点"""
    # 使用结构化输出调用增强型 LLM，以充当路由逻辑
    model = ChatOpenAI(model="Qwen/Qwen2.5-7B-Instruct", model_kwargs={ "response_format": { "type": "json_object" } })
    ai_msg = model.invoke(
        [
            SystemMessage(
                content="You are a router that directs user input to the appropriate handler. Return a JSON object with a 'step' key and one of these values: 'story', 'joke', or 'poem'. For example: {'step': 'joke'}" # 路由 LLM 的系统消息
            ),
            HumanMessage(content=input), # 用户输入消息
        ]
    )
    decision = json.loads(ai_msg.content)
    return {"decision": decision["step"]} # 返回路由决策

# 入口点装饰函数定义路由工作流
@entrypoint()
def router_workflow(input: str):
    next_step = llm_call_router(input)["decision"] # 获取路由决策的 'decision' 值
    llm_call = None # 初始化 llm_call 变量
    
    if next_step == "story": # 基于路由器决策的条件路由
        llm_call = llm_call_1 # 如果路由是 'story'，则分配 'llm_call_1' 任务
    elif next_step == "joke":
        llm_call = llm_call_2 # 如果路由是 'joke'，则分配 'llm_call_2' 任务
    elif next_step == "poem":
        llm_call = llm_call_3 # 如果路由是 'poem'，则分配 'llm_call_3' 任务
        
    if llm_call is None:
        raise ValueError(f"Invalid routing decision: {next_step}")
        
    return llm_call(input) # 执行选定的 LLM 调用任务并返回结果

# 调用路由工作流
for step in router_workflow.stream("给我写一个关于猫的笑话", stream_mode="updates"):
    print(step)
    print("\n")

### 7.1.4 并行化 (Parallelization)

并行化是另一种工作流模式，它利用增强型 LLM 同时处理任务不同方面的能力。并行化不是按顺序处理任务，而是允许同时进行 LLM 调用，其输出稍后以编程方式聚合。

这种方法可以体现在两个主要变体中：
- **分段 (Sectioning)**：将任务分解为可以并行执行的独立子任务
- **投票 (Voting)**：多次运行相同的任务以获得更稳健和可靠的结果

并行化的主要好处是提高了效率，尤其是在子任务真正独立并且可以并发处理的情况下。

##### 示例 7-5：基于 Graph API 的并行化工作流实现

让我们实现一个并行内容生成工作流，同时生成故事、笑话和诗歌：

In [None]:
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

from llm_utils import llm
# 并行工作流的图状态定义
class State(TypedDict):
    topic: str
    joke: str
    story: str
    poem: str
    combined_output: str



# 图中的节点，每个节点并行生成不同类型的内容
def call_llm_1(state: State):
    """第一个 LLM 调用，生成初始笑话"""
    msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话") # 调用 LLM 以根据主题写一个笑话
    return {"joke": msg.content} # 返回笑话，更新状态中的 'joke'

def call_llm_2(state: State):
    """第二个 LLM 调用，生成故事"""
    msg = llm.invoke(f"写一个关于 {state['topic']} 的故事") # 调用 LLM 以根据主题写一个故事
    return {"story": msg.content} # 返回故事，更新状态中的 'story'

def call_llm_3(state: State):
    """第三个 LLM 调用，生成诗歌"""
    msg = llm.invoke(f"写一首关于 {state['topic']} 的诗歌") # 调用 LLM 以根据主题写一首诗
    return {"poem": msg.content} # 返回诗歌，更新状态中的 'poem'

def aggregator(state: State):
    """将笑话、故事和诗歌组合成单个输出"""
    combined = f"这是一个关于 {state['topic']} 的故事、笑话和诗歌！\n\n" # 开始组合输出
    combined += f"故事：\n{state['story']}\n\n" # 将故事添加到组合输出
    combined += f"笑话：\n{state['joke']}\n\n" # 将笑话添加到组合输出
    combined += f"诗歌：\n{state['poem']}" # 将诗歌添加到组合输出
    return {"combined_output": combined} # 返回组合输出，更新状态中的 'combined_output'

# 使用 StateGraph 构建并行工作流
parallel_builder = StateGraph(State)

# 将节点添加到图中
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)

# 定义边缘以连接节点并建立并行执行
parallel_builder.add_edge(START, "call_llm_1") # 开始节点连接到 'call_llm_1' 以进行并行执行
parallel_builder.add_edge(START, "call_llm_2") # 开始节点连接到 'call_llm_2' 以进行并行执行
parallel_builder.add_edge(START, "call_llm_3") # 开始节点连接到 'call_llm_3' 以进行并行执行
parallel_builder.add_edge("call_llm_1", "aggregator") # 'call_llm_1' 节点在完成后连接到聚合器
parallel_builder.add_edge("call_llm_2", "aggregator") # 'call_llm_2' 节点在完成后连接到聚合器
parallel_builder.add_edge("call_llm_3", "aggregator") # 'call_llm_3' 节点在完成后连接到聚合器
parallel_builder.add_edge("aggregator", END) # 聚合器节点连接到结束节点

# 编译并行工作流图
parallel_workflow = parallel_builder.compile()

# 使用示例输入调用并行工作流
state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"]) 

# 保存图像
from draw import save_graph_as_png
save_graph_as_png(parallel_workflow, "./graphs/c7/Parallelization_workflow.png")

这是一个关于 cats 的故事、笑话和诗歌！

故事：
在一个宁静的小镇上，住着一只名叫小白的猫。小白是一只可爱的白色长毛猫，眼睛像两颗绿宝石，她的毛发在阳光下闪闪发光。小白每天都在小镇的街道上漫步，探索每一个角落，跟镇上的人们打招呼。

小白最喜欢的地方是镇中心的一家小咖啡馆，咖啡馆的老板是一位和蔼的老奶奶，她每次看到小白都会给她一小碗牛奶。小白不仅喜欢牛奶，更喜欢坐在咖啡馆的窗边，观察街上来来往往的人和车，听人们的欢声笑语。

有一天，小白在咖啡馆的窗边，突然注意到一个小女孩坐在街角，神情沮丧。小白心里一动，决定去看看发生了什么事。她轻巧地跳下窗台，走到小女孩的身边。小女孩抬起头，看到小白，脸上露出了微微的笑容。

“你好，小猫咪。”小女孩轻声说道，“你知道吗？我丢了我的风筝。”她指着天空，几朵白云正悠悠荡荡，风筝早已飞得不见踪影。

小白听了，心里想：“要是我能帮助她找到风筝该多好！”于是，她开始在周围四处探索，试图找到风筝的踪迹。小白穿过花坛、爬上了矮墙，把目光投向更高的地方，希望能找到那只风筝。

就在这时，小白在一棵大树的树枝上发现了那个五彩斑斓的风筝！她兴奋地“喵喵”叫了起来，吸引了路过的行人。人们看到小白在树下仰望，纷纷停下脚步，跟随她的目光。

小女孩听到小白的叫声，抬头一看，立刻惊喜地跳了起来：“我的风筝！谢谢你，小猫咪！”她连忙跑到树下，和周围的人一起想办法把风筝拿下来。最后，几位好心的邻居用长杆将风筝轻松地取了下来。

小女孩高兴地抱住了风筝，脸上洋溢着灿烂的笑容。她蹲下身，轻轻抚摸着小白的头：“谢谢你，小白，你真是一只了不起的猫咪！”小白也开心地用头蹭了蹭小女孩的手，心里暖暖的。

从那天起，小白和小女孩成了好朋友。每天，小女孩都会带着小白一起到咖啡馆，和她分享美味的点心和牛奶。小白也变得更加开朗，陪伴着小女孩度过了许多个快乐的日子。

小镇上的人们都知道了小白和小女孩的故事，他们都说：“这只猫咪不仅是我们的朋友，还是小女孩的幸运星！”小白在小镇上留下了一段美好的回忆，也明白了友谊的珍贵。

笑话：
为什么猫咪总是坐在电脑键盘上？

因为它们喜欢“按猫”键！

诗歌：
在静谧的夜晚，月光柔和，  
小猫悄然漫步，似乎在歌唱。  
它们的眼睛明亮，璀璨如星，  
在黑幕中闪烁，若隐若现的灵。

毛发如丝绸般，温暖又轻柔，  
每个轻巧的步伐，

**💡 并行化的关键优势**：

- **效率提升**：多个 LLM 调用同时执行，减少总体等待时间
- **独立处理**：每个任务专注于特定方面，避免上下文混乱
- **结果聚合**：通过聚合器节点组合所有结果
- **可扩展性**：易于添加更多并行处理分支

##### 示例 7-6：基于 Functional API 的并行化工作流实现

让我们用 Functional API 实现相同的并行处理逻辑：

In [None]:
from langchain_openai import ChatOpenAI
from langgraph.func import entrypoint, task

llm = ChatOpenAI(model="Qwen/Qwen3-8B")

@task
def call_llm_1(topic: str):
    """第一个 LLM 调用，生成初始笑话"""
    msg = llm.invoke(f"写一个关于 {topic} 的笑话") # 调用 LLM 以根据主题写一个笑话
    return msg.content # 返回笑话

@task
def call_llm_2(topic: str):
    """第二个 LLM 调用，生成故事"""
    msg = llm.invoke(f"写一个关于 {topic} 的故事") # 调用 LLM 以根据主题写一个故事
    return msg.content # 返回故事

@task
def call_llm_3(topic):
    """第三个 LLM 调用，生成诗歌"""
    msg = llm.invoke(f"写一首关于 {topic} 的诗歌") # 调用 LLM 以根据主题写一首诗
    return msg.content # 返回诗歌

@task
def aggregator(topic, joke, story, poem):
    """将笑话和故事组合成单个输出"""
    combined = f"这是一个关于 {topic} 的故事、笑话和诗歌！\n\n" # 开始组合输出
    combined += f"故事：\n{story}\n\n" # 将故事添加到组合输出
    combined += f"笑话：\n{joke}\n\n" # 将笑话添加到组合输出
    combined += f"诗歌：\n{poem}" # 将诗歌添加到组合输出
    return combined # 返回组合输出

# 入口点装饰函数定义并行工作流
@entrypoint()
def parallel_workflow(topic: str):
    joke_fut = call_llm_1(topic) # 执行 'call_llm_1' 任务并获取期货以进行并行执行
    story_fut = call_llm_2(topic) # 执行 'call_llm_2' 任务并获取期货以进行并行执行
    poem_fut = call_llm_3(topic) # 执行 'call_llm_3' 任务并获取期货以进行并行执行
    return aggregator(
        topic, joke_fut.result(), story_fut.result(), poem_fut.result() # 在所有并行任务完成后执行 'aggregator' 任务
    ).result() # 从聚合器获取最终结果

# 调用并行工作流
for step in parallel_workflow.stream("cats", stream_mode="updates"):
    print(step)
    print("\n")

**💡 Functional API 中的并行处理**：

- **Future 对象**：通过 `.result()` 方法等待异步任务完成
- **简洁语法**：并行执行通过同时启动多个任务实现
- **自动同步**：聚合器会等待所有前置任务完成

### 7.1.5 协调器-工作者 (Orchestrator-Worker)

协调器-工作者工作流模式专为子任务需求事先未知且需要在执行期间动态确定的复杂任务而设计。在此模式中，中央增强型 LLM 充当"协调器 (Orchestrator)"，负责将初始任务分解为更小、更易于管理的子任务，并将这些子任务委派给"工作者 (Worker)"增强型 LLM。

此工作流特别适用于难以或不可能预先预测必要子任务的复杂场景，例如编码任务、复杂搜索任务等。

##### 示例 7-7：基于 Graph API 的"协调器-工作者"工作流实现

让我们实现一个报告生成的协调器-工作者系统：

In [None]:
import operator
from typing import Annotated, List, TypedDict
from pydantic import BaseModel, Field

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send


# 用于结构化输出的模式，用于规划报告章节
class Section(BaseModel):
    name: str = Field(
        description="报告章节的名称", # 报告章节的名称
    )
    description: str = Field(
        description="本章节中要涵盖的主要主题和概念的简要概述", # 章节内容的描述
    )

class Sections(BaseModel):
    sections: List[Section] = Field(
        description="报告的章节", # 报告章节列表
    )

# 用于规划报告章节的增强型 LLM，使用结构化输出
llm = ChatOpenAI(model="Qwen/Qwen2.5-7B-Instruct")
planner = llm.with_structured_output(Sections, method="function_calling")

# 协调器-工作者工作流的图状态定义
class State(TypedDict):
    topic: str  # 报告主题
    sections: list[Section]  # 由协调器规划的报告章节列表
    completed_sections: Annotated[
        list, operator.add
    ]  # 所有工作者并行写入此键，使用 operator.add 进行列表连接
    final_report: str  # 最终合成报告


# 工作者状态定义，特定于工作者节点
class WorkerState(TypedDict):
    section: Section
    completed_sections: Annotated[list, operator.add] # 工作者也写入共享的 'completed_sections' 键

# 图中的节点
def orchestrator(state: State):
    """协调器，使用结构化输出生成报告计划"""
    # 使用 planner LLM 和结构化输出生成报告章节计划
    report_sections = planner.invoke(
        [
            SystemMessage(content="生成报告计划。"), # planner LLM 的系统消息
            HumanMessage(content=f"这是报告主题：{state['topic']}"), # 包含报告主题的用户输入消息
        ]
    )
    return {"sections": report_sections.sections} # 返回计划的章节，更新状态中的 'sections'

def llm_call(state: WorkerState):
    """工作者根据分配的章节详细信息编写报告章节"""
    # 使用 LLM 根据章节名称和描述生成报告章节内容
    section = llm.invoke(
        [
            SystemMessage(
                content="按照提供的名称和描述编写报告章节。每节不包含序言。使用 markdown 格式。" # 工作者 LLM 的系统消息
            ),
            HumanMessage(
                content=f"这是章节名称：{state['section'].name} 和描述：{state['section'].description}" # 包含章节详细信息的用户消息
            ),
        ]
    )
    # 将生成的章节内容写入共享的 'completed_sections' 键
    return {"completed_sections": [section.content]}

def synthesizer(state: State):
    """从各个章节输出合成完整报告"""
    # 从共享状态检索已完成章节的列表
    completed_sections = state["completed_sections"]

    # 将已完成章节格式化为单个字符串以用于最终报告
    completed_report_sections = "\n\n---\n\n".join(completed_sections)

    return {"final_report": completed_report_sections} # 返回最终报告，更新状态中的 'final_report'

# 条件边缘函数，用于将工作者动态分配给计划中的每个章节
def assign_workers(state: State):
    """使用 Send API 将工作者分配给计划中的每个章节，以实现动态工作者创建"""
    # 使用 Send API 为每个章节动态创建和发送 'llm_call' 工作者节点
    return [Send("llm_call", {"section": s}) for s in state["sections"]]

# 使用 StateGraph 构建协调器-工作者工作流
orchestrator_worker_builder = StateGraph(State)

# 将节点添加到图中
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call) # 工作者节点
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

# 定义边缘以连接节点并建立协调器-工作者流程
orchestrator_worker_builder.add_edge(START, "orchestrator") # 开始节点连接到协调器
orchestrator_worker_builder.add_conditional_edges(
    "orchestrator", assign_workers, ["llm_call"] # 从协调器到使用 Send API 动态创建的工作者节点的条件边缘
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer") # 工作者节点在完成后连接到合成器
orchestrator_worker_builder.add_edge("synthesizer", END) # 合成器节点连接到结束节点

# 编译协调器-工作者工作流图
orchestrator_worker = orchestrator_worker_builder.compile()

# 使用示例报告主题调用协调器-工作者工作流
state = orchestrator_worker.invoke({"topic": "创建关于 LLM 缩放定律的报告"})

from IPython.display import Markdown
Markdown(state["final_report"]) # 以 Markdown 格式显示最终报告

**💡 协调器-工作者的关键特性**：

- **动态任务分解**：协调器根据输入动态生成子任务
- **Send API**：允许运行时动态创建工作者节点
- **共享状态**：使用 `operator.add` 聚合多个工作者的输出
- **结构化输出**：使用 Pydantic 模型确保任务规划的一致性
- **灵活扩展**：可以根据任务复杂性创建任意数量的工作者

##### 示例 7-8：基于 Functional API 的"协调器-工作者"工作流实现

让我们用 Functional API 实现相同的协调器-工作者逻辑：

In [None]:
from typing import List
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.func import entrypoint, task


# 用于结构化输出的模式，用于规划报告章节
class Section(BaseModel):
    name: str = Field(
        description="报告章节的名称", # 报告章节的名称
    )
    description: str = Field(
        description="本章节中要涵盖的主要主题和概念的简要概述", # 章节内容的描述
    )

class Sections(BaseModel):
    sections: List[Section] = Field(
        description="报告的章节", # 报告章节列表
    )

# 用于规划报告章节的增强型 LLM，使用结构化输出
llm = ChatOpenAI(model="Qwen/Qwen2.5-7B-Instruct")
planner = llm.with_structured_output(Sections, method="function_calling")

@task
def orchestrator(topic: str):
    """协调器，使用结构化输出生成报告计划"""
    # 使用 planner LLM 和结构化输出生成报告章节计划
    report_sections = planner.invoke(
        [
            SystemMessage(content="生成报告计划。"), # planner LLM 的系统消息
            HumanMessage(content=f"这是报告主题：{topic}"), # 包含报告主题的用户消息
        ]
    )
    return report_sections.sections # 返回计划的章节

@task
def llm_call(section: Section):
    """工作者根据分配的章节详细信息编写报告章节"""
    # 使用 LLM 根据章节名称和描述生成报告章节内容
    result = llm.invoke(
        [
            SystemMessage(content="编写报告章节。"), # 工作者 LLM 的系统消息
            HumanMessage(
                content=f"这是章节名称：{section.name} 和描述：{section.description}" # 包含章节详细信息的用户消息
            ),
        ]
    )
    return result.content # 返回生成的章节内容

@task
def synthesizer(completed_sections: list[str]):
    """从各个章节输出合成完整报告"""
    # 将已完成章节格式化为单个字符串以用于最终报告
    final_report = "\n\n---\n\n".join(completed_sections)
    return final_report # 返回最终报告

# 入口点装饰函数定义协调器-工作者工作流
@entrypoint()
def orchestrator_worker(topic: str):
    sections = orchestrator(topic).result() # 执行协调器任务以获取报告章节计划
    section_futures = [llm_call(section) for section in sections] # 并行动态创建和执行每个章节的工作者任务
    final_report = synthesizer(
        [section_fut.result() for section_fut in section_futures] # 在所有工作者任务完成后执行合成器任务
    ).result() # 获取最终合成报告

    return final_report # 返回最终报告

# 使用示例报告主题调用协调器-工作者工作流
report = orchestrator_worker.invoke("创建关于 LLM 缩放定律的报告")

from IPython.display import Markdown
Markdown(report) # 以 Markdown 格式显示最终报告

### 7.1.6 评估器-优化器 (Evaluator-Optimizer)

"评估器-优化器"工作流体现了一种迭代改进过程，模仿了人类通常通过反馈和修订来改进其工作的方式。在此模式中，一个增强型 LLM 调用负责生成初始响应，而另一个增强型 LLM 调用（"评估器 (Evaluator)"）的任务是提供对此响应的反馈。

可以重复进行生成、评估和反馈的循环多次，直到获得令人满意的结果或达到预定义的迭代次数。当存在可以明确表达和评估的明确评估标准，并且迭代改进能够显著增加输出价值时，评估器-优化器工作流尤其有效。

##### 示例 7-9：基于 Graph API 的"评估器-优化器"工作流实现

让我们实现一个笑话生成和改进的评估器-优化器系统：

In [9]:
from typing_extensions import TypedDict, Literal
from pydantic import BaseModel, Field
from llm_utils import llm

from langgraph.graph import StateGraph, START, END


# 用于结构化输出的模式，用于评估，定义反馈结构
class Feedback(BaseModel):
    grade: Literal["funny", "not funny"] = Field(
        description="判断笑话是否有趣。", # 评估等级：有趣或不好笑
    )
    feedback: str = Field(
        description="如果笑话不好笑，请提供有关如何改进它的反馈。", # 如果笑话不好笑，则提供关于如何改进它的反馈
    )

# 用于评估的增强型 LLM，配置为输出 Feedback 模式
evaluator = llm.with_structured_output(Feedback)

# 评估器-优化器工作流的图状态定义
class State(TypedDict):
    joke: str
    topic: str
    feedback: str
    funny_or_not: str

# 图中的节点
def llm_call_generator(state: State):
    """LLM 生成笑话，可能会结合之前评估的反馈"""
    if state.get("feedback"): # 检查状态中是否存在反馈，指示之前的评估
        msg = llm.invoke(
            f"写一个关于 {state['topic']} 的笑话，但要考虑反馈：{state['feedback']}" # 调用 LLM 生成笑话，结合反馈
        )
    else:
        msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话") # 调用 LLM 生成初始笑话，不带反馈
    return {"joke": msg.content} # 返回生成的笑话，更新状态中的 'joke'

def llm_call_evaluator(state: State):
    """LLM 使用结构化输出评估生成的笑话"""
    grade = evaluator.invoke(f"评价笑话 {state['joke']}") # 调用评估器 LLM 来评价笑话
    return {"funny_or_not": grade.grade, "feedback": grade.feedback} # 返回评估等级和反馈，更新状态中的 'funny_or_not' 和 'feedback'

# 条件边缘函数，用于根据评估结果进行路由，创建反馈循环
def route_joke(state: State):
    """根据评估器的反馈，路由回笑话生成器或结束"""
    if state["funny_or_not"] == "funny": # 检查笑话是否被评估为有趣
        return "Accepted" # 如果笑话被接受，则路由到结束
    elif state["funny_or_not"] == "not funny": # 检查笑话是否被评估为不好笑
        return "Rejected + Feedback" # 如果笑话被拒绝，则路由回生成器以结合反馈

# 构建评估器-优化器工作流，使用 StateGraph
optimizer_builder = StateGraph(State)

# 将节点添加到图中
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)

# 定义边缘以连接节点并建立反馈循环
optimizer_builder.add_edge(START, "llm_call_generator") # 开始节点连接到笑话生成器
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator") # 生成器节点连接到评估器节点
optimizer_builder.add_conditional_edges(
    "llm_call_evaluator",
    route_joke,
    {  # 由 route_joke 返回的名称：要访问的下一个节点的名称
        "Accepted": END, # 如果笑话被接受，则路由到结束
        "Rejected + Feedback": "llm_call_generator", # 如果笑话被拒绝，则路由回生成器，创建反馈循环
    },
)

# 编译评估器-优化器工作流图
optimizer_workflow = optimizer_builder.compile()

# 使用示例主题调用评估器-优化器工作流
state = optimizer_workflow.invoke({"topic": "sex"})
print(state["joke"]) 

# 保存 Mermaid 生成的 PNG 图像到文件
# from draw import save_graph_as_png
# save_graph_as_png(optimizer_workflow, "./graphs/c7/evaluator_optimizer_workflow.png")

好的，以下是一个关于性的话题笑话，结合了比喻和双关语：

为什么健身房的情侣总是很少？

因为他们一直在“锻炼感情”，但每次都把“重心”放在了举重上，没时间去“抬头”看看彼此的心意！


**💡 评估器-优化器的关键机制**：

- **迭代改进**：通过反馈循环不断优化输出质量
- **结构化评估**：使用 Pydantic 模型确保评估的一致性
- **条件路由**：根据评估结果决定是否继续优化
- **状态记忆**：保持反馈信息以指导后续改进
- **质量门控**：只有达到标准的输出才会被接受

##### 示例 7-10：基于 Functional API 的"评估器-优化器"工作流实现

让我们用 Functional API 实现相同的评估器-优化器逻辑：

In [None]:
from typing_extensions import Literal
from pydantic import BaseModel, Field
from langgraph.func import entrypoint, task


# 用于结构化输出的模式，用于评估，定义反馈结构
class Feedback(BaseModel):
    grade: Literal["funny", "not funny"] = Field(
        description="判断笑话是否有趣。", # 评估等级：有趣或不好笑
    )
    feedback: str = Field(
        description="如果笑话不好笑，请提供有关如何改进它的反馈。", # 如果笑话不好笑，则提供关于如何改进它的反馈
    )

# 用于评估的增强型 LLM，使用结构化输出
evaluator = llm.with_structured_output(Feedback, method="function_calling")

# 工作流中的节点，定义为任务
@task
def llm_call_generator(topic: str, feedback: str = None):
    """LLM 生成笑话，可能会结合反馈"""
    if feedback: # 检查是否提供了反馈
        msg = llm.invoke(
            f"写一个关于 {topic} 的笑话，但要考虑反馈：{feedback}" # 调用 LLM 生成笑话，结合反馈
        )
    else:
        msg = llm.invoke(f"写一个关于 {topic} 的笑话") # 调用 LLM 生成初始笑话，不带反馈
    return msg.content # 返回生成的笑话

@task
def llm_call_evaluator(joke: str):
    """LLM 使用结构化输出评估生成的笑话"""
    feedback = evaluator.invoke(f"评价笑话 {joke}") # 调用评估器 LLM 来评价笑话
    return feedback # 返回评估反馈

# 入口点装饰函数定义评估器-优化器工作流
@entrypoint()
def optimizer_workflow(topic: str):
    feedback = None # 将反馈初始化为 None，用于第一次迭代
    max_iterations = 3 # 设置最大迭代次数以避免无限循环
    iteration = 0
    
    while iteration < max_iterations: # 迭代反馈循环的开始
        joke = llm_call_generator(topic, feedback).result() # 执行生成器任务以创建笑话
        evaluation = llm_call_evaluator(joke).result() # 执行评估器任务以评价笑话并获取反馈
        
        if evaluation.grade == "funny": # 检查笑话是否被评估为有趣
            return joke # 如果笑话有趣，则返回笑话
        
        feedback = evaluation.feedback # 更新反馈用于下一次迭代
        iteration += 1
        
    return joke # 如果达到最大迭代次数，返回最后的笑话

# 调用评估器-优化器工作流
result = optimizer_workflow.invoke("Cats")
print(result)

## 7.2 LangGraph 中的多智能体架构

随着我们在构建 AI 智能体方面不断进步，我们旨在解决的任务的复杂性通常需要超越单一的、单片式的智能体设计。多智能体系统的概念因此变得非常宝贵。我们不再依赖单个智能体来处理所有事情，而是将我们的 AI 应用程序分解为一组更小、更专注的智能体，每个智能体都具有特定的职责和专业知识。

多智能体系统的核心优势在于：

- **模块化**：将复杂的智能体分解为更小的、独立的智能体简化了开发、测试和维护
- **专业化**：使我们能够创建针对特定领域或任务量身定制的专家智能体
- **受控通信**：允许开发人员显式定义和管理智能体如何通信、交换信息以及协调其行动

LangGraph 为构建和编排多智能体系统提供了强大的框架。在本节中，我们将探索几种关键多智能体架构。

### 7.2.1 主管架构

主管（Supervisor）架构是多智能体系统中的核心模式，当需要明确的编排点来管理和指导任务流时，它尤其有效。在这种架构中，指定的主管智能体充当中央协调员的角色，监督和指导多个专业智能体的活动。

LangChain 团队新推出的 LangGraph Supervisor 类库提供了一种简化的方式来创建基于主管的多智能体系统。

##### 示例 7-11：使用 LangGraph Supervisor 类库搭建具有数学和研究智能体的主管架构

首先安装必要的库，然后构建一个包含数学专家和研究专家的主管系统：

In [None]:
# 首先安装 langgraph-supervisor 库（如果尚未安装）
# !pip install langgraph-supervisor

from langchain_openai import ChatOpenAI
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
from llm_utils import llm

# 1. 为专业智能体定义工具：
# 定义表示每个专业智能体工具的函数。
# 'add' 和 'multiply' 用于数学智能体，'web_search' 用于研究智能体。
def add(a: float, b: float) -> float:
    """添加两个数字。"""
    return a + b

def multiply(a: float, b: float) -> float:
    """乘两个数字。"""
    return a * b

def web_search(query: str) -> str:
    """在网络上搜索信息。"""
    return (
        "以下是 FAANG 公司 2024 年的员工人数：\n"
        "1. **Facebook (Meta)**: 67,317 名员工。\n"
        "2. **Apple**: 164,000 名员工。\n"
        "3. **Amazon**: 1,551,000 名员工。\n"
        "4. **Netflix**: 14,000 名员工。\n"
        "5. **Google (Alphabet)**: 181,269 名员工。"
    )

# 2. 使用 create_react_agent 创建专业智能体：
# 使用 LangGraph 的预构建 create_react_agent 创建每个专业智能体。
# 每个智能体都配置了特定的模型、工具、名称和提示。
math_agent = create_react_agent(
    model=llm,
    tools=[add, multiply], # 数学智能体的工具是 'add' 和 'multiply'
    name="math_expert", # 标识数学智能体的名称
    prompt="你是一名数学专家。始终一次使用一个工具。" # 指导数学智能体行为的提示
)

research_agent = create_react_agent(
    model=llm,
    tools=[web_search], # 研究智能体的工具是 'web_search'
    name="research_expert", # 标识研究智能体的名称
    prompt="你是一名世界一流的研究员，可以访问网络搜索。不要做任何数学运算。" # 指导研究智能体行为的提示
)

# 3. 使用 create_supervisor 创建主管工作流：
# 使用 LangGraph Supervisor 的 create_supervisor 创建主管工作流。
# 传递专业智能体列表、主管模型以及主管的提示。
workflow = create_supervisor(
    [research_agent, math_agent], # 由主管管理的专业智能体列表
    model=llm, # 主管智能体的模型
    prompt="你是一名团队主管，管理着一名研究专家和一名数学专家。研究专家能够利用网络搜索工具进行查询。", # 指导主管行为的提示
)

# 4. 编译并运行工作流：
# 将工作流编译为 LangGraph 应用程序，并使用用户消息调用它。
app = workflow.compile() # 将工作流编译为可执行的 LangGraph 应用程序
result = app.invoke({ # 使用用户消息调用已编译的应用程序
    "messages": [
        {
            "role": "user",
            "content": "2024 年 FAANG 公司的总员工人数是多少？" # 用户关于 FAANG 公司员工人数的查询
        }
    ]
})
print(result["messages"][-1].content)
# 保存图
from draw import save_graph_as_png
save_graph_as_png(app, "./graphs/c7/supervisor_workflow.png")

### 7.2.2 层次化架构 (Hierarchical Architecture)

层次化架构通过多层次的智能体组织来处理复杂任务。每层都有特定的职责，上层智能体负责高级决策，下层智能体执行具体任务。

**核心特点：**
- 多层次组织结构
- 清晰的职责分工
- 层次间的通信机制
- 递归的任务分解

##### 示例 7-12：具有研究和写作团队的分层主管系统

In [None]:
from langchain_openai import ChatOpenAI
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent

from llm_utils import llm as model

# --- 1. 定义研究团队智能体和主管 ---
# 定义研究团队（math_expert 和 research_expert）的工具和智能体
def add(a: float, b: float) -> float:
    """添加两个数字。"""
    return a + b

def multiply(a: float, b: float) -> float:
    """乘两个数字。"""
    return a * b

def web_search(query: str) -> str:
    """在网络上搜索信息。"""
    return (
        "以下是 FAANG 公司 2024 年的员工人数：\n"
        "1. **Facebook (Meta)**: 67,317 名员工。\n"
        "2. **Apple**: 164,000 名员工。\n"
        "3. **Amazon**: 1,551,000 名员工。\n"
        "4. **Netflix**: 14,000 名员工。\n"
        "5. **Google (Alphabet)**: 181,269 名员工。"
    )

math_agent = create_react_agent(
    model=model,
    tools=[add, multiply],
    name="math_expert",
    prompt="你是一名数学专家。"
)

research_agent = create_react_agent(
    model=model,
    tools=[web_search],
    name="research_expert",
    prompt="你是一名世界一流的研究员，可以访问网络搜索。不要做任何数学运算。"
)

research_team_supervisor = create_supervisor(
    [research_agent, math_agent], # 研究团队内的智能体
    model=model,
    prompt="你正在管理一个研究团队，该团队由研究和数学专家组成。研究专家能够利用网络搜索工具进行查询。", # 研究团队主管的提示
)
research_team = research_team_supervisor.compile(name="research_team") # 编译研究团队工作流并命名

# --- 2. 定义写作团队智能体和主管 ---
# 定义写作团队（writing_expert 和 publishing_expert）的工具和智能体
def write_report(topic: str) -> str:
    """撰写关于给定主题的报告。"""
    return f"关于 {topic} 的报告：... （报告的详细内容）"

def publish_report(report: str) -> str:
    """发布报告。"""
    return f"报告已发布：{report}"

writing_agent = create_react_agent(
    model=model,
    tools=[write_report],
    name="writing_expert",
    prompt="你是一名写作专家。"
)

publishing_agent = create_react_agent(
    model=model,
    tools=[publish_report],
    name="publishing_expert",
    prompt="你是一名出版专家。"
)

writing_team_supervisor = create_supervisor(
    [writing_agent, publishing_agent], # 写作团队内的智能体
    model=model,
    prompt="你正在管理一个写作团队，该团队由写作和出版专家组成。", # 写作团队主管的提示
)
writing_team = writing_team_supervisor.compile(name="writing_team") # 编译写作团队工作流并命名

# --- 3. 定义顶层主管 ---
# 定义顶层主管以管理研究团队和写作团队
top_level_supervisor_agent = create_supervisor(
    [research_team, writing_team], # 传递已编译的研究和写作团队工作流作为智能体
    model=model,
    prompt="你是一名顶层主管，管理着研究团队和写作团队。", # 顶层主管的提示
)
top_level_supervisor = top_level_supervisor_agent.compile(name="top_level_supervisor") # 编译顶层主管工作流并命名

# --- 4. 调用顶层主管 ---
# 使用用户查询调用顶层主管
result = top_level_supervisor.invoke({
    "messages": [
        {
            "role": "user",
            "content": "2024 年 FAANG 公司的总员工人数是多少？" # 用户查询
        }
    ]
})
print(result["messages"][-1].content)
# 保存图
from draw import save_graph_as_png
save_graph_as_png(top_level_supervisor, "./graphs/c7/multi_agent_hierarchy_workflow.png")

**💡 层次化架构关键特性**：

- **清晰的层次结构**：高级管理层、中级管理层、执行层各司其职
- **自上而下的任务分解**：从高级计划到详细任务再到具体执行
- **动态工作者分配**：根据规划结果动态创建工作者节点
- **层次间通信**：每层都有明确的输入输出接口
- **递归管理结构**：可以进一步扩展为更深层次的管理结构

### 7.2.3 网络架构 (Network Architecture)

网络架构（也称为群体架构 Swarm Architecture）允许智能体之间进行更灵活、自组织的交互。在这种架构中，智能体可以直接相互通信，形成动态的协作网络。

**核心特点：**
- 去中心化的协作模式
- 智能体间的直接通信
- 动态的角色分配
- 自适应的工作流程

##### 示例 7-13：Alice 和 Bob 的简单交互网络

In [None]:
# 首先安装 langgraph-swarm 库
# !pip install langgraph-swarm

from langchain_openai import ChatOpenAI

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langgraph_swarm import create_handoff_tool, create_swarm

# 这里推荐使用 SiliconCloud 平台上参赛量较大且支持工具调用的付费模型
model = ChatOpenAI(model="Qwen/Qwen3-8B")

def add(a: int, b: int) -> int:
    """添加两个数字"""
    return a + b

# 1. 创建专业智能体（Alice 和 Bob）：
# 使用 create_react_agent 定义两个专业智能体 Alice 和 Bob。
# Alice 是一位数学专家，拥有 'add' 工具和一个移交给 Bob 的移交工具。
alice = create_react_agent(
    model,
    [add, create_handoff_tool(agent_name="Bob")], # Alice 拥有 'add' 工具和移交给 Bob 的移交工具
    prompt="你是 Alice，一位加法专家，使用工具完成所有加法。",
    name="Alice",
)

# Bob 说话像个海盗，并拥有一个移交给 Alice 以寻求数学帮助的移交工具。
bob = create_react_agent(
    model,
    [create_handoff_tool(agent_name="Alice", description="务必将所有数学问题请转移给 Alice，她可以帮助解决数学问题")], # Bob 拥有一个移交给 Alice 的移交工具
    prompt="你是 Bob，你说话像个海盗。",
    name="Bob",
)

# 2. 创建用于对话记忆的内存检查点：
# InMemorySaver 用于短期记忆，这对于保持对话的连续性至关重要。
checkpointer = InMemorySaver()

# 3. 使用 create_swarm 创建 Swarm 工作流：
# create_swarm 函数使用 Alice 和 Bob 设置 swarm 架构。
# default_active_agent="Alice" 将 Alice 设置为新对话的起始智能体。
workflow = create_swarm(
    [alice, bob], # swarm 中的智能体列表
    default_active_agent="Alice" # 用于启动新对话的默认智能体
)

# 4. 使用检查点编译工作流：
# 编译 swarm 工作流，传递检查点以进行内存管理。
app = workflow.compile(checkpointer=checkpointer)

# 5. 在多轮对话中调用 Swarm：
# 多次调用 swarm，模拟对话线程。
# config={"configurable": {"thread_id": "1"}} 确保消息在同一对话线程中被跟踪。
config = {"configurable": {"thread_id": "1"}}
turn_1 = app.invoke( # 第一轮 - 用户想和 Bob 说话
    {"messages": [{"role": "user", "content": "我想和 Bob 说话"}]},
    config,
)
print(turn_1["messages"][-1].content) # 第一轮的输出
turn_2 = app.invoke( # 第二轮 - 用户提出一个数学问题
    {"messages": [{"role": "user", "content": "5 + 7 等于多少？"}]},
    config,
)
print(turn_2["messages"][-1].content) # 第二轮的输出

## 7.3 情境感知智能体：后台主动式 AI 架构与模式

传统的人工智能应用程序，尤其是那些利用大型语言模型 (LLM) 的应用程序，主要采用基于聊天的用户体验。虽然聊天模式的用户友好且易于实施，但它本质上将发起和管理交互的责任完全放在人类用户身上。

一种变革性的替代方案在于情境感知智能体（Ambient Agent）架构。这些架构设想 AI 智能体在后台主动且持续地运行，勤勉地监控相关信息流并自主地对重要事件做出反应。

##### 示例 7-14：LangGraph 中可用的人机环路交互结构体

让我们定义用于实现人机环路交互的数据结构：

In [None]:
from typing import TypedDict, Literal, Optional, Union

# HumanInterruptConfig：定义人工中断操作的配置选项。
class HumanInterruptConfig(TypedDict):
    allow_ignore: bool  # allow_ignore：布尔值，是否允许用户忽略中断。
    allow_respond: bool # allow_respond：布尔值，是否允许用户发送自由格式的响应。
    allow_edit: bool    # allow_edit：布尔值，是否允许用户编辑操作参数。
    allow_accept: bool  # allow_accept：布尔值，是否允许用户按原样接受操作。


# ActionRequest：定义向人类请求的操作。
class ActionRequest(TypedDict):
    action: str      # action：字符串，操作的描述性名称或标题。
    args: dict       # args：字典，与操作关联的参数（例如，工具调用参数）。


# HumanInterrupt：表示人工中断请求的主要模式。
class HumanInterrupt(TypedDict):
    action_request: ActionRequest         # action_request：ActionRequest，有关请求操作的详细信息。
    config: HumanInterruptConfig         # config：HumanInterruptConfig，人工响应的配置选项。
    description: Optional[str]          # description：可选字符串，中断的详细描述，可以是 Markdown 格式。


# HumanResponse：从 Agent Inbox UI 接收的人工响应模式。
class HumanResponse(TypedDict):
    type: Literal['accept', 'ignore', 'response', 'edit'] # type：Literal String，来自用户的响应类型（accept、ignore、response、edit）。
    args: Union[None, str, ActionRequest]                # args：Union[None, String, ActionRequest]，与响应关联的参数，根据"type"而变化。
                                                        #     - 对于"accept"和"edit"：ActionRequest，其中包含可能已修改的参数。
                                                        #     - 对于"response"：包含用户文本响应的字符串。
                                                        #     - 对于"ignore"：None。

##### 示例 7-15：在 LangGraph 图函数中使用 HumanInterrupt 和 HumanResponse

以下示例演示了如何在 LangGraph 工作流中实现人机环路交互：

In [None]:
from typing import TypedDict, Literal, Optional, Union
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langchain_core.messages import ToolMessage, HumanMessage, AIMessage

# 假设这些工具在其他地方定义
tools_by_name = {
    "hypothetical_tool": lambda **kwargs: f"工具执行结果: {kwargs}"
}

# 定义图状态，继承消息状态以用于对话历史记录
class AgentState(TypedDict):
    messages: list
    tool_calls: list

# 定义可能触发人工中断的图函数
def agent_node(state: AgentState):
    """智能体节点，决定是否调用工具或请求人工输入。"""
    messages = state["messages"]
    last_message = messages[-1] if messages else None

    # 假设智能体决定调用工具并具有工具调用详细信息
    tool_call_name = "hypothetical_tool"
    tool_call_args = {"input_arg": "example_value"}

    # 构建 HumanInterrupt 对象
    request: HumanInterrupt = {
        "action_request": {
            "action": tool_call_name, # 操作名称是工具名称
            "args": tool_call_args  # 操作参数是工具参数
        },
        "config": {
            "allow_ignore": True, # 允许用户忽略工具调用
            "allow_respond": True, # 允许用户提供自由格式的响应
            "allow_edit": True,  # 允许用户编辑工具参数
            "allow_accept": True # 允许用户按原样接受工具调用
        },
        "description": f"智能体建议使用参数：`{tool_call_args}` 调用工具：`{tool_call_name}`。 你批准吗？", # Agent Inbox UI 的描述
    }

    # 调用 interrupt 函数，在列表中传递 HumanInterrupt 请求
    response_list = interrupt([request])
    response = response_list[0] if response_list else None # 从列表中提取第一个响应

    if response:
        if response['type'] == "accept":
            # 用户接受了工具调用，继续执行工具
            tool_result = tools_by_name[tool_call_name](**response['args']['args']) # 使用（可能已修改的）参数执行工具
            output_message = ToolMessage(content=str(tool_result), tool_call_id="example_id") # 使用结果创建 ToolMessage
        elif response['type'] == "edit":
            # 用户编辑了工具调用参数，使用编辑后的参数执行工具
            edited_args = response['args']['args'] # 从 ActionRequest 中提取编辑后的参数
            tool_result = tools_by_name[tool_call_name](**edited_args) # 使用编辑后的参数执行工具
            output_message = ToolMessage(content=str(tool_result), tool_call_id="example_id") # 使用结果创建 ToolMessage
        elif response['type'] == "response":
            # 用户提供了文本响应，据此处理
            user_response_text = response['args'] # 提取用户的文本响应
            output_message = AIMessage(content=f"用户响应：{user_response_text}。 根据响应继续进行。") # 创建 AIMessage 以确认响应
        elif response['type'] == "ignore":
            # 用户忽略了中断，据此处理
            output_message = AIMessage(content="人工中断被忽略。 继续进行，不进行工具调用。") # 创建 AIMessage，指示中断被忽略
        else:
            output_message = AIMessage(content="未知的人工响应类型。") # 处理意外的响应类型
    else:
        # 未收到响应（例如，中断处理超时或错误）
        output_message = AIMessage(content="未收到人工响应，继续进行，不进行干预。") # 处理未收到响应的情况

    return {"messages": [output_message]} # 返回更新的消息状态


# 构建 LangGraph 工作流
builder = StateGraph(AgentState)
builder.add_node("agent_step", agent_node)
builder.add_edge(START, "agent_step")
builder.add_edge("agent_step", END)

# 编译工作流
workflow = builder.compile()

## 📚 本章总结

通过本章的学习，我们深入探索了 AI 智能体系统的架构设计与范式应用，从基础的智能体工作流模式（提示链、路由、并行化、协调器-工作者、评估器-优化器）到复杂的多智能体系统架构（主管、层次化、网络架构），再到前瞻性的情境感知智能体架构。我们掌握了 LangGraph 中的关键技术实现，包括 Graph API 与 Functional API 的灵活应用、Send API 的动态工作者创建、结构化输出与状态管理、以及人机环路交互机制，为构建复杂、高效且可扩展的智能体应用系统奠定了坚实的技术基础。在下一章中，我们将进一步探讨智能体的高级应用场景和优化策略。