Context-Engineering: RAG 检索增强生成的实践配方
=============================================

本模块演示了检索增强生成（RAG）模式的实际实现，用于通过外部知识增强大语言模型（LLM）的上下文。
我们专注于最小化、高效的实现，突出关键概念，无需复杂的基础设施。

涵盖的关键概念：
1. 基本 RAG 流水线构建
2. 上下文窗口管理与分块策略
3. 嵌入与检索技术
4. 检索质量与相关性评估
5. 上下文集成模式
6. 高级 RAG 变体

用法：
    # 在 Jupyter 或 Colab 中：
    %run 04_rag_recipes.py
    # 或
    from rag_recipes import SimpleRAG, ChunkedRAG, HybridRAG


In [1]:
import os
import re
import json
import time
import numpy as np
import logging
import tiktoken
from typing import Dict, List, Tuple, Any, Optional, Union, Callable, TypeVar
from dataclasses import dataclass
import matplotlib.pyplot as plt
from IPython.display import display, Markdown, HTML

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 检查必需的库
try:
    from openai import OpenAI
    OPENAI_AVAILABLE = True
except ImportError:
    OPENAI_AVAILABLE = False
    logger.warning("未找到 OpenAI 包。请安装：pip install openai")

try:
    import dotenv
    dotenv.load_dotenv()
    ENV_LOADED = True
except ImportError:
    ENV_LOADED = False
    logger.warning("未找到 python-dotenv。请安装：pip install python-dotenv")

try:
    from sklearn.metrics.pairwise import cosine_similarity
    SKLEARN_AVAILABLE = True
except ImportError:
    SKLEARN_AVAILABLE = False
    logger.warning("未找到 scikit-learn。请安装：pip install scikit-learn")

try:
    import numpy as np
    NUMPY_AVAILABLE = True
except ImportError:
    NUMPY_AVAILABLE = False
    logger.warning("未找到 NumPy。请安装：pip install numpy")

try:
    import faiss
    FAISS_AVAILABLE = True
except ImportError:
    FAISS_AVAILABLE = False
    logger.warning("未找到 FAISS。请安装：pip install faiss-cpu 或 faiss-gpu")

# 常量
DEFAULT_MODEL = "openai/gpt-4.1"
DEFAULT_EMBEDDING_MODEL = "openai/text-embedding-3-small"
DEFAULT_TEMPERATURE = 0.7
DEFAULT_MAX_TOKENS = 500
DEFAULT_CHUNK_SIZE = 1000
DEFAULT_CHUNK_OVERLAP = 200
DEFAULT_TOP_K = 3


2025-07-12 09:54:06,086 - faiss.loader - INFO - Loading faiss with AVX512 support.
2025-07-12 09:54:06,086 - faiss.loader - INFO - Could not load library with AVX512 support due to:
ModuleNotFoundError("No module named 'faiss.swigfaiss_avx512'")
2025-07-12 09:54:06,086 - faiss.loader - INFO - Loading faiss with AVX2 support.
2025-07-12 09:54:06,086 - faiss.loader - INFO - Could not load library with AVX512 support due to:
ModuleNotFoundError("No module named 'faiss.swigfaiss_avx512'")
2025-07-12 09:54:06,086 - faiss.loader - INFO - Loading faiss with AVX2 support.
2025-07-12 09:54:06,118 - faiss.loader - INFO - Successfully loaded faiss with AVX2 support.
2025-07-12 09:54:06,129 - faiss - INFO - Failed to load GPU Faiss: name 'GpuIndexIVFFlat' is not defined. Will not load constructor refs for GPU indexes. This is only an error if you're trying to use GPU Faiss.


In [2]:
# 基础数据结构
# =====================

@dataclass
class Document:
    """表示一个文档或文本块及其元数据。"""
    content: str
    metadata: Dict[str, Any] = None
    embedding: Optional[List[float]] = None
    id: Optional[str] = None
    
    def __post_init__(self):
        """如果未提供，初始化默认值。"""
        if self.metadata is None:
            self.metadata = {}
        
        if self.id is None:
            # 基于内容哈希生成简单ID
            import hashlib
            self.id = hashlib.md5(self.content.encode()).hexdigest()[:8]


# 辅助函数
# ===============

def setup_client(api_key=None, model=DEFAULT_MODEL):
    """
    设置用于 LLM 交互的 API 客户端。

    参数：
        api_key: API 密钥（如果为 None，将在环境变量中查找 OPENAI_API_KEY）
        model: 要使用的模型名称

    返回：
        tuple: (客户端, 模型名称)
    """
    if api_key is None:
        api_key = os.environ.get("GITHUB_TOKEN")
        if api_key is None and not ENV_LOADED:
            logger.warning("未找到 API 密钥。请设置 OPENAI_API_KEY 环境变量或传递 api_key 参数。")
    
    if OPENAI_AVAILABLE:
        client = OpenAI(
            base_url="https://models.github.ai/inference",
            api_key=api_key,
        )
        return client, model
    else:
        logger.error("需要 OpenAI 包。请安装：pip install openai")
        return None, model


