# LCEL 实现对话记忆

In [1]:
# 导入依赖
import os
import json
import pickle
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime
from collections import deque

# LangChain imports
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import (
    RunnablePassthrough,
    RunnableLambda,
    RunnableParallel,
    RunnableBranch
)
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory, InMemoryChatMessageHistory
from langchain_core.output_parsers import StrOutputParser
from langchain_ollama import OllamaLLM



In [2]:
# 配置模型
print("LCEL 对话记忆实现方法大全")
print("=" * 50)

# 配置
OLLAMA_BASE_URL = "http://localhost:11434"
OLLAMA_MODEL = "qwen2.5:3b"

def create_llm():
    """创建LLM实例"""
    return OllamaLLM(
        base_url=OLLAMA_BASE_URL,
        model=OLLAMA_MODEL,
        temperature=0.7
    )

llm = create_llm()
print(f"使用模型: {OLLAMA_MODEL}")

LCEL 对话记忆实现方法大全
使用模型: qwen2.5:3b


## 方法1: 基础手动记忆管理

In [3]:
# 方法1: 基础手动记忆管理
print("\n" + "=" * 50)
print("方法1: 基础手动记忆管理")
print("=" * 50)


class BasicMemoryChain:
    """基础手动记忆管理"""

    def __init__(self, llm):
        self.llm = llm
        self.history: List[BaseMessage] = []

        # 创建提示模板
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个友好的AI助手，能够记住对话历史。"),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}")
        ])

        # 创建LCEL链
        self.chain = self.prompt | self.llm | StrOutputParser()

    def invoke(self, user_input: str) -> str:
        """调用链并更新历史"""
        response = self.chain.invoke({
            "history": self.history,
            "input": user_input
        })

        # 更新历史
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=response))

        return response


# 演示
basic_chain = BasicMemoryChain(llm)
print("\n基础记忆演示:")
print("用户: 我叫小王，是程序员")
response1 = basic_chain.invoke("我叫小王，是程序员")
print(f"AI: {response1}")

print("\n用户: 你记得我的职业吗？")
response2 = basic_chain.invoke("你记得我的职业吗？")
print(f"AI: {response2}")



方法1: 基础手动记忆管理

基础记忆演示:
用户: 我叫小王，是程序员
AI: 你好小王！很高兴认识你。作为你的AI助手，我确实可以帮助你在编程方面提供支持和解答疑问。请问你现在遇到了什么问题或者有什么想要了解的编程知识吗？无论是前端、后端还是其他领域的问题，我都会尽力帮助你。

用户: 你记得我的职业吗？
AI: 是的，我记得你是程序员。有什么我可以帮助你的吗？请告诉我你在编程过程中遇到的具体问题或想了解的知识点。无论是在前端开发、后端开发，还是其他任何编程相关的话题上，我都很乐意提供支持。


## 方法2: 使用RunnablePassthrough的记忆管理

In [13]:
# 方法2: 使用RunnablePassthrough的记忆管理
print("\n" + "=" * 50)
print("方法2: 使用RunnablePassthrough的记忆管理")
print("=" * 50)


class PassthroughMemoryChain:
    """使用RunnablePassthrough管理记忆"""

    def __init__(self, llm):
        self.llm = llm
        self.history: List[BaseMessage] = []

        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个AI助手，请基于对话历史回复。"),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}")
        ])

        # 使用RunnablePassthrough.assign添加历史
        self.chain = (
                RunnablePassthrough.assign(
                    history=lambda _: self.history
                )
                | self.prompt
                | self.llm
                | StrOutputParser()
        )

    def invoke(self, user_input: str) -> str:
        response = self.chain.invoke({"input": user_input})

        # 更新历史
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=response))

        return response


# 演示
passthrough_chain = PassthroughMemoryChain(llm)
print("\nPassthrough记忆演示:")
print("用户: 我喜欢看科幻电影")
response1 = passthrough_chain.invoke("我喜欢看科幻电影")
print(f"AI: {response1}")

print("\n用户: 我的爱好是什么？")
response2 = passthrough_chain.invoke("我的爱好是什么？")
print(f"AI: {response2}")


方法2: 使用RunnablePassthrough的记忆管理

Passthrough记忆演示:
用户: 我喜欢看科幻电影
AI: 听起来很棒！你喜欢的科幻电影有哪些类型或主题呢？是关于太空探险、人工智能、外星生物，还是其他类型的科幻故事？分享一下你的喜好吧。

