[![在 Colab 中打开](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/langchain-ai/langchain-academy/blob/main/module-4/sub-graph.ipynb) [![在 LangChain Academy 中打开](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/66e9eba12c7b7688aa3dbb5e_LCA-badge-green.svg)](https://academy.langchain.com/courses/take/intro-to-langgraph/lessons/58239937-lesson-2-sub-graphs)


# 子图

## 回顾

我们正在逐步构建一个多智能体研究助手，它将本课程的所有模块串联在一起。

我们刚刚介绍了并行化，这是 LangGraph 控制能力的一个关键主题。

## 目标

现在，我们要[学习子图](https://langchain-ai.github.io/langgraph/how-tos/subgraph/#simple-example)。

## 状态

子图让你可以在图的不同部分创建并管理不同的状态。

这对多智能体系统尤其有用，因为每个智能体团队通常都有自己的状态。

我们来看一个简化示例：

* 我有一个接收日志的系统
* 不同的智能体会执行两个独立的子任务（总结日志、查找失败模式）
* 我希望这两个操作在两个不同的子图中完成

理解图之间如何通信是关键！

简而言之，通信是通过**重叠的键**完成的：

* 子图可以访问父图提供的 `docs`
* 父图可以访问子图返回的 `summary/failure_report`

![subgraph.png](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/66dbb1abf89f2d847ee6f1ff_sub-graph1.png)

## 输入

我们先为要传入图中的日志定义一个 schema。


In [3]:
%%capture --no-stderr
%pip install -U  langgraph

我们将使用 [LangSmith](https://docs.smith.langchain.com/) 来[追踪](https://docs.smith.langchain.com/concepts/tracing)。


In [4]:
import os, getpass


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("LANGSMITH_API_KEY")
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "langchain-academy"

In [5]:
from operator import add
from typing_extensions import TypedDict
from typing import List, Optional, Annotated


# The structure of the logs
# 结构体，作为输入
class Log(TypedDict):
    id: str
    question: str
    docs: Optional[List]
    answer: str
    grade: Optional[int]
    grader: Optional[str]
    feedback: Optional[str]

## 子图

下面是使用 `FailureAnalysisState` 的失败分析子图。


In [7]:
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END


# Failure Analysis Sub-graph
# 失败分析状态，作为中间状态
class FailureAnalysisState(TypedDict):
    cleaned_logs: List[Log]
    failures: List[Log]
    fa_summary: str
    processed_logs: List[str]


# 失败分析输出状态，作为输出状态
class FailureAnalysisOutputState(TypedDict):
    fa_summary: str
    processed_logs: List[str]


def get_failures(state):
    """获取包含失败信息的日志

    此函数从清理过的日志中筛选出包含等级（grade）信息的日志条目，
    这些条目通常表示某种形式的失败或评估结果。

    参数:
        state (dict): 包含系统状态的字典，必须包含'cleaned_logs'键

    返回:
        dict: 包含一个键为'failures'的字典，对应值为筛选出的失败日志列表
    """
    cleaned_logs = state["cleaned_logs"]  # 从状态中提取清理过的日志
    failures = [log for log in cleaned_logs if "grade" in log]  # 筛选包含'grade'的日志
    return {"failures": failures}  # 返回失败日志列表


def generate_summary(state):
    """生成失败日志的摘要

    此函数从系统状态中提取失败日志，并为这些失败生成汇总摘要，
    同时为每个失败日志创建处理记录标识。

    参数:
        state (dict): 包含系统状态的字典，必须包含'failures'键，对应值为失败日志列表

    返回:
        dict: 包含两个键的字典
            - 'fa_summary': 失败分析的汇总摘要
            - 'processed_logs': 处理过的失败日志标识符列表
    """
    failures = state["failures"]  # 从状态中提取失败日志列表
    # Add fxn: fa_summary = summarize(failures)  # 注释表明这里原本计划调用summarize函数
    fa_summary = "Poor quality retrieval of Chroma documentation."  # 失败分析的汇总摘要
    return {
        "fa_summary": fa_summary,  # 返回汇总摘要
        "processed_logs": [
            # 为每个失败日志创建处理记录标识符
            f"failure-analysis-on-log-{failure['id']}"
            for failure in failures
        ],
    }


fa_builder = StateGraph(
    state_schema=FailureAnalysisState, output_schema=FailureAnalysisOutputState
)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)

graph = fa_builder.compile()
# display(Image(graph.get_graph().draw_mermaid_png()))

这是使用 `QuestionSummarizationState` 的问题汇总子图。


In [9]:
# Summarization subgraph
# 问题摘要状态，作为中间状态
class QuestionSummarizationState(TypedDict):
    cleaned_logs: List[Log]
    qs_summary: str
    report: str
    processed_logs: List[str]


# 问题摘要输出状态，作为输出状态
class QuestionSummarizationOutputState(TypedDict):
    report: str
    processed_logs: List[str]


def generate_summary(state):
    """生成日志内容的总结摘要

    此函数从清理过的日志中提取信息，并生成关于问题重点的摘要，
    同时为每个处理过的日志创建唯一标识。

    参数:
        state (dict): 包含系统状态的字典，必须包含'cleaned_logs'键，对应值为清理过的日志列表

    返回:
        dict: 包含两个键的字典
            - 'qs_summary': 问题重点的总结摘要
            - 'processed_logs': 处理过的日志标识符列表
    """
    cleaned_logs = state["cleaned_logs"]  # 从状态中提取清理过的日志列表
    # Add fxn: summary = summarize(generate_summary)  # 注释表明这里原本计划调用summarize函数
    summary = "Questions focused on usage of ChatOllama and Chroma vector store."  # 生成的问题重点摘要
    return {
        "qs_summary": summary,  # 返回问题摘要
        "processed_logs": [
            # 为每个清理过的日志创建处理记录标识符
            f"summary-on-log-{log['id']}"
            for log in cleaned_logs
        ],
    }


def send_to_slack(state):
    """生成用于发送到Slack的报告

    此函数从系统状态中提取问题摘要，并基于该摘要生成一个
    可发送到Slack的报告内容。

    参数:
        state (dict): 包含系统状态的字典，必须包含'qs_summary'键，对应值为问题摘要

    返回:
        dict: 包含一个键为'report'的字典，对应值为生成的报告内容
    """
    qs_summary = state["qs_summary"]  # 从状态中提取问题摘要
    # Add fxn: report = report_generation(qs_summary)  # 注释表明这里原本计划调用report_generation函数
    report = "foo bar baz"  # 生成的报告内容（当前为示例文本）
    return {"report": report}  # 返回生成的报告


qs_builder = StateGraph(
    QuestionSummarizationState, output_schema=QuestionSummarizationOutputState
)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)

graph = qs_builder.compile()
# display(Image(graph.get_graph().draw_mermaid_png()))

## 将子图加入父图

现在我们可以把所有内容组合到一起。

我们使用 `EntryGraphState` 创建父图。

然后把子图作为节点加入！

```
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())
```


In [10]:
# Entry Graph
class EntryGraphState(TypedDict):
    raw_logs: List[Log]
    cleaned_logs: Annotated[List[Log], add]  # This will be USED BY in BOTH sub-graphs
    fa_summary: str  # This will only be generated in the FA sub-graph
    report: str  # This will only be generated in the QS sub-graph
    processed_logs: Annotated[
        List[int], add
    ]  # This will be generated in BOTH sub-graphs

但是，如果 `cleaned_logs` 只是作为输入传入每个子图而不会被修改，为什么还需要 reducer 呢？

```
cleaned_logs: Annotated[List[Log], add] # This will be USED BY in BOTH sub-graphs
```

这是因为子图的输出状态会包含**所有键**，即使它们没有被修改。

子图是并行运行的。

由于并行的子图会返回同名键，我们需要像 `operator.add` 这样的 reducer 来合并每个子图传回的值。

不过，我们可以利用之前提到的另一个概念来规避这个问题。

只要为每个子图创建输出状态 schema，并让它们输出不同的键即可。

我们其实不需要每个子图都输出 `cleaned_logs`。


In [12]:
# Entry Graph
class EntryGraphState(TypedDict):
    raw_logs: List[Log]
    cleaned_logs: List[Log]
    fa_summary: str  # This will only be generated in the FA sub-graph
    report: str  # This will only be generated in the QS sub-graph
    processed_logs: Annotated[
        List[int], add
    ]  # This will be generated in BOTH sub-graphs


def clean_logs(state):
    # Get logs
    raw_logs = state["raw_logs"]
    # Data cleaning raw_logs -> docs
    cleaned_logs = raw_logs
    return {"cleaned_logs": cleaned_logs}


entry_builder = StateGraph(EntryGraphState)
entry_builder.add_node("clean_logs", clean_logs)
entry_builder.add_node("question_summarization", qs_builder.compile())
entry_builder.add_node("failure_analysis", fa_builder.compile())

entry_builder.add_edge(START, "clean_logs")
entry_builder.add_edge("clean_logs", "failure_analysis")
entry_builder.add_edge("clean_logs", "question_summarization")
entry_builder.add_edge("failure_analysis", END)
entry_builder.add_edge("question_summarization", END)

graph = entry_builder.compile()

from IPython.display import Image, display

# Setting xray to 1 will show the internal structure of the nested graph
# display(Image(graph.get_graph(xray=1).draw_mermaid_png()))

In [13]:
# Dummy logs
question_answer = Log(
    id="1",
    question="How can I import ChatOllama?",
    answer="To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'",
)

question_answer_feedback = Log(
    id="2",
    question="How can I use Chroma vector store?",
    answer="To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).",
    grade=0,
    grader="Document Relevance Recall",
    feedback="The retrieved documents discuss vector stores in general, but not Chroma specifically",
)

raw_logs = [question_answer, question_answer_feedback]
graph.invoke({"raw_logs": raw_logs})

{'raw_logs': [{'id': '1',
   'question': 'How can I import ChatOllama?',
   'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"},
  {'id': '2',
   'question': 'How can I use Chroma vector store?',
   'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).',
   'grade': 0,
   'grader': 'Document Relevance Recall',
   'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}],
 'cleaned_logs': [{'id': '1',
   'question': 'How can I import ChatOllama?',
   'answer': "To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'"},
  {'id': '2',
   'question': 'How can I use Chroma vector store?',
   'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).',
   'grade': 0,
   'grader': 'Document Relevance Recall',
   'feedback': 'The retrieved documents discuss vector stores in general, 

## LangSmith

来看一下 LangSmith trace：

https://smith.langchain.com/public/f8f86f61-1b30-48cf-b055-3734dfceadf2/r
