# 使用Semantic Kernel实现完整的RAG系统

本综合性笔记将演示如何使用微软的 **Semantic Kernel** 框架来构建一个检索增强生成（RAG）系统。我们将从展示没有特定数据访问权限的AI模型的局限性开始，然后逐步构建一个包含分块策略、向量数据库和评估方法的完整RAG系统。

## 安装与设置

在本模块中，我们将重点利用 **Semantic Kernel**。

**Semantic Kernel** 是一个开源的SDK，旨在让开发者能轻松构建企业级的AI应用。它支持C#、Python和Java，已经为生产环境做好了准备，并被许多大型企业所采用。它的核心设计理念是**模块化**，这意味着你可以轻松地更换底层AI模型而无需重写整个代码库。

像Semantic Kernel这样的SDK之所以流行，是因为LLM本身只能处理文本和生成响应，它无法直接访问你的数据库、调用你的API、执行代码或与外部系统交互。Semantic Kernel正是为了解决这个问题而生，它负责：
- **管理与AI服务的连接**（如OpenAI、Azure OpenAI）。
- 提供一个**插件（Plugin）系统**，让你可以编写供AI调用的自定义函数。
- **管理对话历史和上下文**。

Semantic Kernel的核心是**内核（Kernel）协调器**。在复杂的AI应用中，你需要协调多个活动部件，如AI服务、数据库、API、日志系统等。Kernel就像一个中央大脑，将所有这些部分整合在一起。它包含**服务（Services）**（如AI服务、日志服务）和**插件（Plugins）**（AI可以调用的自定义函数，如访问数据库）。想象一个真实的企业场景：一个AI助手需要查询CRM、检查库存、生成报价单，并为合规性记录所有交互。没有Kernel，每一段代码都需要知道如何连接所有这些服务。有了Kernel，所有配置只需一次。因为所有的AI操作都流经Kernel，你就拥有了一个用于日志记录和管理的单一控制点。

**Semantic Kernel的关键组件:**

1. **AI服务连接器**: 在一个多模型的世界里，不同的AI模型有不同的API和认证方法。SK中的连接器是一个抽象层，可以防止供应商锁定，让你在不同模型间轻松切换。
2. **向量存储连接器**: 这是RAG的核心，是连接向量存储和Kernel的桥梁。
3. **函数和插件**: 插件系统让LLM能够使用“工具”。一个**函数**是你暴露给LLM的单个能力（例如一个Python函数），而一个**插件**是一组相关函数的集合（例如，`DatabasePlugin`可能包含`GetUser`、`UpdateUser`等函数）。
4. **提示模板**: 在代码中用多行字符串编写复杂的提示会变得混乱且难以维护。提示模板解决了这个问题，它允许你混合静态指令和动态占位符。
5. **过滤器 (Filters)**: 一段代码，用于在Kernel执行的关键时刻进行拦截，例如在函数调用前或提示渲染后。你可以用它来过滤PII（个人身份信息），确保用户的敏感数据永远不会被发送到外部LLM。

**首先，安装所需的软件包：**

In [None]:
# 运行此单元格以安装所有必需的包
!pip install semantic-kernel openai numpy scikit-learn faiss-cpu python-dotenv

## 环境设置

In [None]:
from typing import List, Dict
import numpy as np
from dataclasses import dataclass
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 导入Semantic Kernel核心库
import semantic_kernel as sk
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion, OpenAITextEmbedding
from semantic_kernel.contents import ChatHistory
from semantic_kernel.connectors.ai.open_ai import OpenAIChatPromptExecutionSettings

# 导入用于向量存储的库 - 我们将构建自己的简单系统
import faiss # Facebook AI Similarity Search
from sklearn.metrics.pairwise import cosine_similarity

print("Semantic Kernel 环境设置完成！")

## 创建我们的文档模型和向量存储

