# 导包

In [3]:
import os
print(os.getcwd())

e:\ai\02项目\Agent_Memory


In [None]:
import json
from llm_client import LLMClient  # 导入我们的通信专家
import prompts  
from sentence_transformers import SentenceTransformer
from scipy.spatial.distance import cosine
import numpy as np
MY_API_KEY = "xxx"
MY_BASE_URL = "xxx"

In [2]:
class AgentMemory:
    def __init__ (self,api_key,base_url):
        self.client = LLMClient(
            api_key = api_key,
            base_url = base_url
        )
        self.embedding_model = SentenceTransformer('BAAI/bge-small-zh-v1.5')
        self.knowledge_base = {}
        self.episodic_memory = []
        

    
    def extract_fact(self, user_statement: str) -> dict:
        """
        输入用户的话，提取其中的fact
        """
        messages = [
            {"role": "system", "content": prompts.EXTRACT_PROMPT_TEMPLATE},
            {"role": "user", "content": user_statement}
        ]
        
        response_content = self.client.call_for_json(messages)
        
        if response_content:
            try:
                return json.loads(response_content)
            except json.JSONDecodeError:
                print(f"JSON解析失败: {response_content}")
                return {}
        return {}
    
    def select_fact(self, user_statement: str) -> dict:
        """
        输入用户的话，输出fact或semantic
        """
        messages = [
            {"role": "system", "content": prompts.SELECT_PROMPT_TEMPLATE},
            {"role": "user", "content": user_statement}
        ]
        
        response_content = self.client.call_for_json(messages)
        
        if response_content:
            try:
                return json.loads(response_content)
            except json.JSONDecodeError:
                print(f"JSON解析失败: {response_content}")
                return {}
        return {}
    
    def add_statement_to_memory(self, user_statement: str, agent_response: str):
        full_turn = f"User: {user_statement}\nAgent: {agent_response}"
        embedding = self.embedding_model.encode([full_turn])[0]
        self.episodic_memory.append({
                'text': full_turn,
                'vector': embedding
            })
    
        extracted_fact = self.extract_fact(user_statement)
        if extracted_fact:
            print(f"成功提取到事实: {extracted_fact}")
            self.knowledge_base.update(extracted_fact)
            print("知识库已更新！")
        else:
            print("未提取到事实，知识库保持不变。")
        print(f"当前知识库: {self.knowledge_base}\n")

    def _translate_question_to_key(self, user_question: str, available_keys: list):
        prompt = prompts.TRANS_PROMPT_TEMPLATE.format(
            available_keys=available_keys,
            user_question=user_question
        )
        messages = [{"role": "system", "content": prompt}]
        
        # 我们期望得到纯文本，所以调用一个不强制JSON的LLM函数
        response_content = self.client.call_for_text(messages)
        
        if response_content and response_content != "null":
            return response_content.strip() # 去掉多余空格
        return None # 如果没有找到或者出错了，返回None
    


    def pre_select(self,question:str,history):
        prompt = prompts.PRE_PROMPT_TEMPLATE.format(question=question,
                                                 history=history)
        messages = [{"role":"system", "content":prompt}]
 

        response_content = self.client.call_for_json(messages)
        
        if response_content:
            try:
                return json.loads(response_content)
            except json.JSONDecodeError:
                print(f"JSON解析失败: {response_content}")
                return {}
        return {}
    
    def _rewrite_query(self, question: str, history: list) -> str:
        """
        根据对话历史，将可能依赖上下文的问题，改写为独立问题。
        """
        print("--- 启动查询重写 ---")
        # 使用 f-string 填充模板
        prompt = prompts.REWRITE_PROMPT_TEMPLATE.format(history=history, question=question)
        messages = [{"role": "system", "content": prompt}]
        
        # 我们期望得到纯文本，所以调用 text 版本的LLM函数
        rewritten_query = self.client.call_for_text(messages)
        
        if rewritten_query:
            print(f"原始问题: '{question}'")
            print(f"重写后的问题: '{rewritten_query}'")
            return rewritten_query
        
        # 如果重写失败，就返回原始问题作为保底
        print("查询重写失败，将使用原始问题。")
        return question
    
    def _perform_semantic_search(self, query: str) -> str:
        """
        执行一次完整的语义搜索并返回最终答案。
        这是一个可被复用的核心组件。
        """
        print(f"--- (Fallback) 启动语义搜索来回答: '{query}' ---")
        retrieved_info = self._simple_rag_search(query)
        
        if not retrieved_info:
            # 如果RAG没找到东西，就让LLM直接回答
            final_prompt = f"你是一个乐于助人的助手。请直接回答用户的这个开放性问题。\n\n用户问题: {query}\n\n你的回答:"
        else:
            # 如果RAG找到了东西，就用上下文来回答
            context_str = "\n---\n".join(retrieved_info)
            final_prompt = f"""你是一个知识渊博且善于总结的助手。请根据以下可能相关的历史对话，来回答用户的最终问题。请不要直接复述历史对话，而是要从中提炼和总结信息。

    # 相关历史对话:
    {context_str}

    # 最终用户问题:
    {query}

    # 你的回答:
    """
        messages = [{"role": "system", "content": final_prompt}]
        answer = self.client.call_for_text(messages)
        return answer or "抱歉，我找到了信息但组织语言时出错了。"


    def pipeline(self, query: str) -> str:
        category_data = self.select_fact(query)
        if not category_data:
            return "抱歉，我没判断出你的问题类型"
        
        category = category_data.get('type')
        print(f"--- 问题被分类为: {category} ---")

        # --- 路径一：尝试事实查找 ---
        if category == 'fact_lookup':
            available_keys = list(self.knowledge_base.keys())
            key = self._translate_question_to_key(query, available_keys)
            print(f"问题映射到的Key是: {key}")

            if key:
                information = self.knowledge_base.get(key)
                if information:
                    print("--- 在事实记忆中找到答案！ ---")
                    final_prompt = f"你是一个乐于助人的助手。请根据以下信息，用自然语言回答用户的问题。\n\n已知信息: '{key}' 是 '{information}'.\n\n用户问题: {query}\n\n你的回答:"
                    messages = [{"role": "system", "content": final_prompt}]
                    answer = self.client.call_for_text(messages)
                    return answer or "抱歉，我找到了信息但组织语言时出错了。"

           
            # 如果 key 没有找到，或者 key 找到了但 information 是空的
            # 我们不直接放弃，而是自动降级，去尝试语义搜索
            print("--- 事实查找失败，自动降级为语义搜索 ---")
            return self._perform_semantic_search(query)

        # --- 路径二：直接进行语义搜索 ---
        elif category == 'semantic_search':
            return self._perform_semantic_search(query)
            
       
        else:
            return "抱歉，我无法识别这个问题类型。"
        
    def answer_question(self,user_question: str,history:list):
        """
        接收用户问题，经过完整的记忆回忆流程，返回最终答案。
        """
        pre_data = self.pre_select(user_question,history)
        pre = pre_data.get('type')
        if pre == 'context_dependent':
            print("这是情景回答，需要查询重写，然后进入流水线")
            query = self._rewrite_query(user_question, history)
            return self.pipeline(query)
        elif pre == 'standalone':
            print('情景独立，直接进入流水线')
            return self.pipeline(user_question)

    def _simple_rag_search(self,query, top_k:int = 3):
        if not self.episodic_memory:
            return [] 
        
        query_embedding =self.embedding_model.encode([query])[0]
        docs = [item['text'] for item in self.episodic_memory]
        doc_embeddings = np.array([item['vector'] for item in self.episodic_memory])
        
        # 计算相似度
        similarities = []
        for i, doc_embedding in enumerate(doc_embeddings):
            similarity = 1 - cosine(query_embedding, doc_embedding)
            similarities.append((similarity, docs[i]))

        sorted_similarities = sorted(similarities,key = lambda x:x[0], reverse = True)
        return [doc for similarity, doc in sorted_similarities[:top_k]]
        


