In [1]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
使用 LCEL (LangChain Expression Language) 实现对话记忆
展示多种在 LCEL 中管理对话历史的方法
"""

import os
from typing import Dict, List, Any, Optional
from datetime import datetime

# LangChain imports
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory, InMemoryChatMessageHistory
from langchain_core.output_parsers import StrOutputParser
from langchain_ollama import OllamaLLM

print("LCEL 对话记忆实现演示")
print("=" * 40)


LCEL 对话记忆实现演示


In [2]:
# ===================== 配置部分 =====================

OLLAMA_BASE_URL = "http://localhost:11434"
OLLAMA_MODEL = "gemma3:4b"

def create_llm():
    """创建LLM实例"""
    return OllamaLLM(
        base_url=OLLAMA_BASE_URL,
        model=OLLAMA_MODEL,
        temperature=0.7
    )

llm = create_llm()
print(f"使用模型: {OLLAMA_MODEL}")


使用模型: gemma3:4b


In [None]:
# ===================== 方法1: 手动管理对话历史 =====================

print("\n1. 手动管理对话历史")
print("-" * 30)

class ManualMemoryChain:
    """手动管理对话历史的链"""
    
    def __init__(self, llm):
        self.llm = llm
        self.history: List[BaseMessage] = []
        
        # 创建提示模板
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个友好的AI助手，能够记住对话历史。"),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}")
        ])
        
        # 创建链
        self.chain = self.prompt | self.llm | StrOutputParser()
    
    def invoke(self, user_input: str) -> str:
        """调用链并更新历史"""
        # 调用链
        response = self.chain.invoke({
            "history": self.history,
            "input": user_input
        })
        
        # 更新历史
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=response))
        
        return response
    
    def clear_history(self):
        """清空历史"""
        self.history = []
    
    def get_history(self) -> List[BaseMessage]:
        """获取历史"""
        return self.history.copy()

# 演示手动记忆管理
def demo_manual_memory():
    """演示手动记忆管理"""
    print("\n手动记忆管理演示:")
    
    chain = ManualMemoryChain(llm)
    
    conversations = [
        "你好，我叫小明，是一名程序员",
        "我在上海工作，主要做Python开发",
        "你还记得我的名字吗？",
        "我在哪个城市工作？"
    ]
    
    for i, user_input in enumerate(conversations, 1):
        try:
            response = chain.invoke(user_input)
            print(f"\n第{i}轮:")
            print(f"用户: {user_input}")
            print(f"AI: {response}")
        except Exception as e:
            print(f"错误: {e}")
    
    print(f"\n历史记录数量: {len(chain.get_history())} 条消息")

demo_manual_memory()


In [3]:
# ===================== 方法2: 使用 RunnableWithMessageHistory =====================

print("\n\n2. 使用 RunnableWithMessageHistory")
print("-" * 40)

# 存储会话历史的字典
store = {}

def get_session_history(session_id: str) -> BaseChatMessageHistory:
    """获取会话历史"""
    if session_id not in store:
        store[session_id] = InMemoryChatMessageHistory()
    return store[session_id]

# 创建基础链
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个友好的AI助手。请根据对话历史进行回复。"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])

base_chain = prompt | llm | StrOutputParser()

# 添加消息历史
chain_with_history = RunnableWithMessageHistory(
    base_chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)

def demo_runnable_with_history():
    """演示 RunnableWithMessageHistory"""
    print("\nRunnableWithMessageHistory 演示:")
    
    # 配置会话
    config = {"configurable": {"session_id": "demo_session"}}
    
    conversations = [
        "你好，我是李华，我喜欢阅读",
        "我最喜欢的书是《三体》",
        "你还记得我的名字吗？",
        "我喜欢什么书？"
    ]
    
    for i, user_input in enumerate(conversations, 1):
        try:
            response = chain_with_history.invoke(
                {"input": user_input},
                config=config
            )
            print(f"\n第{i}轮:")
            print(f"用户: {user_input}")
            print(f"AI: {response}")
        except Exception as e:
            print(f"错误: {e}")
    
    # 显示历史
    history = get_session_history("demo_session")
    print(f"\n会话历史: {len(history.messages)} 条消息")

demo_runnable_with_history()




2. 使用 RunnableWithMessageHistory
----------------------------------------

RunnableWithMessageHistory 演示:

第1轮:
用户: 你好，我是李华，我喜欢阅读
AI: 你好李华，很高兴认识你！ 很高兴知道你喜欢阅读，那真是太棒了！ 你平时喜欢读什么类型的书呢？ 😊


第2轮:
用户: 我最喜欢的书是《三体》
AI: AI: 《三体》！哇，刘慈欣的《三体》绝对是科幻经典！ 你喜欢它的什么呢？ 是刘慈欣的想象力，还是故事的深度，还是整个宇宙的宏大叙事？ 😊


第3轮:
用户: 你还记得我的名字吗？
AI: AI: 当然记得！李华，很高兴再次和你聊天 😊 你今天过得怎么样？

第4轮:
用户: 我喜欢什么书？
AI: AI: 你喜欢《三体》啊，看来你对科幻小说非常感兴趣！ 😊  之前我们聊过你喜欢阅读，而且你特别喜欢《三体》。  你觉得《三体》的什么吸引了你呢？ 还是说，你还记得我们之前聊过你喜欢阅读这件事呢？ 😉

会话历史: 8 条消息


In [None]:
# ===================== 方法3: 带有记忆窗口的LCEL链 =====================

print("\n\n3. 带有记忆窗口的LCEL链")
print("-" * 35)

from langchain_core.runnables import RunnableConfig

class WindowMemoryChain:
    """带有窗口记忆的链"""
    
    def __init__(self, llm, window_size: int = 6):
        self.llm = llm
        self.window_size = window_size  # 保留的消息数量
        self.history: List[BaseMessage] = []
        
        # 创建提示模板
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个友好的AI助手。基于最近的对话历史进行回复。"),
            MessagesPlaceholder(variable_name="recent_history"),
            ("human", "{input}")
        ])
        
        # 创建处理函数
        def get_recent_history(inputs: dict) -> dict:
            """获取最近的历史记录"""
            recent = self.history[-self.window_size:] if len(self.history) > self.window_size else self.history
            return {
                "recent_history": recent,
                "input": inputs["input"]
            }
        
        # 创建链
        self.chain = (
            RunnableLambda(get_recent_history) |
            self.prompt |
            self.llm |
            StrOutputParser()
        )
    
    def invoke(self, user_input: str) -> str:
        """调用链"""
        response = self.chain.invoke({"input": user_input})
        
        # 更新历史
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=response))
        
        return response
    
    def get_window_info(self) -> dict:
        """获取窗口信息"""
        return {
            "total_messages": len(self.history),
            "window_size": self.window_size,
            "messages_in_window": min(len(self.history), self.window_size)
        }

def demo_window_memory():
    """演示窗口记忆"""
    print("\n窗口记忆演示 (窗口大小=4):")
    
    chain = WindowMemoryChain(llm, window_size=4)
    
    conversations = [
        "我叫张三，是医生",
        "我在北京工作",
        "我喜欢游泳",
        "我有一只猫",
        "我的猫叫小白",
        "我还喜欢看电影",
        "你还记得我的名字吗？",  # 可能已经超出窗口
        "我的宠物是什么？",      # 应该还记得
    ]
    
    for i, user_input in enumerate(conversations, 1):
        try:
            response = chain.invoke(user_input)
            print(f"\n第{i}轮:")
            print(f"用户: {user_input}")
            print(f"AI: {response}")
            
            # 显示窗口信息
            info = chain.get_window_info()
            print(f"窗口状态: {info['messages_in_window']}/{info['window_size']} (总计: {info['total_messages']})")
        except Exception as e:
            print(f"错误: {e}")

demo_window_memory()


In [None]:
# ===================== 方法4: 带有摘要功能的LCEL链 =====================

print("\n\n4. 带有摘要功能的LCEL链")
print("-" * 35)

class SummaryMemoryChain:
    """带有摘要功能的记忆链"""
    
    def __init__(self, llm, max_messages: int = 8):
        self.llm = llm
        self.max_messages = max_messages
        self.history: List[BaseMessage] = []
        self.summary: str = ""
        
        # 主对话提示
        self.chat_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个友好的AI助手。

{summary_context}

请基于对话摘要和最近的对话历史进行回复。"""),
            MessagesPlaceholder(variable_name="recent_history"),
            ("human", "{input}")
        ])
        
        # 摘要提示
        self.summary_prompt = ChatPromptTemplate.from_messages([
            ("system", "请简洁地总结以下对话的要点，保留重要信息："),
            MessagesPlaceholder(variable_name="messages_to_summarize"),
            ("human", "请提供对话摘要：")
        ])
        
        # 创建链
        self.chat_chain = self.chat_prompt | self.llm | StrOutputParser()
        self.summary_chain = self.summary_prompt | self.llm | StrOutputParser()
    
    def _create_summary(self, messages: List[BaseMessage]) -> str:
        """创建对话摘要"""
        try:
            return self.summary_chain.invoke({
                "messages_to_summarize": messages
            })
        except:
            return "对话摘要生成失败"
    
    def invoke(self, user_input: str) -> str:
        """调用链"""
        # 如果历史太长，创建摘要
        if len(self.history) >= self.max_messages:
            # 摘要前面的消息
            messages_to_summarize = self.history[:-4]  # 保留最近2轮对话
            new_summary = self._create_summary(messages_to_summarize)
            
            # 更新摘要和历史
            if self.summary:
                self.summary = f"{self.summary}\n\n{new_summary}"
            else:
                self.summary = new_summary
            
            # 只保留最近的消息
            self.history = self.history[-4:]
        
        # 准备上下文
        summary_context = f"对话摘要: {self.summary}" if self.summary else "这是对话的开始。"
        
        # 调用聊天链
        response = self.chat_chain.invoke({
            "summary_context": summary_context,
            "recent_history": self.history,
            "input": user_input
        })
        
        # 更新历史
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=response))
        
        return response
    
    def get_memory_info(self) -> dict:
        """获取记忆信息"""
        return {
            "recent_messages": len(self.history),
            "has_summary": bool(self.summary),
            "summary_length": len(self.summary) if self.summary else 0
        }

