# 使用 Milvus 和 DeepSeek 构建 RAG

DeepSeek 帮助开发者使用高性能语言模型构建和扩展 AI 应用。它提供高效的推理、灵活的 API 以及先进的专家混合 (MoE) 架构，用于强大的推理和检索任务。

在本教程中，我们将展示如何使用 Milvus 和 DeepSeek 构建一个检索增强生成 (RAG) 管道。

## 准备工作

### 依赖与环境

In [59]:
!pip install "pymilvus[model]==2.5.10" openai==1.82.0 requests==2.32.3 tqdm==4.67.1 torch==2.7.0

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple12


---

In [60]:
import os

# 从环境变量获取 DeepSeek API Key
api_key = os.getenv("DEEPSEEK_API_KEY")
print(api_key)

sk-f201ca717fa5442eb57bbe33eb894672


### 准备数据

我们使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库，这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档解压到 `milvus_docs` 文件夹。

**建议在命令行执行下面命令**

In [61]:
import re

level1 = re.compile(r"^## (.*?)$")
level2 = re.compile(r"^### (.*?)$")
level3 = re.compile(r"^#### (.*?)$")
article = re.compile(r"^\*\*(第.{1,10}条)\*\* (.+)$")  # 匹配条文


我们从 `milvus_docs/en/faq` 文件夹加载所有 markdown 文件。对于每个文档，我们简单地使用 "# " 来分割文件中的内容，这样可以大致分离出 markdown 文件中每个主要部分的内容。

In [62]:
from glob import glob

title_path = []
chunks = []

for file_path in glob("mfd.md", recursive=True):
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    article_pattern = re.compile(r"\*\*(第.{1,10}条)\*\* (.+)")
    
    for line in lines:
        line = line.strip()
        if line.startswith("## "):  # 一级标题
            title_path = [line[3:].strip()]
        elif line.startswith("### "):  # 二级标题
            if len(title_path) >= 1:
                if len(title_path) == 1:
                    title_path.append(line[4:].strip())
                else:
                    title_path[1] = line[4:].strip()
        elif line.startswith("#### "):  # 三级标题
            if len(title_path) >= 2:
                if len(title_path) == 2:
                    title_path.append(line[5:].strip())
                else:
                    title_path[2] = line[5:].strip()
        else:
            match = article_pattern.match(line)
            if match:
                article = match.group(1)
                content = match.group(2)
                chunks.append({
                    "title_path": title_path.copy(),
                    "article": article,
                    "content": content
                })


In [63]:
len(chunks)

387

In [64]:
chunks[:5]

[{'title_path': ['中华人民共和国民法典', '（二）物权编', '第一章 一般规定'],
  'article': '第二百零四条',
  'content': '为了明确物的归属，充分发挥物的效用，保护权利人的合法权益，维护社会经济秩序，制定本编。'},
 {'title_path': ['中华人民共和国民法典', '（二）物权编', '第一章 一般规定'],
  'article': '第二百零五条',
  'content': '本编调整因物的归属和利用产生的民事关系。'},
 {'title_path': ['中华人民共和国民法典', '（二）物权编', '第一章 一般规定'],
  'article': '第二百零六条',
  'content': '国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。'},
 {'title_path': ['中华人民共和国民法典', '（二）物权编', '第一章 一般规定'],
  'article': '第二百零七条',
  'content': '国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。'},
 {'title_path': ['中华人民共和国民法典', '（二）物权编', '第一章 一般规定'],
  'article': '第二百零八条',
  'content': '不动产权利的设立、变更、转让和消灭，应当依照法律规定登记。动产物权的设立和转让，应当依照法律规定交付。'}]

### 准备 LLM 和 Embedding 模型

DeepSeek 支持 OpenAI 风格的 API，您可以使用相同的 API 进行微小调整来调用 LLM。

In [65]:
from openai import OpenAI

deepseek_client = OpenAI(
    api_key=api_key,
    base_url="https://api.deepseek.com/v1",  # DeepSeek API 的基地址
)
api_key

'sk-f201ca717fa5442eb57bbe33eb894672'