def count_tokens(text: str, model: str = DEFAULT_MODEL) -> int:
    """
    使用合适的分词器计算文本字符串中的令牌数。

    参数：
        text: 要分词的文本
        model: 用于分词的模型名称

    返回：
        int: 令牌数量
    """
    try:
        encoding = tiktoken.encoding_for_model(model)
        return len(encoding.encode(text))
    except Exception as e:
        # 当 tiktoken 不支持该模型时的备用方案
        logger.warning(f"无法为 {model} 使用 tiktoken：{e}")
        # 粗略近似：英语中 1 个令牌 ≈ 4 个字符
        return len(text) // 4


def generate_embedding(
    text: str,
    client=None,
    model: str = DEFAULT_EMBEDDING_MODEL
) -> List[float]:
    """
    为给定文本生成嵌入向量。

    参数：
        text: 要嵌入的文本
        client: API 客户端（如果为 None，将创建一个）
        model: 嵌入模型名称

    返回：
        list: 嵌入向量
    """
    if client is None:
        client, _ = setup_client()
        if client is None:
            # 如果没有可用的客户端，返回虚拟嵌入
            return [0.0] * 1536  # 许多嵌入模型的默认大小
    
    try:
        response = client.embeddings.create(
            model=model,
            input=[text]
        )
        return response.data[0].embedding
    except Exception as e:
        logger.error(f"生成嵌入时出错：{e}")
        # 出错时返回虚拟嵌入
        return [0.0] * 1536


def generate_response(
    prompt: str,
    client=None,
    model: str = DEFAULT_MODEL,
    temperature: float = DEFAULT_TEMPERATURE,
    max_tokens: int = DEFAULT_MAX_TOKENS,
    system_message: str = "你是一个有用的助手。"
) -> Tuple[str, Dict[str, Any]]:
    """
    从 LLM 生成响应并返回元数据。

    参数：
        prompt: 要发送的提示
        client: API 客户端（如果为 None，将创建一个）
        model: 模型名称
        temperature: 温度参数
        max_tokens: 生成的最大令牌数
        system_message: 要使用的系统消息

    返回：
        tuple: (响应文本, 元数据)
    """
    if client is None:
        client, model = setup_client(model=model)
        if client is None:
            return "错误：没有可用的 API 客户端", {"error": "没有 API 客户端"}
    
    prompt_tokens = count_tokens(prompt, model)
    system_tokens = count_tokens(system_message, model)
    
    metadata = {
        "prompt_tokens": prompt_tokens,
        "system_tokens": system_tokens,
        "model": model,
        "temperature": temperature,
        "max_tokens": max_tokens,
        "timestamp": time.time()
    }
    
    try:
        start_time = time.time()
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": system_message},
                {"role": "user", "content": prompt}
            ],
            temperature=temperature,
            max_tokens=max_tokens
        )
        latency = time.time() - start_time
        
        response_text = response.choices[0].message.content
        response_tokens = count_tokens(response_text, model)
        
        metadata.update({
            "latency": latency,
            "response_tokens": response_tokens,
            "total_tokens": prompt_tokens + system_tokens + response_tokens,
            "token_efficiency": response_tokens / (prompt_tokens + system_tokens) if (prompt_tokens + system_tokens) > 0 else 0,
            "tokens_per_second": response_tokens / latency if latency > 0 else 0
        })
        
        return response_text, metadata
    
    except Exception as e:
        logger.error(f"生成响应时出错：{e}")
        metadata["error"] = str(e)
        return f"错误：{str(e)}", metadata


def format_metrics(metrics: Dict[str, Any]) -> str:
    """
    将指标字典格式化为可读字符串。
    
    参数：
        metrics: 指标字典
        
    返回：
        str: 格式化的指标字符串
    """
    # 选择要显示的最重要指标
    key_metrics = {
        "提示令牌": metrics.get("prompt_tokens", 0),
        "响应令牌": metrics.get("response_tokens", 0),
        "总令牌": metrics.get("total_tokens", 0),
        "延迟": f"{metrics.get('latency', 0):.2f}秒",
        "令牌效率": f"{metrics.get('token_efficiency', 0):.2f}"
    }
    
    return " | ".join([f"{k}: {v}" for k, v in key_metrics.items()])