def demo_summary_memory():
    """演示摘要记忆"""
    print("\n摘要记忆演示 (最大消息数=6):")
    
    chain = SummaryMemoryChain(llm, max_messages=6)
    
    conversations = [
        "我叫王五，是一名老师",
        "我在上海的小学教数学",
        "我有10年的教学经验",
        "我喜欢和学生互动",
        "我的班级有30个学生",
        "我最近在准备期末考试",
        "我还负责学校的数学竞赛",
        "你还记得我的职业吗？",
        "我在哪里工作？",
        "我有多少年经验？"
    ]
    
    for i, user_input in enumerate(conversations, 1):
        try:
            response = chain.invoke(user_input)
            print(f"\n第{i}轮:")
            print(f"用户: {user_input}")
            print(f"AI: {response}")
            
            # 显示记忆信息
            info = chain.get_memory_info()
            print(f"记忆状态: {info['recent_messages']} 条最近消息, 摘要: {'有' if info['has_summary'] else '无'}")
            
            if info['has_summary'] and i % 3 == 0:  # 每3轮显示一次摘要
                print(f"当前摘要: {chain.summary[:100]}...")
        except Exception as e:
            print(f"错误: {e}")

demo_summary_memory()


In [None]:
# ===================== 方法5: 多会话管理的LCEL实现 =====================

