# 第15章：Persistence基础（Checkpointing）

## 学习目标

本章将学习：
1. LangGraph的持久化（Persistence）概念
2. Checkpointer原理和作用
3. InMemorySaver的使用
4. thread_id的概念和用法
5. 短期记忆（Thread-level Persistence）
6. 状态检查和更新
7. 实战：构建带记忆的对话Agent

---

## 为什么需要Persistence？

### 无状态Agent的局限

默认情况下，Agent的每次调用都是**独立的、无状态的**：

```python
agent.invoke({"messages": [...]})  # 第1次调用
agent.invoke({"messages": [...]})  # 第2次调用（不记得第1次）
```

**问题**：
- ❌ Agent无法记住之前的对话
- ❌ 用户必须每次提供完整上下文
- ❌ 无法实现多轮对话
- ❌ 无法实现Human-in-the-loop

### Persistence解决方案

通过**Checkpointer**持久化Agent的状态：

```python
agent = create_agent(
    model=model,
    tools=[...],
    checkpointer=InMemorySaver()  # 启用持久化
)

# 使用thread_id标识会话
config = {"configurable": {"thread_id": "conversation-1"}}

agent.invoke({"messages": [...]}, config)  # 第1次
agent.invoke({"messages": [...]}, config)  # 第2次（记得第1次）
```

**优势**：
- ✅ Agent记住对话历史
- ✅ 支持多轮对话
- ✅ 可以暂停和恢复对话
- ✅ 支持Human-in-the-loop
- ✅ 实现故障恢复

---

## 核心概念

### 1. Checkpointer

**定义**：Checkpointer是LangGraph的持久化层，在每个执行步骤（super-step）自动保存状态快照。

**工作原理**：
```
User Input
    ↓
Agent执行 Step 1 → [Checkpoint 1保存]
    ↓
Agent执行 Step 2 → [Checkpoint 2保存]
    ↓
Agent执行 Step 3 → [Checkpoint 3保存]
    ↓
Final Response
```

### 2. Thread（线程/会话）

**定义**：Thread是一个唯一的会话标识符（thread_id），用于组织和隔离不同的对话。

**类比**：就像邮件中的"对话线程"，将相关的消息组织在一起。

```python
# Thread 1: 与用户A的对话
config_a = {"configurable": {"thread_id": "user-a-chat"}}

# Thread 2: 与用户B的对话
config_b = {"configurable": {"thread_id": "user-b-chat"}}

# 两个Thread的状态完全独立
```

### 3. 短期记忆 vs 长期记忆

| 类型 | 作用域 | 实现方式 | 用途 |
|------|--------|---------|------|
| **短期记忆** | Thread内 | Checkpointer | 单次对话的历史 |
| **长期记忆** | 跨Thread | Store | 用户偏好、知识库 |

**本章重点：短期记忆（Thread-level Persistence）**

In [1]:
# 环境配置
import os
import sys

_project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(_project_root)

from config import config
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver

# 初始化模型
model = ChatOpenAI(
    model="gpt-4.1-mini",
    temperature=0,
    api_key=config.CLOUD_API_KEY,
    base_url=config.CLOUD_BASE_URL,
)

print("环境配置完成！")
print(f"模型: {model.model_name}")

环境配置完成！
模型: gpt-4.1-mini


## 1. 基础使用：添加短期记忆

### 对比：无状态 vs 有状态

In [2]:
print("【演示1：无状态Agent】")
print()

# 创建无状态Agent（没有checkpointer）
agent_stateless = create_agent(
    model=model,
    tools=[],
    system_prompt="你是一个友好的助手。"
)

# 第1轮对话
response1 = agent_stateless.invoke({
    "messages": [{"role": "user", "content": "你好，我叫Alice"}]
})
print(f"用户: 你好，我叫Alice")
print(f"Agent: {response1['messages'][-1].content}")
print()