def display_response(
    prompt: str,
    response: str,
    retrieved_context: Optional[str] = None,
    metrics: Dict[str, Any] = None,
    show_prompt: bool = True,
    show_context: bool = True
) -> None:
    """
    在笔记本中显示提示-响应对和指标。
    
    参数：
        prompt: 提示文本
        response: 响应文本
        retrieved_context: 检索到的上下文（可选）
        metrics: 指标字典（可选）
        show_prompt: 是否显示提示文本
        show_context: 是否显示检索到的上下文
    """
    if show_prompt:
        display(HTML("<h4>查询：</h4>"))
        display(Markdown(f"```\n{prompt}\n```"))
    
    if retrieved_context and show_context:
        display(HTML("<h4>检索到的上下文：</h4>"))
        display(Markdown(f"```\n{retrieved_context}\n```"))
    
    display(HTML("<h4>响应：</h4>"))
    display(Markdown(response))
    
    if metrics:
        display(HTML("<h4>指标：</h4>"))
        display(Markdown(f"```\n{format_metrics(metrics)}\n```"))


In [None]:
# 文档处理函数
# ============================

def text_to_chunks(
    text: str,
    chunk_size: int = DEFAULT_CHUNK_SIZE,
    chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
    model: str = DEFAULT_MODEL
) -> List[Document]:
    """
    将文本分割成指定标记大小的重叠块。
    
    Args:
        text: 要分割的文本
        chunk_size: 每个块的最大标记数
        chunk_overlap: 块之间重叠的标记数
        model: 用于标记化的模型
        
    Returns:
        list: Document对象列表
    """
    if not text:
        return []
    
    # 获取标记器
    try:
        encoding = tiktoken.encoding_for_model(model)
    except:
        logger.warning(f"无法获取{model}的标记器。使用近似分块。")
        return _approximate_text_to_chunks(text, chunk_size, chunk_overlap)
    
    # 对文本进行标记化
    tokens = encoding.encode(text)
    
    # 创建块
    chunks = []
    i = 0
    while i < len(tokens):
        # 提取块标记
        chunk_end = min(i + chunk_size, len(tokens))
        chunk_tokens = tokens[i:chunk_end]
        
        # 解码回文本
        chunk_text = encoding.decode(chunk_tokens)
        
        # 创建文档
        chunks.append(Document(
            content=chunk_text,
            metadata={
                "start_idx": i,
                "end_idx": chunk_end,
                "chunk_size": len(chunk_tokens)
            }
        ))
        
        # 移动到下一个块，考虑重叠
        i += max(1, chunk_size - chunk_overlap)
    
    return chunks


def _approximate_text_to_chunks(
    text: str,
    chunk_size: int = DEFAULT_CHUNK_SIZE,
    chunk_overlap: int = DEFAULT_CHUNK_OVERLAP
) -> List[Document]:
    """
    使用简单的基于字符的近似方法将文本分割成块。
    
    Args:
        text: 要分割的文本
        chunk_size: 每个块的近似字符数（假设约4个字符/标记）
        chunk_overlap: 重叠的近似字符数
        
    Returns:
        list: Document对象列表
    """
    # 将标记大小转换为字符大小（近似）
    char_size = chunk_size * 4
    char_overlap = chunk_overlap * 4
    
    # 首先按段落分割（如果可能的话，避免在段落中间分割）
    paragraphs = text.split('\n\n')
    
    chunks = []
    current_chunk = []
    current_size = 0
    
    for paragraph in paragraphs:
        paragraph_size = len(paragraph)
        
        # 如果添加此段落会超过块大小
        if current_size + paragraph_size > char_size and current_chunk:
            # 从当前文本创建一个块
            chunk_text = '\n\n'.join(current_chunk)
            chunks.append(Document(
                content=chunk_text,
                metadata={"approx_size": current_size}
            ))
            
            # 开始一个新块，带有重叠
            # 找到应该包含在重叠中的段落
            overlap_size = 0
            overlap_paragraphs = []
            
            for p in reversed(current_chunk):
                p_size = len(p)
                if overlap_size + p_size <= char_overlap:
                    overlap_paragraphs.insert(0, p)
                    overlap_size += p_size
                else:
                    break
            
            current_chunk = overlap_paragraphs
            current_size = overlap_size
        
        # 添加当前段落
        current_chunk.append(paragraph)
        current_size += paragraph_size
    
    # 如果还有剩余内容，添加最后一个块
    if current_chunk:
        chunk_text = '\n\n'.join(current_chunk)
        chunks.append(Document(
            content=chunk_text,
            metadata={"approx_size": current_size}
        ))
    
    return chunks