print("\n\n5. 多会话管理的LCEL实现")
print("-" * 35)

class MultiSessionMemory:
    """多会话记忆管理"""
    
    def __init__(self, llm):
        self.llm = llm
        self.sessions: Dict[str, List[BaseMessage]] = {}
        
        # 创建提示模板
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个友好的AI助手。请基于对话历史进行个性化回复。"),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}")
        ])
        
        # 创建链
        self.chain = self.prompt | self.llm | StrOutputParser()
    
    def invoke(self, user_input: str, session_id: str) -> str:
        """为特定会话调用链"""
        # 获取或创建会话历史
        if session_id not in self.sessions:
            self.sessions[session_id] = []
        
        history = self.sessions[session_id]
        
        # 调用链
        response = self.chain.invoke({
            "history": history,
            "input": user_input
        })
        
        # 更新会话历史
        history.append(HumanMessage(content=user_input))
        history.append(AIMessage(content=response))
        
        return response
    
    def get_session_info(self) -> dict:
        """获取会话信息"""
        return {
            "total_sessions": len(self.sessions),
            "session_ids": list(self.sessions.keys()),
            "messages_per_session": {
                sid: len(messages) for sid, messages in self.sessions.items()
            }
        }

def demo_multi_session():
    """演示多会话管理"""
    print("\n多会话管理演示:")
    
    memory = MultiSessionMemory(llm)
    
    # 用户A的对话
    print("\n=== 用户A的会话 ===")
    response1 = memory.invoke("我是Alice，我喜欢画画", "user_a")
    print(f"Alice: 我是Alice，我喜欢画画")
    print(f"AI: {response1}")
    
    # 用户B的对话
    print("\n=== 用户B的会话 ===")
    response2 = memory.invoke("我是Bob，我是程序员", "user_b")
    print(f"Bob: 我是Bob，我是程序员")
    print(f"AI: {response2}")
    
    # 继续用户A的对话
    print("\n=== 继续Alice的会话 ===")
    response3 = memory.invoke("你记得我的爱好吗？", "user_a")
    print(f"Alice: 你记得我的爱好吗？")
    print(f"AI: {response3}")
    
    # 继续用户B的对话
    print("\n=== 继续Bob的会话 ===")
    response4 = memory.invoke("我的职业是什么？", "user_b")
    print(f"Bob: 我的职业是什么？")
    print(f"AI: {response4}")
    
    # 显示会话信息
    info = memory.get_session_info()
    print(f"\n会话统计: {info}")