我们将使用 **FAISS** 作为我们的向量数据库。FAISS 是一个完全在本地机器上运行的向量数据库库。在生产场景中，我们通常会使用云端的向量数据库，如 Azure AI Search。但其核心概念是相同的：向量数据库允许我们高效地存储和检索向量。

**代码解释:** 

1.  我们将创建一个**类（Class）** `DocumentChunk` 来表示文档的每一个分块。类中的注释将详细解释每个字段的含义。
2.  然后，我们将创建另一个类 `SimpleVectorStore`，它包含使用向量搜索（而非精确的关键词匹配）来查找这些文本块的**方法（函数）**。
    -   **初始化函数 (`__init__`)**: 用于初始化我们的向量存储。我们将使用以下属性对其进行初始化：
        -   **嵌入维度 (Embedding Dimension)**: 每个文档块在转换为向量时将包含多少个数字。OpenAI的文本嵌入模型（我们将使用 `text-embedding-3-small`）会将任何文本转换为一个包含1536个数字的向量。
        -   **索引 (Index)**: 就像书的目录一样，向量数据库中的索引是一个数据结构，它组织向量以便快速找到相似的向量。没有索引，查找相似向量将需要将你的查询与存储的每个向量逐一比较。有了索引，FAISS会预先组织向量，从而可以快速定位到最相似的那些。
        -   **文档 (Documents)**: 在一个常规的Python列表中存储实际的文档块（原始文本加上元数据）。
        -   **ID到索引的映射 (ID to Index)**: 这是一个字典，将文档块的ID映射到其在列表中的位置，以便我们可以通过ID快速找到它。
    -   **添加文档函数 (`add_documents`)**: 接收一批文档块并将其存储到我们的向量存储中。我们处理每个已经向量化的文档，并将其添加到FAISS索引中以进行快速相似性搜索。
    -   **搜索函数 (`search`)**: 接收一个用户的查询（已转换为1536维的向量），并找到最相似的文档块。
3.  现在我们有了自定义的向量存储。接下来，我们需要将一个LLM连接到它，这就是Semantic Kernel发挥作用的地方。我们将使用Semantic Kernel来利用其**嵌入能力**（将文本转换为向量）和**聊天补全能力**（LLM的推理引擎）。

In [None]:
@dataclass
# 我们创建一个DocumentChunk类来表示文档的每个片段
class DocumentChunk:
    # 必需字段 - 每个分块都必须有这些
    id: str                    # 此分块的唯一标识符，如 "policy_doc_chunk_1"
    content: str               # 此分块的实际文本内容
    source_doc_id: str         # 此分块来自哪个原始文档
    title: str                 # 原始文档的可读标题
    chunk_index: int           # 这是第几块？ (0=第一块, 1=第二块, 等等)
   
    # 可选字段 - 这些有默认值
    department: str = ""       # 哪个团队拥有此文档 (可选)
    doc_type: str = ""         # 这是什么类型的文档 - 政策、指南等 (可选)
    embedding: List[float] = None  # 此文本的向量表示 (数字列表)

