# 第 5 章：AI 智能体的记忆系统

> 本笔记文件需要与《LangGraph实战》的第 5 章的内容配套使用。

在 AI 智能体不断演进的征程中，我们已然认识到，赋予它们类似人类的记忆能力是构建真正智能和实用系统的核心所在。本章将深入探讨 AI 智能体记忆系统的关键概念和实践方法，为你揭示如何利用 LangGraph 框架赋予你的智能体持久的知识和情境感知能力。

### 🚀 环境准备

首先加载必要的环境变量配置和基础模块：

In [1]:
from dotenv import load_dotenv

load_dotenv()

True

## 5.1 短期记忆 vs. 长期记忆：平衡语境与持久性

要创建真正智能且有助力的 AI 智能体，就需要为其配备能够模仿人类认知细微之处的记忆系统。正如人类利用短期记忆和长期记忆来进行有效的互动和学习一样，AI 智能体也能从类似的二分法中获益。

### 记忆系统的重要性

- **短期记忆**：对话的即时性至关重要，使智能体能够在正在进行的对话中处理当前的用户输入，理解细微之处，并保持对话流程
- **长期记忆**：提供连续性和成长性，使智能体能够超越个别互动，建立持久的身份，从累积的经验中学习

一个健壮的 AI 智能体记忆架构需要一种区分对待的方法，策略性地同时使用短期记忆和长期记忆。

### 5.1.1 短期记忆：维持对话的连贯性

短期记忆是 AI 智能体进行连贯、回合式对话的基石。它使智能体能够在整个对话过程中保持语境理解。

##### 示例 5-1：在 LangGraph 中通过对话历史记录管理短期记忆

首先定义 AgentState 结构来存储对话历史记录：

In [2]:
from typing import TypedDict, List, Dict, Any
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

class AgentState(TypedDict):
    messages: List[BaseMessage]  # 消息列表，作为对话历史记录
    intermediate_results: Dict[str, Any]  # 短期记忆中的其他数据

**💡 短期记忆核心概念**：

- `AgentState`：定义智能体的状态结构，包含消息列表和中间结果
- `messages`：存储对话历史的核心字段，支持用户和AI消息
- `intermediate_results`：存储会话期间的临时数据，如任务状态、中间计算结果等

##### 示例 5-2：在 LangGraph 中通过中间结果管理短期记忆

实现添加消息到对话历史的函数：

In [3]:
def add_user_message(state: AgentState, user_message: str) -> Dict[str, List[BaseMessage]]:
    """向对话历史记录添加用户消息"""
    new_message = HumanMessage(content=user_message)
    return {"messages": state["messages"] + [new_message]}

def add_ai_message(state: AgentState, ai_response: str) -> Dict[str, List[BaseMessage]]:
    """向对话历史记录添加 AI 响应"""
    new_message = AIMessage(content=ai_response)
    return {"messages": state["messages"] + [new_message]}

# 测试短期记忆管理
initial_state = AgentState(messages=[], intermediate_results={})
state_with_user_msg = add_user_message(initial_state, "你好，我是用户")
print("添加用户消息后的状态：", len(state_with_user_msg["messages"]), "条消息")

final_state = add_ai_message(state_with_user_msg, "你好！我是AI助手，很高兴为您服务")
print("添加AI消息后的状态：", len(final_state["messages"]), "条消息")

添加用户消息后的状态： 1 条消息
添加AI消息后的状态： 2 条消息


**💡 消息管理机制**：

- **消息累积**：新消息通过列表连接的方式添加到历史记录中
- **类型区分**：`HumanMessage`和`AIMessage`分别表示用户和AI的消息
- **状态更新**：返回字典格式的状态更新，符合LangGraph的状态管理机制

##### 示例 5-3：在 LangGraph 中通过截断管理短期记忆

随着对话的延长，需要管理内存和成本。这里演示两种截断策略：

In [4]:
# 注意：这里模拟 trim_messages 功能，实际使用时需要安装完整的 LangChain
def truncate_history(state: AgentState, max_messages: int) -> Dict[str, List[BaseMessage]]:
    """截断对话历史记录，仅保留最近的消息"""
    truncated_messages = state["messages"][-max_messages:]
    return {"messages": truncated_messages}

def simulate_trim_message_history_by_token(state: AgentState, max_tokens: int) -> Dict[str, List[BaseMessage]]:
    """模拟根据 token 计数修剪消息历史记录"""
    if not state["messages"]:
        return {"messages": []}
    
    # 简化的 token 计算（实际中会使用真实的 tokenizer）
    messages = state["messages"]
    total_tokens = 0
    kept_messages = []
    
    # 从后往前保留消息，直到达到 token 限制
    for message in reversed(messages):
        # 简化：假设每个字符约等于 0.5 个 token
        message_tokens = len(message.content) * 0.5
        if total_tokens + message_tokens <= max_tokens:
            kept_messages.insert(0, message)
            total_tokens += message_tokens
        else:
            break
    
    return {"messages": kept_messages}

# 创建一个包含多条消息的测试状态
test_state = AgentState(
    messages=[
        HumanMessage(content="第一条用户消息"),
        AIMessage(content="第一条AI回复"),
        HumanMessage(content="第二条用户消息"),
        AIMessage(content="第二条AI回复"),
        HumanMessage(content="第三条用户消息"),
        AIMessage(content="第三条AI回复")
    ],
    intermediate_results={}
)