定义一个 embedding 模型，使用 `milvus_model` 来生成文本嵌入。我们以 `DefaultEmbeddingFunction` 模型为例，这是一个预训练的轻量级嵌入模型。

In [89]:
from pymilvus import model as milvus_model
from sentence_transformers import SentenceTransformer


#embedding_model = milvus_model.DefaultEmbeddingFunction()
embedding_model = SentenceTransformer('shibing624/text2vec-base-chinese')  # 中文效果较好

modules.json:   0%|          | 0.00/230 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/13.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/54.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/856 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/409M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/319 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/110k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/74.0 [00:00<?, ?B/s]

生成一个测试嵌入并打印其维度和前几个元素。

In [93]:
test_embedding = embedding_model.encode(["第二百零四条"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

768
[-0.31876534  0.65547967 -0.14721599  0.9130001   0.06470051 -0.6080847
  0.72726095 -0.02721348  0.23516041 -0.37115   ]


In [95]:
test_embedding_0 = embedding_model.encode(["第二百零七条"])[0]
print(test_embedding_0[:10])

[-0.53187436  0.6951853   0.09212895  0.979648    0.58372724 -0.8735608
  0.5928255   0.00845217  0.04227739 -0.42507032]


## 将数据加载到 Milvus

### 创建 Collection

In [96]:
from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_mfd.db")

collection_name = "mfd_rag_collection"

关于 `MilvusClient` 的参数：

*   将 `uri` 设置为本地文件，例如 `./milvus.db`，是最方便的方法，因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
*   如果您有大规模数据，可以在 Docker 或 Kubernetes 上设置性能更高的 Milvus 服务器。在此设置中，请使用服务器 URI，例如 `http://localhost:19530`，作为您的 `uri`。
*   如果您想使用 Zilliz Cloud（Milvus 的完全托管云服务），请调整 `uri` 和 `token`，它们对应 Zilliz Cloud 中的 Public Endpoint 和 Api key。

检查 collection 是否已存在，如果存在则删除它。

In [97]:
if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)


创建一个具有指定参数的新 collection。

如果我们不指定任何字段信息，Milvus 将自动创建一个默认的 `id` 字段作为主键，以及一个 `vector` 字段来存储向量数据。一个保留的 JSON 字段用于存储非 schema 定义的字段及其值。

`metric_type` (距离度量类型):
     作用：定义如何计算向量之间的相似程度。
     例如：`IP` (内积) - 值越大通常越相似；`L2` (欧氏距离) - 值越小越相似；`COSINE` (余弦相似度) - 通常转换为距离，值越小越相似。
     选择依据：根据你的嵌入模型的特性和期望的相似性定义来选择。

 `consistency_level` (一致性级别):
     作用：定义数据写入后，读取操作能多快看到这些新数据。
     例如：
         `Strong` (强一致性): 总是读到最新数据，可能稍慢。
         `Bounded` (有界过期): 可能读到几秒内旧数据，性能较好 (默认)。
         `Session` (会话一致性): 自己写入的自己能立刻读到。
         `Eventually` (最终一致性): 最终会读到新数据，但没时间保证，性能最好。
     选择依据：在数据实时性要求和系统性能之间做权衡。

简单来说：
 `metric_type`：怎么算相似。
 `consistency_level`：新数据多久能被读到。

In [98]:
milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # 内积距离
    consistency_level="Strong",  # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level。
)

### 插入数据

遍历文本行，创建嵌入，然后将数据插入 Milvus。

这里有一个新字段 `text`，它是在 collection schema 中未定义的字段。它将自动添加到保留的 JSON 动态字段中，该字段在高级别上可以被视为普通字段。

In [100]:
from tqdm import tqdm

data = []