# 第2轮对话
response2 = agent_stateless.invoke({
    "messages": [{"role": "user", "content": "我叫什么名字？"}]
})
print(f"用户: 我叫什么名字？")
print(f"Agent: {response2['messages'][-1].content}")
print()
print("问题: Agent不记得用户名字（无状态）")

【演示1：无状态Agent】

用户: 你好，我叫Alice
Agent: 你好，Alice！很高兴认识你。有什么我可以帮忙的吗？

用户: 我叫什么名字？
Agent: 你好！你没有告诉我你的名字，所以我不知道你叫什么名字。你愿意告诉我吗？

问题: Agent不记得用户名字（无状态）


In [3]:
print("【演示2：有状态Agent】")
print()

# 创建有状态Agent（使用InMemorySaver）
checkpointer = InMemorySaver()  # 创建Checkpointer

agent_stateful = create_agent(
    model=model,
    tools=[],
    system_prompt="你是一个友好的助手。",
    checkpointer=checkpointer  # 启用持久化
)

# 定义thread_id
config = {"configurable": {"thread_id": "conversation-alice"}}

# 第1轮对话
response1 = agent_stateful.invoke(
    {"messages": [{"role": "user", "content": "你好，我叫Alice"}]},
    config  # 传入config
)
print(f"用户: 你好，我叫Alice")
print(f"Agent: {response1['messages'][-1].content}")
print()

# 第2轮对话（使用相同的thread_id）
response2 = agent_stateful.invoke(
    {"messages": [{"role": "user", "content": "我叫什么名字？"}]},
    config  # 相同的thread_id
)
print(f"用户: 我叫什么名字？")
print(f"Agent: {response2['messages'][-1].content}")
print()
print("✓ Agent记住了用户名字（有状态）")

【演示2：有状态Agent】

用户: 你好，我叫Alice
Agent: 你好，Alice！很高兴认识你。有什么我可以帮忙的吗？

用户: 我叫什么名字？
Agent: 你叫Alice。有什么我可以帮你的吗，Alice？

✓ Agent记住了用户名字（有状态）


### 关键要点

1. **创建Checkpointer**：`InMemorySaver()`
2. **传给Agent**：`create_agent(..., checkpointer=checkpointer)`
3. **指定thread_id**：`{"configurable": {"thread_id": "xxx"}}`
4. **每次调用都传config**：确保状态持久化到正确的Thread

---

## 2. Thread隔离：多用户场景

In [4]:
print("【演示：Thread隔离】")
print()
print("同一个Agent，不同的thread_id，状态完全独立")
print()

# Thread 1: Alice
config_alice = {"configurable": {"thread_id": "user-alice"}}

response = agent_stateful.invoke(
    {"messages": [{"role": "user", "content": "我叫Alice，我喜欢Python"}]},
    config_alice
)
print("[Thread: user-alice]")
print(f"用户: 我叫Alice，我喜欢Python")
print(f"Agent: {response['messages'][-1].content}")
print()

# Thread 2: Bob
config_bob = {"configurable": {"thread_id": "user-bob"}}

response = agent_stateful.invoke(
    {"messages": [{"role": "user", "content": "我叫Bob，我喜欢JavaScript"}]},
    config_bob
)
print("[Thread: user-bob]")
print(f"用户: 我叫Bob，我喜欢JavaScript")
print(f"Agent: {response['messages'][-1].content}")
print()

# Alice询问自己的信息
response = agent_stateful.invoke(
    {"messages": [{"role": "user", "content": "我叫什么名字？我喜欢什么？"}]},
    config_alice  # 使用Alice的thread
)
print("[Thread: user-alice]")
print(f"用户: 我叫什么名字？我喜欢什么？")
print(f"Agent: {response['messages'][-1].content}")
print()

# Bob询问自己的信息
response = agent_stateful.invoke(
    {"messages": [{"role": "user", "content": "我叫什么名字？我喜欢什么？"}]},
    config_bob  # 使用Bob的thread
)
print("[Thread: user-bob]")
print(f"用户: 我叫什么名字？我喜欢什么？")
print(f"Agent: {response['messages'][-1].content}")
print()
print("✓ 不同Thread的状态完全隔离")