# 测试消息数量截断
truncated_state = truncate_history(test_state, 3)
print(f"原始消息数量: {len(test_state['messages'])}")
print(f"截断后消息数量: {len(truncated_state['messages'])}")

# 显示截断后的消息内容
for i, msg in enumerate(truncated_state['messages']):
    print(f"消息 {i+1}: {type(msg).__name__} - {msg.content}")

原始消息数量: 6
截断后消息数量: 3
消息 1: AIMessage - 第二条AI回复
消息 2: HumanMessage - 第三条用户消息
消息 3: AIMessage - 第三条AI回复


**💡 记忆优化策略**：

- **数量截断**：`truncate_history`保留最新的N条消息，简单有效
- **Token截断**：考虑实际Token消耗，更精确地控制上下文长度
- **策略选择**：根据模型上下文窗口限制和成本考虑选择合适的截断策略
- **消息完整性**：确保截断后的对话历史仍然保持逻辑完整性

##### 示例 5-4：在 LangGraph 中通过摘要管理短期记忆

更精细的策略是使用 LLM 总结对话历史：

In [5]:
# 模拟摘要功能
def simulate_summarize_history(state: AgentState) -> Dict[str, List[BaseMessage]]:
    """模拟使用 LLM 总结对话历史记录"""
    messages = state["messages"]
    if len(messages) <= 2:  # 如果消息太少，不需要摘要
        return {"messages": messages}
    
    # 模拟摘要生成（实际中会调用 LLM）
    conversation_content = []
    for m in messages[:-1]:  # 排除最后一条消息
        role = "用户" if isinstance(m, HumanMessage) else "AI助手"
        conversation_content.append(f"{role}: {m.content}")
    
    # 简化的摘要生成
    summary = f"对话摘要：讨论了{len(messages)-1}轮交互，涉及用户查询和AI回复。"
    
    # 将历史记录替换为摘要和最新的用户消息
    summary_message = AIMessage(content=summary)
    last_message = messages[-1]  # 保留最后一条消息
    new_messages = [summary_message, last_message]
    
    return {"messages": new_messages}

# 测试摘要功能
rich_conversation_state = AgentState(
    messages=[
        HumanMessage(content="我想学习Python编程"),
        AIMessage(content="很好！Python是一门优秀的编程语言。我建议从基础语法开始学习。"),
        HumanMessage(content="我应该先学什么？"),
        AIMessage(content="建议先学习变量、数据类型、条件语句和循环。"),
        HumanMessage(content="能给我推荐一些学习资源吗？"),
        AIMessage(content="推荐《Python编程：从入门到实践》这本书，还有官方文档和在线教程。"),
        HumanMessage(content="现在我想了解面向对象编程")
    ],
    intermediate_results={}
)

print(f"摘要前消息数量: {len(rich_conversation_state['messages'])}")

# 使用摘要功能
summarized_state = simulate_summarize_history(rich_conversation_state)
print(f"摘要后消息数量: {len(summarized_state['messages'])}")

print("\n摘要后的消息内容：")
for i, msg in enumerate(summarized_state['messages']):
    print(f"{i+1}. {type(msg).__name__}: {msg.content}")

摘要前消息数量: 7
摘要后消息数量: 2

摘要后的消息内容：
1. AIMessage: 对话摘要：讨论了6轮交互，涉及用户查询和AI回复。
2. HumanMessage: 现在我想了解面向对象编程


**💡 智能摘要机制**：

- **语义保留**：LLM摘要能够保留对话的核心语义信息，而不仅仅是截断
- **上下文压缩**：将长对话压缩为简洁的摘要，大幅减少Token使用
- **错误处理**：包含异常处理机制，确保摘要失败时有备用方案
- **最新信息保留**：总是保留最新的用户消息，确保当前上下文不丢失

## 5.2 深入探索记忆存储：设置和管理长期知识

在明确了长期记忆的重要作用后，我们现在将注意力转向在 LangGraph 中实现此类记忆的实用机制。本节将深入探讨 LangGraph 记忆存储，这是一个多功能的抽象，旨在管理智能体的持久化、跨线程知识。

### 5.2.1 核心记忆存储操作：设置和基本用法

LangGraph 记忆存储以 `BaseStore` 类及其实现为代表，为 LangGraph 应用程序中的长期记忆管理提供了基本抽象。

##### 示例 5-5 & 5-6：使用 InMemoryStore 实例化记忆存储

In [6]:
import uuid
from langgraph.store.memory import InMemoryStore

in_memory_store = InMemoryStore()

# 定义用户特定数据的命名空间
user_id = "example_user"
namespace_for_user_data = (user_id, "user_info")

# 为记忆条目生成唯一键
memory_key = str(uuid.uuid4())

# 创建一个字典来保存用户姓名作为记忆值
memory_value = {"user_name": "Example User"}

# 使用 put 将记忆存储在 InMemoryStore 中
in_memory_store.put(namespace_for_user_data, memory_key, memory_value)

print(f"记忆已保存，键为：{memory_key}，命名空间为：{namespace_for_user_data}")

记忆已保存，键为：8ee2fce6-145b-42b3-953d-40dea32e6f26，命名空间为：('example_user', 'user_info')


**💡 记忆存储架构**：

