# 项目实战：RAG企业知识库（上）

## 课程目标
通过本课程，学员将掌握基于检索增强生成（RAG）技术的智能问答系统的核心原理与实现，包括RAG架构设计、文档向量化、FastAPI后端开发、大模型集成及会话管理。

## 课程内容
1. RAG架构与基础知识回顾
2. 向量嵌入与文档处理
3. FastAPI后端开发
4. 大模型接入与提示工程
5. 数据持久化与会话管理

## 前提条件
- Python 3.9+
- 安装必要的库：`fastapi`, `sentence-transformers`, `faiss-cpu`, `openai`, `sqlite3`, `uvicorn`

## 1. RAG架构与基础知识回顾

### 1.1 RAG技术概述
检索增强生成（Retrieval-Augmented Generation, RAG）是一种结合了信息检索和生成式模型的AI技术，核心原理是通过检索相关文档作为上下文，增强大语言模型的回答质量和准确性。RAG适用于知识密集型任务，如知识库问答、文档查询等。

**核心流程**：
- 用户提问 → 检索相关文档 → 构建上下文 → 语言模型生成回答

<img src="./rag.png" style="margin-left: 0px" width=800px>


**应用场景**：
- 企业知识库问答
- 学术研究辅助
- 客户支持自动化

### 1.2 向量数据库基础
FAISS（Facebook AI Similarity Search）是一个高效的向量相似度搜索库，广泛用于RAG系统中存储和检索文档嵌入向量。

**FAISS特性**：
- 支持多种索引类型（如FlatL2、IVF、HNSW）
- 高性能，适合大规模向量搜索
- 易于与Python集成

**代码示例**：初始化FAISS索引

In [1]:
import faiss
import numpy as np

# 初始化FAISS索引
dimension = 768  # 嵌入向量维度（m3e-base模型）
index = faiss.IndexFlatL2(dimension)

# 假设有一些嵌入向量
embeddings = np.random.random((100, dimension)).astype('float32')
index.add(embeddings)  # 添加向量到索引

# 搜索
query_embedding = np.random.random((1, dimension)).astype('float32')
k = 3  # 返回前3个最相似的向量
distances, indices = index.search(query_embedding, k)
print("最近的向量索引：", indices)
print("距离：", distances)

最近的向量索引： [[20 46 51]]
距离： [[110.19388  111.065475 114.94035 ]]


### 1.3 语言模型集成
RAG系统中，语言模型（如GLM-4-plus）负责根据检索到的上下文生成回答。通过API集成，可以轻松调用大模型。

**关键点**：
- 使用OpenAI兼容的API接口
- 配置API密钥和端点
- 确保上下文格式清晰

**代码示例**：调用大语言模型

In [2]:
from openai import OpenAI

# 初始化OpenAI客户端
client = OpenAI(
    api_key="your api key",
    base_url="https://open.bigmodel.cn/api/paas/v4/"
)

# 调用GLM-4-plus模型
response = client.chat.completions.create(
    model="glm-4-plus",
    messages=[
        {"role": "system", "content": "你是一个专业的问答助手。"},
        {"role": "user", "content": "什么是RAG技术？"}
    ]
)
print(response.choices[0].message.content)

RAG技术，全称为"Retrieval-Augmented Generation"（检索增强生成），是一种结合信息检索和生成模型的技术，广泛应用于自然语言处理（NLP）领域，特别是在问答系统、机器翻译和文本生成等任务中。

### 核心思想
RAG技术的核心思想是通过检索相关文档或知识库中的信息来增强生成模型的能力。具体来说，它包含两个主要组件：
1. **检索模块**：负责从大量文档或知识库中检索与输入查询最相关的信息。
2. **生成模块**：基于检索到的信息和原始查询生成最终输出。

### 工作流程
1. **输入查询**：用户或系统提供一个查询或问题。
2. **信息检索**：检索模块从预定义的文档集合或知识库中找到与查询最相关的信息。
3. **信息融合**：将检索到的信息与原始查询结合，提供给生成模块。
4. **生成输出**：生成模块基于融合后的信息生成最终回答或文本。