【演示：Thread隔离】

同一个Agent，不同的thread_id，状态完全独立

[Thread: user-alice]
用户: 我叫Alice，我喜欢Python
Agent: 你好，Alice！很高兴认识你。你喜欢Python，真棒！你是喜欢用Python做什么呢？编程、数据分析还是其他方面？需要我帮你学习Python或者解决什么问题吗？

[Thread: user-bob]
用户: 我叫Bob，我喜欢JavaScript
Agent: 你好，Bob！很高兴认识你。你喜欢JavaScript，真棒！你是刚开始学习，还是已经有一些项目经验了呢？需要我帮你解答关于JavaScript的问题吗？

[Thread: user-alice]
用户: 我叫什么名字？我喜欢什么？
Agent: 你叫Alice，你喜欢Python。有什么我可以帮你的吗？

[Thread: user-bob]
用户: 我叫什么名字？我喜欢什么？
Agent: 你叫Bob，你喜欢JavaScript。有什么我可以帮你的吗？

✓ 不同Thread的状态完全隔离


## 3. 状态检查：查看和理解Checkpoints

### 查看Thread的当前状态

In [5]:
print("【查看Thread状态】")
print()

# get_state()返回Thread的当前状态
state = agent_stateful.get_state(config_alice)

print("State对象包含：")
print(f"  - values: {type(state.values)}")
print(f"  - messages数量: {len(state.values.get('messages', []))}")
print()

# 显示对话历史
print("对话历史：")
for i, msg in enumerate(state.values.get('messages', []), 1):
    role = msg.__class__.__name__
    content = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content
    print(f"  {i}. [{role}] {content}")

print()
print("说明: get_state()可以查看Thread的完整状态，包括对话历史")

【查看Thread状态】

State对象包含：
  - values: <class 'dict'>
  - messages数量: 4

对话历史：
  1. [HumanMessage] 我叫Alice，我喜欢Python
  2. [AIMessage] 你好，Alice！很高兴认识你。你喜欢Python，真棒！你是喜欢用Python做什么呢？编程、数据...
  3. [HumanMessage] 我叫什么名字？我喜欢什么？
  4. [AIMessage] 你叫Alice，你喜欢Python。有什么我可以帮你的吗？

说明: get_state()可以查看Thread的完整状态，包括对话历史


### Checkpointer的工作机制

```
User: "我叫Alice"
    ↓
[Checkpoint 1] messages = [HumanMessage("我叫Alice")]
    ↓
Agent处理
    ↓
[Checkpoint 2] messages = [HumanMessage(...), AIMessage("你好Alice")]
    ↓
User: "我叫什么？"
    ↓
[Checkpoint 3] messages = [HumanMessage(...), AIMessage(...), HumanMessage("我叫什么？")]
    ↓
Agent查看历史 → 找到"我叫Alice" → 回答"你叫Alice"
```

**关键**：每次Agent执行都会自动保存状态到Checkpointer，下次invoke时自动加载。

---

## 4. 带工具的有状态Agent

In [6]:
print("【演示：带工具的有状态Agent】")
print()

# 定义工具
@tool
def add_to_cart(item: str) -> str:
    """将商品添加到购物车"""
    return f"已添加 {item} 到购物车"

@tool
def remove_from_cart(item: str) -> str:
    """从购物车移除商品"""
    return f"已从购物车移除 {item}"

@tool
def view_cart() -> str:
    """查看购物车（简化版）"""
    return "购物车当前为空（简化演示）"

# 创建有状态的购物助手
shopping_agent = create_agent(
    model=model,
    tools=[add_to_cart, remove_from_cart, view_cart],
    checkpointer=InMemorySaver(),
    system_prompt="""你是购物助手。帮助用户管理购物车。
    
记住用户的购物历史和偏好。"""
)

# 模拟一系列对话
shopping_thread = {"configurable": {"thread_id": "shopping-session-1"}}