- **命名空间设计**：使用元组作为逻辑容器，支持层次化组织
- **键值存储**：键作为唯一标识符，值为字典格式提供灵活性
- **时间戳管理**：自动添加创建和更新时间戳
- **可扩展性**：可以轻松扩展以支持更复杂的查询和索引功能

##### 示例 5-7：使用 search 和 get 检索记忆

In [7]:
# 检索 user_info 命名空间中的所有记忆（没有查询或过滤器）
all_user_memories = in_memory_store.search(namespace_for_user_data)
print("命名空间中的所有用户记忆：")
for record in all_user_memories:
    print(record.dict()) # 打印 MemoryRecord 字典表示

# 使用 'get' 通过键检索记忆
retrieved_memory_record = in_memory_store.get(namespace_for_user_data, memory_key)
print(f"\n使用键 '{memory_key}' 检索到的记忆：")
print(retrieved_memory_record.dict()) # 打印 MemoryRecord 字典表示

命名空间中的所有用户记忆：
{'namespace': ['example_user', 'user_info'], 'key': '8ee2fce6-145b-42b3-953d-40dea32e6f26', 'value': {'user_name': 'Example User'}, 'created_at': '2025-08-27T12:15:04.250800+00:00', 'updated_at': '2025-08-27T12:15:04.250815+00:00', 'score': None}

使用键 '8ee2fce6-145b-42b3-953d-40dea32e6f26' 检索到的记忆：
{'namespace': ['example_user', 'user_info'], 'key': '8ee2fce6-145b-42b3-953d-40dea32e6f26', 'value': {'user_name': 'Example User'}, 'created_at': '2025-08-27T12:15:04.250800+00:00', 'updated_at': '2025-08-27T12:15:04.250815+00:00'}


### 5.2.2 通过语义搜索增强记忆检索

LangGraph 记忆存储的语义搜索功能显著提升了记忆检索的有效性，使智能体能够基于语义相似性而非简单的关键词匹配来检索信息。这种机制通过向量嵌入技术实现，能够捕捉文本的深层语义特征，从而获得更符合语境的检索结果。

##### 示例 5-8：使用 BGE-M3 向量化模型配置 InMemoryStore 以进行语义搜索

In [11]:
from langchain_openai import OpenAIEmbeddings
from langgraph.store.memory import InMemoryStore

# 初始化 BGE-M3 向量化模型
embeddings = OpenAIEmbeddings(model="BAAI/bge-m3") # 或其他向量化模型

# 使用语义搜索索引配置 InMemoryStore
store_with_semantic_search = InMemoryStore(
    index={
        "embed": embeddings.embed_documents, 
        "dims": 1024, # BGE-M3 的向量维度
        "fields": ["memory_content"] # 仅向量化 "memory_content" 字段（可选）
    }
)

##### 示例 5-9：使用 put 执行记忆保存

In [12]:
# 保存将为语义搜索索引的记忆（默认行为）
store_with_semantic_search.put(
    ("user_789", "food_memories"),
    "memory_1",
    {"memory_content": "我真的很喜欢辛辣的印度咖喱。"},
)

# 保存另一个记忆，显式禁用此条目的索引
store_with_semantic_search.put(
    ("user_789", "system_metadata"),
    "memory_2",
    {"memory_content": "用户入职已完成。", "status": "completed"},
    index=False, # 禁用此记忆的索引
)

# 保存一个记忆，覆盖默认索引字段并仅索引 "context"
store_with_semantic_search.put(
    ("user_789", "restaurant_reviews"),
    "memory_3",
    {"memory_content": "服务很慢，但食物很好。", "context": "对 'The Italian Place' 餐厅的评论"},
    index=["context"] # 仅索引 "context" 字段
)

##### 示例 5-10：使用 search 执行语义记忆检索

In [13]:
# 语义搜索食物偏好
search_query = "该用户喜欢哪种食物？"
semantic_memory_results = store_with_semantic_search.search(
    ("user_789", "food_memories"), query=search_query, limit=2
)

print("查询的语义搜索结果：", search_query)
for record in semantic_memory_results:
    print(f"记忆键：{record.key}，相似度评分：{record.score}")
    print(f"记忆内容：{record.value}")
    print("=" * 30)

查询的语义搜索结果： 该用户喜欢哪种食物？
记忆键：memory_1，相似度评分：0.39143762261467724
记忆内容：{'memory_content': '我真的很喜欢辛辣的印度咖喱。'}


## 5.3 记忆系统的实际应用

在系统阐述短期记忆与长期记忆的核心概念，并深入解析 LangGraph 记忆存储机制后，我们将聚焦实际应用场景。本节重点探讨记忆系统如何显著增强 AI 智能体的功能表现，针对每个典型用例，我们将详细说明记忆应用价值分析、概念性代码示例，以及实际集成实施方案。其中，我们会特别关注记忆数据更新时机选择、信息提取技术，以及如何通过 TrustCall 机制提升记忆数据提取的可靠性。

### 5.3.1 个性化推荐

长期记忆最具价值的应用场景之一是实现个性化服务。通过持续记录用户偏好、兴趣点及历史交互数据，AI 智能体能够提供精准定制的推荐内容，从而显著提升用户参与度与满意度。以下示例主要依托语义记忆实现用户画像和偏好的跨会话持久化存储。

##### 示例 5-12：个性化推荐的示例用法

In [15]:
import json

from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI

from langgraph.graph import END, START, StateGraph, MessagesState
from langgraph.store.memory import BaseStore, InMemoryStore

# 假设 'fetch_product_recommendations'、'format_recommendation_message'、'UserProfile' 已在其他地方定义

recommendation_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个乐于助人的推荐引擎。根据用户资料，提供个性化的产品推荐。"),
    ("human", "{user_profile_summary}")
])
recommendation_chain = recommendation_prompt | ChatOpenAI(model="Qwen/Qwen3-8B") |  (lambda x: {"messages": [AIMessage(content=x.content)]})

def recommend_products(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """根据用户存储的偏好向用户推荐产品。"""
    user_id = config["configurable"]["user_id"]
    namespace = ("user_profiles", user_id)
    user_profile_record = store.get(namespace, "profile")
    user_profile = user_profile_record.value if user_profile_record else {}

    user_profile_summary = format_user_profile_summary(user_profile) # 用于格式化提示的用户资料字典的函数

    # 调用推荐链
    result = recommendation_chain.invoke({"user_profile_summary": user_profile_summary})
    return result

def format_user_profile_summary(user_profile: dict) -> str:
    """将用户资料字典格式化为字符串以进行提示注入。"""
    name = user_profile.get("preferred_name", "用户")
    categories = ", ".join(user_profile.get("preferred_product_categories", ["产品"]))
    return f"用户名为 {name}。他们偏好的产品类别是：{categories}。"


def extract_preference_updates(state: MessagesState) -> dict:
    """从最新的用户消息中提取用户偏好更新。"""
    latest_message_content = state["messages"][-2].content
    # 示例：使用 LLM 提取偏好 - 替换为实际的提取逻辑
    extraction_prompt = ChatPromptTemplate.from_messages([
        ("system", "从用户消息中提取用户的产品类别偏好。以 JSON 字典形式返回，外层不要包裹 ```json```， 键为 'preferred_product_categories'，值为类别列表。如果没有表达偏好，则返回一个空字典。"),
        ("human", "{user_message}")
    ])
    extraction_chain = extraction_prompt | ChatOpenAI(model="Qwen/Qwen2.5-7B-Instruct") # 如果需要结构化输出，请替换为合适的链

    preferences_json = extraction_chain.invoke({"user_message": latest_message_content})
    try:
        preferences = json.loads(preferences_json.content) # 假设 LLM 返回 JSON 字符串
        return preferences
    except json.JSONDecodeError:
        return {} # 如果提取失败，则返回空字典

def update_user_profile_node(state: MessagesState, config: RunnableConfig, store: BaseStore):
    """在"热路径"中触发的记忆存储中更新用户资料。"""
    user_id = config["configurable"]["user_id"]
    namespace = ("user_profiles", user_id)
    user_profile_record = store.get(namespace, "profile")
    user_profile = user_profile_record.value if user_profile_record else {}

    preference_updates = extract_preference_updates(state) # 从当前轮次提取偏好
    
    updated_profile = user_profile.copy() # 创建副本以避免修改原始字典
    if "preferred_product_categories" in preference_updates: # 合并或更新偏好
        updated_profile["preferred_product_categories"] = list(set(updated_profile.get("preferred_product_categories", []) + preference_updates["preferred_product_categories"])) # 示例：合并列表
    
    store.put(namespace, "profile", updated_profile) # 保存更新后的资料

    return {} # 节点应返回字典


# LangGraph 中的示例用法
memory_store = InMemoryStore()

builder = StateGraph(MessagesState)
builder.add_node("recommend_products", recommend_products)
builder.add_node("update_profile", update_user_profile_node) # 在"热路径"中更新资料的节点
builder.add_edge(START, "recommend_products")
builder.add_edge("recommend_products", "update_profile") # 在推荐后更新资料
builder.add_edge("update_profile", END)

graph = builder.compile(store=memory_store)

# 初始化用户资料
user_id = "user_123"
memory_store.put(
    ("user_profiles", user_id),
    "profile",
    {
        "preferred_name": "张三",
        "preferred_product_categories": ["电子产品", "书籍"]
    }
)

# 执行图 - 第一次交互（获取推荐）
config = {"configurable": {"user_id": user_id}}
result = graph.invoke({"messages": [HumanMessage(content="你好")]}, config=config)
print("初始推荐:")
print(result["messages"][-1].content)
print("=" * 50)

# 模拟用户表达新的偏好
user_message = "我最近对户外装备和运动鞋很感兴趣。"
result = graph.invoke(
    {"messages": result["messages"] + [HumanMessage(content=user_message)]},
    config=config
)

# 检查更新后的用户资料
updated_profile = memory_store.get(("user_profiles", user_id), "profile").value
print("更新后的用户资料:")
print(json.dumps(updated_profile, ensure_ascii=False, indent=2))
print("=" * 50)

# 再次获取推荐，应该包含新的偏好
result = graph.invoke({"messages": [HumanMessage(content="我又来了")]}, config=config)
print("基于更新后资料的推荐:")
print(result["messages"][-1].content)

初始推荐:


根据张三对**电子产品**和**书籍**的偏好，以下是为您量身推荐的产品列表：

---

### 📱 **电子产品推荐**  
1. **Apple iPhone 15 Pro**  
   - **特点**: 高性能A17芯片、Pro级相机系统、灵动岛设计、钛金属机身  
   - **适用场景**: 适合追求科技前沿、拍照和性能的用户  
   - **价格**: ¥7999起  

2. **Sony WH-1000XM5 无线降噪耳机**  
   - **特点**: 顶级降噪效果、30小时续航、高清音质、轻巧设计  
   - **适用场景**: 旅途、办公、居家使用，需要沉浸式音效  
   - **价格**: ¥1999  

3. **Amazon Echo Dot (5th Gen)**  
   - **特点**: 智能语音助手、音乐播放、智能家居控制、性价比高  
   - **适用场景**: 智能家居爱好者、需要语音控制家电的用户  
   - **价格**: ¥299  

4. **DJI Osmo Pocket 3**  
   - **特点**: 轻巧便携、专业运动相机功能、AI自动追踪、4K视频  
   - **适用场景**: 影视爱好者、旅行记录或Vlog拍摄  
   - **价格**: ¥2499  

5. **小米扫地机器人 Pro 2**  
   - **特点**: 超薄设计、LDS激光导航、自动集尘、兼容多品牌吸尘器  
   - **适用场景**: 希望解放双手、保持家居清洁的用户  
   - **价格**: ¥1499  

---

### 📚 **书籍推荐**  
1. **《时间的秩序》——卡洛·罗韦利**  
   - **类型**: 科普、物理学  
   - **亮点**: 用通俗语言解释复杂物理概念，适合对宇宙和时间感兴趣的读者  
   - **价格**: ¥45  

2. **《深度工作》——卡尔·纽波特**  
   - **类型**: 自我提升、效率管理  
   - **亮点**: 提供专注力训练和深度工作的实用策略，适合职场人士  
   - **价格**: ¥39  

3. **《三体》系列（刘慈欣）**  
   - **类型**: 科幻、文学  
   -

### 5.3.2 多步骤的情境化任务

对于需要引导用户完成多步骤任务或复杂工作流的 AI 智能体来说，记忆至关重要。短期记忆在跟踪任务的当前阶段、记住先前步骤中的用户输入，以及在整个过程中保持语境方面起着至关重要的作用。长期记忆可用于存储模板或成功的任务完成示例，以指导 AI 智能体的行为。

##### 示例 5-13：情境化任务完成的示例用法

In [1]:
import json
from typing import Dict, Any, List
from langgraph.graph import StateGraph, END, START
from langgraph.store.memory import InMemoryStore, BaseStore
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# 定义任务状态类型
class TaskState(dict):
    """任务状态管理"""
    def __init__(self, **kwargs):
        super().__init__(kwargs)
        if 'messages' not in self:
            self['messages'] = []
        if 'task_data' not in self:
            self['task_data'] = {}
        if 'current_step' not in self:
            self['current_step'] = 0
        if 'task_type' not in self:
            self['task_type'] = None
        if 'task_complete' not in self:
            self['task_complete'] = False

class TaskCompletionAgent:
    """多步骤任务完成智能体"""
    
    def __init__(self):
        # 定义不同类型的任务模板
        self.task_templates = {
            "flight_booking": {
                "steps": [
                    {"field": "departure_city", "prompt": "请告诉我您的出发城市：", "required": True},
                    {"field": "arrival_city", "prompt": "请告诉我您的目的地城市：", "required": True},
                    {"field": "departure_date", "prompt": "请提供出发日期（格式：YYYY-MM-DD）：", "required": True},
                    {"field": "passengers", "prompt": "请告诉我乘客人数：", "required": True}
                ],
                "completion_message": "✈️ 航班信息已收集完毕！正在为您查找最佳航班..."
            },
            "restaurant_reservation": {
                "steps": [
                    {"field": "cuisine_type", "prompt": "您想预订哪种类型的餐厅？（如：中餐、西餐、日料等）", "required": True},
                    {"field": "date_time", "prompt": "请提供用餐日期和时间：", "required": True},
                    {"field": "party_size", "prompt": "请告诉我用餐人数：", "required": True}
                ],
                "completion_message": "🍽️ 餐厅预订信息已收集完毕！正在为您查找合适的餐厅..."
            }
        }

def process_task(state: TaskState, config: RunnableConfig, store: BaseStore):
    """处理任务的主要逻辑"""
    print(f"🔄 处理任务 - 当前状态: step={state.get('current_step', 0)}, type={state.get('task_type')}")
    
    agent = TaskCompletionAgent()
    
    # 如果还没有任务类型，先检测任务类型
    if not state.get('task_type'):
        if not state['messages']:
            return state
        
        last_message = state['messages'][-1].content.lower()
        
        # 检测任务类型
        if any(keyword in last_message for keyword in ['机票', '航班', '飞机', '预订机票']):
            task_type = "flight_booking"
        elif any(keyword in last_message for keyword in ['餐厅', '预订', '吃饭', '订餐']):
            task_type = "restaurant_reservation"
        else:
            task_type = "flight_booking"
        
        print(f"📋 检测到任务类型: {task_type}")
        
        # 初始化任务状态
        state['task_type'] = task_type
        state['current_step'] = 0
        state['task_data'] = {}
        state['task_complete'] = False
        
        # 添加确认消息和第一个问题
        template = agent.task_templates[task_type]
        first_step = template['steps'][0]
        
        confirmation_messages = {
            "flight_booking": f"我来帮您预订航班。{first_step['prompt']}",
            "restaurant_reservation": f"我来帮您预订餐厅。{first_step['prompt']}"
        }
        
        ai_message = AIMessage(content=confirmation_messages[task_type])
        state['messages'].append(ai_message)
        return state
    
    # 如果已经完成任务，直接返回
    if state.get('task_complete'):
        return state
    
    # 处理信息收集
    task_type = state['task_type']
    template = agent.task_templates[task_type]
    steps = template['steps']
    current_step = state['current_step']
    
    # 检查是否还有步骤需要完成
    if current_step >= len(steps):
        # 所有信息已收集完毕
        state['task_complete'] = True
        response = template['completion_message']
        response += "\n\n📋 收集到的信息：\n"
        for key, value in state['task_data'].items():
            response += f"• {key}: {value}\n"
        
        ai_message = AIMessage(content=response)
        state['messages'].append(ai_message)
        return state
    
    # 检查是否有新的用户消息需要处理
    if len(state['messages']) >= 2 and isinstance(state['messages'][-1], HumanMessage):
        # 获取当前步骤信息
        step_info = steps[current_step]
        field_name = step_info['field']
        user_message = state['messages'][-1].content
        
        print(f"🎯 处理用户输入: {user_message} -> 提取字段: {field_name}")
        
        # 尝试提取信息
        extracted_value = extract_field_value(user_message, field_name)
        
        if extracted_value and extracted_value != "NOT_FOUND":
            # 成功提取信息
            state['task_data'][field_name] = extracted_value
            state['current_step'] += 1
            
            print(f"✅ 成功提取 {field_name}: {extracted_value}")
            
            # 检查是否还有更多步骤
            if state['current_step'] < len(steps):
                next_step = steps[state['current_step']]
                next_prompt = next_step['prompt']
                response = f"已记录您的{field_name}：{extracted_value}。{next_prompt}"
            else:
                # 所有信息已收集完毕
                state['task_complete'] = True
                response = template['completion_message']
                response += "\n\n📋 收集到的信息：\n"
                for key, value in state['task_data'].items():
                    response += f"• {key}: {value}\n"
        else:
            # 提取失败，重新询问
            response = f"抱歉，我没能理解您提供的{field_name}信息。{step_info['prompt']}"
        
        # 添加AI响应
        ai_message = AIMessage(content=response)
        state['messages'].append(ai_message)
    
    return state

def extract_field_value(user_message: str, field_name: str) -> str:
    """简化的信息提取函数"""
    message_lower = user_message.lower()
    
    if field_name == "departure_city":
        cities = ["北京", "上海", "广州", "深圳", "杭州", "成都", "重庆", "西安", "南京", "武汉"]
        for city in cities:
            if city in user_message:
                return city
    
    elif field_name == "arrival_city":
        cities = ["北京", "上海", "广州", "深圳", "杭州", "成都", "重庆", "西安", "南京", "武汉", "大连", "青岛"]
        for city in cities:
            if city in user_message:
                return city
    
    elif field_name == "departure_date":
        import re
        date_patterns = [r'\d{4}-\d{2}-\d{2}', r'\d{1,2}月\d{1,2}日', r'\d{1,2}/\d{1,2}']
        for pattern in date_patterns:
            match = re.search(pattern, user_message)
            if match:
                return match.group()
    
    elif field_name == "passengers":
        import re
        number_match = re.search(r'(\d+)人?', user_message)
        if number_match:
            return number_match.group(1)
    
    elif field_name == "cuisine_type":
        cuisine_types = ["中餐", "西餐", "日料", "韩料", "意大利", "法餐", "泰餐", "印度"]
        for cuisine in cuisine_types:
            if cuisine in user_message:
                return cuisine
    
    elif field_name == "party_size":
        import re
        number_match = re.search(r'(\d+)人?', user_message)
        if number_match:
            return number_match.group(1)
    
    elif field_name == "date_time":
        if any(word in message_lower for word in ["明天", "今天", "后天"]):
            return user_message
        import re
        if re.search(r'\d+点|\d+:\d+', user_message):
            return user_message
    
    return "NOT_FOUND"

def save_task_memory(state: TaskState, config: RunnableConfig, store: BaseStore):
    """保存任务完成记录到长期记忆"""
    print("💾 保存任务记录到记忆...")
    
    if not state['task_data'] or not state['task_type']:
        return state
    
    user_id = config.get("configurable", {}).get("user_id", "default_user")
    
    # 创建任务记录
    task_record = {
        "task_type": state['task_type'],
        "task_data": state['task_data'],
        "completion_time": "2024-02-15T10:30:00Z",
        "status": "completed"
    }
    
    # 保存到记忆存储
    import uuid
    record_key = str(uuid.uuid4())
    namespace = (user_id, "completed_tasks")
    
    store.put(namespace, record_key, task_record)
    print(f"✅ 任务记录已保存: {record_key}")
    
    # 添加确认消息
    confirmation_message = AIMessage(content="✨ 任务已完成并保存到您的记录中！如需查看历史记录或开始新任务，请随时告诉我。")
    state['messages'].append(confirmation_message)
    
    return state

# 创建简化的任务完成图
def create_task_completion_graph():
    """创建任务完成流程图"""
    store = InMemoryStore()
    
    # 创建状态图
    workflow = StateGraph(dict)
    
    # 添加节点
    workflow.add_node("process_task", process_task)
    workflow.add_node("save_memory", save_task_memory)
    
    # 条件函数：检查是否需要保存记忆
    def should_save_memory(state):
        if state.get('task_complete', False) and state.get('task_data'):
            return "save_memory"
        else:
            return "end"
    
    # 添加边
    workflow.add_edge(START, "process_task")
    workflow.add_conditional_edges(
        "process_task",
        should_save_memory,
        {
            "save_memory": "save_memory",
            "end": END
        }
    )
    workflow.add_edge("save_memory", END)
    
    # 编译图
    app = workflow.compile(store=store)
    return app, store

# 演示任务完成系统
print("=== 情境化任务完成系统演示 ===")

# 创建应用和存储
app, store = create_task_completion_graph()

# 配置
config = {"configurable": {"user_id": "user_456"}}

print("\n--- 航班预订演示 ---")

# 演示完整的多轮对话
conversation_steps = [
    "我想预订一张机票",
    "我从北京出发", 
    "我要去上海",
    "2024-03-15",
    "1人"
]

current_state = TaskState()

for i, user_input in enumerate(conversation_steps):
    print(f"\n👤 用户: {user_input}")
    
    # 添加用户消息
    current_state['messages'].append(HumanMessage(content=user_input))
    
    # 处理任务
    result = app.invoke(current_state, config=config)
    
    # 更新状态
    current_state = result
    
    # 显示AI响应
    if current_state['messages'] and isinstance(current_state['messages'][-1], AIMessage):
        print(f"🤖 助手: {current_state['messages'][-1].content}")
    
    # 显示当前任务状态
    print(f"📊 当前步骤: {current_state.get('current_step', 0)}")
    print(f"🏁 任务完成: {current_state.get('task_complete', False)}")
    if current_state.get('task_data'):
        print(f"📋 已收集数据: {current_state['task_data']}")

# 检查保存的记忆
print("\n--- 检查保存的任务记忆 ---")
saved_memories = store.search(("user_456", "completed_tasks"))
print(f"📋 用户完成的任务数量: {len(saved_memories)}")

for memory in saved_memories:
    task_data = memory.value
    print(f"✅ {task_data['task_type']}: {task_data['task_data']}")

=== 情境化任务完成系统演示 ===

--- 航班预订演示 ---

👤 用户: 我想预订一张机票
🔄 处理任务 - 当前状态: step=0, type=None
📋 检测到任务类型: flight_booking
🤖 助手: 我来帮您预订航班。请告诉我您的出发城市：
📊 当前步骤: 0
🏁 任务完成: False

👤 用户: 我从北京出发
🔄 处理任务 - 当前状态: step=0, type=flight_booking
🎯 处理用户输入: 我从北京出发 -> 提取字段: departure_city
✅ 成功提取 departure_city: 北京
🤖 助手: 已记录您的departure_city：北京。请告诉我您的目的地城市：
📊 当前步骤: 1
🏁 任务完成: False
📋 已收集数据: {'departure_city': '北京'}

👤 用户: 我要去上海
🔄 处理任务 - 当前状态: step=1, type=flight_booking
🎯 处理用户输入: 我要去上海 -> 提取字段: arrival_city
✅ 成功提取 arrival_city: 上海
🤖 助手: 已记录您的arrival_city：上海。请提供出发日期（格式：YYYY-MM-DD）：
📊 当前步骤: 2
🏁 任务完成: False
📋 已收集数据: {'departure_city': '北京', 'arrival_city': '上海'}

👤 用户: 2024-03-15
🔄 处理任务 - 当前状态: step=2, type=flight_booking
🎯 处理用户输入: 2024-03-15 -> 提取字段: departure_date
✅ 成功提取 departure_date: 2024-03-15
🤖 助手: 已记录您的departure_date：2024-03-15。请告诉我乘客人数：
📊 当前步骤: 3
🏁 任务完成: False
📋 已收集数据: {'departure_city': '北京', 'arrival_city': '上海', 'departure_date': '2024-03-15'}

👤 用户: 1人
🔄 处理任务 - 当前状态: step=3, type=flight_booking
🎯 处理用户输入: 1人 

### 5.3.3 TrustCall 信息提取模拟

TrustCall 是一个开源库，旨在简化 AI 智能体的基于模式的信息提取和更新。本节演示如何使用结构化方法提取和管理记忆信息。

##### 示例 5-14：构建一个 TrustCall 可用的数据结构

定义用于信息提取的结构化数据模型：

In [2]:
from pydantic import BaseModel, Field
from typing import List

class UserProfile(BaseModel):
    """具有类型化字段的用户资料模式"""
    user_name: str = Field(description="用户的首选姓名")
    interests: List[str] = Field(description="用户兴趣列表")

##### 示例 5-15：使用 TrustCall 进行基本信息提取

In [3]:
from trustcall import create_extractor
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="Qwen/Qwen2.5-7B-Instruct", temperature=0) # 初始化 LLM
trustcall_extractor = create_extractor(
    model,
    tools=[UserProfile], # 将我们的 Pydantic 模式作为工具传递
    tool_choice="UserProfile" # 强制 TrustCall 使用 UserProfile 工具进行输出
)