### 优势
- **提高准确性**：通过引入外部知识，生成的回答或文本更准确、更有信息量。
- **灵活性**：能够处理更广泛的问题类型，特别是那些需要特定领域知识的问题。
- **可解释性**：生成的结果可以追溯到具体的文档或知识来源，增加了系统的透明度。

### 应用场景
- **问答系统**：如智能客服、搜索引擎的问答功能。
- **机器翻译**：通过检索双语对齐的文档来提高翻译质量。
- **文本生成**：如自动写作助手、新闻生成等。

### 挑战
- **检索效率**：在大规模文档集合中高效检索相关信息是一个挑战。
- **信息融合**：如何有效融合检索到的信息和原始查询仍需深入研究。
- **数据质量**：检索到的信息质量直接影响生成结果的质量。

### 代表性模型
- **DPR（Dense Passage Retrieval）**：一种高效的密集检索方法。
- **T5（Text-to-Text Transfer Transformer）**：可以用于多种NLP任务的通用生成模型。

RAG技术通过结合检索和生成，有效提升了NLP系统的性能和实用性，是当前自然语言处理领域的一个重要研究方向。


## 2. 向量嵌入与文档处理

### 2.1 文档解析技术
文档解析是将上传的文档（如TXT、PDF、DOCX）转换为纯文本，以便后续向量化处理。

**关键点**：
- 支持多种编码（如UTF-8、GBK）
- 处理文件格式多样性
- 确保内容提取的完整性

**代码示例**：读取TXT文件