demo_multi_session()


In [None]:
# ===================== 总结和对比 =====================

print("\n\n" + "="*60)
print("LCEL 对话记忆实现方法总结")
print("="*60)

summary_text = """
LCEL 实现对话记忆的方法:

1. 🔧 手动管理历史
   优点: 完全控制，简单直接
   缺点: 需要手动管理状态
   适用: 简单应用，学习目的

2. 📚 RunnableWithMessageHistory
   优点: 官方支持，功能完整
   缺点: API相对复杂
   适用: 标准聊天应用

3. 🪟 窗口记忆
   优点: 内存使用固定
   缺点: 会丢失早期信息
   适用: 长对话，内存有限

4. 📝 摘要记忆
   优点: 保留重要信息，适合长对话
   缺点: 需要额外LLM调用
   适用: 超长对话，成本敏感

5. 👥 多会话管理
   优点: 支持多用户
   缺点: 需要手动实现会话隔离
   适用: 多用户应用

选择建议:
- 简单聊天: 方法1或2
- 长对话: 方法3或4
- 多用户: 方法5
- 复杂应用: 考虑迁移到LangGraph

注意: LangChain 0.3+ 推荐使用 LangGraph 来处理复杂的记忆需求！
"""

print(summary_text)

print("\n演示完成！")
print("虽然LCEL可以实现记忆功能，但对于复杂应用建议使用LangGraph。")