text_lines = [f"{c['article']} {c['content']}" for c in chunks]
metadatas = [c for c in chunks]
doc_embeddings = embedding_model.encode(text_lines)

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
    data.append({"id": i, "vector": doc_embeddings[i], "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

Creating embeddings: 100%|███████████████████████████████████████████████████████| 387/387 [00:00<00:00, 2302405.17it/s]


{'insert_count': 387, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 

In [101]:
text_lines[:5]

['第二百零四条 为了明确物的归属，充分发挥物的效用，保护权利人的合法权益，维护社会经济秩序，制定本编。',
 '第二百零五条 本编调整因物的归属和利用产生的民事关系。',
 '第二百零六条 国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。',
 '第二百零七条 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。',
 '第二百零八条 不动产权利的设立、变更、转让和消灭，应当依照法律规定登记。动产物权的设立和转让，应当依照法律规定交付。']

## 构建 RAG

### 检索查询数据

我们指定一个关于 Milvus 的常见问题。

In [105]:
#question = "第二百零四条规定了什么？"
question = "国家、集体和私人所有的物权是否受到法律平等保护？"

在 collection 中搜索该问题，并检索语义上最匹配的前3个结果。

In [106]:
search_res = milvus_client.search(
    collection_name=collection_name,
    data=embedding_model.encode(
        [question]
    ),  # 将问题转换为嵌入向量
    limit=3,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回 text 字段
)
search_res

data: [[{'id': 121, 'distance': 207.38560485839844, 'entity': {'text': '第三百二十四条 共有人对共有物享有共有和共同管理的权利。'}}, {'id': 3, 'distance': 207.08399963378906, 'entity': {'text': '第二百零七条 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。'}}, {'id': 61, 'distance': 206.47080993652344, 'entity': {'text': '第二百六十四条 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。'}}]]

让我们看一下查询的搜索结果

In [107]:
import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))

[
    [
        "\u7b2c\u4e09\u767e\u4e8c\u5341\u56db\u6761 \u5171\u6709\u4eba\u5bf9\u5171\u6709\u7269\u4eab\u6709\u5171\u6709\u548c\u5171\u540c\u7ba1\u7406\u7684\u6743\u5229\u3002",
        207.38560485839844
    ],
    [
        "\u7b2c\u4e8c\u767e\u96f6\u4e03\u6761 \u56fd\u5bb6\u3001\u96c6\u4f53\u3001\u79c1\u4eba\u7684\u7269\u6743\u548c\u5176\u4ed6\u6743\u5229\u4eba\u7684\u7269\u6743\u53d7\u6cd5\u5f8b\u5e73\u7b49\u4fdd\u62a4\uff0c\u4efb\u4f55\u7ec4\u7ec7\u6216\u8005\u4e2a\u4eba\u4e0d\u5f97\u4fb5\u72af\u3002",
        207.08399963378906
    ],
    [
        "\u7b2c\u4e8c\u767e\u516d\u5341\u56db\u6761 \u56fd\u5bb6\u3001\u96c6\u4f53\u3001\u79c1\u4eba\u7684\u7269\u6743\u548c\u5176\u4ed6\u6743\u5229\u4eba\u7684\u7269\u6743\u53d7\u6cd5\u5f8b\u5e73\u7b49\u4fdd\u62a4\uff0c\u4efb\u4f55\u7ec4\u7ec7\u6216\u8005\u4e2a\u4eba\u4e0d\u5f97\u4fb5\u72af\u3002",
        206.47080993652344
    ]
]


### 使用 LLM 获取 RAG 响应

将检索到的文档转换为字符串格式。

In [108]:
context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

In [109]:
context

'第三百二十四条 共有人对共有物享有共有和共同管理的权利。\n第二百零七条 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。\n第二百六十四条 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。'

In [110]:
question

'国家、集体和私人所有的物权是否受到法律平等保护？'

为语言模型定义系统和用户提示。此提示是使用从 Milvus 检索到的文档组装而成的。

In [111]:
SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""

USER_PROMPT = f"""请根据以下《民法典》内容回答问题：

{context}

问题：{question}
回答："""

In [112]:
USER_PROMPT

'请根据以下《民法典》内容回答问题：\n\n第三百二十四条 共有人对共有物享有共有和共同管理的权利。\n第二百零七条 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。\n第二百六十四条 国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。\n\n问题：国家、集体和私人所有的物权是否受到法律平等保护？\n回答：'

使用 DeepSeek 提供的 `deepseek-chat` 模型根据提示生成响应。

In [113]:
response = deepseek_client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)

是的，根据《民法典》第二百零七条和第二百六十四条的规定，国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。