In [3]:
def read_text_file(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            content = f.read()
    except UnicodeDecodeError:
        with open(file_path, "r", encoding="gbk", errors="ignore") as f:
            content = f.read()
    return content

# 示例
content = read_text_file("docs/sample.txt")
print(content[:100])  # 打印前100个字符


　　　　　　　　　　　　老子道德经


　　　　　　　　　　　　　～　·※·　～

一　　章　［道，可道，非恒道］　　　　　二　　章　［天下皆知美之为美］
三　　章　［不尚贤］　　　　　　　　　　四


### 2.2 向量化模型选择
SentenceTransformer（如m3e-base）用于将文本转换为固定维度的嵌入向量，适合语义检索。

**m3e-base特性**：
- 维度：768
- 支持多语言
- 高效且易于部署

**代码示例**：生成嵌入向量

In [None]:
pip install  sentence-transformers

In [None]:
from sentence_transformers import SentenceTransformer

# 加载m3e-base模型
model = SentenceTransformer('moka-ai/m3e-base')

# 文本向量化
texts = ["这是一个示例文本", "另一个文档内容"]
embeddings = model.encode(texts, normalize_embeddings=True)
print("嵌入向量形状：", embeddings.shape)
print("第一个向量：", embeddings[0][:5])  # 打印前5个值

### 2.3 文本分块策略
将长文档分割为小块（chunks）以提高检索精度和效率。

**策略**：
- 固定长度分块
- 按语义分块（如句子、段落）
- 确保块之间有一定重叠

**代码示例**：简单分块

In [None]:
def chunk_text(text, max_length=500):
    chunks = []
    words = text.split()
    current_chunk = []
    current_length = 0
    
    for word in words:
        current_chunk.append(word)
        current_length += len(word) + 1
        if current_length >= max_length:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
            current_length = 0
    
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    return chunks

# 示例
text = "这是一个很长的文档内容..." * 100
chunks = chunk_text(text)
print(f"分块数量：{len(chunks)}")
print("第一个分块：", chunks[0][:50])

**代码示例**：重叠分块

In [None]:
# 文档分块函数
def chunk_document(text, max_chars=500, overlap=100):
    """
    将长文档切分成较小的块，使用滑动窗口确保上下文连贯性
    
    参数:
        text: 要切分的文本
        max_chars: 每个块的最大字符数
        overlap: 相邻块之间的重叠字符数
    
    返回:
        chunks: 切分后的文本块列表
    """
    # 如果文本长度小于最大长度，直接返回
    if len(text) <= max_chars:
        return [text]
    
    chunks = []
    start = 0
    last_end = 0  # 跟踪上一次的结束位置
    
    while start < len(text):
        # 确定当前块的结束位置
        end = min(start + max_chars, len(text))
        
        # 如果没有到达文本末尾且不是最后一块，尝试在句子边界切分
        if end < len(text):
            # 在结束位置查找最近的句子结束标记
            sentence_ends = [
                m.end() for m in re.finditer(r'[。！？.!?]\s*', text[start:end])
            ]
            
            if sentence_ends:  # 如果找到句子结束标记，在最后一个句子结束处切分
                end = start + sentence_ends[-1]
            else:  # 如果没有找到，尝试在单词或标点处切分
                last_space = text[start:end].rfind(' ')
                last_punct = max(text[start:end].rfind('，'), text[start:end].rfind(','))
                cut_point = max(last_space, last_punct)
                
                if cut_point > 0:  # 如果找到了合适的切分点
                    end = start + cut_point + 1
        
        # 添加当前块到结果列表
        chunks.append(text[start:end])
        
        # 检测是否有进展
        if end <= last_end:
            # 如果没有进展，强制向前移动，避免死循环
            end = min(last_end + 1, len(text))
            # 重新添加块，覆盖之前添加的
            chunks[-1] = text[start:end]
            
            # 如果已到达文本末尾，退出循环
            if end >= len(text):
                break
        
        # 记录本次结束位置
        last_end = end
        
        # 移动开始位置，考虑重叠
        start = end - overlap
        
        # 确保开始位置不会后退
        if start < 0:
            start = 0
            
        # 确保有进展，避免死循环
        if start >= end:
            start = end
            
        # 如果到达文本末尾，退出循环
        if start >= len(text):
            break
    
    return chunks

## 3. FastAPI后端开发

### 3.1 RESTful API设计
FastAPI用于构建高效的异步API，支持文档上传、检索和问答功能。

**关键API**：
- `/api/upload`：上传文档
- `/api/documents`：列出文档
- `/api/stream`：处理问答请求

**代码示例**：文档上传API

In [None]:
from fastapi import FastAPI, UploadFile, File, HTTPException
import os
import uuid

app = FastAPI()

@app.post("/api/upload")
async def upload_document(file: UploadFile = File(...)):
    if not file.filename.endswith((".txt", ".pdf", ".docx")):
        raise HTTPException(status_code=400, detail="仅支持.txt、.pdf、.docx文件")
    
    os.makedirs("docs", exist_ok=True)
    file_path = os.path.join("docs", file.filename)
    content = await file.read()
    
    with open(file_path, "wb") as f:
        f.write(content)
    
    doc_id = str(uuid.uuid4())
    return {"id": doc_id, "name": file.filename}

### 3.2 异步处理机制
FastAPI的异步特性支持高并发处理，适合实时问答系统。

**代码示例**：异步文档列表API

In [None]:
@app.get("/api/documents")
async def list_documents():
    documents = []
    for filename in os.listdir("docs"):
        documents.append({"id": str(uuid.uuid4()), "name": filename})
    return documents

### 3.3 流式响应实现
通过Server-Sent Events（SSE）实现流式输出，提升用户体验。

**code示例**：流式问答API

In [None]:
from fastapi.responses import StreamingResponse
import asyncio
import json

@app.get("/api/stream")
async def stream_response(query: str):
    async def generate():
        for i in range(5):
            yield f"data: {json.dumps({'content': f'回答部分 {i+1}', 'done': i == 4})}

"
            await asyncio.sleep(1)
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

## 4. 大模型接入与提示工程

### 4.1 大模型API集成
通过OpenAI API集成GLM-4-plus模型，确保稳定性和高性能。

**配置**：
- API密钥：存储在环境变量或配置文件中
- 端点：`https://open.bigmodel.cn/api/paas/v4/`

**代码示例**：流式调用大模型

In [None]:
async def stream_llm_response(query: str):
    stream = client.chat.completions.create(
        model="glm-4-plus",
        messages=[
            {"role": "system", "content": "你是一个专业的问答助手。"},
            {"role": "user", "content": query}
        ],
        stream=True
    )
    
    async def generate():
        for chunk in stream:
            if chunk.choices[0].delta.content:
                yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}

"
            if chunk.choices[0].finish_reason:
                yield f"data: {json.dumps({'done': True})}

"
                break
    
    return StreamingResponse(generate(), media_type="text/event-stream")