In [None]:
# ===================== 交互式演示 =====================

def interactive_lcel_demo():
    """交互式LCEL记忆演示"""
    print("\n" + "="*50)
    print("交互式LCEL记忆演示")
    print("选择一种记忆实现方式进行测试")
    print("="*50)

    print("\n可用的记忆实现:")
    print("1. 手动管理历史")
    print("2. RunnableWithMessageHistory")
    print("3. 窗口记忆")
    print("4. 摘要记忆")
    print("5. 多会话管理")

    choice = input("\n请选择实现方式 (1-5): ").strip()

    if choice == "1":
        chain = ManualMemoryChain(llm)
        print("\n使用手动管理历史，输入 'quit' 退出")

        while True:
            user_input = input("\n你: ").strip()
            if user_input.lower() == 'quit':
                break
            try:
                response = chain.invoke(user_input)
                print(f"AI: {response}")
            except Exception as e:
                print(f"错误: {e}")

    elif choice == "2":
        config = {"configurable": {"session_id": "interactive"}}
        print("\n使用 RunnableWithMessageHistory，输入 'quit' 退出")

        while True:
            user_input = input("\n你: ").strip()
            if user_input.lower() == 'quit':
                break
            try:
                response = chain_with_history.invoke(
                    {"input": user_input}, config=config
                )
                print(f"AI: {response}")
            except Exception as e:
                print(f"错误: {e}")

    elif choice == "3":
        chain = WindowMemoryChain(llm, window_size=6)
        print("\n使用窗口记忆 (窗口大小=6)，输入 'quit' 退出")

        while True:
            user_input = input("\n你: ").strip()
            if user_input.lower() == 'quit':
                break
            try:
                response = chain.invoke(user_input)
                print(f"AI: {response}")
                info = chain.get_window_info()
                print(f"[窗口: {info['messages_in_window']}/{info['window_size']}]")
            except Exception as e:
                print(f"错误: {e}")

    elif choice == "4":
        chain = SummaryMemoryChain(llm, max_messages=8)
        print("\n使用摘要记忆 (最大消息=8)，输入 'quit' 退出")

        while True:
            user_input = input("\n你: ").strip()
            if user_input.lower() == 'quit':
                break
            try:
                response = chain.invoke(user_input)
                print(f"AI: {response}")
                info = chain.get_memory_info()
                print(f"[消息: {info['recent_messages']}, 摘要: {'有' if info['has_summary'] else '无'}]")
            except Exception as e:
                print(f"错误: {e}")

    elif choice == "5":
        memory = MultiSessionMemory(llm)
        session_id = input("请输入会话ID: ").strip() or "default"
        print(f"\n使用多会话管理 (会话: {session_id})，输入 'quit' 退出，输入 'switch' 切换会话")

        while True:
            user_input = input("\n你: ").strip()
            if user_input.lower() == 'quit':
                break
            elif user_input.lower() == 'switch':
                session_id = input("请输入新的会话ID: ").strip() or "default"
                print(f"切换到会话: {session_id}")
                continue
            try:
                response = memory.invoke(user_input, session_id)
                print(f"AI: {response}")
                info = memory.get_session_info()
                print(f"[会话: {session_id}, 总会话数: {info['total_sessions']}]")
            except Exception as e:
                print(f"错误: {e}")
    else:
        print("无效选择")

# 如果直接运行此文件，启动交互式演示
if __name__ == "__main__":
    try:
        # 测试连接
        test_response = llm.invoke("Hello")
        print("✓ Ollama 连接成功")
        interactive_lcel_demo()
    except Exception as e:
        print(f"✗ Ollama 连接失败: {e}")
        print("请确保 Ollama 正在运行: ollama serve")
        print("并安装模型: ollama pull gemma:3b")