##### 示例 5-16：访问提取的结构化输出

In [4]:
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

conversation = [
    HumanMessage(content="嗨，我是 Alice。"),
    AIMessage(content="很高兴认识你，Alice！"),
    HumanMessage(content="我的爱好包括远足和阅读科幻小说。")
]

instruction_prompt = "从以下对话中提取用户资料。"

result = trustcall_extractor.invoke({"messages": [SystemMessage(content=instruction_prompt)] + conversation})

extracted_profile = result["responses"][0] # 访问提取的 UserProfile 对象
print(extracted_profile)

user_name='Alice' interests=['远足', '阅读科幻小说']


**💡 结构化提取核心价值**：

- **模式强制执行**：确保提取的数据严格遵循定义的结构
- **智能提取**：利用模式匹配和规则进行复杂的信息提取
- **增量更新**：支持基于新信息更新现有数据结构
- **可靠性提升**：减少手工解析带来的错误和不一致性

## 5.4 LangMem

在系统阐述 AI 智能体记忆理论基础，并全面解析 LangGraph 记忆存储架构后，本节将详细介绍如何运用 LangChain 团队最新推出的 LangMem 工具库，将先进的记忆管理功能无缝集成至 LangGraph 智能体。

##### 示例 5-17：使用记忆工具配置 ReAct 智能体