# 这是一个包含搜索文档方法的类。它不是进行精确的词匹配，而是找到含义相似的文档。
class SimpleVectorStore:
   # 初始化我们的向量存储。我们将使用FAISS，它可以快速存储和搜索大量的文档分块。
   def __init__(self, embedding_dimension: int = 1536):
       # 每个嵌入向量中有多少个数字？这意味着每个文档块将由一个包含1536个数字的列表来表示其含义。
       # OpenAI的 text-embedding-3-small 模型为每段文本提供1536个数字。
       self.embedding_dimension = embedding_dimension
       
       # 创建一个FAISS索引来存储我们的文档嵌入。IndexFlatIP意味着我们将使用内积相似度（类似于余弦相似度）来查找相似的文档。
       self.index = faiss.IndexFlatIP(embedding_dimension)
       
       # 在一个名为documents的列表中存储实际的文档块（文本和元数据）
       self.documents: List[DocumentChunk] = []
       
       # 这个字典将文档ID映射到其在documents列表中的位置，以便我们能通过ID快速找到文档。
       self.id_to_index = {}
   
   # 此函数将一批文档添加到我们的向量存储中。
   def add_documents(self, documents: List[DocumentChunk]):
       embeddings = []  # 这里我们将存储每个文档块的向量表示（嵌入）
       
       # 逐个处理每个文档
       for doc in documents:
           if doc.embedding is None:
               raise ValueError(f"文档 {doc.id} 缺少嵌入向量")
           
           # 关键步骤：规范化嵌入向量
           # 为什么？这样我们就可以使用余弦相似度（比较角度，而不是长度）
           embedding_array = np.array(doc.embedding)  # 将列表转换为numpy数组以便进行数学运算
           faiss.normalize_L2(embedding_array.reshape(1, -1)) # FAISS的L2规范化，使其长度为1
           embeddings.append(embedding_array)
           
           doc_index = len(self.documents)           # 这个文档将处于什么位置？
           self.documents.append(doc)                # 将文档添加到我们的存储中
           self.id_to_index[doc.id] = doc_index      # 记住：这个ID在这个位置
       
       # 将所有规范化的向量添加到FAISS中以进行闪电般的快速搜索
       embeddings_array = np.vstack(embeddings).astype('float32')  # FAISS需要float32类型
       self.index.add(embeddings_array)
       
       print(f"已向向量存储中添加 {len(documents)} 个文档。")
   
   # 此函数搜索与用户问题相似的文档。
   def search(self, query_embedding: List[float], k: int = 3, score_threshold: float = 0.0):
       if self.index.ntotal == 0:
           return []
       
       query_array = np.array(query_embedding).reshape(1, -1).astype('float32')
       faiss.normalize_L2(query_array)
       
       scores, indices = self.index.search(query_array, k)
       
       results = []
       for score, idx in zip(scores[0], indices[0]):
           if idx != -1 and score >= score_threshold:
               document = self.documents[idx]
               results.append((document, float(score)))
       
       return results

# 设置我们将使用的AI服务
kernel = Kernel()  # Semantic Kernel是我们的AI编排框架

# 服务1：聊天补全（生成对问题的回答）
chat_service = OpenAIChatCompletion(
   ai_model_id="gpt-4o"
)
kernel.add_service(chat_service)

# 服务2：文本嵌入（将文本转换为向量表示）
embedding_service = OpenAITextEmbedding(
   ai_model_id="text-embedding-3-small"
)
kernel.add_service(embedding_service)

print("Semantic Kernel已使用OpenAI服务进行初始化。")
print("使用FAISS的简单向量存储进行文档存储。")

---

# 第一部分：演示问题——模型无法访问私有数据

让我们首先展示当向一个AI模型询问它未曾训练过的信息时会发生什么。

下面的代码很简单，我们在一系列数组中存储了“文档”。我们将对这些文档提出问题（但我们实际上不会实现一个完整的RAG系统），所以我们预期模型会不知道我们在说什么。