def extract_document_batch_embeddings(
    documents: List[Document],
    client=None,
    model: str = DEFAULT_EMBEDDING_MODEL,
    batch_size: int = 10
) -> List[Document]:
    """
    高效地为一批文档生成向量嵌入。
    
    Args:
        documents: 要嵌入的文档对象列表
        client: API客户端（如果为None，将创建一个）
        model: 使用的嵌入模型
        batch_size: 每次API调用中嵌入的文档数量
        
    Returns:
        list: 带有嵌入的更新文档对象
    """
    if not documents:
        return []
    
    if client is None:
        client, _ = setup_client()
        if client is None:
            logger.error("没有可用的API客户端进行嵌入")
            return documents
    
    # 分批处理
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        batch_texts = [doc.content for doc in batch]
        
        try:
            # 为批次生成嵌入
            response = client.embeddings.create(
                model=model,
                input=batch_texts
            )
            
            # 使用嵌入更新文档
            for j, doc in enumerate(batch):
                if j < len(response.data):
                    doc.embedding = response.data[j].embedding
                else:
                    logger.warning(f"文档 {i+j} 缺少嵌入")
        except Exception as e:
            logger.error(f"生成批次嵌入时出错: {e}")
    
    return documents


def similarity_search(
    query_embedding: List[float],
    documents: List[Document],
    top_k: int = DEFAULT_TOP_K
) -> List[Tuple[Document, float]]:
    """
    找到与查询嵌入最相似的文档。
    
    Args:
        query_embedding: 查询嵌入向量
        documents: 带有嵌入的文档对象列表
        top_k: 返回的结果数量
        
    Returns:
        list: (文档, 相似度分数) 元组列表
    """
    if not NUMPY_AVAILABLE:
        logger.error("相似度搜索需要NumPy")
        return []
    
    # 过滤掉没有嵌入的文档
    docs_with_embeddings = [doc for doc in documents if doc.embedding is not None]
    
    if not docs_with_embeddings:
        logger.warning("找不到带有嵌入的文档")
        return []
    
    # 将嵌入转换为numpy数组
    query_embedding_np = np.array(query_embedding).reshape(1, -1)
    doc_embeddings = np.array([doc.embedding for doc in docs_with_embeddings])
    
    # 计算余弦相似度
    if SKLEARN_AVAILABLE:
        similarities = cosine_similarity(query_embedding_np, doc_embeddings)[0]
    else:
        # 回退到手动余弦相似度计算
        norm_query = np.linalg.norm(query_embedding_np)
        norm_docs = np.linalg.norm(doc_embeddings, axis=1)
        dot_products = np.dot(query_embedding_np, doc_embeddings.T)[0]
        similarities = dot_products / (norm_query * norm_docs)
    
    # 创建(文档, 相似度)对
    doc_sim_pairs = list(zip(docs_with_embeddings, similarities))
    
    # 按相似度排序（降序）并取前top_k个
    sorted_pairs = sorted(doc_sim_pairs, key=lambda x: x[1], reverse=True)
    return sorted_pairs[:top_k]


def create_faiss_index(documents: List[Document]) -> Any:
    """
    从文档嵌入创建FAISS索引以进行高效的相似度搜索。
    
    Args:
        documents: 带有嵌入的文档对象列表
        
    Returns:
        object: FAISS索引，如果FAISS不可用则返回None
    """
    if not FAISS_AVAILABLE:
        logger.error("索引需要FAISS")
        return None
    
    # 过滤掉没有嵌入的文档
    docs_with_embeddings = [doc for doc in documents if doc.embedding is not None]
    
    if not docs_with_embeddings:
        logger.warning("找不到带有嵌入的文档")
        return None
    
    # 从第一个文档获取嵌入维度
    embedding_dim = len(docs_with_embeddings[0].embedding)
    
    # 创建FAISS索引
    index = faiss.IndexFlatL2(embedding_dim)
    
    # 将嵌入添加到索引中
    embeddings = np.array([doc.embedding for doc in docs_with_embeddings], dtype=np.float32)
    index.add(embeddings)
    
    return index, docs_with_embeddings


def faiss_similarity_search(
    query_embedding: List[float],
    faiss_index: Any,
    documents: List[Document],
    top_k: int = DEFAULT_TOP_K
) -> List[Tuple[Document, float]]:
    """
    使用FAISS索引找到最相似的文档。
    
    Args:
        query_embedding: 查询嵌入向量
        faiss_index: FAISS索引（来自create_faiss_index）
        documents: 对应索引的文档对象列表
        top_k: 返回的结果数量
        
    Returns:
        list: (文档, 相似度分数) 元组列表
    """
    if not FAISS_AVAILABLE:
        logger.error("相似度搜索需要FAISS")
        return []
    
    if faiss_index is None:
        logger.error("FAISS索引为None")
        return []
    
    # 如果从create_faiss_index返回，解包索引和文档
    if isinstance(faiss_index, tuple):
        index, docs_with_embeddings = faiss_index
    else:
        index = faiss_index
        docs_with_embeddings = documents
    
    # 将查询转换为numpy数组
    query_np = np.array([query_embedding], dtype=np.float32)
    
    # 搜索索引
    distances, indices = index.search(query_np, top_k)
    
    # 创建(文档, 相似度)对
    # 将L2距离转换为相似度分数（越高越好）
    results = []
    for i in range(len(indices[0])):
        idx = indices[0][i]
        if idx < len(docs_with_embeddings):
            # 将L2距离转换为相似度（1 / (1 + distance)）
            similarity = 1.0 / (1.0 + distances[0][i])
            results.append((docs_with_embeddings[idx], similarity))
    
    return results

