# 简历项目3：Ollama构建本地轻量级RAG系统
## 1. 项目简介

## 2. 数据集 & RAG系统
&emsp;&emsp;RAG基础阅读：[16种RAG方案一站式攻略](https://blog.csdn.net/l01011_/article/details/149929414)、[七大Agentic RAG框架技术解析](https://zhuanlan.zhihu.com/p/19229901774) <br>
&emsp;&emsp;[RAGEval数据集文献](https://arxiv.org/abs/2408.01262): Scenario Specific RAG Evaluation Dataset Generation Framework  <br>
&emsp;&emsp;[LightRAG文献](https://arxiv.org/abs/2410.05779)：Simple and Fast Retrieval-Augmented Generation <br>
&emsp;&emsp;本地配置：Ollama推理Qwen3-4B-Q8_0.gguf大模型和bge-m3-FP16.gguf嵌入模型，可以在魔搭社区上提前下载好再使用Modelfile配置一下，vllm推理bge-reranker重排序模型，在运行LightRAG之前在终端开启以上服务。<br>
&emsp;&emsp;扩展阅读：[大语言模型检索增强生成优化技术研究综述](https://kns.cnki.net/kcms2/article/abstract?v=VYuoLtjwl8MYmE9GqmY8xf1O2fecUU_GoKLRuNKU-AXEtLiOvAqt73fsdoc1L62K0gVUoqEl1fMU-PHeQi0fQ6PzmGkmjb8x483Ljy4UjeCjK8ZShmTxtWYBFmURFJ7SMPVCL0qVDVoAeS8RPrixwtQyrFdVs9hRv1CoClwDQ6U1lr5rIsd0xw==&uniplatform=NZKPT&language=CHS)、[检索增强生成技术研究综述](http://cea.ceaj.org/CN/10.3778/j.issn.1002-8331.2501-0061)、[检索增强生成在软件工程中的应用综述](https://kns.cnki.net/kcms2/article/abstract?v=VYuoLtjwl8NUINSF6UaZuKpxXpAOW_G5nb95hxqf2qltz5ibMQ0VuR90boRWvM6LrL-5K9nO9vmn_kF02NmRcRceWKeEjFeQZ688uocBK6oarvPseNdVa__ulkWomPWOeOtO0ZRDKgSfrqQyExy3hGEN1-oIJmy_F4R48dvLnBbHAOoQJWAfdg==&uniplatform=NZKPT&language=CHS)

## 3. LightRAG解读
<div align="center">
   <img src="./pics/rp3_rag_1.png" width="1000" />
</div>

&emsp;&emsp;参考阅读:[深度解析：LightRAG知识图谱实现于应用分析](https://mp.weixin.qq.com/s/scZ1WZRS3v1QBOYV_KYFTw)、[LightRAG技术框架解读](https://zhuanlan.zhihu.com/p/13261291813)

&emsp;&emsp;知识图谱是 LightRAG 区别于传统 RAG 系统的核心特性，它不仅将文档转换为向量，还构建了一个结构化的知识网络，让系统能够理解实体之间的关系，进行推理和发现。

### 3.1 LightRAG管道初始化
1. LightRAG通过异步函数进行初始化。异步函数的好处有：
   1. **非阻塞执行**：可以避免阻塞主线程，在等待 I/O 操作（如文件读写、网络请求、数据库连接）时，程序可以继续执行其他任务。
   1. **资源密集型操作优化**：初始化RAG实例通常设计大量资源操作（创建存储目录、连接LLM模型服务、初始化嵌入函数、设置向量数据库连接），异步处理可以让这些操作不阻塞其他程序逻辑。
   1. **异步和同步**
1. LightRAG初始化采用了一种延迟初始化的模式，与一般对象初始化有显著区别。
   1. **时机差异**：传统初始化会在init方法中立即初始化所有资源，而LightRAG初始化时将存储初始化推迟到显示调用。
   1. **资源管理**：传统构造函数立即占用所有资源，而LighRAG允许在创建对象后、初始化存储前进行配置调整。

In [3]:
# 1. 异步函数同步执行（等待完成）
import asyncio
import time

async def sync_execution_example():
    print("开始同步执行异步任务...")
    
    # 这里会等待 task1 完全完成后才继续执行
    result1 = await long_running_task("任务1")
    print(f"任务1完成，结果: {result1}")
    
    # 必须等待任务1完成后才会开始任务2
    result2 = await long_running_task("任务2") 
    print(f"任务2完成，结果: {result2}")
    
    print("所有任务按顺序完成")

async def long_running_task(name):
    print(f"{name} 开始执行")
    # 模拟耗时操作 - 这里会释放控制权给事件循环
    await asyncio.sleep(2)  
    print(f"{name} 执行完毕")
    return f"{name}的结果"

# 同步执行 - 等待每个任务完成
await sync_execution_example()

开始同步执行异步任务...
任务1 开始执行
任务1 执行完毕
任务1完成，结果: 任务1的结果
任务2 开始执行
任务2 执行完毕
任务2完成，结果: 任务2的结果
所有任务按顺序完成


In [None]:
# 并发执行多个异步任务
import asyncio

async def concurrent_execution_example():
    print("开始并发执行异步任务...")
    
    # 使用 asyncio.gather 并发执行多个任务
    results = await asyncio.gather(
        long_running_task("并发任务1"),
        long_running_task("并发任务2"), 
        long_running_task("并发任务3")
    )
    
    print(f"所有并发任务完成，结果: {results}")

# 并发执行 - 多个任务同时运行
await concurrent_execution_example()

开始并发执行异步任务...
并发任务1 开始执行
并发任务2 开始执行
并发任务3 开始执行
并发任务1 执行完毕
并发任务2 执行完毕
并发任务3 执行完毕
所有并发任务完成，结果: ['并发任务1的结果', '并发任务2的结果', '并发任务3的结果']


In [None]:
# 让出控制权继续执行其他任务
import asyncio

async def yielding_control_example():
    print("开始演示让出控制权...")
    
    # 启动任务但不立即等待
    task1 = asyncio.create_task(long_running_task("后台任务1"))
    task2 = asyncio.create_task(long_running_task("后台任务2"))
    
    # 在等待期间可以执行其他操作
    print("启动了后台任务，现在执行其他操作...")
    
    # 模拟其他工作
    for i in range(3):
        print(f"执行其他工作 {i+1}/3")
        await asyncio.sleep(0.5)  # 让出控制权，允许其他协程运行
    
    print("等待后台任务完成...")
    # 现在等待所有任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"后台任务结果: {result1}, {result2}")

await yielding_control_example()

开始演示让出控制权...
启动了后台任务，现在执行其他操作...
执行其他工作 1/3
后台任务1 开始执行
后台任务2 开始执行
执行其他工作 2/3
执行其他工作 3/3
等待后台任务完成...
后台任务1 执行完毕
后台任务2 执行完毕
后台任务结果: 后台任务1的结果, 后台任务2的结果


In [6]:
#  任务取消和控制权转移
import asyncio

async def cancellable_task_example():
    print("演示任务取消和控制权转移...")
    
    # 创建一个可能需要很长时间的任务
    long_task = asyncio.create_task(long_running_operation())
    
    try:
        # 等待一段时间，如果超时则取消任务
        result = await asyncio.wait_for(long_task, timeout=3.0)
        print(f"任务正常完成: {result}")
    except asyncio.TimeoutError:
        print("任务超时，取消任务...")
        long_task.cancel()  # 取消任务
        
        try:
            # 等待任务真正被取消
            await long_task
        except asyncio.CancelledError:
            print("任务已被成功取消")
    
    # 继续执行其他操作
    print("继续执行其他任务...")

async def long_running_operation():
    for i in range(10):
        print(f"长时间操作步骤 {i+1}/10")
        await asyncio.sleep(1)  # 让出控制权
    return "长时间操作完成"

# asyncio.run(cancellable_task_example())
await cancellable_task_example()

演示任务取消和控制权转移...
长时间操作步骤 1/10
长时间操作步骤 2/10
长时间操作步骤 3/10
任务超时，取消任务...
任务已被成功取消
继续执行其他任务...


**异步函数核心要点总结**<br>
1. **同步执行**：使用 await 会等待任务完成后再继续。
1. **并发执行**：使用 asyncio.gather() 或 asyncio.create_task() 会同时执行多个任务。
1. **让出控制权**：await asyncio.sleep(0) 或其他异步操作会让出 CPU 时间片。
1. **任务管理**：可以创建、取消、等待任务，灵活控制执行流程。

### 3.1 文本文档预处理
&emsp;&emsp;lightrag.py中的 apipeline_enqueue_documents 异步文档入队管道函数，主要负责验证、去重、过滤和添加待处理文档到文档状态存储中。使用到的参数**input: str | list[str], ids: list[str] | None = None, file_paths: str | list[str] | None = None,track_id: str | None = None**。<br>
1. **标准化输入参数**：确保后续处理逻辑能够一致地处理列表格式的数据，并验证参数之间的逻辑关系，为后续的文档去重和ID管理做好准备。具体地，如果没有提供track_id或空字符串，则生成一个新的跟踪ID，前缀为"enqueue"。将单个字符串类型的input转换为列表格式，确保函数能处理单个或多个文档。此外，还会确保文件路径数量与输入文档数量匹配，**input参数**存储实际文本内容，**file_paths**记录文档来源，作为元数据。<br>
1. **文档去重和ID管理**：如果未提供ID，则自动生成MD5哈希作为文档ID，同时基于内容的唯一性自动去除重复文档，此外验证用户提供的ID是否唯一且数量匹配。具体地：
    1. ID管理策略：检查ID数量石佛与文档数量匹配，使用set(ids)检查是否唯一，ID必须是唯一的字符串，if len(ids) != len(set(ids))。**ids参数**作为每个输入文档的唯一ID，用于数据去重和索引，可以是多个ID。**track_id参数**跟踪一批文档的处理状态，用于查询处理进度，单个ID。
    1. 文档去重机制，**(1)内容级去重**使用清理后（清理文本以确保安全的UTF-8编码，通过移除或替换可能导致编码错误的问题字符。）的内容作为字典的键进行去重，相同内容只保留第一个出现的。**(2)存储级去重**使用filter_keys方法检查存储中是否已经存在相同ID的文档，只处理不存在的文档ID。内容去重和ID去重的双重去重机制，确保了系统中不会出现重复内容的文档，同时维护了文档ID的唯一性和系统的稳定性。
1. **文档状态初始化**：定义无文档内容字典变量new_docs={"id_1":{"status":"....",...},"id_2":{"status":"....",...}}嵌套字典，id_1为唯一标识字符。为新文档创建初始状态DocStatus.PENDING
    （class DocStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    PREPROCESSED = "preprocessed"
    PROCESSED = "processed"
    FAILED = "failed"），其他键：content_summary: 从文档内容生成的摘要（前250个字符）；content_length: 文档内容的字符长度；created_at: 文档创建时间戳（ISO格式的UTC时间）；updated_at: 文档更新时间戳（ISO格式的UTC时间）；file_path: 文档的原始文件路径；track_id: 用于跟踪此批处理操作的ID。
1. **重复检测与过滤**：检查文档是否已存在于系统中，过滤掉已处理过的文档，避免重复处理，记录被忽略的文档ID并输出警告。
1. **数据持久化**：将文档内容存储到full_docs存储中，将文档状态存储到doc_status存储中。立即执行index_done_callback()确保数据持久化。
1. **错误处理与日志**：提供详细的日志记录，包括处理进度和警告信息。对于重复文档他提供清晰的警告信息，支持跟踪ID用于监控处理状态。

### 3.2 知识图谱构建
&emsp;&emsp;lightrag.py中的apipeline_process_enqueue_documents函数是LightRAG的文档处理管道，负责将待处理文档转换为知识图谱。这个函数本质上是一个***生产者-消费者**模式的处理管道，负责将原始文档转换为结构化知识图谱数据。主要功能如下：<br>
1. **文档状态管理**：首先获取共享的管道状态数据和相应的锁，用于协调多个进程或线程之间的文档处理任务。具体地：
    1. 获取管道状态数据：从文档状态太存储中获取所有处于pending（等待）、processing（处理中）和failed（失败）状态的文档
    1. 工作进程协调：通过共享的pipeline_status和锁机制确保同一时间只有一个进程在处理文档队列。
1. **数据一致性验证**：
    1. 数据验证：检查文档状态与实际文档内容之间的一致性
    1. 异常处理：删除不一致的数据条目，当保留失败的文档以供手动审查
    1. 状态重置：将经过一致性检查后人处于processing或failed状态的文档重置为pending状态
1. **多阶段处理流程**：。
    1. **预处理：文档分块与存储**<br>
    两种分块策略类型：<br>
    1、基于Token的分块（默认），按照token数量分割文档，默认块的大小为1200tokens，重叠大小为100tokens。<br>
    2、基于字符的分块，按照指定字符分割文档，支持仅按字符分割或结合token大小限制。**参数说明**：<font color='red'>**split_by_character: str | None**</font>，类型: str | None，说明: 指定用来分割文本的字符，如果不是 None，会先按照这个字符分割文本，然后再根据 token 限制进一步分割（除非设置了 split_by_character_only=True）；<font color='red'>**split_by_character_only: bool**</font>，类型: bool，说明:如果为 True，仅按 split_by_character 指定的字符分割，不考虑 token 限制，如果为 False，会在按字符分割后，再按 token 大小限制进行二次分割。<br>
    1. **阶段1：并行存储操作**：通过 asyncio.gather() 同时执行多个存储任务的并行处理步骤。这一步骤是为了提高处理效率，同时完成文档状态更新、向量数据库插入和文本块存储等操作。向量数据库使用基于json文件的nano-vectordb，用于存储实体向量、关系向量和文本块向量。图数据基于networkX库，序列化为json文件保存。文档的键值存储将数据存在json文件中，有文本块、完整文档、实体信息、关系信息。
    1. **阶段2：实体关系提取**：
    1. **阶段3：合并节点和边到知识图谱**
1. **并发控制**：
    1. 信号量限制：使用max_parallel_insert参数控制同时处理的文档数量
    1. 任务调度：通过异步任务和协程实现高效并发处理
1. **错误处理和取消支持**
    1. 异常捕获：完善的错误处理机制，包括用户取消操作
    1. 状态更新：处理失败时更新文档状态为 FAILED
    1. 资源清理：确保在各种情况下正确清理资源和持久化缓存
1. **资源管理**
1. **进度追踪**
     1. 日志记录：详细记录处理进度和状态变化
     1. 状态监控：通过 pipeline_status 提供处理进度的实时监控



## 4. 面试问答
1. 有没有了解过GraphRAG？[参考：完整案例透视GraphRAG工作机制：从图构建到图检索](https://zhuanlan.zhihu.com/p/1922782829256868561)<br>
    GraphRAG是一种利用图结构来增强检索增强生成的技术，其核心流程可分为两大阶段：<br>
    **(1) 索引** <br>
    &#8226;&emsp;将输入语料库切分成一系列TextUnit，并从中提取所有实体、关系<br>
    &#8226;&emsp;使用Leiden技术执行图的层次聚类，将图划分为多个社区，调用LLM为每个社区生成一份详细的报告，概述其主要事件或主题。<br>
    **(2) 查询** <br>
    &#8226;&emsp;局部搜索(Local Search)：应对具体问题。<br>
    &#8226;&emsp;全局搜索(Global Search)：探索宏观主题，使用了一种类似MapReduce的策略。<br>
    &#8226;&emsp;基本搜索：适合只进行向量搜索的基线RAG，标准top k向量搜索。<br>
    &#8226;&emsp;Drift搜索：通过扩展到其邻居和相关概念对特定实体进行推理，但增加了社区信息的上下文。<br>

1. 

#### 5. 项目涉及技术提问介绍
1. 介绍一下lighttRAG方法。 [参考：LightRAG是什么？都用在什么场景](https://blog.csdn.net/sunyuhua_keyboard/article/details/148695068) 、[LightRAG技术框架解读](https://zhuanlan.zhihu.com/p/13261291813)<br>
   LightRAG全称轻量检索增强生成，是香港大学数据科学实验室开发的开源框架，旨在提供一个高效、轻量、成本更低的RAG系统，作为微软GraphRAG的替代方案。它结合了知识图谱和向量检索，通过图结构增强文本索引和检索过程，解决了传统RAG在上下文感知、复杂关系处理和动态更新的局限性。LightRAG主要特点包括：<br>
      1. 图索引增强：通过提取文档中的实体和关系，构建知识图谱，捕捉数据间的复杂关联。
      1. 双层检索机制：支持底层和高层检索，适应不同类型的查询。
      1. 增量更新：新数据可直接合并到现有图谱，无需重建整个索引，降低计算成本。
      1. 高效检索：相比GraphRAG，LightRAG在检索阶段使用更少的token（<100 vs 600-10000）和API调用（单次和多次），显著提高效率。
      1. 去重优化：通过去重减少冗余实体和关系，优化图谱规模和性能。
      1. 多模态支持：支持文本、图像等多模态数据处理，扩展应用场景。
    