# 案例一（stream）

In [None]:
from typing import TypedDict
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph
from langgraph.constants import START, END
from langchain_core.messages import AIMessage


# ======================================================
# 1. 定义 State（图的状态结构）
# ------------------------------------------------------
# 这里只有一个字段 count，节点会把 count + 1。
# ======================================================
class State(TypedDict):
    count: int


# ======================================================
# 2. 定义一个简单节点 node1
# ------------------------------------------------------
# 每次执行 node1：
#   - count + 1
#   - 返回 {"count": 新值}
#
# 它也会输出一条 AIMessage，用于 messages 模式演示。
# ======================================================
def node1(state: State, config: RunnableConfig):
    new_count = state["count"] + 1
    msg = AIMessage(content=f"Node executed, count = {new_count}")
    return {
        "count": new_count,
        "messages": [msg]     # 用于 stream_mode="messages"
    }


# ======================================================
# 3. 构造图
# ------------------------------------------------------
# 流程：
#   START → node1 → END
# ======================================================
builder = StateGraph(State)
builder.add_node("node1", node1)
builder.add_edge(START, "node1")
builder.add_edge("node1", END)

graph = builder.compile()


# ======================================================
# 4. 演示四种 stream_mode
# ======================================================

print("\n===================== 1) stream_mode = 'values' =====================")
# 输出最终 state，不展示中间过程
for value in graph.stream({"count": 1}, stream_mode="values"):
    print(value)
# 输出示例： {'count': 2}


print("\n===================== 2) stream_mode = 'updates' =====================")
# 每次节点更新状态时输出增量（diff）
for update in graph.stream({"count": 1}, stream_mode="updates"):
    print(update)
# 输出示例：
# {'count': 2}


print("\n===================== 3) stream_mode = 'debug' =====================")
# 输出最详细调试信息，例如：
#   - 执行的节点名
#   - 输入状态
#   - 输出状态
#   - 路由信息
#   - 缓存命中情况
for info in graph.stream({"count": 1}, stream_mode="debug"):
    print(info)
# 输出类似：
# {
#   'node': 'node1',
#   'input': {'count': 1},
#   'output': {'count': 2, 'messages': [...]},
#   'metadata': {...}
# }


print("\n===================== 4) stream_mode = 'messages' =====================")
# 用于 LLM 消息流式输出
# node1 返回了 AIMessage，因此这里会 token-by-token 输出
for msg in graph.stream({"count": 1}, stream_mode="messages"):
    print(msg)
# 输出类似：
# AIMessageChunk(content="Node executed, count = 2")

# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))



{'count': 1}
{'count': 2}

{'node1': {'count': 2}}

{'step': 1, 'timestamp': '2025-11-29T06:57:33.935174+00:00', 'type': 'task', 'payload': {'id': 'fe5fafb5-c0ef-ef6f-9e5b-9d00c37b55f1', 'name': 'node1', 'input': {'count': 1}, 'triggers': ('branch:to:node1',)}}
{'step': 1, 'timestamp': '2025-11-29T06:57:33.935222+00:00', 'type': 'task_result', 'payload': {'id': 'fe5fafb5-c0ef-ef6f-9e5b-9d00c37b55f1', 'name': 'node1', 'error': None, 'result': {'count': 2}, 'interrupts': []}}

(AIMessage(content='Node executed, count = 2', additional_kwargs={}, response_metadata={}, id='2be20e4a-1a48-4e54-a62c-c24c9d33a316'), {'langgraph_step': 1, 'langgraph_node': 'node1', 'langgraph_triggers': ('branch:to:node1',), 'langgraph_path': ('__pregel_pull', 'node1'), 'langgraph_checkpoint_ns': 'node1:35e8c7e0-b45d-0642-c4b2-2d68740d2220'})


# 案例二（stream： custom）

In [None]:
#stream_mode="custom" 的含义
#自定义流式输出如何工作
#StateGraph 如何流式推送数据
#这是 LangGraph 自定义流模式（custom stream mode） 的最小可复现示例。

from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START


# ======================================================
# 1. 定义状态结构（State）
# ------------------------------------------------------
# query : 输入的查询
# answer: 节点执行后生成的回答
# State 是 LangGraph 中每个节点接受 / 返回的状态
# ======================================================
class State(TypedDict):
    query: str
    answer: str


# ======================================================
# 2. 定义一个节点 node
# ------------------------------------------------------
# 在 node 内部使用 get_stream_writer():
#   - 可以向外推送自定义的“流式消息”
#   - 不依赖 LLM，不依赖 AIMessage
#   - 你可以自己定义输出结构，例如 dict、string
#
# writer({"key": "value"}) 会在 stream_mode="custom" 时被捕获
# 并立即被发送给客户端（用户）
# ======================================================
def node(state: State):
    # 获取一个 writer，该 writer 只在 stream_mode="custom" 时生效
    writer = get_stream_writer()

    # 推送一条自定义流信息（类似“中间进度消息”）
    writer({"自定义key": "在节点内部自定义输出信息"})

    # 节点最终返回新的 State 增量
    return {"answer": "some data"}


# ======================================================
# 3. 构建 Graph
# ------------------------------------------------------
# 图结构：
#   START → node → END
# ======================================================
graph = (
    StateGraph(State)
    .add_node("node", node)
    .add_edge(START, "node")
    .compile()
)


# ======================================================
# 4. 输入数据
# ======================================================
inputs = {"query": "example"}