queries = [
    "我想买一本Python书",
    "再加一个鼠标",
    "我刚才买了什么？"  # Agent需要记住之前的对话
]

for i, query in enumerate(queries, 1):
    print(f"\n【第{i}轮对话】")
    print(f"用户: {query}")
    
    response = shopping_agent.invoke(
        {"messages": [{"role": "user", "content": query}]},
        shopping_thread
    )
    
    print(f"Agent: {response['messages'][-1].content}")

print()
print("✓ Agent记住了整个购物过程")

【演示：带工具的有状态Agent】


【第1轮对话】
用户: 我想买一本Python书
Agent: 我已经帮你把一本Python书添加到购物车了。你还需要其他商品吗？

【第2轮对话】
用户: 再加一个鼠标
Agent: 鼠标也已经添加到购物车了。你还想买别的吗？

【第3轮对话】
用户: 我刚才买了什么？
Agent: 抱歉，购物车当前显示为空，可能是系统暂时没有更新。根据之前的操作，你添加了Python书和鼠标到购物车。你还需要我帮你确认或添加其他商品吗？

✓ Agent记住了整个购物过程


### 多轮对话的价值

有了Persistence，Agent可以：
1. **理解上下文**："再加一个鼠标" → Agent知道是添加到购物车
2. **回答历史问题**："我刚才买了什么？" → Agent查看对话历史
3. **保持连贯性**：整个对话过程自然流畅
4. **记住用户偏好**：如果用户说"我喜欢黑色"，后续推荐会考虑

---

## 5. 实战项目：智能客服系统

In [7]:
print("【实战：智能客服系统】")
print()

# 模拟的订单数据库
ORDER_DB = {
    "ORDER001": {"status": "已发货", "items": ["iPhone 15", "AirPods"], "total": 8999},
    "ORDER002": {"status": "配送中", "items": ["MacBook Pro"], "total": 12999}
}

# 定义客服工具
@tool
def query_order(order_id: str) -> str:
    """查询订单状态"""
    order = ORDER_DB.get(order_id)
    if not order:
        return f"未找到订单 {order_id}"
    return f"订单{order_id}：{order['status']}，包含 {', '.join(order['items'])}，总价 ¥{order['total']}"

@tool
def cancel_order(order_id: str) -> str:
    """取消订单"""
    if order_id in ORDER_DB:
        return f"订单 {order_id} 已提交取消申请"
    return f"未找到订单 {order_id}"

@tool
def submit_refund(order_id: str, reason: str) -> str:
    """提交退款申请"""
    return f"已提交订单 {order_id} 的退款申请，原因：{reason}"

# 创建客服Agent
customer_service = create_agent(
    model=model,
    tools=[query_order, cancel_order, submit_refund],
    checkpointer=InMemorySaver(),
    system_prompt="""你是友好的客服助手。
    
职责：
1. 查询订单状态
2. 处理取消和退款请求
3. 记住客户的问题和需求
4. 提供耐心、专业的服务

注意：
- 记住客户之前提到的信息（如订单号）
- 如果客户没有提供必要信息，主动询问
- 保持礼貌和专业"""
)

print("✓ 智能客服系统创建完成")

【实战：智能客服系统】

✓ 智能客服系统创建完成


### 测试场景：完整的客服对话

In [8]:
print("【模拟客服对话】")
print("="*70)

# 客户会话
customer_thread = {"configurable": {"thread_id": "customer-zhang-san"}}

# 模拟多轮对话
conversation = [
    "你好，我想查询我的订单",
    "订单号是ORDER001",
    "什么时候能到？",
    "我想取消这个订单"
]

for i, user_input in enumerate(conversation, 1):
    print(f"\n【第{i}轮】")
    print(f"客户: {user_input}")
    
    response = customer_service.invoke(
        {"messages": [{"role": "user", "content": user_input}]},
        customer_thread
    )
    
    print(f"客服: {response['messages'][-1].content}")