In [None]:
# RAG 系统基类
# ============

class RAGSystem:
    """
    检索增强生成系统的基类。
    提供通用功能和接口。
    """
    
    def __init__(
        self,
        client=None,
        model: str = DEFAULT_MODEL,
        embedding_model: str = DEFAULT_EMBEDDING_MODEL,
        system_message: str = "你是一个基于检索上下文来回答问题的有用助手。",
        max_tokens: int = DEFAULT_MAX_TOKENS,
        temperature: float = DEFAULT_TEMPERATURE,
        verbose: bool = False
    ):
        """
        初始化 RAG 系统。
        
        参数:
            client: API 客户端（如果为 None，将创建一个新的）
            model: 用于生成的模型名称
            embedding_model: 用于嵌入的模型名称
            system_message: 要使用的系统消息
            max_tokens: 最大生成令牌数
            temperature: 温度参数
            verbose: 是否打印调试信息
        """
        self.client, self.model = setup_client(model=model) if client is None else (client, model)
        self.embedding_model = embedding_model
        self.system_message = system_message
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.verbose = verbose
        
        # 初始化文档存储
        self.documents = []
        
        # 初始化历史记录和指标跟踪
        self.history = []
        self.metrics = {
            "total_prompt_tokens": 0,
            "total_response_tokens": 0,
            "total_tokens": 0,
            "total_latency": 0,
            "retrieval_latency": 0,
            "queries": 0
        }
    
    def _log(self, message: str) -> None:
        """
        如果启用了详细模式，则记录消息。
        
        参数:
            message: 要记录的消息
        """
        if self.verbose:
            logger.info(message)
    
    def add_documents(self, documents: List[Document]) -> None:
        """
        将文档添加到文档存储中。
        
        参数:
            documents: 要添加的文档对象列表
        """
        self.documents.extend(documents)
    
    def add_texts(
        self,
        texts: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None
    ) -> None:
        """
        将文本添加到文档存储中，可选择添加元数据。
        
        参数:
            texts: 要添加的文本字符串列表
            metadatas: 元数据字典列表（可选）
        """
        if metadatas is None:
            metadatas = [{} for _ in texts]
        
        # 创建 Document 对象
        documents = [
            Document(content=text, metadata=metadata)
            for text, metadata in zip(texts, metadatas)
        ]
        
        self.add_documents(documents)
    
    def _retrieve(
        self,
        query: str,
        top_k: int = DEFAULT_TOP_K
    ) -> List[Tuple[Document, float]]:
        """
        检索与查询相关的文档。
        
        参数:
            query: 查询字符串
            top_k: 返回的结果数量
            
        返回:
            list: (文档, 相似度分数) 元组列表
        """
        # 这是一个占位符 - 子类应该实现这个方法
        raise NotImplementedError("子类必须实现 _retrieve 方法")
    
    def _format_context(
        self,
        retrieved_documents: List[Tuple[Document, float]]
    ) -> str:
        """
        将检索到的文档格式化为上下文字符串。
        
        参数:
            retrieved_documents: (文档, 相似度分数) 元组列表
            
        返回:
            str: 格式化的上下文字符串
        """
        context_parts = []
        
        for i, (doc, score) in enumerate(retrieved_documents):
            # 格式化文档和元数据
            source_info = ""
            if doc.metadata:
                # 如果可用，提取来源信息
                source = doc.metadata.get("source", "")
                if source:
                    source_info = f" (来源: {source})"
            
            context_parts.append(f"[文档 {i+1}{source_info}]\n{doc.content}\n")
        
        return "\n".join(context_parts)
    
    def _create_prompt(
        self,
        query: str,
        context: str
    ) -> str:
        """
        创建结合查询和检索上下文的提示。
        
        参数:
            query: 用户查询
            context: 检索到的上下文
            
        返回:
            str: 格式化的提示
        """
        return f"""基于检索到的上下文回答以下问题。如果上下文不包含相关信息，请如实说明而不是编造答案。

检索到的上下文:
{context}

问题: {query}

答案:"""
    
    def query(
        self,
        query: str,
        top_k: int = DEFAULT_TOP_K
    ) -> Tuple[str, Dict[str, Any]]:
        """
        通过 RAG 流水线处理查询。
        
        参数:
            query: 查询字符串
            top_k: 返回的结果数量
            
        返回:
            tuple: (响应, 详细信息)
        """
        self._log(f"正在处理查询: {query}")
        
        # 检索相关文档
        start_time = time.time()
        retrieved_docs = self._retrieve(query, top_k)
        retrieval_latency = time.time() - start_time
        
        # 从检索到的文档格式化上下文
        context = self._format_context(retrieved_docs)
        
        # 创建提示
        prompt = self._create_prompt(query, context)
        
        # 生成响应
        response, metadata = generate_response(
            prompt=prompt,
            client=self.client,
            model=self.model,
            temperature=self.temperature,
            max_tokens=self.max_tokens,
            system_message=self.system_message
        )
        
        # 更新指标
        self.metrics["total_prompt_tokens"] += metadata.get("prompt_tokens", 0)
        self.metrics["total_response_tokens"] += metadata.get("response_tokens", 0)
        self.metrics["total_tokens"] += metadata.get("total_tokens", 0)
        self.metrics["total_latency"] += metadata.get("latency", 0)
        self.metrics["retrieval_latency"] += retrieval_latency
        self.metrics["queries"] += 1
        
        # 添加到历史记录
        query_record = {
            "query": query,
            "retrieved_docs": [(doc.content, score) for doc, score in retrieved_docs],
            "context": context,
            "prompt": prompt,
            "response": response,
            "metrics": {
                **metadata,
                "retrieval_latency": retrieval_latency
            },
            "timestamp": time.time()
        }
        self.history.append(query_record)
        
        # 创建详细信息字典
        details = {
            "query": query,
            "retrieved_docs": retrieved_docs,
            "context": context,
            "response": response,
            "metrics": {
                **metadata,
                "retrieval_latency": retrieval_latency
            }
        }
        
        return response, details
    
    def get_summary_metrics(self) -> Dict[str, Any]:
        """
        获取所有查询的摘要指标。
        
        返回:
            dict: 摘要指标
        """
        summary = self.metrics.copy()
        
        # 添加派生指标
        if summary["queries"] > 0:
            summary["avg_latency_per_query"] = summary["total_latency"] / summary["queries"]
            summary["avg_retrieval_latency"] = summary["retrieval_latency"] / summary["queries"]
            
        if summary["total_prompt_tokens"] > 0:
            summary["overall_efficiency"] = (
                summary["total_response_tokens"] / summary["total_prompt_tokens"]
            )
        
        return summary
    
    def display_query_results(self, details: Dict[str, Any], show_context: bool = True) -> None:
        """
        在笔记本中显示查询结果。
        
        参数:
            details: 来自 query() 的查询详细信息
            show_context: 是否显示检索到的上下文
        """
        display(HTML("<h2>RAG 查询结果</h2>"))
        
        # 显示查询
        display(HTML("<h3>查询</h3>"))
        display(Markdown(details["query"]))
        
        # 显示检索到的文档
        if show_context and "retrieved_docs" in details:
            display(HTML("<h3>检索到的文档</h3>"))
            
            for i, (doc, score) in enumerate(details["retrieved_docs"]):
                display(HTML(f"<h4>文档 {i+1} (分数: {score:.4f})</h4>"))
                
                # 如果可用，显示元数据
                if doc.metadata:
                    display(HTML("<p><em>元数据:</em></p>"))
                    display(Markdown(f"```json\n{json.dumps(doc.metadata, indent=2)}\n```"))
                
                # 显示内容
                display(Markdown(f"```\n{doc.content}\n```"))
        
        # 显示响应
        display(HTML("<h3>响应</h3>"))
        display(Markdown(details["response"]))
        
        # 显示指标
        if "metrics" in details:
            display(HTML("<h3>指标</h3>"))
            metrics = details["metrics"]
            
            # 格式化指标
            display(Markdown(f"""
            - 提示令牌数: {metrics.get('prompt_tokens', 0)}
            - 响应令牌数: {metrics.get('response_tokens', 0)}
            - 总令牌数: {metrics.get('total_tokens', 0)}
            - 生成延迟: {metrics.get('latency', 0):.2f}秒
            - 检索延迟: {metrics.get('retrieval_latency', 0):.2f}秒
            - 总延迟: {metrics.get('latency', 0) + metrics.get('retrieval_latency', 0):.2f}秒
            """))
    
    def visualize_metrics(self) -> None:
        """
        创建跨查询的指标可视化。
        """
        if not self.history:
            logger.warning("没有历史记录可视化")
            return
        
        # 提取绘图数据
        queries = list(range(1, len(self.history) + 1))
        prompt_tokens = [h["metrics"].get("prompt_tokens", 0) for h in self.history]
        response_tokens = [h["metrics"].get("response_tokens", 0) for h in self.history]
        generation_latencies = [h["metrics"].get("latency", 0) for h in self.history]
        retrieval_latencies = [h["metrics"].get("retrieval_latency", 0) for h in self.history]
        total_latencies = [g + r for g, r in zip(generation_latencies, retrieval_latencies)]
        
        # 创建图表
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        fig.suptitle("RAG 系统各查询指标", fontsize=16)

        plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
        plt.rcParams['axes.unicode_minus'] = False

        # 图表1: 令牌使用情况
        axes[0, 0].bar(queries, prompt_tokens, label="提示令牌", color="blue", alpha=0.7)
        axes[0, 0].bar(queries, response_tokens, bottom=prompt_tokens, 
                       label="响应令牌", color="green", alpha=0.7)
        axes[0, 0].set_title("令牌使用情况")
        axes[0, 0].set_xlabel("查询")
        axes[0, 0].set_ylabel("令牌数")
        axes[0, 0].legend()
        axes[0, 0].grid(alpha=0.3)
        
        # 图表2: 延迟分解
        axes[0, 1].bar(queries, retrieval_latencies, label="检索", color="orange", alpha=0.7)
        axes[0, 1].bar(queries, generation_latencies, bottom=retrieval_latencies, 
                      label="生成", color="red", alpha=0.7)
        axes[0, 1].set_title("延迟分解")
        axes[0, 1].set_xlabel("查询")
        axes[0, 1].set_ylabel("秒")
        axes[0, 1].legend()
        axes[0, 1].grid(alpha=0.3)
        
        # 图表3: 检索数量
        if any("retrieved_docs" in h for h in self.history):
            doc_counts = [len(h.get("retrieved_docs", [])) for h in self.history]
            axes[1, 0].plot(queries, doc_counts, marker='o', color="purple", alpha=0.7)
            axes[1, 0].set_title("检索文档数量")
            axes[1, 0].set_xlabel("查询")
            axes[1, 0].set_ylabel("数量")
            axes[1, 0].grid(alpha=0.3)
        
        # 图表4: 累积令牌
        cumulative_tokens = np.cumsum([h["metrics"].get("total_tokens", 0) for h in self.history])
        axes[1, 1].plot(queries, cumulative_tokens, marker='^', color="brown", alpha=0.7)
        axes[1, 1].set_title("累积令牌使用量")
        axes[1, 1].set_xlabel("查询")
        axes[1, 1].set_ylabel("总令牌数")
        axes[1, 1].grid(alpha=0.3)
        
        plt.tight_layout()
        plt.subplots_adjust(top=0.9)
        plt.show()