用户: 我的爱好是什么？
AI: 你提到你喜欢看科幻电影，这表明你的兴趣点可能在科幻和未来科技方面。如果你没有明确的兴趣爱好，请告诉我更多关于你的信息，我将尽力帮助你找到合适的话题或活动来探索自己的兴趣。对于科幻电影来说，你喜欢的是上述提到的类型中的哪一个？或者还有其他类型的科幻故事你也感兴趣呢？


## 方法3: 滑动窗口记忆

In [None]:
# 方法3: 滑动窗口记忆
print("\n" + "=" * 50)
print("方法3: 滑动窗口记忆")
print("=" * 50)


class SlidingWindowMemory:
    """滑动窗口记忆实现"""

    def __init__(self, llm, window_size: int = 6):
        self.llm = llm
        self.window_size = window_size
        self.history: deque = deque(maxlen=window_size)

        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是AI助手，基于最近的对话历史回复。"),
            MessagesPlaceholder(variable_name="recent_history"),
            ("human", "{input}")
        ])

        # 创建获取窗口历史的函数
        def get_window_history(inputs: dict) -> dict:
            return {
                "recent_history": list(self.history),
                "input": inputs["input"]
            }

        self.chain = (
                RunnableLambda(get_window_history)
                | self.prompt
                | self.llm
                | StrOutputParser()
        )

    def invoke(self, user_input: str) -> str:
        response = self.chain.invoke({"input": user_input})

        # 添加到滑动窗口
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=response))

        return response

    def get_window_info(self) -> dict:
        return {
            "window_size": self.window_size,
            "current_messages": len(self.history)
        }


# 演示
window_chain = SlidingWindowMemory(llm, window_size=4)
print("\n滑动窗口记忆演示:")

conversations = [
    "我叫张三",
    "我是医生",
    "我住在北京",
    "我喜欢游泳",
    "你记得我的名字吗？",  # 可能已经滑出窗口
    "我的职业是什么？"  # 应该还在窗口内
]

for i, msg in enumerate(conversations, 1):
    response = window_chain.invoke(msg)
    info = window_chain.get_window_info()
    print(f"\n第{i}轮 [窗口: {info['current_messages']}/{info['window_size']}]:")
    print(f"用户: {msg}")
    print(f"AI: {response}")


## 方法4: 智能摘要记忆

In [None]:
# 方法4: 智能摘要记忆
print("\n" + "=" * 50)
print("方法4: 智能摘要记忆")
print("=" * 50)


class SmartSummaryMemory:
    """智能摘要记忆"""

    def __init__(self, llm, max_messages: int = 8):
        self.llm = llm
        self.max_messages = max_messages
        self.history: List[BaseMessage] = []
        self.summary: str = ""

        # 主对话提示
        self.chat_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是AI助手。

对话摘要: {summary}