# 测试环节

## 初始化更新agent

In [5]:
api_key = MY_API_KEY
base_url = MY_BASE_URL
memory_system = AgentMemory(api_key=api_key,base_url=base_url)

### 提取测试

In [9]:


statement1 = "我是kk。"
statement2 = "我感觉今天有点累。" 


fact1 = memory_system.extract_fact(statement1)
fact2 = memory_system.extract_fact(statement2)


print(f"从 '{statement1}' 提取的事实是: {fact1}")
print(f"从 '{statement2}' 提取的事实是: {fact2}")

从 '我是kk。' 提取的事实是: {'user_name': 'kk'}
从 '我感觉今天有点累。' 提取的事实是: {}


### 仲裁测试

In [20]:
statement3 = '我不知道这是啥意思'
statement4 = '我的朋友什么时候到'

select1 = memory_system.select_fact(statement3)
select2 = memory_system.select_fact(statement4)

print(f" '{statement3}' 的仲裁是: {select1}")
print(f" '{statement4}' 的仲裁是: {select2}")

 '我不知道这是啥意思' 的仲裁是: {'type': 'semantic_search'}
 '我的朋友什么时候到' 的仲裁是: {'type': 'fact_lookup'}


### 添加memory测试

In [48]:
memory_system.add_statement_to_memory("我叫kk。")
memory_system.add_statement_to_memory("今天天气真不错，不是吗？")
memory_system.add_statement_to_memory("我最喜欢的编程语言是Python。")
memory_system.add_statement_to_memory("对了，我的邮箱是 explorer@adventure.com。")

--- 正在处理新陈述: '我叫kk。' ---
成功提取到事实: {'user_name': 'kk'}
知识库已更新！
当前知识库: {'user_name': 'kk', 'favorite_programming_language': 'Python'}