print()
print("="*70)
print("【对话分析】")
print("1. 第2轮：客户提供了订单号")
print("2. 第3轮：客户询问'什么时候能到？'，Agent记住了ORDER001")
print("3. 第4轮：客户说'这个订单'，Agent知道是ORDER001")
print("\n✓ Persistence让对话自然流畅，无需重复信息")

【模拟客服对话】

【第1轮】
客户: 你好，我想查询我的订单
客服: 您好！请问您能提供一下您的订单号吗？我帮您查询订单状态。

【第2轮】
客户: 订单号是ORDER001
客服: 您的订单ORDER001已经发货，包含iPhone 15和AirPods，总价为¥8999。请问您还需要了解其他信息吗？或者有其他需要帮忙的地方？

【第3轮】
客户: 什么时候能到？
客服: 您的订单ORDER001已经发货，通常发货后1-3个工作日内送达。具体的送达时间可能会根据您的收货地址有所不同。您方便告诉我您的收货地址吗？我帮您确认更准确的预计送达时间。

【第4轮】
客户: 我想取消这个订单
客服: 您的订单ORDER001已经提交取消申请。请问您还需要我帮您提交退款申请吗？

【对话分析】
1. 第2轮：客户提供了订单号
2. 第3轮：客户询问'什么时候能到？'，Agent记住了ORDER001
3. 第4轮：客户说'这个订单'，Agent知道是ORDER001

✓ Persistence让对话自然流畅，无需重复信息


## 6. InMemorySaver的局限与生产环境方案

### InMemorySaver的特点

**优点**：
- ✅ 简单易用，无需配置
- ✅ 适合开发和测试
- ✅ 性能快（内存操作）

**局限**：
- ❌ 数据存在内存中，进程重启后丢失
- ❌ 无法跨进程共享
- ❌ 不适合生产环境

### 生产环境Checkpointer

对于生产环境，使用数据库支持的Checkpointer：

```python
# PostgreSQL Checkpointer
from langgraph.checkpoint.postgres import PostgresSaver

DB_URI = "postgresql://user:pass@localhost:5432/dbname"
checkpointer = PostgresSaver.from_conn_string(DB_URI)

agent = create_agent(
    model=model,
    tools=[...],
    checkpointer=checkpointer
)
```

其他选项：
- `SQLiteSaver`：适合小型应用
- `RedisSaver`：适合高性能场景
- 自定义Checkpointer：实现`BaseCheckpointSaver`接口

---

## 7. 核心概念总结

### Persistence的工作流程

```python
# 1. 创建Checkpointer
checkpointer = InMemorySaver()

# 2. 创建Agent并传入Checkpointer
agent = create_agent(
    model=model,
    tools=[...],
    checkpointer=checkpointer  # 关键
)

# 3. 调用时指定thread_id
config = {"configurable": {"thread_id": "unique-id"}}
response = agent.invoke({"messages": [...]}, config)

# 4. 状态自动保存和加载
# - 每次invoke前，Checkpointer自动加载该Thread的历史状态
# - 每个执行步骤后，Checkpointer自动保存新状态
```

---

## 8. 常见问题

### Q1: 必须每次都传config吗？

**是的**。如果不传`config`，Agent将无法加载历史状态，每次调用都是独立的。

```python
# ❌ 错误：没有传config
agent.invoke({"messages": [...]})

# ✅ 正确：传入config
agent.invoke({"messages": [...]}, {"configurable": {"thread_id": "xxx"}})
```

### Q2: thread_id可以是任意字符串吗？

**是的**。但建议使用有意义的ID：
- `user-{user_id}`：按用户隔离
- `session-{session_id}`：按会话隔离
- `{user_id}-{timestamp}`：按用户+时间隔离

### Q3: 如何清理旧的Thread？

对于`InMemorySaver`，进程重启后自动清理。

对于数据库Checkpointer，需要定期清理：
```python
# 删除指定Thread
checkpointer.delete_thread(thread_id)

# 或在数据库层面设置TTL（Time-To-Live）
```

### Q4: Checkpointer会保存什么？