请基于摘要和最近对话回复。"""),
            MessagesPlaceholder(variable_name="recent_history"),
            ("human", "{input}")
        ])

        # 摘要提示
        self.summary_prompt = ChatPromptTemplate.from_messages([
            ("system", "请简洁总结以下对话的关键信息："),
            MessagesPlaceholder(variable_name="messages"),
            ("human", "总结要点：")
        ])

        # 创建链
        def prepare_context(inputs: dict) -> dict:
            return {
                "summary": self.summary or "这是对话开始。",
                "recent_history": self.history[-4:],  # 最近2轮对话
                "input": inputs["input"]
            }

        self.chat_chain = (
                RunnableLambda(prepare_context)
                | self.chat_prompt
                | self.llm
                | StrOutputParser()
        )

        self.summary_chain = self.summary_prompt | self.llm | StrOutputParser()

    def _create_summary(self, messages: List[BaseMessage]) -> str:
        """创建摘要"""
        try:
            return self.summary_chain.invoke({"messages": messages})
        except:
            return "摘要生成失败"

    def invoke(self, user_input: str) -> str:
        # 检查是否需要摘要
        if len(self.history) >= self.max_messages:
            # 摘要旧消息
            old_messages = self.history[:-2]  # 保留最近1轮
            new_summary = self._create_summary(old_messages)

            # 更新摘要
            if self.summary:
                self.summary = f"{self.summary}\n\n最新摘要: {new_summary}"
            else:
                self.summary = new_summary

            # 保留最近消息
            self.history = self.history[-2:]

        # 生成回复
        response = self.chat_chain.invoke({"input": user_input})

        # 更新历史
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=response))

        return response

    def get_memory_status(self) -> dict:
        return {
            "recent_messages": len(self.history),
            "has_summary": bool(self.summary),
            "summary_preview": self.summary[:100] + "..." if self.summary else None
        }


# 演示
summary_chain = SmartSummaryMemory(llm, max_messages=6)
print("\n智能摘要记忆演示:")

long_conversations = [
    "我是李四，今年28岁",
    "我在上海的一家科技公司工作",
    "我是软件工程师，主要做后端开发",
    "我喜欢Python和Go语言",
    "我的团队有8个人",
    "我们在开发一个电商平台",
    "你记得我的基本信息吗？",
    "我在哪个城市工作？"
]

for i, msg in enumerate(long_conversations, 1):
    response = summary_chain.invoke(msg)
    status = summary_chain.get_memory_status()
    print(f"\n第{i}轮 [消息: {status['recent_messages']}, 摘要: {'有' if status['has_summary'] else '无'}]:")
    print(f"用户: {msg}")
    print(f"AI: {response}")

    if status['has_summary'] and i % 3 == 0:
        print(f"摘要预览: {status['summary_preview']}")


## 方法5: 多会话并行管理

In [14]:
# 方法5: 多会话并行管理
print("\n" + "=" * 50)
print("方法5: 多会话并行管理")
print("=" * 50)


class MultiSessionMemory:
    """多会话并行管理"""

    def __init__(self, llm):
        self.llm = llm
        self.sessions: Dict[str, List[BaseMessage]] = {}

        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是AI助手，为用户 {session_id} 提供个性化服务。"),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}")
        ])

        # 创建会话处理函数
        def prepare_session_context(inputs: dict) -> dict:
            session_id = inputs["session_id"]
            if session_id not in self.sessions:
                self.sessions[session_id] = []

            return {
                "session_id": session_id,
                "history": self.sessions[session_id],
                "input": inputs["input"]
            }

        self.chain = (
                RunnableLambda(prepare_session_context)
                | self.prompt
                | self.llm
                | StrOutputParser()
        )

    def invoke(self, user_input: str, session_id: str) -> str:
        response = self.chain.invoke({
            "input": user_input,
            "session_id": session_id
        })

        # 更新会话历史
        if session_id not in self.sessions:
            self.sessions[session_id] = []

        self.sessions[session_id].append(HumanMessage(content=user_input))
        self.sessions[session_id].append(AIMessage(content=response))

        return response

    def get_session_info(self) -> dict:
        return {
            "total_sessions": len(self.sessions),
            "sessions": {
                sid: len(messages) for sid, messages in self.sessions.items()
            }
        }


# 演示
multi_session = MultiSessionMemory(llm)
print("\n多会话管理演示:")

# 用户A
print("\n=== 用户A会话 ===")
response_a1 = multi_session.invoke("我是Alice，我是设计师", "user_a")
print(f"Alice: 我是Alice，我是设计师")
print(f"AI: {response_a1}")

# 用户B
print("\n=== 用户B会话 ===")
response_b1 = multi_session.invoke("我是Bob，我是程序员", "user_b")
print(f"Bob: 我是Bob，我是程序员")
print(f"AI: {response_b1}")

# 继续用户A
print("\n=== 继续Alice会话 ===")
response_a2 = multi_session.invoke("你记得我的职业吗？", "user_a")
print(f"Alice: 你记得我的职业吗？")
print(f"AI: {response_a2}")

# 继续用户B
print("\n=== 继续Bob会话 ===")
response_b2 = multi_session.invoke("我的工作是什么？", "user_b")
print(f"Bob: 我的工作是什么？")
print(f"AI: {response_b2}")

print(f"\n会话统计: {multi_session.get_session_info()}")



方法5: 多会话并行管理

多会话管理演示:

=== 用户A会话 ===
Alice: 我是Alice，我是设计师
AI: 你好Alice！很高兴认识你。作为一名设计师，你的工作一定充满了创意和挑战。请问有什么具体的服务或问题我可以帮助你解决吗？无论是设计灵感的获取、软件使用技巧还是职业发展上的建议，我都会尽力协助你。如果你有任何想法或者需要关于某个特定主题的信息，请随时告诉我。

=== 用户B会话 ===
Bob: 我是Bob，我是程序员
AI: 你好Bob！很高兴能为你提供帮助。作为你的助理，我可以帮你解答关于编程的问题、提供建议、或是分享学习资源。如果你有任何疑问或需要技术指导，请随时告诉我！

无论是Python、Java还是前端开发中的HTML/CSS/JavaScript，亦或是后端的Node.js和Django等框架，甚至是机器学习算法的基础知识，我都可以提供帮助。

如果你有具体的项目需求或者遇到了特定的问题，也欢迎详细说明。我们可以一起探讨解决问题的方法。

此外，我也非常乐意为你推荐一些好的编程资源、书籍或在线课程来提升你的技能。

请随时告诉我你想要了解的内容或讨论的话题吧！

=== 继续Alice会话 ===
Alice: 你记得我的职业吗？
AI: 当然，我记得你是Alice，一名设计师。如果你有关于设计方面的任何问题或需要帮助的地方，无论是寻找灵感、学习新的软件技能，还是想要了解行业趋势和职业发展建议，我都会在这里为你提供支持。请随时告诉我你的需求或者你想讨论的话题！

=== 继续Bob会话 ===


KeyboardInterrupt: 

## 方法7: 基于RunnableParallel的并行记忆处理

In [15]:
# 方法7: 基于RunnableParallel的并行记忆处理
print("\n" + "=" * 50)
print("方法7: 基于RunnableParallel的并行记忆处理")
print("=" * 50)


class ParallelMemoryProcessor:
    """并行记忆处理器"""

    def __init__(self, llm):
        self.llm = llm
        self.history: List[BaseMessage] = []

        # 创建不同的记忆分析器
        self.emotion_prompt = ChatPromptTemplate.from_messages([
            ("system", "分析用户消息的情感倾向，返回：positive/negative/neutral"),
            ("human", "{input}")
        ])

        self.topic_prompt = ChatPromptTemplate.from_messages([
            ("system", "提取用户消息的主要话题，用2-3个关键词概括"),
            ("human", "{input}")
        ])

        self.importance_prompt = ChatPromptTemplate.from_messages([
            ("system", "评估消息重要性，返回：high/medium/low"),
            ("human", "{input}")
        ])

        # 主回复提示
        self.main_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是AI助手。用户消息分析：
情感: {emotion}
话题: {topic}
重要性: {importance}

基于分析和历史对话回复："""),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}")
        ])

        # 创建并行分析链
        self.parallel_analyzer = RunnableParallel({
            "emotion": self.emotion_prompt | self.llm | StrOutputParser(),
            "topic": self.topic_prompt | self.llm | StrOutputParser(),
            "importance": self.importance_prompt | self.llm | StrOutputParser(),
            "input": RunnablePassthrough()
        })

        # 创建主链
        def prepare_main_context(analysis: dict) -> dict:
            return {
                "emotion": analysis["emotion"],
                "topic": analysis["topic"],
                "importance": analysis["importance"],
                "history": self.history[-6:],  # 最近3轮
                "input": analysis["input"]["input"]
            }

        self.main_chain = (
                self.parallel_analyzer
                | RunnableLambda(prepare_main_context)
                | self.main_prompt
                | self.llm
                | StrOutputParser()
        )

    def invoke(self, user_input: str) -> Tuple[str, dict]:
        # 并行分析并生成回复
        result = self.main_chain.invoke({"input": user_input})

        # 获取分析结果（重新运行分析以获取详细信息）
        analysis = self.parallel_analyzer.invoke({"input": user_input})

        # 更新历史
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=result))

        return result, {
            "emotion": analysis["emotion"],
            "topic": analysis["topic"],
            "importance": analysis["importance"]
        }