### 4.2 提示词设计
有效的提示词可以显著提升模型输出质量。

**设计原则**：
- 明确指令：清楚描述任务
- 提供上下文：包含检索到的文档内容
- 限制输出：避免无关信息

**示例提示**：

In [None]:
prompt = """
上下文信息:
{context}

问题: {query}
请基于上下文信息回答问题，仅使用提供的信息，不要添加未提及的内容。
"""

# 示例使用
context = "相关文档：示例文档.txt\n内容片段：RAG是一种结合检索和生成的AI技术。"
query = "RAG技术是什么？"
formatted_prompt = prompt.format(context=context, query=query)
print(formatted_prompt)

### 4.3 上下文管理
优化上下文窗口大小，确保相关性并避免信息过载。

**策略**：
- 限制检索文档数量（如k=3）
- 按相关性排序
- 截断过长上下文

**代码示例**：构建上下文

In [None]:
def build_context(retrieved_docs, retrieved_chunks):
    context_parts = []
    context_parts.append("相关文档:\n" + "\n".join(retrieved_docs))
    
    if retrieved_chunks:
        chunk_context = "\n\n文档内容片段:\n"
        for i, (doc_id, chunk) in enumerate(retrieved_chunks):
            chunk_context += f"[文档{i+1}] {chunk}\n"
        context_parts.append(chunk_context)
    else:
        context_parts.append("\n\n没有找到相关的文档内容。")
    
    return "\n".join(context_parts)

# 示例
docs = ["文档1.txt", "文档2.txt"]
chunks = [("doc1", "这是文档1的内容"), ("doc2", "这是文档2的内容")]
context = build_context(docs, chunks)
print(context)

## 5. 数据持久化与会话管理

### 5.1 SQLite数据库设计
使用SQLite存储聊天历史和会话信息，轻量且易于部署。

**表结构**：
- `chat_sessions`：存储会话ID、摘要、时间戳
- `messages`：存储消息内容、角色、时间戳

**代码示例**：初始化数据库

In [None]:
import sqlite3

def init_db():
    conn = sqlite3.connect('chat_history.db')
    cursor = conn.cursor()
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS chat_sessions (
        id TEXT PRIMARY KEY,
        summary TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS messages (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        session_id TEXT,
        role TEXT,
        content TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (session_id) REFERENCES chat_sessions (id)
    )
    ''')
    
    conn.commit()
    conn.close()

init_db()

### 5.2 会话状态管理
支持会话创建、消息添加和历史恢复。

**代码示例**：创建新会话

In [None]:
from datetime import datetime
import uuid

async def create_new_chat_session(session_id, query, response):
    summary = query[:30] + "..." if len(query) > 30 else query
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
    conn = sqlite3.connect('chat_history.db')
    cursor = conn.cursor()
    
    cursor.execute(
        "INSERT INTO chat_sessions (id, summary, created_at, updated_at) VALUES (?, ?, ?, ?)",
        (session_id, summary, current_time, current_time)
    )
    
    cursor.execute(
        "INSERT INTO messages (session_id, role, content, created_at) VALUES (?, ?, ?, ?)",
        (session_id, "user", query, current_time)
    )
    
    cursor.execute(
        "INSERT INTO messages (session_id, role, content, created_at) VALUES (?, ?, ?, ?)",
        (session_id, "bot", response, current_time)
    )
    
    conn.commit()
    conn.close()

# 示例
session_id = str(uuid.uuid4())
await create_new_chat_session(session_id, "什么是RAG？", "RAG是一种结合检索和生成的AI技术。")

## 总结与实践

### 关键点回顾
- **RAG架构**：结合检索和生成，提升回答质量
- **向量嵌入**：使用m3e-base模型将文档转为向量
- **FastAPI**：实现高效的异步API和流式响应
- **大模型**：通过API集成大模型，优化提示词
- **会话管理**：使用SQLite存储历史记录

### 实践任务
1. 初始化一个简单的RAG系统，包含文档上传和检索功能
2. 实现一个流式问答接口，集成m3e-base和大模型API
3. 添加会话管理功能，支持历史记录保存和恢复

### 下一步
- 学习前端开发，构建响应式界面
- 集成语音交互功能
- 系统优化和功能拓展
- 系统部署和运行