In [None]:
import torch
import os
import re
from typing import List, Dict, Optional
from transformers import (
    AutoTokenizer, 
    AutoModelForCausalLM, 
    pipeline,
    BitsAndBytesConfig  # 用于模型量化，降低显存占用
)
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import TextLoader

# 1. GPU配置模块
# --------------------------


In [13]:

def setup_environment() -> torch.device:
    """
    初始化运行环境，优先使用GPU并检查硬件配置
    DeepSeek-R1-Distill-Qwen-14B需要至少16GB显存的GPU
    """
    # 检查是否有可用GPU
    if torch.cuda.is_available():
        device = torch.device("cuda")
        # 检查GPU显存是否足够
        gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024 **3)  # 转换为GB
        if gpu_memory < 16:
            print(f"⚠️ 警告：检测到GPU显存为{gpu_memory:.1f}GB，可能不足以运行模型，建议使用16GB以上显存的GPU")
        print(f"✅ 使用GPU加速：{torch.cuda.get_device_name(0)}")
        print(f"📊 初始GPU显存占用：{torch.cuda.memory_allocated()/1024**2:.2f}MB")
        return device
    else:
        raise RuntimeError("❌ 未检测到可用GPU，DeepSeek-R1-Distill-Qwen-14B需要GPU运行")


# 2. LLM核心模块
# --------------------------

In [14]:
def load_deepseek_llm(device: torch.device) -> tuple[pipeline, AutoTokenizer]:
    """
    加载DeepSeek-R1-Distill-Qwen-14B模型和分词器
    使用4bit量化减少显存占用，同时保持较好性能
        generation_pipeline: 文本生成管道
    """
    # 模型名称，需要在HuggingFace上获取访问权限
    model_name = "./models/DeepSeek-R1-Distill-Qwen-14B"
    
    # 配置4bit量化参数，这是能够在16GB显存中运行14B模型的关键
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,                  # 启用4bit量化：INT4（4位整数），0.5字节
        bnb_4bit_use_double_quant=True,     # 使用双量化，进一步减少显存占用
        bnb_4bit_quant_type="nf4",          # 使用归一化浮点4bit类型，更适合LLM
        bnb_4bit_compute_dtype=torch.bfloat16  # 计算时使用bfloat16精度
    )
    
    # 加载分词器，Qwen系列模型原生支持pad_token
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.padding_side = "right"  # 设置右填充，避免影响生成结果
    
    # 加载模型，使用量化配置和自动设备映射
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        quantization_config=quantization_config,
        device_map="auto",  # 自动将模型层分配到可用设备
        torch_dtype=torch.bfloat16,
        trust_remote_code=True  # 需要信任远程代码，因为模型使用了自定义架构
    )
    
    # 创建文本生成管道，设置适合14B模型的生成参数
    generation_pipeline = pipeline(
        task="text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=150,        # 生成回答的最大长度
        temperature=0.5,           # 控制生成的随机性，0.5表示中等随机性
        repetition_penalty=1.1,    # 重复惩罚，减少重复生成相同内容
        top_p=0.9,                 # 核采样参数，控制生成的多样性
        do_sample=True,            # 启用采样生成
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id
    )
    
    print("✅ DeepSeek-R1-Distill-Qwen-14B模型加载完成（4bit量化）")
    print(f"📊 模型加载后显存占用：{torch.cuda.memory_allocated()/1024**2:.2f}MB")
    return generation_pipeline, tokenizer


# 3. 对话记忆模块
# --------------------------

In [None]:

class ConversationMemory:
    """
    对话记忆管理模块，负责存储和管理对话历史
    特点：
        - 支持超长对话历史（最高50000Token）
        - 自动截断超长历史，保留最新对话
        - 生成包含历史上下文的提示词

        初始化对话记忆
        参数:
            tokenizer: 用于计算Token长度的分词器
            max_token_len: 最大记忆长度（Token数）
        """
    def __init__(self, tokenizer: AutoTokenizer, max_token_len: int = 50000):
        self.tokenizer = tokenizer
        self.max_token_len = max_token_len  # 适配14B模型的32K上下文窗口
        self.history: List[Dict[str, str]] = []  # 存储对话历史

    """
        添加一轮对话到记忆中，并在必要时截断历史
        """
    def add_conversation(self, user_query: str, agent_response: str) -> None:
        self.history.append({"user": user_query, "agent": agent_response})
        self._truncate_history()  # 确保历史不超过最大长度

     """
        截断超长对话历史，从最旧的对话开始删除
        确保总长度不超过max_token_len
        """
    def _truncate_history(self) -> None:
        while True:
            # 1、拼接所有历史对话为文本
            history_text = "\n".join([
                f"用户：{item['user']}\n助手：{item['agent']}" 
                for item in self.history
            ])
            
            # 2、计算当前历史的Token长度
            token_len = len(self.tokenizer.encode(history_text, add_special_tokens=False))
            
            # 3、如果长度在限制范围内，或只剩最后一轮对话，则停止截断
            if token_len <= self.max_token_len or len(self.history) <= 1:
                break
            
            # 4、删除最旧的一轮对话
            self.history.pop(0)
        
        # 打印当前记忆长度（调试用）
        # current_len = len(self.tokenizer.encode(
        #     "\n".join([f"用户：{i['user']}\n助手：{i['agent']}" for i in self.history]),
        #     add_special_tokens=False
        # ))
        # print(f"💾 当前对话记忆长度：{current_len} Token")
        
    """
        生成包含历史对话的提示词，用于模型生成回答
        参数:
            current_query: 当前用户查询
        返回:
            包含历史上下文的提示词
        """
    def get_context_prompt(self, current_query: str) -> str:
        
        # 如果没有历史对话，直接返回当前问题
        if not self.history:
            return f"用户问：{current_query}\n助手回答："
        
        # 拼接历史对话和当前问题
        history_text = "\n".join([
            f"用户：{item['user']}\n助手：{item['agent']}" 
            for item in self.history
        ])
        
        # 生成提示词，指导模型基于历史和当前问题生成回答
        return f"""基于以下历史对话和当前问题，生成准确、简洁、有用的回答：
{history_text}
用户现在问：{current_query}
助手回答："""

# 4. 决策模块

In [None]:

def agent_decision(llm_pipeline: pipeline, query: str, memory: ConversationMemory) -> Dict[str, str]:
    """
    Agent决策逻辑：判断是否需要调用工具，以及调用哪个工具
    
    参数:
        llm_pipeline: 文本生成管道
        query: 当前用户查询
        
    返回:
        决策结果字典：包含action和可选的tool_name
    """
    # 生成决策提示词
    decision_prompt = f"""
任务：判断是否需要调用工具回答以下问题，并选择合适的工具。

判断标准：
1. 如果是数学计算问题（包含数字和运算符）→ 调用计算器工具，输出：tool:calculator
2. 如果是询问本Agent的功能、使用方法 → 调用文档检索工具，输出：tool:document_retrieval
3. 如果是需要实时信息的问题（如天气、新闻、实时数据等）→ 调用搜索工具，输出：tool:search
4. 其他情况 → 直接回答，输出：direct

需要分析的问题：{query}

参考历史对话（最近2轮）：{memory.history[-2:] if len(memory.history)>=2 else "无"}

请严格按照上述格式输出决策结果，仅输出决策内容，不添加额外信息。
决策：
"""
    
    # 调用模型生成决策
    decision_result = llm_pipeline(decision_prompt)[0]["generated_text"]
    # 提取决策结果（取"决策："后面的内容）
    decision_result = decision_result.split("决策：")[-1].strip().lower()
    
    # 解析决策结果
    if "tool:calculator" in decision_result:
        return {"action": "tool", "tool_name": "calculator"}
    elif "tool:document_retrieval" in decision_result:
        return {"action": "tool", "tool_name": "document_retrieval"}
    elif "tool:search" in decision_result:
        return {"action": "tool", "tool_name": "search"}
    else:
        # 如果决策不明确，默认直接回答
        return {"action": "direct"}


# 5. 工具模块
# --------------------------

In [None]:

class ToolManager:
    """
    工具管理模块，定义Agent可以调用的工具
    
    目前支持的工具：
    - 计算器：处理数学计算
    - 文档检索：从本地文档中查找信息
    - 模拟搜索：模拟网络搜索功能
    """
    def __init__(self, device: torch.device, doc_path: str = "agent_knowledge.txt"):
        """
        初始化工具管理器
        
        参数:
            device: 运行设备
            doc_path: 文档检索的知识库路径
        """
        self.device = device
        
        # 工具注册表，键为工具名称，值为工具函数
        self.tools = {
            "calculator": self._calculator_tool,
            "document_retrieval": self._document_retrieval_tool,
            "search": self._simulate_search_tool
        }
        
        # 初始化文档检索工具
        self._init_document_retrieval(doc_path)
        print("✅ 工具模块初始化完成（支持计算器、文档检索、模拟搜索）")
    
    def _init_document_retrieval(self, doc_path: str) -> None:
        """
        初始化文档检索工具（RAG：检索增强生成）
        
        步骤：
        1. 生成样本文档（如果不存在）
        2. 加载文档并分割为小块
        3. 初始化嵌入模型
        4. 创建向量数据库存储文档向量
        """
        # 生成样本文档（Agent的知识库）
        if not os.path.exists(doc_path):
            with open(doc_path, "w", encoding="utf-8") as f:
                f.write("本Agent基于DeepSeek-R1-Distill-Qwen-14B大语言模型开发，具备以下能力：\n")
                f.write("1. 自然语言对话：可以进行多轮对话，理解上下文语境\n")
                f.write("2. 数学计算：能够处理加减乘除等基本运算，支持括号和小数\n")
                f.write("3. 文档检索：可以回答关于自身功能和使用方法的问题\n")
                f.write("4. 模拟搜索：对于需要实时信息的问题，会提供模拟搜索结果\n")
                f.write("使用方法：直接输入您的问题即可，Agent会自动判断是否需要调用工具来回答\n")
        
        # 1、加载文档
        loader = TextLoader(doc_path, encoding="utf-8")
        documents = loader.load()

        # 将非字符串内容转换为字符串
        for i, doc in enumerate(documents):
            if not isinstance(doc.page_content, str):
                documents[i].page_content = str(doc.page_content)
                print(f"⚠️  修复文档 {i} 内容类型：非字符串→字符串")
        # 2、分割文档为适合嵌入的小块，     递归分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=100,      # 每个文本块的大小
            chunk_overlap=30,    # 文本块之间的重叠部分
            separators = ["\n\n", "\n", " ", ""]  # 中文分割符优化
        )
        split_docs = text_splitter.split_documents(documents)
        
        # 初始化嵌入模型（将文本转换为向量）
        embeddings = HuggingFaceEmbeddings(
            model_name="all-MiniLM-L6-v2",  # 轻量级嵌入模型
            model_kwargs={"device": self.device.type}  # 使用GPU加速
        )
        
        # 初始化Chroma向量数据库，存储文档向量
        self.vector_db = Chroma.from_documents(
            documents=split_docs,
            embedding=embeddings,
            persist_directory="./deepseek_agent_chroma_db"  # 向量数据持久化路径
        )
        self.vector_db.persist()  # 保存向量数据库
    
    def _calculator_tool(self, query: str) -> str:
        """
        计算器工具：处理数学计算
        
        参数:
            query: 包含数学表达式的查询
            
        返回:
            计算结果或错误信息
        """
        # 提取查询中的数学表达式
        expr_match = re.search(r"[\d\(\)\+\-\×\*\/\.\s]+", query)
        if not expr_match:
            return "未检测到有效的数学表达式，请输入包含数字和运算符(+、-、*、/、×、÷)的问题"
        
        # 提取并清理表达式
        expr = expr_match.group(0).strip()
        expr = expr.replace("×", "*").replace("÷", "/")  # 统一运算符
        
        # 安全检查：只允许数字、运算符和括号
        safe_chars = set("0123456789.+-*/() ")
        if not all(c in safe_chars for c in expr):
            return "表达式包含不安全字符，仅支持数字和基本运算符(+、-、*、/、()、.)"
        
        # 计算表达式
        try:
            result = eval(expr)  # 使用eval计算表达式
            return f"计算结果：{expr} = {float(result):.4g}"
        except ZeroDivisionError:
            return "计算错误：除数不能为0"
        except Exception as e:
            return f"计算错误：无法计算表达式 '{expr}'，错误信息：{str(e)}"
    
    def _document_retrieval_tool(self, query: str) -> str:
        """
        文档检索工具：从知识库中检索相关信息
        
        参数:
            query: 检索查询
            
        返回:
            检索到的相关信息
        """
        # 从向量数据库中检索最相关的3个文档片段
        retriever = self.vector_db.as_retriever(search_kwargs={"k": 3})
        relevant_docs = retriever.get_relevant_documents(query)
        
        if not relevant_docs:
            return "未检索到相关文档信息"
        
        # 拼接检索结果
        context = "\n".join([
            f"[相关片段{i+1}] {doc.page_content}" 
            for i, doc in enumerate(relevant_docs)
        ])
        return f"从知识库中检索到以下相关信息：\n{context}"
    
    def _simulate_search_tool(self, query: str) -> str:
        """
        模拟搜索工具：模拟网络搜索功能
        
        参数:
            query: 搜索查询
            
        返回:
            模拟的搜索结果
        """
        return f"模拟搜索结果（非实时数据）：关于'{query}'的信息\n" \
               f"1. 基本概述：这是一个关于'{query}'的示例搜索结果\n" \
               f"2. 提示：要获取最新信息，请使用浏览器访问搜索引擎"
    
    def call_tool(self, tool_name: str, query: str) -> str:
        """
        调用指定工具
        
        参数:
            tool_name: 工具名称
            query: 工具输入查询
            
        返回:
            工具执行结果
        """
        if tool_name not in self.tools:
            return f"不支持的工具：{tool_name}，当前支持的工具包括：{list(self.tools.keys())}"
        
        try:
            return self.tools[tool_name](query)
        except Exception as e:
            return f"工具调用出错：{str(e)}"