# ======================================================
# 5. 使用 stream_mode = "custom"
# ------------------------------------------------------
# 图执行时会有两类流输出：
#
# (1) writer(...) 推送的自定义 streaming 内容
#     → 会被作为 chunk 输出
#
# (2) 节点最终返回的 State 更新结果
#
# 你可以看到整个执行过程中的“实时信息”
# ======================================================
print("\n===== Streaming Output (custom) =====")
for chunk in graph.stream(inputs, stream_mode="custom"):
    print(chunk)

# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))



===== Streaming Output (custom) =====
{'自定义key': '在节点内部自定义输出信息'}


# 案例三（Stream：messages + LLMs）

In [None]:
from typing import TypedDict
from getpass import getpass
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START


# 1. 获取 API Key
OPENAI_API_KEY = getpass("请输入你的 OpenAI API Key： ")


# 2. 初始化 GPT-5-mini（流式输出）
llm = ChatOpenAI(
    model="gpt-5-mini",
    api_key=OPENAI_API_KEY,
    streaming=True
)


# 3. 定义节点
def call_model(state: MessagesState):
    response = llm.invoke(state["messages"])
    return {"messages": response}


# 4. 创建 LangGraph
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
graph = builder.compile()


# 5. 输入消息
inputs = {
    "messages": [{"role": "user", "content": "湖南的省会是哪里？"}]
}


# 6. 流式输出大模型 Token（重点）
print("\n=== Streaming tokens (gpt-5-mini) ===")
for chunk in graph.stream(inputs, stream_mode="messages"):
    print(chunk)

# 可视化图结构
from IPython.display import Image, display
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))


=== Streaming tokens (gpt-5-mini) ===
(AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run--058416bd-350f-4190-9916-8b2005548d29'), {'langgraph_step': 1, 'langgraph_node': 'call_model', 'langgraph_triggers': ('branch:to:call_model',), 'langgraph_path': ('__pregel_pull', 'call_model'), 'langgraph_checkpoint_ns': 'call_model:6232be95-0ef2-dad5-d8b4-3aa62875ca92', 'checkpoint_ns': 'call_model:6232be95-0ef2-dad5-d8b4-3aa62875ca92', 'ls_provider': 'openai', 'ls_model_name': 'gpt-5-mini', 'ls_model_type': 'chat', 'ls_temperature': None})
(AIMessageChunk(content='湖南', additional_kwargs={}, response_metadata={}, id='run--058416bd-350f-4190-9916-8b2005548d29'), {'langgraph_step': 1, 'langgraph_node': 'call_model', 'langgraph_triggers': ('branch:to:call_model',), 'langgraph_path': ('__pregel_pull', 'call_model'), 'langgraph_checkpoint_ns': 'call_model:6232be95-0ef2-dad5-d8b4-3aa62875ca92', 'checkpoint_ns': 'call_model:6232be95-0ef2-dad5-d8b4-3aa62875ca92', 'ls_provide

# 案例四（Checkpointer-短期记忆）

In [5]:
# ============================================================
# 0. 输入你的 OpenAI API Key（手动输入，不会显示）
# ============================================================
from getpass import getpass
OPENAI_API_KEY = getpass("请输入你的 OpenAI API Key：")


# ============================================================
# 1. 使用 OpenAI 的 gpt-5-mini 模型
# ------------------------------------------------------------
# ChatOpenAI 是 LangChain 对 OpenAI Chat Completions 的封装，
# 与 ChatTongyi 用法保持完全兼容。
# streaming=True 允许流式输出。
# ============================================================
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.memory import InMemorySaver


llm = ChatOpenAI(
    model="gpt-5-mini",           # ← 按你的要求：使用 gpt-5-mini
    api_key=OPENAI_API_KEY,       # ← 用户手动输入
    streaming=True                # ← 支持流式 Token（与原例一致）
)


# ============================================================
# 2. 定义一个 LangGraph 节点：调用大模型
# ------------------------------------------------------------
# 输入：MessagesState（包含历史对话 messages）
# 输出：MessagesState（新增模型回复）
#
# OpenAI 的 ChatGPT 模型和通义千问一样，都兼容 messages 数组格式。
# ============================================================
def call_model(state: MessagesState) -> MessagesState:
    response = llm.invoke(state["messages"])
    return {"messages": response}


# ============================================================
# 3. 定义并构建 Graph（状态结构为 MessagesState）
# ------------------------------------------------------------
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")


# ============================================================
# 4. 初始化 Checkpointer（状态保存器）
# ------------------------------------------------------------
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)


# ============================================================
# 5. 配置 config（向 Graph 注入 thread_id）
# ------------------------------------------------------------
config = {
    "configurable": {
        "thread_id": "1"   # 使用同一线程 ID，实现连续对话
    }
}


# ============================================================
# 6. 第一次调用：问“湖南的省会是哪里？”
# ------------------------------------------------------------
for chunk in graph.stream(
    {
        "messages": [
            {"role": "user", "content": "湖南的省会是哪里？"}
        ]
    },
    config,
    stream_mode="values",
):
    chunk["messages"][-1].pretty_print()


# ============================================================
# 7. 第二次调用：继续问“湖北呢？”
# ------------------------------------------------------------
for chunk in graph.stream(
    {
        "messages": [
            {"role": "user", "content": "湖北呢？"}
        ]
    },
    config,
    stream_mode="values",
):
    chunk["messages"][-1].pretty_print()



湖南的省会是哪里？

湖南省的省会是长沙（长沙市）。

湖北呢？

湖北省的省会是武汉（武汉市）。