In [None]:
# RAG 系统实现
# =========================

class SimpleRAG(RAGSystem):
    """
    使用嵌入进行相似性搜索的简单 RAG 系统。
    """
    
    def __init__(self, **kwargs):
        """初始化简单 RAG 系统。"""
        super().__init__(**kwargs)
        
        # 文档是否已嵌入
        self.documents_embedded = False
    
    def add_documents(self, documents: List[Document]) -> None:
        """
        向文档存储添加文档并重置嵌入标志。
        
        Args:
            documents: 要添加的 Document 对象列表
        """
        super().add_documents(documents)
        self.documents_embedded = False
    
    def _ensure_documents_embedded(self) -> None:
        """确保所有文档都有嵌入。"""
        if self.documents_embedded:
            return
        
        docs_to_embed = [doc for doc in self.documents if doc.embedding is None]
        
        if docs_to_embed:
            self._log(f"为 {len(docs_to_embed)} 个文档生成嵌入")
            extract_document_batch_embeddings(
                docs_to_embed, 
                client=self.client,
                model=self.embedding_model
            )
        
        self.documents_embedded = True
    
    def _retrieve(
        self,
        query: str,
        top_k: int = DEFAULT_TOP_K
    ) -> List[Tuple[Document, float]]:
        """
        使用嵌入相似性检索查询的相关文档。
        
        Args:
            query: 查询字符串
            top_k: 返回的结果数量
            
        Returns:
            list: (文档, 相似度分数) 元组列表
        """
        # 确保文档已嵌入
        self._ensure_documents_embedded()
        
        if not self.documents:
            self._log("文档存储中没有文档")
            return []
        
        # 生成查询嵌入
        query_embedding = generate_embedding(
            query,
            client=self.client,
            model=self.embedding_model
        )
        
        # 执行相似性搜索
        results = similarity_search(
            query_embedding,
            self.documents,
            top_k
        )
        
        return results