# 6. 执行模块
# --------------------------


In [18]:


class DeepSeekAgentAssistant:
    """
    基于DeepSeek-R1-Distill-Qwen-14B的Agent助手
    
    核心功能：
    - 接收用户输入
    - 决定是否调用工具
    - 执行决策（调用工具或直接回答）
    - 存储对话历史
    - 返回最终回答
    """
    def __init__(self):
        """初始化Agent的所有组件"""
        # 1. 初始化运行环境
        self.device = setup_environment()
        
        # 2. 加载LLM模型和分词器
        self.llm_pipeline, self.tokenizer = load_deepseek_llm(self.device)
        
        # 3. 初始化对话记忆
        self.memory = ConversationMemory(self.tokenizer)
        
        # 4. 初始化工具管理器
        self.tool_manager = ToolManager(self.device)
        
        print("🎉 DeepSeek-R1-Distill-Qwen-14B Agent助手初始化完成， ready to use!")
    
    def run(self, user_query: str) -> str:
        """
        执行Agent的完整工作流程
        
        参数:
            user_query: 用户输入的查询
            
        返回:
            Agent生成的回答
        """
        print(f"\n📥 用户输入：{user_query}")
        
        # 步骤1：Agent决策（是否调用工具）
        decision = agent_decision(self.llm_pipeline, user_query, self.memory)
        print(f"🔍 Agent决策：{decision}")
        
        # 步骤2：执行决策
        try:
            if decision["action"] == "tool":
                # 调用工具并获取结果
                tool_name = decision["tool_name"]
                tool_result = self.tool_manager.call_tool(tool_name, user_query)
                print(f"🛠️  工具执行结果：{tool_result[:200]}..." if len(tool_result) > 200 else f"🛠️  工具执行结果：{tool_result}")
                
                # 结合工具结果生成最终回答
                final_prompt = self.memory.get_context_prompt(user_query)
                final_prompt += f"请根据以下工具结果回答问题：{tool_result}"
                agent_response = self.llm_pipeline(final_prompt)[0]["generated_text"]
                
            else:
                # 直接生成回答（不调用工具）
                final_prompt = self.memory.get_context_prompt(user_query)
                agent_response = self.llm_pipeline(final_prompt)[0]["generated_text"]
            
            # 提取并清理回答（去掉提示词部分和特殊标记）
            agent_response = agent_response.split("助手回答：")[-1].strip().replace("</s>", "")
        
        except Exception as e:
            # 处理可能的错误
            agent_response = f"处理您的问题时出错：{str(e)}"
            print(f"❌ 执行错误：{str(e)}")
        
        # 步骤3：将本轮对话添加到记忆
        self.memory.add_conversation(user_query, agent_response)
        
        print(f"📤 Agent回答：{agent_response}")
        return agent_response



# 7. 测试模块（交互入口）
# --------------------------


In [19]:

if __name__ == "__main__":
    # 提示用户需要模型访问权限
    print("⚠️ 注意：使用前请确保已在HuggingFace获取模型访问权限，并通过`huggingface-cli login`命令登录")
    
    try:
        # 初始化Agent
        agent = DeepSeekAgentAssistant()
        
        # 启动交互循环
        print("\n===== DeepSeek-R1-Distill-Qwen-14B Agent助手 =====")
        print("提示：输入'退出'可结束对话")
        
        while True:
            user_input = input("你：")
            # 检查是否退出
            if user_input.strip().lower() in ["退出", "quit", "exit"]:
                print("Agent：再见！有任何问题欢迎再次咨询～")
                break
            
            # 处理用户输入并获取回答
            response = agent.run(user_input)
            print(f"Agent：{response}")
    
    except Exception as e:
        print(f"初始化失败：{str(e)}")
        print("💡 解决方案：")
        print("1. 确保您的GPU显存不少于16GB")
        print("2. 确保已安装必要依赖：pip install torch transformers bitsandbytes langchain-huggingface chromadb sentence-transformers")
        print("3. 确保已获取模型访问权限并成功登录")
    

⚠️ 注意：使用前请确保已在HuggingFace获取模型访问权限，并通过`huggingface-cli login`命令登录
初始化失败：❌ 未检测到可用GPU，DeepSeek-R1-Distill-Qwen-14B需要GPU运行
💡 解决方案：
1. 确保您的GPU显存不少于16GB
2. 确保已安装必要依赖：pip install torch transformers bitsandbytes langchain-huggingface chromadb sentence-transformers
3. 确保已获取模型访问权限并成功登录


# 输出