In [None]:
from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

In [None]:
# 这是一个state schema
class State(TypedDict):
    # Messages have the type "list". The `add_messages` function
    # in the annotation defines how this state key should be updated
    # (in this case, it appends messages to the list, rather than overwriting them)
    # add_message是一个reducer, 未指定reducer默认采用覆盖的策略
    # https://langchain-ai.github.io/langgraph/concepts/low_level/#default-reducer
    messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

In [None]:
import os
from langchain.chat_models import init_chat_model
from dotenv import load_dotenv

# 加载 .env 文件
load_dotenv()

llm = init_chat_model("openai:gpt-4.1")

In [None]:
# 这是一个node，接受一个state作为参数，将聊天模型合并到一个简单的节点中
# State 中的 add_messages 函数将把 LLM 的响应消息附加到状态中已有的消息list中
def chatbot(state: State):
    return {"messages": [llm.invoke(state["messages"])]}


# The first argument is the unique node name
# The second argument is the function or object that will be called whenever
# the node is used.
graph_builder.add_node("chatbot", chatbot)

In [None]:
# 添加一个 entry 点来告诉图表每次运行时从哪里开始工作
graph_builder.add_edge(START, "chatbot")
# 添加 exit 点，指示图表应在何处结束执行
graph_builder.add_edge("chatbot", END)

In [None]:
# 编译graph，在运行图表之前，我们需要编译它。这将创建一个 CompiledGraph ，我们可以用state调用它
graph = graph_builder.compile()

In [None]:
# 可视化图表
# 以使用 get_graph 方法和其中一种“draw”方法，例如 draw_ascii 或 draw_png 来可视化图形。每种 draw 方法都需要额外的依赖项
# from IPython.display import Image, display
# display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
# 运行聊天机器人
def stream_graph_updates(user_input: str):
    for event in graph.stream(
            {
                "messages": [
                    {"role": "user", "content": user_input}
                ]
            }
        ):
        for value in event.values():
            print("Assistant:", value["messages"][-1].content)


while True:
    try:
        user_input = input("User: ")
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break
        stream_graph_updates(user_input)
    except:
        # fallback if input() is not available
        user_input = "What do you know about LangGraph?"
        print("User: " + user_input)
        stream_graph_updates(user_input)
        break

In [None]:
# 让我们创建一个更详细的版本来演示stream的工作过程
def detailed_stream_demo(user_input: str):
    print(f"🔹 用户输入: {user_input}")
    print("🔹 开始流式执行图...")

    # 构造输入数据
    input_data = {
        "messages": [
            {"role": "user", "content": user_input}
        ]
    }
    print(f"🔹 输入数据结构: {input_data}")

    # 开始流式执行
    # event是一个字典，key为节点的name，value是一个message list
    for i, event in enumerate(graph.stream(input_data), 1):
        print(f"\n📦 事件 {i}:")
        print(f"   事件类型: {type(event)}")
        print(f"   事件键: {list(event.keys())}")

        # 遍历事件中的每个节点输出
        for node_name, node_output in event.items():
            print(f"   📝 节点 '{node_name}' 的输出:")
            print(f"      消息总数: {len(node_output['messages'])}")

            # 遍历message list 中的所有message对象（不是字典），因此只能使用.xx来获得xx属性
            for j, msg in enumerate(node_output['messages']):
                print(f"      消息 {j + 1}: {msg.type} -> {msg.content[:50]}...")

            # 获取最新的AI回复
            if node_output['messages']:
                latest_msg = node_output['messages'][-1]
                if latest_msg.type == 'ai':
                    print(f"   🤖 AI回复: {latest_msg.content}")


# 测试一下这个详细版本
print("=" * 60)
print("详细流式执行演示")
print("=" * 60)
detailed_stream_demo("你好，请介绍一下自己")

In [None]:
'''
============================================================
详细流式执行演示
============================================================
🔹 用户输入: 你好，请介绍一下自己
🔹 开始流式执行图...
🔹 输入数据结构: {'messages': [{'role': 'user', 'content': '你好，请介绍一下自己'}]}

📦 事件 1:
   事件类型: <class 'dict'>
   事件键: ['chatbot']
   📝 节点 'chatbot' 的输出:
      消息总数: 1
      消息 1: ai -> 你好！我是 ChatGPT，一款由 OpenAI 开发的人工智能语言模型，基于 GPT-4 架构。我...
   🤖 AI回复: 你好！我是 ChatGPT，一款由 OpenAI 开发的人工智能语言模型，基于 GPT-4 架构。我的主要功能是通过自然语言与用户进行交流，并根据提供的信息或问题，给出尽量准确和有帮助的回答。我可以帮助你处理各种任务，比如查找信息、撰写文本、翻译语言、解答疑难、提供学习建议、甚至闲聊娱乐。

无论你有什么问题或需求，都可以随时问我哦！很高兴认识你~
'''

In [None]:
# 让我们演示不同消息类型的结构
def demonstrate_message_types():
    print("🔍 消息类型演示:")
    
    # 创建输入数据
    input_data = {"messages": [{"role": "user", "content": "简短回复测试"}]}
    
    # 执行一次来获取结果
    for event in graph.stream(input_data):
        for node_name, node_output in event.items():
            messages = node_output['messages']
            
            print(f"\n📋 节点 '{node_name}' 中的消息类型分析:")
            
            for i, msg in enumerate(messages):
                print(f"\n  消息 {i+1}:")
                print(f"    类型: {type(msg).__name__}")
                print(f"    是否为字典: {isinstance(msg, dict)}")
                print(f"    有 'role' 属性: {hasattr(msg, 'role')}")
                print(f"    有 'content' 属性: {hasattr(msg, 'content')}")
                
                # 安全地获取内容
                if isinstance(msg, dict):
                    role = msg.get('role', 'unknown')
                    content = msg.get('content', 'no content')
                    print(f"    内容 (字典方式): {role} -> {content[:30]}...")
                elif hasattr(msg, 'content'):
                    role = getattr(msg, 'role', 'AI')  # AIMessage 通常没有 role 属性
                    content = msg.content
                    print(f"    内容 (属性方式): {role if role else 'AI'} -> {content[:30]}...")
                
                # 显示对象的所有属性 (前几个)
                attrs = [attr for attr in dir(msg) if not attr.startswith('_')][:5]
                print(f"    主要属性: {attrs}")
            
            break  # 只看第一个节点
        break  # 只看第一个事件

# 运行演示
print("=" * 60)
print("消息类型结构分析")
print("=" * 60)
demonstrate_message_types()

In [None]:
# 让我们验证一下 event 的 key 数量
def analyze_event_keys(user_input: str):
    print(f"🔍 分析输入 '{user_input}' 的事件结构:")
    
    for i, event in enumerate(graph.stream({"messages": [{"role": "user", "content": user_input}]}), 1):
        keys = list(event.keys())
        print(f"\n事件 {i}:")
        print(f"  - Key 数量: {len(keys)}")
        print(f"  - Key 列表: {keys}")
        print(f"  - 是否只有一个key: {len(keys) == 1}")
        
        # 验证key的内容
        for key in keys:
            print(f"  - Key '{key}' 对应的数据类型: {type(event[key])}")
            if 'messages' in event[key]:
                msg_count = len(event[key]['messages'])
                print(f"  - Key '{key}' 包含 {msg_count} 条消息")

# 测试几个不同的输入
test_inputs = ["测试1", "这是一个较长的测试输入"]

for test_input in test_inputs:
    analyze_event_keys(test_input)
    print("-" * 50)