class ChunkedRAG(SimpleRAG):
    """
    在建立索引之前对文档进行分块的 RAG 系统。
    """
    
    def __init__(
        self,
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
        **kwargs
    ):
        """
        初始化分块 RAG 系统。
        
        Args:
            chunk_size: 每个块的最大标记数
            chunk_overlap: 块之间重叠的标记数
            **kwargs: 传递给 RAGSystem 的其他参数
        """
        super().__init__(**kwargs)
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # 分块前的原始文档
        self.original_documents = []
        
        # 是否使用 FAISS 进行检索（如果可用）
        self.use_faiss = FAISS_AVAILABLE
        self.faiss_index = None
    
    def add_documents(self, documents: List[Document]) -> None:
        """
        将文档添加到存储中，对其进行分块，并重置嵌入标志。
        
        Args:
            documents: 要添加的 Document 对象列表
        """
        # 存储原始文档
        self.original_documents.extend(documents)
        
        # 对每个文档进行分块
        chunked_docs = []
        for doc in documents:
            chunks = text_to_chunks(
                doc.content,
                chunk_size=self.chunk_size,
                chunk_overlap=self.chunk_overlap,
                model=self.model
            )
            
            # 将元数据复制到块中并添加父引用
            for i, chunk in enumerate(chunks):
                chunk.metadata.update(doc.metadata)
                chunk.metadata["parent_id"] = doc.id
                chunk.metadata["chunk_index"] = i
                chunk.metadata["parent_content"] = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content
            
            chunked_docs.extend(chunks)
        
        # 将分块文档添加到存储中
        super().add_documents(chunked_docs)
        
        # 如果使用 FAISS，重置 FAISS 索引
        if self.use_faiss:
            self.faiss_index = None
    
    def _ensure_documents_embedded(self) -> None:
        """确保所有文档都有嵌入并在需要时构建 FAISS 索引。"""
        super()._ensure_documents_embedded()
        
        # 如果使用 FAISS，构建 FAISS 索引
        if self.use_faiss and self.faiss_index is None and self.documents:
            self._log("正在构建 FAISS 索引")
            self.faiss_index = create_faiss_index(self.documents)
    
    def _retrieve(
        self,
        query: str,
        top_k: int = DEFAULT_TOP_K
    ) -> List[Tuple[Document, float]]:
        """
        使用嵌入相似性或 FAISS 检索相关文档块。
        
        Args:
            query: 查询字符串
            top_k: 返回的结果数量
            
        Returns:
            list: (文档, 相似度分数) 元组列表
        """
        # 确保文档已嵌入并在需要时构建 FAISS 索引
        self._ensure_documents_embedded()
        
        if not self.documents:
            self._log("文档存储中没有文档")
            return []
        
        # 生成查询嵌入
        query_embedding = generate_embedding(
            query,
            client=self.client,
            model=self.embedding_model
        )
        
        # 如果可用，使用 FAISS 进行检索
        if self.use_faiss and self.faiss_index is not None:
            results = faiss_similarity_search(
                query_embedding,
                self.faiss_index,
                self.documents,
                top_k
            )
        else:
            # 回退到基本相似性搜索
            results = similarity_search(
                query_embedding,
                self.documents,
                top_k
            )
        
        return results