In [5]:
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStore
from langmem import create_manage_memory_tool, create_search_memory_tool

# 创建具有记忆功能的智能体
agent = create_react_agent(
    "openai:Qwen/Qwen3-8B", # 选择您的 LLM
    tools=[
        # 为智能体配备工具以在“热路径”中管理自己的记忆
        create_manage_memory_tool(namespace=("memories",)), # 用于创建、更新、删除记忆的工具
        create_search_memory_tool(namespace=("memories",)), # 用于搜索现有记忆的工具
    ],
    store=InMemoryStore(), # 提供记忆存储以实现持久性
)

# 执行示例：使用智能体进行简单对话
response = agent.invoke({"messages": [HumanMessage(content="请记住我喜欢编程。")]})
print("智能体响应:", response["messages"][-1].content)

# 检索记忆以验证存储
search_result = agent.invoke({"messages": [HumanMessage(content="回忆一下我喜欢什么吗？")]})
print("记忆检索结果:", search_result["messages"][-1].content)

智能体响应: 

我已经记住了您的喜好，您喜欢编程。如果之后还有其他需要记录的信息，随时告诉我！
记忆检索结果: 

根据之前的记录，您提到过喜欢编程。需要我帮您做些什么与编程相关的吗？或者您希望更新/删除这个偏好信息？