# 演示
parallel_processor = ParallelMemoryProcessor(llm)
print("\n并行记忆处理演示:")

test_inputs = [
    "我今天心情很好，刚升职了！",
    "我有点担心明天的面试",
    "你觉得Python好学吗？",
    "我的猫咪生病了，很难过"
]

for i, msg in enumerate(test_inputs, 1):
    response, analysis = parallel_processor.invoke(msg)
    print(f"\n第{i}轮:")
    print(f"用户: {msg}")
    print(f"分析: {analysis}")
    print(f"AI: {response}")



方法7: 基于RunnableParallel的并行记忆处理

并行记忆处理演示:

第1轮:
用户: 我今天心情很好，刚升职了！
分析: {'emotion': 'positive', 'topic': '升职、心情好、庆祝', 'importance': 'high'}
AI: 听到您今天晋升的好消息，我感到非常高兴！这是人生中的一大进步，恭喜恭喜！新的职位一定会给您带来更多的成就感和工作幸福感。在新的岗位上，您可以发挥更大的潜力，实现更多的职业目标。祝愿您的未来充满成功与喜悦！如果您有任何关于新职位的问题或需要任何帮助，请随时告诉我。

第2轮:
用户: 我有点担心明天的面试
分析: {'emotion': 'negative', 'topic': '焦虑/面试/担忧', 'importance': 'medium\n\n这条信息的重要性被评定为中等。它表达了说话者对即将进行的面试的一些担忧，但并没有提到非常具体或紧急的情境，所以分类为中等重要性。如果情况涉及健康问题或其他影响重大的事情，则可能会被归类为high（高）重要性；如果是日常生活中的普通忧虑，则可能被归类为low（低）重要性。'}
AI: 听到您对明天面试有些担忧，我能理解这种感觉。面试是求职过程中的重要环节，确实会对大家的心理状态产生影响。不过请放心，无论是面试者还是被面试者，都难免会有一些紧张或不安的情绪。