class HybridRAG(ChunkedRAG):
    """
    结合嵌入相似性与关键词搜索的 RAG 系统。
    """
    
    def __init__(
        self,
        keyword_weight: float = 0.3,
        **kwargs
    ):
        """
        初始化混合 RAG 系统。
        
        Args:
            keyword_weight: 关键词搜索的权重（0.0 到 1.0）
            **kwargs: 传递给 ChunkedRAG 的其他参数
        """
        super().__init__(**kwargs)
        self.keyword_weight = max(0.0, min(1.0, keyword_weight))
        self.embedding_weight = 1.0 - self.keyword_weight
    
    def _keyword_search(
        self,
        query: str,
        documents: List[Document],
        top_k: int = DEFAULT_TOP_K
    ) -> List[Tuple[Document, float]]:
        """
        对文档执行关键词搜索。
        
        Args:
            query: 查询字符串
            documents: Document 对象列表
            top_k: 返回的结果数量
            
        Returns:
            list: (文档, 相似度分数) 元组列表
        """
        # 简单关键词匹配
        query_terms = set(query.lower().split())
        
        results = []
        for doc in documents:
            content = doc.content.lower()
            
            # 计算匹配词汇并计算分数
            matches = sum(1 for term in query_terms if term in content)
            score = matches / len(query_terms) if query_terms else 0.0
            
            results.append((doc, score))
        
        # 按分数排序（降序）并取前 top_k 个
        sorted_results = sorted(results, key=lambda x: x[1], reverse=True)
        return sorted_results[:top_k]
    
    def _retrieve(
        self,
        query: str,
        top_k: int = DEFAULT_TOP_K
    ) -> List[Tuple[Document, float]]:
        """
        使用混合搜索检索相关文档块。
        
        Args:
            query: 查询字符串
            top_k: 返回的结果数量
            
        Returns:
            list: (文档, 相似度分数) 元组列表
        """
        # 确保文档已嵌入
        self._ensure_documents_embedded()
        
        if not self.documents:
            self._log("文档存储中没有文档")
            return []
        
        # 生成查询嵌入
        query_embedding = generate_embedding(
            query,
            client=self.client,
            model=self.embedding_model
        )
        
        # 获取语义搜索结果
        if self.use_faiss and self.faiss_index is not None:
            semantic_results = faiss_similarity_search(
                query_embedding
                