##### 示例 5-18：实现提示优化

In [8]:
from langmem import create_prompt_optimizer

optimizer = create_prompt_optimizer(
    "openai:Qwen/Qwen2.5-7B-Instruct", # 选择您的 LLM 以进行优化
    kind="prompt_memory", # 选择成功案例优化策略
    config={"max_reflection_steps": 5, "min_reflection_steps": 1}, # 配置优化行为
)

# 带有反馈的示例对话轨迹
trajectories = [
    # 没有注释的对话（仅对话内容）
    (
        [
            {"role": "user", "content": "请告诉我Python的优点"},
            {"role": "assistant", "content": "Python是一种易于学习和使用的编程语言..."},
            {"role": "user", "content": "能详细说说它的库支持吗？"},
        ],
        None,
    ),
    # 带有反馈的对话
    (
        [
            {"role": "user", "content": "Python有哪些流行的库？"},
            {
                "role": "assistant",
                "content": "Python有许多流行的库，如NumPy、Pandas、TensorFlow等...",
            },
        ],
        {
            "score": 0.8,
            "comment": "可以增加库的应用场景和更多细节",
        },
    ),
    # 注释可以是不同类型的，例如编辑/修订！
    (
        [
            {"role": "user", "content": "Python和Java相比如何？"},
            {"role": "assistant", "content": "Python和Java在语法和应用领域上有许多不同..."},
        ],
        {"revised": "Python和Java在语法、性能和应用领域上各有优劣..."},
    ),
]