In [None]:
# 模型不知道的示例公司数据。
# 这代表了私有的、“地面真实”的信息。
company_documents = [
    {
        "id": "product_001",
        "title": "CloudSync Pro企业版计划",
        "content": """CloudSync Pro企业版提供无限存储、高级加密、最多500用户的实时协作、优先支持和自定义集成。定价：每用户每月49美元，需年度承诺。功能包括：自动备份、版本控制、审计日志、SSO集成和99.9%的正常运行时间SLA。""",
        "metadata": {"department": "产品部", "type": "定价"}
    },
    {
        "id": "policy_001", 
        "title": "2024年远程工作政策",
        "content": """自2024年1月起生效：所有员工每周最多可远程工作3天。远程工作需经直接经理批准。每年提供500美元的设备津贴用于家庭办公室设置。团队会议必须使用视频通话。核心协作时间：当地时间上午10点至下午3点。""",
        "metadata": {"department": "人力资源部", "type": "政策"}
    },
    {
        "id": "process_001",
        "title": "客户退款流程",
        "content": """步骤1：客户通过支持门户提交退款请求。步骤2：支持代理在24小时内审核。步骤3：金额低于100美元的，自动批准。步骤4：金额超过100美元的，需要经理批准。步骤5：退款将在3-5个工作日内处理至原支付方式。购买后30天内可全额退款。""",
        "metadata": {"department": "支持部", "type": "流程"}
    },
    {
        "id": "guide_001",
        "title": "新员工入职清单",
        "content": """第一天：IT设置和系统访问。第二天：部门介绍和导师分配。第一周：完成强制性培训模块（安全、合规、公司文化）。第二周：跟随团队成员并审查项目文档。第一个月：完成试用期审查并设定90天目标。""",
        "metadata": {"department": "人力资源部", "type": "指南"}
    }
]

# 我们想用来测试模型基础知识的问题。
test_questions = [
    "CloudSync Pro企业版的价格是多少？",
    "员工每周可以远程工作几天？",
    "购买金额超过100美元的退款审批流程是怎样的？",
    "员工入职第一周会发生什么？"
]

async def run_direct_to_model_test():
    """
    直接向基础AI模型测试问题，以证明其对我们私有公司数据缺乏了解。
    """
    print("测试无RAG的模型 - 关于私有公司数据的问题:")
    print("=" * 70)

    chat_service = kernel.get_service(type=OpenAIChatCompletion)

    execution_settings = OpenAIChatPromptExecutionSettings(model_id="gpt-4o")

    for i, question in enumerate(test_questions, 1):
        print(f"\n问题 {i}: {question}")
        
        chat_history = ChatHistory()
        chat_history.add_user_message(question)

        response = await chat_service.get_chat_message_content(
            chat_history=chat_history,
            settings=execution_settings
        )
        
        print(f"模型响应: {str(response)}")
        print("-" * 50)

print("✅ 开始测试...")
await run_direct_to_model_test()

## 我们刚才观察到了什么

模型要么：
1. **无法回答**，因为它无法访问这家特定公司的信息。
2. **提供通用性回答**，可能与你的实际政策不符。
3. **做出假设**，这些假设在你的特定情境下可能是错误的。

这正是我们需要RAG的原因——在保留其推理能力的同时，赋予模型访问你特定数据的能力。

---

# 第二部分：文档分块策略

In [None]:
# 在本节中，我们将探讨不同的文本分块策略。
# 我们可以进行简单的基于字符的分割，或者更智能的、尊重段落和句子的语义分割。

def simple_text_splitter(text: str, chunk_size: int = 300, overlap: int = 50) -> List[str]:
    """简单的基于字符的文本分割器，带重叠功能"""
    chunks = []
    start = 0
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunk = text[start:end].strip()
        if chunk:
            chunks.append(chunk)
        start += chunk_size - overlap
        if start >= len(text): break
    return chunks