以下是一些建议帮助您更好地准备和应对面试：
1. 仔细回顾之前的面试经验，尤其是成功通过的面试。
2. 确认并练习您的简历上的信息以及可能被问到的问题。
3. 深呼吸放松自己，保持自信。在紧张的情况下，深呼吸可以帮助缓解焦虑感。
4. 备好所需的任何参考资料或文件，并确保它们整洁有序。
5. 了解即将面试的公司和职位背景，以便您可以在对话中展示对这个机会的兴趣。

如果您感到紧张或者有任何疑问，不要犹豫向我求助。随时准备好回答常见问题并增加自信心是非常重要的。

祝您好运明天！

第3轮:
用户: 你觉得Python好学吗？
分析: {'emotion': '这段文字是问题而不是情感表达，所以没有明确的情感倾向。根据你的要求，我会将其分类为：neutral。', 'topic': '学习难度：Python', 'importance': '该消息属于咨询性质的信息请求，不涉及紧急情况或具

## 方法8: 持久化JSON记忆

In [None]:
# 方法8: 持久化JSON记忆
print("\n" + "=" * 50)
print("方法8: 持久化JSON记忆")
print("=" * 50)


class PersistentJSONMemory:
    """持久化JSON记忆"""

    def __init__(self, llm, memory_file: str = "lcel_memory.json"):
        self.llm = llm
        self.memory_file = memory_file
        self.memory_data = self.load_memory()

        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """你是AI助手，拥有持久化记忆。

用户档案: {user_profile}
对话历史: {recent_history}

请基于用户档案和历史对话回复。"""),
            ("human", "{input}")
        ])

        # 档案更新提示
        self.profile_prompt = ChatPromptTemplate.from_messages([
            ("system", """基于用户消息更新用户档案。
当前档案: {current_profile}
用户消息: {user_message}

返回更新后的档案（JSON格式）："""),
            ("human", "更新档案")
        ])

        # 创建链
        def prepare_context(inputs: dict) -> dict:
            user_id = inputs.get("user_id", "default")
            user_data = self.memory_data.get(user_id, {
                "profile": {},
                "history": []
            })

            return {
                "user_profile": json.dumps(user_data["profile"], ensure_ascii=False),
                "recent_history": "\n".join(user_data["history"][-6:]),
                "input": inputs["input"]
            }

        self.main_chain = (
                RunnableLambda(prepare_context)
                | self.prompt
                | self.llm
                | StrOutputParser()
        )

        self.profile_chain = self.profile_prompt | self.llm | StrOutputParser()

    def load_memory(self) -> dict:
        """加载记忆数据"""
        if os.path.exists(self.memory_file):
            try:
                with open(self.memory_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except:
                return {}
        return {}

    def save_memory(self):
        """保存记忆数据"""
        with open(self.memory_file, 'w', encoding='utf-8') as f:
            json.dump(self.memory_data, f, ensure_ascii=False, indent=2)

    def update_profile(self, user_id: str, user_message: str):
        """更新用户档案"""
        if user_id not in self.memory_data:
            self.memory_data[user_id] = {"profile": {}, "history": []}

        current_profile = self.memory_data[user_id]["profile"]

        try:
            updated_profile = self.profile_chain.invoke({
                "current_profile": json.dumps(current_profile, ensure_ascii=False),
                "user_message": user_message
            })

            # 尝试解析JSON
            try:
                new_profile = json.loads(updated_profile)
                self.memory_data[user_id]["profile"].update(new_profile)
            except:
                # 如果解析失败，手动提取关键信息
                if "姓名" in user_message or "叫" in user_message:
                    # 简单的姓名提取逻辑
                    pass
        except:
            pass

    def invoke(self, user_input: str, user_id: str = "default") -> str:
        # 更新用户档案
        self.update_profile(user_id, user_input)

        # 生成回复
        response = self.main_chain.invoke({
            "input": user_input,
            "user_id": user_id
        })

        # 更新对话历史
        if user_id not in self.memory_data:
            self.memory_data[user_id] = {"profile": {}, "history": []}

        self.memory_data[user_id]["history"].append(f"用户: {user_input}")
        self.memory_data[user_id]["history"].append(f"AI: {response}")

        # 保持历史长度
        if len(self.memory_data[user_id]["history"]) > 20:
            self.memory_data[user_id]["history"] = self.memory_data[user_id]["history"][-20:]

        # 保存到文件
        self.save_memory()

        return response

    def get_user_data(self, user_id: str) -> dict:
        return self.memory_data.get(user_id, {})


# 演示
json_memory = PersistentJSONMemory(llm, "demo_lcel_memory.json")
print("\n持久化JSON记忆演示:")

conversations = [
    "我叫赵六，是数据科学家",
    "我在深圳工作，喜欢机器学习",
    "我的爱好是阅读和跑步",
    "你记得我的基本信息吗？"
]

for i, msg in enumerate(conversations, 1):
    response = json_memory.invoke(msg, "user_demo")
    print(f"\n第{i}轮:")
    print(f"用户: {msg}")
    print(f"AI: {response}")

print(f"\n用户数据: {json_memory.get_user_data('user_demo')}")

# 清理演示文件
if os.path.exists("demo_lcel_memory.json"):
    os.remove("demo_lcel_memory.json")


## 方法9: 基于RunnableBranch的智能路由记忆

In [None]:
# 方法9: 基于RunnableBranch的智能路由记忆
print("\n" + "=" * 50)
print("方法9: 基于RunnableBranch的智能路由记忆")
print("=" * 50)


class SmartRoutingMemory:
    """智能路由记忆系统"""

    def __init__(self, llm):
        self.llm = llm
        self.short_term: List[BaseMessage] = []  # 短期记忆
        self.long_term: List[BaseMessage] = []  # 长期记忆
        self.important: List[BaseMessage] = []  # 重要信息

        # 路由判断函数
        def is_important(inputs: dict) -> bool:
            message = inputs["input"].lower()
            keywords = ["名字", "职业", "年龄", "住址", "电话", "邮箱", "重要", "记住"]
            return any(keyword in message for keyword in keywords)

        def is_question(inputs: dict) -> bool:
            message = inputs["input"]
            return "?" in message or "吗" in message or "什么" in message

        # 不同类型的处理链
        self.important_prompt = ChatPromptTemplate.from_messages([
            ("system", "用户提供了重要信息，请仔细记住并确认。"),
            MessagesPlaceholder(variable_name="important_history"),
            MessagesPlaceholder(variable_name="recent_history"),
            ("human", "{input}")
        ])

        self.question_prompt = ChatPromptTemplate.from_messages([
            ("system", "用户提出问题，请基于所有记忆信息回答。"),
            MessagesPlaceholder(variable_name="important_history"),
            MessagesPlaceholder(variable_name="long_term_history"),
            MessagesPlaceholder(variable_name="recent_history"),
            ("human", "{input}")
        ])

        self.casual_prompt = ChatPromptTemplate.from_messages([
            ("system", "进行日常对话。"),
            MessagesPlaceholder(variable_name="recent_history"),
            ("human", "{input}")
        ])

        # 创建处理函数
        def process_important(inputs: dict) -> dict:
            return {
                "important_history": self.important[-4:],
                "recent_history": self.short_term[-4:],
                "input": inputs["input"]
            }

        def process_question(inputs: dict) -> dict:
            return {
                "important_history": self.important,
                "long_term_history": self.long_term[-6:],
                "recent_history": self.short_term[-4:],
                "input": inputs["input"]
            }

        def process_casual(inputs: dict) -> dict:
            return {
                "recent_history": self.short_term[-6:],
                "input": inputs["input"]
            }

        # 创建路由链
        self.router = RunnableBranch(
            (is_important, RunnableLambda(process_important) | self.important_prompt),
            (is_question, RunnableLambda(process_question) | self.question_prompt),
            RunnableLambda(process_casual) | self.casual_prompt
        )

        self.chain = self.router | self.llm | StrOutputParser()

    def invoke(self, user_input: str) -> str:
        # 判断消息类型并路由
        response = self.chain.invoke({"input": user_input})

        # 更新相应的记忆
        user_msg = HumanMessage(content=user_input)
        ai_msg = AIMessage(content=response)

        # 总是添加到短期记忆
        self.short_term.append(user_msg)
        self.short_term.append(ai_msg)

        # 如果是重要信息，也添加到重要记忆
        if any(keyword in user_input.lower() for keyword in ["名字", "职业", "年龄", "住址"]):
            self.important.append(user_msg)
            self.important.append(ai_msg)

        # 短期记忆转长期记忆
        if len(self.short_term) > 12:  # 超过6轮对话
            # 将较早的对话移到长期记忆
            self.long_term.extend(self.short_term[:4])
            self.short_term = self.short_term[4:]

        # 限制长期记忆大小
        if len(self.long_term) > 20:
            self.long_term = self.long_term[-20:]

        return response

    def get_memory_status(self) -> dict:
        return {
            "short_term": len(self.short_term),
            "long_term": len(self.long_term),
            "important": len(self.important)
        }


# 演示
routing_memory = SmartRoutingMemory(llm)
print("\n智能路由记忆演示:")

test_conversations = [
    "我叫孙七，是产品经理",  # 重要信息
    "今天天气不错",  # 日常对话
    "我在杭州工作",  # 重要信息
    "我喜欢喝咖啡",  # 日常对话
    "你记得我的名字吗？",  # 问题
    "我在哪个城市工作？",  # 问题
    "我们聊聊别的吧",  # 日常对话
    "我的职业是什么？"  # 问题
]

for i, msg in enumerate(test_conversations, 1):
    response = routing_memory.invoke(msg)
    status = routing_memory.get_memory_status()
    print(f"\n第{i}轮 {status}:")
    print(f"用户: {msg}")
    print(f"AI: {response}")

## 方法10: 高级组合记忆系统

In [None]:
# 方法10: 高级组合记忆系统
print("\n" + "=" * 50)
print("方法10: 高级组合记忆系统")
print("=" * 50)


class AdvancedCompositeMemory:
    """高级组合记忆系统 - 结合多种LCEL技术"""

    def __init__(self, llm):
        self.llm = llm

        # 多层记忆结构
        self.episodic_memory: List[dict] = []  # 情节记忆
        self.semantic_memory: dict = {}  # 语义记忆
        self.working_memory: deque = deque(maxlen=8)  # 工作记忆

        # 记忆编码器
        self.encoder_prompt = ChatPromptTemplate.from_messages([
            ("system", """分析用户消息，提取结构化信息：
返回JSON格式：
{
  "type": "personal/event/knowledge/question",
  "entities": ["实体1", "实体2"],
  "relations": ["关系1", "关系2"],
  "importance": 1-10,
  "emotion": "positive/negative/neutral"
}"""),
            ("human", "{input}")
        ])

        # 记忆检索器
        self.retriever_prompt = ChatPromptTemplate.from_messages([
            ("system", """基于查询检索相关记忆：
情节记忆: {episodic}
语义记忆: {semantic}
工作记忆: {working}

返回最相关的记忆片段。"""),
            ("human", "{query}")
        ])

        # 主对话提示
        self.main_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是高级AI助手，拥有多层记忆系统。

检索到的相关记忆: {retrieved_memory}
当前工作记忆: {working_memory}

请基于记忆信息智能回复。"""),
            ("human", "{input}")
        ])

        # 创建处理链
        self.encoder = self.encoder_prompt | self.llm | StrOutputParser()
        self.retriever = self.retriever_prompt | self.llm | StrOutputParser()

        # 主处理链
        def process_memory(inputs: dict) -> dict:
            query = inputs["input"]

            # 检索相关记忆
            retrieved = self.retriever.invoke({
                "episodic": str(self.episodic_memory[-5:]),
                "semantic": str(self.semantic_memory),
                "working": str(list(self.working_memory)),
                "query": query
            })

            return {
                "retrieved_memory": retrieved,
                "working_memory": str(list(self.working_memory)),
                "input": query
            }

        self.main_chain = (
                RunnableLambda(process_memory)
                | self.main_prompt
                | self.llm
                | StrOutputParser()
        )

    def encode_memory(self, user_input: str, ai_response: str):
        """编码记忆"""
        try:
            # 编码用户输入
            encoded = self.encoder.invoke({"input": user_input})

            # 尝试解析JSON
            try:
                memory_data = json.loads(encoded)
            except:
                # 简化编码
                memory_data = {
                    "content": user_input,
                    "response": ai_response,
                    "timestamp": datetime.now().isoformat(),
                    "type": "general"
                }

            # 存储到情节记忆
            self.episodic_memory.append(memory_data)

            # 更新语义记忆
            if "personal" in str(memory_data.get("type", "")):
                entities = memory_data.get("entities", [])
                for entity in entities:
                    if entity not in self.semantic_memory:
                        self.semantic_memory[entity] = []
                    self.semantic_memory[entity].append(user_input)

            # 限制记忆大小
            if len(self.episodic_memory) > 50:
                self.episodic_memory = self.episodic_memory[-50:]

        except Exception as e:
            # 简单存储
            self.episodic_memory.append({
                "content": user_input,
                "response": ai_response,
                "timestamp": datetime.now().isoformat()
            })

    def invoke(self, user_input: str) -> str:
        # 添加到工作记忆
        self.working_memory.append(f"用户: {user_input}")

        # 生成回复
        response = self.main_chain.invoke({"input": user_input})

        # 添加回复到工作记忆
        self.working_memory.append(f"AI: {response}")

        # 编码长期记忆
        self.encode_memory(user_input, response)

        return response

    def get_memory_summary(self) -> dict:
        return {
            "episodic_count": len(self.episodic_memory),
            "semantic_entities": len(self.semantic_memory),
            "working_memory_size": len(self.working_memory),
            "recent_entities": list(self.semantic_memory.keys())[-5:]
        }


# 演示
advanced_memory = AdvancedCompositeMemory(llm)
print("\n高级组合记忆系统演示:")

complex_conversations = [
    "我是周八，在北京的AI公司工作",
    "我负责自然语言处理项目",
    "我的团队有12个人，都是技术专家",
    "我们最近在研究多模态大模型",
    "我个人比较喜欢深度学习和强化学习",
    "你能总结一下我的工作情况吗？",
    "我在哪个城市工作？",
    "我的团队规模如何？"
]

for i, msg in enumerate(complex_conversations, 1):
    response = advanced_memory.invoke(msg)
    summary = advanced_memory.get_memory_summary()
    print(f"\n第{i}轮 {summary}:")
    print(f"用户: {msg}")
    print(f"AI: {response}")

## LCEL记忆对话总结
本实现展示了10种使用LCEL实现对话记忆的方法：

1. 基础手动记忆管理 - 最简单的LCEL记忆实现
2. RunnablePassthrough记忆 - 使用assign方法管理状态
3. 滑动窗口记忆 - 使用deque实现固定大小记忆
4. 智能摘要记忆 - 自动摘要长对话历史
5. 多会话并行管理 - 支持多用户独立记忆
6. 条件分支记忆 - 根据内容类型分类存储
7. 并行记忆处理 - 使用RunnableParallel并行分析
8. 持久化JSON记忆 - 结构化数据持久存储
9. 智能路由记忆 - 使用RunnableBranch智能路由
10. 高级组合记忆 - 多层记忆架构

LCEL的优势：
- 声明式编程，代码简洁
- 强大的组合能力
- 内置并行和条件处理
- 易于调试和维护

选择建议：
- 简单应用：方法1、2
- 性能要求：方法3、7
- 复杂逻辑：方法6、9、10
- 持久化需求：方法8
- 多用户：方法5

注意：虽然LCEL功能强大，但对于更复杂的状态管理，
建议考虑使用LangGraph。