# 定义写作助手的初始系统提示
initial_prompt = "您是一位乐于助人的写作助手"

# 调用优化器以根据训练数据改进初始提示
optimized_prompt = optimizer.invoke(
    {"trajectories": trajectories, "prompt": initial_prompt} # 提供轨迹和要优化的初始提示
)

print(optimized_prompt) # 输出优化的提示

您是一位乐于助人的写作助手。请确保您的回答详细且包含用户可能感兴趣的相关信息。如果可能，提供具体的例子或应用场景来增强回答的质量。


## 📚 本章总结

通过本章的学习，我们深入掌握了 AI 智能体记忆系统的核心理念和实践技术。首先系统阐述了短期记忆与长期记忆的区别与应用，理解了如何通过对话历史管理、截断策略和智能摘要来优化短期记忆，同时学会利用 LangGraph 记忆存储实现跨会话的长期知识积累。接着深入探索了记忆存储的核心操作，包括命名空间设计、语义搜索和向量嵌入技术，使智能体能够基于语义相似性进行智能检索。然后通过个性化推荐、多步骤任务完成等实际应用场景，展示了记忆系统如何显著提升智能体的功能表现，并学习了 TrustCall 等工具进行结构化信息提取。最后介绍了 LangMem 工具库的使用方法，包括记忆管理工具和提示优化功能。这些技术的结合使我们能够构建具有持续学习能力、个性化交互和智能进化特性的下一代 AI 智能体系统。