--- 正在处理新陈述: '今天天气真不错，不是吗？' ---
未提取到事实，知识库保持不变。
当前知识库: {'user_name': 'kk', 'favorite_programming_language': 'Python'}

--- 正在处理新陈述: '我最喜欢的编程语言是Python。' ---
成功提取到事实: {'favorite_programming_language': 'Python'}
知识库已更新！
当前知识库: {'user_name': 'kk', 'favorite_programming_language': 'Python'}

--- 正在处理新陈述: '对了，我的邮箱是 explorer@adventure.com。' ---
速率限制错误，将在 1 秒后重试... (第 1/5 次)
速率限制错误，将在 2 秒后重试... (第 2/5 次)
速率限制错误，将在 4 秒后重试... (第 3/5 次)
速率限制错误，将在 8 秒后重试... (第 4/5 次)
成功提取到事实: {'email': 'explorer@adventure.com'}
知识库已更新！
当前知识库: {'user_name': 'kk', 'favorite_programming_language': 'Python', 'email': 'explorer@adventure.com'}



## 端到端测试

### 窗口版本

In [4]:
memory_system = AgentMemory(api_key=MY_API_KEY, base_url=MY_BASE_URL)
print("--- Agent记忆系统已启动，请输入指令 ---")
print("指令格式:")
print("  'learn: [一句话]'  -> 让Agent学习一个新知识")
print("  'ask: [一个问题]'   -> 向Agent提问")
print("  'exit'             -> 退出程序")


# 初始化一个列表，用来记录message
chat_history = []

while True:
    
    user_input = input("\n> ")

    if user_input.lower() == 'exit':
        print("Agent已关闭。")
        print("\n--- 本次会话的短期记忆 ---")
        import json
        print(json.dumps(chat_history, indent=2, ensure_ascii=False))
        break
        
    
    # 解析指令
    if user_input.startswith("learn:"):
        statement = user_input[len("learn:"):].strip()
        
        # 准备好 learn 指令的用户部分和Agent部分
        user_part = user_input # 我们记录完整的指令
        agent_part = f"好的，我已经记住了关于'{statement[:10]}...'的信息。"
        
        print(f"Agent: {agent_part}")
        
        
        memory_system.add_statement_to_memory(statement, agent_part)
        
        # 更新短期记忆
        chat_history.append({"role": "user", "content": user_part})
        chat_history.append({"role": "assistant", "content": agent_part})
    
    elif user_input.startswith("ask:"):
        question = user_input[len("ask:"):].strip()
        chat_history.append({"role": "user", "content": question})
        answer = memory_system.answer_question(question, history=chat_history)
        print(f"Agent: {answer}")
        memory_system.add_statement_to_memory(statement, answer)
        chat_history.append({"role": "assistant", "content": answer})
    
    else:
        print("指令无法识别，请使用 'learn: ' 或 'ask: '")

--- Agent记忆系统已启动，请输入指令 ---
指令格式:
  'learn: [一句话]'  -> 让Agent学习一个新知识
  'ask: [一个问题]'   -> 向Agent提问
  'exit'             -> 退出程序
Agent: 好的，我已经记住了关于'这几天是国庆节...'的信息。
未提取到事实，知识库保持不变。
当前知识库: {}

Agent: 好的，我已经记住了关于'我喜欢的饮料是冰红茶...'的信息。
成功提取到事实: {'favorite_drink': '冰红茶'}
知识库已更新！
当前知识库: {'favorite_drink': '冰红茶'}

这是情景回答，需要查询重写，然后进入流水线
--- 启动查询重写 ---
原始问题: '我最喜欢的饮料是？'
重写后的问题: '我最喜欢的饮料是什么？'
--- 问题被分类为: fact_lookup ---
问题映射到的Key是: favorite_drink
--- 在事实记忆中找到答案！ ---
Agent: 根据已知信息，您最喜欢的饮料是冰红茶。
成功提取到事实: {'favorite_drink': '冰红茶'}
知识库已更新！
当前知识库: {'favorite_drink': '冰红茶'}

这是情景回答，需要查询重写，然后进入流水线
--- 启动查询重写 ---
原始问题: '你知道我为什么在家里休息吗？'
重写后的问题: '根据已知信息，你知道我为什么在家里休息吗？'
--- 问题被分类为: semantic_search ---
--- (Fallback) 启动语义搜索来回答: '根据已知信息，你知道我为什么在家里休息吗？' ---
Agent: 根据我们之前的对话记录，您提到过“这几天是国庆节”。国庆节通常是法定节假日，因此您可能因为假期而在家休息。不过，关于您休息的具体原因，目前已知信息中并没有更详细的说明。
成功提取到事实: {'favorite_drink': '冰红茶'}
知识库已更新！
当前知识库: {'favorite_drink': '冰红茶'}

Agent已关闭。

--- 本次会话的短期记忆 ---
[
  {
    "role": "user",
    "content": "learn: 这几天是国庆节"
  },
 