def semantic_text_splitter(text: str, max_chunk_size: int = 400) -> List[str]:
    """尊重段落和句子边界的文本分割器"""
    paragraphs = [p.strip() for p in text.split('
') if p.strip()]
    chunks = []
    current_chunk = """
    for paragraph in paragraphs:
        if len(current_chunk) + len(paragraph) + 2 > max_chunk_size and current_chunk:
            chunks.append(current_chunk)
            current_chunk = paragraph
        else:
            current_chunk += ("\n" + paragraph) if current_chunk else paragraph
    if current_chunk:
        chunks.append(current_chunk)
    return chunks

# 测试不同的分块策略
sample_doc = company_documents[0]
print("分块策略比较:")
print("=" * 35)

print(f"原始文档: {sample_doc['title']}")
print(f"长度: {len(sample_doc['content'])} 字符")

print("\n1. 简单的基于字符的分块:")
simple_chunks = simple_text_splitter(sample_doc['content'], chunk_size=200, overlap=30)
for i, chunk in enumerate(simple_chunks):
    print(f"块 {i+1} ({len(chunk)} 字符): {chunk}")

print("\n2. 语义分块 (尊重段落):")
semantic_chunks = semantic_text_splitter(sample_doc['content'], max_chunk_size=250)
for i, chunk in enumerate(semantic_chunks):
    print(f"块 {i+1} ({len(chunk)} 字符): {chunk}")

print("\n权衡利弊:")
print("- 简单分块: 大小可预测，但可能在句子中间断开。")
print("- 语义分块: 保持意义完整，但分块大小可变。")

## 测试完整的RAG流程

现在，我们将实现一个简单的RAG系统，它使用Semantic Kernel来处理文档检索和答案生成。

1.  **semantic_chunker**: 它的工作是将一段文本分割成更小的字符串（块）。它首先根据段落（由两个换行符 `

` 分隔）来分割文本，这确保了属于一起的句子能保持在一起。然后，它遍历这些段落，根据最大块大小将它们组合成块。通过尊重文本的自然断点，它确保了相关句子保持在一起，从而创建了高质量、重点突出的信息块。这极大地提高了我们数据的“信噪比”。
2.  **ingest_documents_semantic**: 这个函数旨在解决任何RAG系统的第一个主要问题：为AI准备数据。它接收一个文档列表、一个向量存储（我们之前构建的自定义数据库）和嵌入服务。它将这些文档转换为向量，遍历每个文档，调用嵌入服务，并创建一个`DocumentChunk`对象（包含向量、原始文本和元数据）。
3.  **ask_with_semantic_rag**: RAG的核心引擎。它接收用户的问题、内核和向量存储（我们的知识库）。它通过嵌入服务传递用户问题以获得向量表示，然后使用向量搜索方法找到向量最接近的文档块。然后我们增强提示并最终生成答案。

In [None]:
# --- 语义分块的辅助函数 ---
def semantic_chunker(text: str, max_chunk_size: int = 300) -> List[str]:
    paragraphs = [p.strip() for p in text.split('

') if p.strip()]
    chunks = []
    current_chunk = """
    for paragraph in paragraphs:
        if current_chunk and (len(current_chunk) + len(paragraph) + 2) > max_chunk_size:
            chunks.append(current_chunk)
            current_chunk = paragraph
        else:
            current_chunk += ("\n\n" + paragraph) if current_chunk else paragraph
    if current_chunk:
        chunks.append(current_chunk)
    return chunks

# --- 核心RAG函数 ---
async def ingest_documents_semantic(documents: List[Dict], vector_store: SimpleVectorStore, embedding_service: OpenAITextEmbedding) -> None:
    print(f"使用语义分块处理 {len(documents)} 个文档...")
    all_chunks_to_add = []
    for doc in documents:
        text_chunks = semantic_chunker(doc["content"])
        for i, chunk_text in enumerate(text_chunks):
            if len(chunk_text) < 20: continue
            embedding = (await embedding_service.generate_embedding(chunk_text))
            chunk = DocumentChunk(id=f"{doc['id']}_chunk_{i}", content=chunk_text, source_doc_id=doc["id"], title=doc["title"], chunk_index=i, embedding=embedding)
            all_chunks_to_add.append(chunk)
    vector_store.add_documents(all_chunks_to_add)
    print(f"已向向量存储中添加 {len(all_chunks_to_add)} 个新块。")

async def ask_with_semantic_rag(question: str, kernel: Kernel, vector_store: SimpleVectorStore) -> str:
    embedding_service = kernel.get_service(type=OpenAITextEmbedding)
    chat_service = kernel.get_service(type=OpenAIChatCompletion)
    
    query_embedding = (await embedding_service.generate_embedding(question))
    search_results = vector_store.search(query_embedding, k=3, score_threshold=0.3)
    
    if not search_results:
        return "我在文档中找不到任何相关信息来回答这个问题。"
        
    context = "\n\n---\n\n".join([result.content for result, score in search_results])
    
    prompt = f"""
仅根据下面提供的上下文回答以下问题。

上下文:
---
{context}
---

问题: {question}

答案:
"""

    chat_history = ChatHistory()
    chat_history.add_user_message(prompt)
    
    settings = OpenAIChatPromptExecutionSettings(model_id="gpt-4o", max_tokens=200, temperature=0.1)
    
    response = await chat_service.get_chat_message_content(chat_history, settings)
    
    return str(response)

# --- 主执行块 ---
semantic_vector_store = SimpleVectorStore()
embedding_service = kernel.get_service(type=OpenAITextEmbedding)
await ingest_documents_semantic(company_documents, semantic_vector_store, embedding_service)

print("\n" + "="*50)
print("测试使用语义分块的RAG系统:")
question_to_ask = "CloudSync Pro企业版的价格是多少？"
answer = await ask_with_semantic_rag(question_to_ask, kernel, semantic_vector_store)

print(f"\n问: {question_to_ask}")
print(f"答: {answer}")

---

# 第四部分：高级配置与调优

我们正在测试两种简单的方法来让我们的RAG系统给出更好的答案——第一种是通过改变我们要求AI回应的方式（友好型 vs 专业型），第二种是通过调整我们对包含哪些文档的挑剔程度（严格匹配 vs 宽松匹配）。

后者被称为**相似度阈值**。相似度阈值就像是为“相关”搜索结果设定的门槛。它是一个介于0和1之间的数字，决定了一个文档块必须与你的问题有多相似，我们才会将其包含在答案中。

当阈值**过低**时：如果你用0.2的阈值问“我们的休假政策是什么？”，你可能会得到关于休假政策、员工福利、工时追踪和公司假期的结果。虽然都与人力资源相关，但这会给用户带来大量并非直接回答他们问题的信息。然后，AI不得不在所有这些额外的上下文中筛选，可能会稀释最终答案的质量。

当阈值**过高**时：如果你用0.7的阈值问“我如何申请休假？”，你可能根本得不到任何结果，因为没有文档包含那个确切的短语，即使你的休假政策文档清楚地解释了流程。当答案实际上存在于你的知识库中时，用户最终会因“未找到信息”的响应而感到沮丧。

**找到最佳点**：目标是找到一个既能提供足够相关信息又不会引入噪音的阈值。对于大多数商业文档，0.3到0.5之间的阈值效果很好——足够高以过滤掉不相关的内容，但又足够低以捕捉到可能使用不同措辞的相关信息。



## 最佳实践总结

### 文档处理
- **使用语义分块**，尊重段落和句子边界。
- **最佳块大小**: 对于大多数商业文档，300-400个字符。
- **包含有意义的重叠** (50-80个字符)以保持上下文。
- **保留丰富的元数据**用于过滤和来源归属。

### 向量搜索配置
- **从FAISS开始**进行本地开发和小型生产。
- **使用0.3左右的相似度阈值**以平衡精度/召回率。
- **检索3-5个文档**以提供足够的上下文而无噪音。
- **规范化嵌入**以进行一致的相似度计算。

### 提示工程
- **为不同用户类型创建特定角色的提示** (客户、员工、高管)。
- **包含清晰的指令**以处理信息不可用的情况。
- **使用结构化模板**将上下文与问题分开。
- **测试提示变体**以优化你的特定用例。

## 下一步

1. **从核心功能开始** - 让基本的RAG与你的文档一起工作。
2. **尽早添加监控** - 实现日志记录和指标收集。
3. **为你的领域定制** - 为你的内容量身定制提示和分块。
4. **根据反馈迭代** - 使用真实的用户互动来改进系统。
5. **为生产做计划** - 考虑可扩展性、监控和维护。