默认保存`AgentState`中的所有字段，包括：
- `messages`：对话历史（最常用）
- 自定义状态字段（如果扩展了AgentState）

### Q5: 对话历史太长怎么办？

长对话会导致Token消耗过多和性能下降。解决方案：
1. **Trim消息**：只保留最近N条消息
2. **总结历史**：用LLM总结早期对话
3. **混合策略**：保留系统消息+最近消息+总结

（详见第17章：消息管理）

---

## 9. 总结与最佳实践

### 核心要点

#### Persistence三要素

1. **Checkpointer**：持久化层，自动保存和加载状态
2. **Thread_id**：会话标识符，隔离不同对话
3. **Config**：每次invoke时传入，指定使用哪个Thread

#### 短期记忆（Thread-level）

- **作用域**：单个Thread内
- **内容**：对话历史 + AgentState
- **生命周期**：Thread存在期间
- **用途**：多轮对话、上下文理解

---

### 最佳实践

#### 1. 合理设计thread_id

```python
# ✅ 好的设计
thread_id = f"user-{user_id}-session-{session_id}"
thread_id = f"{user_id}-{datetime.now().strftime('%Y%m%d')}"

# ❌ 不好的设计
thread_id = "global"  # 所有用户共享，会混乱
thread_id = str(uuid.uuid4())  # 每次都新建，无法追踪
```

#### 2. 开发用InMemorySaver，生产用数据库Checkpointer

```python
import os

if os.getenv("ENV") == "production":
    checkpointer = PostgresSaver.from_conn_string(DB_URI)
else:
    checkpointer = InMemorySaver()
```

#### 3. 为不同场景使用不同Thread

```python
# 场景1：客服对话（按会话隔离）
thread_id = f"support-{ticket_id}"

# 场景2：个人助手（按用户+日期隔离）
thread_id = f"assistant-{user_id}-{date}"

# 场景3：临时咨询（随机ID，用后即弃）
thread_id = f"temp-{uuid.uuid4()}"
```

#### 4. 监控Thread数量和大小

```python
# 定期检查
state = agent.get_state(config)
message_count = len(state.values.get('messages', []))

if message_count > 50:
    print(f"警告: Thread {thread_id} 消息数过多")
    # 考虑总结或trim
```

#### 5. 错误处理

```python
try:
    response = agent.invoke(
        {"messages": [...]},
        {"configurable": {"thread_id": thread_id}}
    )
except Exception as e:
    # 记录错误，但不影响用户
    logger.error(f"Agent调用失败: {e}")
    # 可以尝试用新Thread重试
```

---

### 与其他概念的关系

| 概念 | 作用域 | 实现 | 关系 |
|------|--------|------|------|
| **Checkpointer** | 持久化层 | InMemorySaver等 | 底层实现 |
| **Thread** | 会话隔离 | thread_id | 逻辑概念 |
| **短期记忆** | Thread内 | Checkpointer | 应用层面 |
| **长期记忆** | 跨Thread | Store（下章） | 补充功能 |
| **消息管理** | Thread内 | Trim/Summarize | 优化策略 |

---

## 下一步学习

完成Persistence基础后，建议学习：

1. **第16章：Cross-Thread Memory（Store）** 
   - 跨Thread的长期记忆
   - 用户偏好持久化
   - InMemoryStore使用

2. **第17章：消息管理**
   - Trim策略：移除旧消息
   - Summarize策略：总结历史
   - 混合策略

3. **LangGraph深入**
   - Human-in-the-loop（基于Checkpointer）
   - Time Travel：回溯到历史状态
   - 自定义State管理

---

## 关键收获

✅ **Persistence让Agent有了记忆**，实现真正的多轮对话

✅ **Checkpointer自动保存和加载**，开发者无需手动管理状态

✅ **Thread_id隔离不同会话**，支持多用户并发

✅ **InMemorySaver适合开发**，生产环境需用数据库Checkpointer

✅ **这是LangChain Agent的标准能力**，基于LangGraph实现