# 使用 Milvus 和 DeepSeek 构建 RAG

DeepSeek 帮助开发者使用高性能语言模型构建和扩展 AI 应用。它提供高效的推理、灵活的 API 以及先进的专家混合 (MoE) 架构，用于强大的推理和检索任务。

在本教程中，我们将展示如何使用 Milvus 和 DeepSeek 构建一个检索增强生成 (RAG) 管道。

## 准备工作

### 依赖与环境

In [None]:
!pip install "pymilvus[model]==2.5.10" openai==1.82.0 requests==2.32.3 tqdm==4.67.1 torch==2.7.0

---

In [1]:
import os

# 从环境变量获取 DeepSeek API Key
api_key = os.getenv("DEEPSEEK_API_KEY")

### 准备数据

我们使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库，这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档解压到 `milvus_docs` 文件夹。

**建议在命令行执行下面命令**

In [2]:
#!wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
#!unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从 `milvus_docs/en/faq` 文件夹加载所有 markdown 文件。对于每个文档，我们简单地使用 "# " 来分割文件中的内容，这样可以大致分离出 markdown 文件中每个主要部分的内容。

In [5]:
from glob import glob
import re

text_lines = []

def split_civil_code():
    # 使用正则表达式匹配标题和内容
    # 读取文件内容
    with open("mfd.md", "r", encoding="utf-8") as file:
        content = file.read()

    # 使用正则表达式按 `**第xxx条**` 分割
    sections = re.split(r'(\*\*第[零一二三四五六七八九十百千]+条\*\*)', content)

    # 将分割后的内容组合成列表
    result = []
    for i in range(1, len(sections), 2):
        result.append(sections[i] + sections[i+1].strip())

    return result

text_lines=split_civil_code()

In [6]:
len(text_lines)

387

In [93]:
text_lines = all_lines[0:20]

In [94]:
text_lines

['**第二百零四条**为了明确物的归属，充分发挥物的效用，保护权利人的合法权益，维护社会经济秩序，制定本编。',
 '**第二百零五条**本编调整因物的归属和利用产生的民事关系。',
 '**第二百零六条**国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。\n国家巩固和发展公有制经济，鼓励、支持和引导非公有制经济的发展。\n国家实行社会主义市场经济，保障一切市场主体的平等法律地位和发展权利。',
 '**第二百零七条**国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。',
 '**第二百零八条**不动产权利的设立、变更、转让和消灭，应当依照法律规定登记。动产物权的设立和转让，应当依照法律规定交付。',
 '**第二百零九条**不动产物权的设立、变更、转让和消灭，经依法登记，发生效力；未经登记，不发生效力，但是法律另有规定的除外。\n依法属于国家所有的自然资源，所有权可以不登记。',
 '**第二百一十条**不动产登记，由不动产所在地的登记机构办理。\n国家对不动产实行统一登记制度。统一登记的范围、登记机构和登记办法，由法律、行政法规规定。',
 '**第二百一十一条**当事人申请登记，应当根据不同登记事项提供材料。\n申请登记材料以及登记事项相关信息，可以公开查询。',
 '**第二百一十二条**登记机构应当履行下列职责：\n（一）审查申请人提供的材料；\n（二）询问申请人；\n（三）如实、及时登记；\n（四）法律、行政法规规定的其他职责。\n申请登记的不动产存在尚未解决的权属争议的，登记机构应当不予登记，并书面告知申请人。',
 '**第二百一十三条**登记机构不得有下列行为：\n（一）要求对不动产进行评估；\n（二）以不动产登记为条件收取其他费用；\n（三）超出登记职责范围的其他行为。',
 '**第二百一十四条**不动产物权的设立、变更、转让和消灭，依照法律规定应当登记的，自记载于不动产登记簿时发生效力。',
 '**第二百一十五条**不动产登记簿由登记机构管理。\n不动产登记簿应当采用纸质形式或者电子形式。\n不动产登记簿采用电子形式的，应当备份。',
 '**第二百一十六条**不动产登记簿是物权归属和内容的根据。\n不动产登记簿记载的事项与不动产权属证书记载的事项不一致的，除有证据证明不动产登记

### 准备 LLM 和 Embedding 模型

DeepSeek 支持 OpenAI 风格的 API，您可以使用相同的 API 进行微小调整来调用 LLM。

In [7]:
from openai import OpenAI

deepseek_client = OpenAI(
    api_key=api_key,
    base_url="https://api.deepseek.com/v1",  # DeepSeek API 的基地址
)

定义一个 embedding 模型，使用 `milvus_model` 来生成文本嵌入。我们以 `DefaultEmbeddingFunction` 模型为例，这是一个预训练的轻量级嵌入模型。

In [8]:
from pymilvus import model as milvus_model

embedding_model = milvus_model.DefaultEmbeddingFunction()


生成一个测试嵌入并打印其维度和前几个元素。

In [46]:
test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

768
[-0.04836059  0.07163021 -0.01130063 -0.03789341 -0.03320651 -0.01318453
 -0.03041721 -0.02269495 -0.02317858 -0.00426026]


In [10]:
test_embedding_0 = embedding_model.encode_queries(["That is a test"])[0]
print(test_embedding_0[:10])

[-0.02752976  0.0608853   0.00388525 -0.00215193 -0.02774976 -0.0118618
 -0.04020916 -0.06023417 -0.03813156  0.0100272 ]


## 将数据加载到 Milvus

### 创建 Collection

In [47]:
from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_civil_code_demo.db")

collection_name = "rag_civil_code_collection"

关于 `MilvusClient` 的参数：

*   将 `uri` 设置为本地文件，例如 `./milvus.db`，是最方便的方法，因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
*   如果您有大规模数据，可以在 Docker 或 Kubernetes 上设置性能更高的 Milvus 服务器。在此设置中，请使用服务器 URI，例如 `http://localhost:19530`，作为您的 `uri`。
*   如果您想使用 Zilliz Cloud（Milvus 的完全托管云服务），请调整 `uri` 和 `token`，它们对应 Zilliz Cloud 中的 Public Endpoint 和 Api key。

检查 collection 是否已存在，如果存在则删除它。

In [85]:
if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

创建一个具有指定参数的新 collection。

如果我们不指定任何字段信息，Milvus 将自动创建一个默认的 `id` 字段作为主键，以及一个 `vector` 字段来存储向量数据。一个保留的 JSON 字段用于存储非 schema 定义的字段及其值。

`metric_type` (距离度量类型):
     作用：定义如何计算向量之间的相似程度。
     例如：`IP` (内积) - 值越大通常越相似；`L2` (欧氏距离) - 值越小越相似；`COSINE` (余弦相似度) - 通常转换为距离，值越小越相似。
     选择依据：根据你的嵌入模型的特性和期望的相似性定义来选择。

 `consistency_level` (一致性级别):
     作用：定义数据写入后，读取操作能多快看到这些新数据。
     例如：
         `Strong` (强一致性): 总是读到最新数据，可能稍慢。
         `Bounded` (有界过期): 可能读到几秒内旧数据，性能较好 (默认)。
         `Session` (会话一致性): 自己写入的自己能立刻读到。
         `Eventually` (最终一致性): 最终会读到新数据，但没时间保证，性能最好。
     选择依据：在数据实时性要求和系统性能之间做权衡。

简单来说：
 `metric_type`：怎么算相似。
 `consistency_level`：新数据多久能被读到。

In [86]:
milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="COSINE",  # 内积距离
    consistency_level="Strong",  # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level。
)

### 插入数据

遍历文本行，创建嵌入，然后将数据插入 Milvus。

这里有一个新字段 `text`，它是在 collection schema 中未定义的字段。它将自动添加到保留的 JSON 动态字段中，该字段在高级别上可以被视为普通字段。

In [71]:
###
#对以上条件做总结，和输出5个相关的标签， 用JSON格式输出
#{ 'summary': <内容>, 
#   'tags': [ '<tag1>',...] 
#}
import json 

system_prompt_summary = """
对每个user input做总结，和输出5个相关的标签， 用JSON格式输出。
## 限制
1. 只需要输出生成的JSON。
2. 不要输出任何解释。
3. 不要输出任何其他内容。
4. 总结内容不要超过20字。
5. 标签不要超过5个字。

## 格式
{ 
'summary': <内容>, 
'tags': [ '<tag1>',...] 
}
"""

def get_civil_code_summary(text):
    response = deepseek_client.chat.completions.create(
            model="deepseek-chat",
            messages=[
            {"role": "system", "content": system_prompt_summary},
            {"role": "user", "content": text},            
        ],
        response_format={
            'type': 'json_object'
        }
    )
    jsonText = response.choices[0].message.content.replace("```json", "").replace("```", "")
    jsonText
    return json.loads(jsonText)

In [96]:
text_lines_summary = []
text_lines_tags = []

for line in text_lines:
    summary = get_civil_code_summary(line)
    text_lines_summary.append(summary['summary'])
    text_lines_tags.append(summary['tags'])

In [98]:
print(len(text_lines_summary))
print((text_lines_tags))

20
[['物权', '权益保护', '经济秩序', '法律', '归属'], ['物权', '民事关系', '归属', '利用', '法律'], ['社会主义', '公有制', '非公有制', '市场经济', '法律平等'], ['物权', '法律', '平等', '保护', '权利'], ['不动产', '动产物权', '法律登记', '物权转让', '法律规定'], ['物权法', '不动产', '登记', '法律', '自然资源'], ['不动产', '登记', '法律', '行政法规', '统一制度'], ['登记', '材料', '公开查询', '申请', '事项'], ['登记机构', '职责', '不动产', '权属争议', '法律'], ['不动产', '登记', '禁止', '评估', '收费'], ['物权', '不动产', '登记', '法律', '效力'], ['不动产', '登记簿', '管理', '纸质', '电子'], ['不动产', '登记簿', '物权归属', '权属证书', '法律依据'], ['不动产', '权属证书', '物权', '登记簿', '法律'], ['不动产', '查询', '复制', '登记', '权利'], ['不动产', '登记', '查询', '利害关系人', '法律'], ['不动产', '更正登记', '异议登记', '权利人', '损害赔偿'], ['物权', '预告登记', '不动产', '债权', '登记失效'], ['赔偿', '虚假材料', '登记错误', '法律责任', '追偿'], ['物权', '动产', '转让', '法律', '效力']]


In [91]:
len(text_lines)

387

In [99]:
from tqdm import tqdm
import re 

pattern = r'\*\*第[零一二三四五六七八九十百千]+条\*\*\s*'

data = []

clean_text_lines = [re.sub(pattern, '', line) for line in text_lines]

doc_embeddings = embedding_model.encode_documents(clean_text_lines)

for i, line in enumerate(tqdm(clean_text_lines, desc="Creating embeddings")):
    data.append({"id": i, 
                 "vector": doc_embeddings[i], 
                 "text": line, 
                 "summary": text_lines_summary[i], 
                 "tags": ",".join(text_lines_tags[i])})

milvus_client.insert(collection_name=collection_name, data=data)

Creating embeddings: 100%|██████████| 20/20 [00:00<00:00, 19691.57it/s]


{'insert_count': 20, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], 'cost': 0}

## 构建 RAG

### 检索查询数据

我们指定一个关于 Milvus 的常见问题。

In [15]:
question = "​​什么是“预告登记”？"

在 collection 中搜索该问题，并检索语义上最匹配的前3个结果。

In [135]:
def search(question):
    query = [question] if isinstance(question, str) else question
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=embedding_model.encode_queries(
        query
    ),  # 将问题转换为嵌入向量
    limit=10,  # 返回前3个结果
    search_params={"metric_type": "COSINE", "params": {}},  # 内积距离
        output_fields=["text","tags","summary"],  # 返回 text 字段
    )
    return search_res   


In [212]:
question = "不动产"
search_res = search(question)

In [213]:
# 查询所有匹配的实体
fuzzy_query = 'text like "%{q}%"'
# fuzzy_query = "summary like '%预告登记%'"
print(fuzzy_query)
query_result = milvus_client.query(
    collection_name=collection_name,
    filter=fuzzy_query,
    output_fields=["text","tags","summary"],
    search_params={"metric_type": "COSINE", "params": {"search_list": 100}},
    limit=5,
    filter_params = {"q": "不动产"}
)

text like "%{q}%"


In [214]:
query_result

data: []

In [215]:
for res in search_res[0]:
    print(res)

{'id': 1, 'distance': 1.0000001192092896, 'entity': {'text': '本编调整因物的归属和利用产生的民事关系。', 'summary': '调整物权民事关系', 'tags': '物权,民事关系,归属,利用,法律'}}
{'id': 11, 'distance': 0.8316565155982971, 'entity': {'text': '不动产登记簿由登记机构管理。\n不动产登记簿应当采用纸质形式或者电子形式。\n不动产登记簿采用电子形式的，应当备份。', 'summary': '不动产登记簿管理规范', 'tags': '不动产,登记簿,管理,纸质,电子'}}
{'id': 12, 'distance': 0.7050387263298035, 'entity': {'text': '不动产登记簿是物权归属和内容的根据。\n不动产登记簿记载的事项与不动产权属证书记载的事项不一致的，除有证据证明不动产登记簿确有错误外，以不动产登记簿为准。', 'summary': '不动产登记簿为物权归属依据', 'tags': '不动产,登记簿,物权归属,权属证书,法律依据'}}
{'id': 2, 'distance': 0.6966318488121033, 'entity': {'text': '国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。\n国家巩固和发展公有制经济，鼓励、支持和引导非公有制经济的发展。\n国家实行社会主义市场经济，保障一切市场主体的平等法律地位和发展权利。', 'summary': '国家坚持多种所有制经济共同发展', 'tags': '社会主义,公有制,非公有制,市场经济,法律平等'}}
{'id': 7, 'distance': 0.6711220145225525, 'entity': {'text': '当事人申请登记，应当根据不同登记事项提供材料。\n申请登记材料以及登记事项相关信息，可以公开查询。', 'summary': '登记申请需提供材料', 'tags': '登记,材料,公开查询,申请,事项'}}
{'id': 6, 'distance': 0.6711220145225525, 'entity': {'text': '不动产登记，由不动产所在地

让我们看一下查询的搜索结果

In [216]:
import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))

[
    [
        "\u672c\u7f16\u8c03\u6574\u56e0\u7269\u7684\u5f52\u5c5e\u548c\u5229\u7528\u4ea7\u751f\u7684\u6c11\u4e8b\u5173\u7cfb\u3002",
        1.0000001192092896
    ],
    [
        "\u4e0d\u52a8\u4ea7\u767b\u8bb0\u7c3f\u7531\u767b\u8bb0\u673a\u6784\u7ba1\u7406\u3002\n\u4e0d\u52a8\u4ea7\u767b\u8bb0\u7c3f\u5e94\u5f53\u91c7\u7528\u7eb8\u8d28\u5f62\u5f0f\u6216\u8005\u7535\u5b50\u5f62\u5f0f\u3002\n\u4e0d\u52a8\u4ea7\u767b\u8bb0\u7c3f\u91c7\u7528\u7535\u5b50\u5f62\u5f0f\u7684\uff0c\u5e94\u5f53\u5907\u4efd\u3002",
        0.8316565155982971
    ],
    [
        "\u4e0d\u52a8\u4ea7\u767b\u8bb0\u7c3f\u662f\u7269\u6743\u5f52\u5c5e\u548c\u5185\u5bb9\u7684\u6839\u636e\u3002\n\u4e0d\u52a8\u4ea7\u767b\u8bb0\u7c3f\u8bb0\u8f7d\u7684\u4e8b\u9879\u4e0e\u4e0d\u52a8\u4ea7\u6743\u5c5e\u8bc1\u4e66\u8bb0\u8f7d\u7684\u4e8b\u9879\u4e0d\u4e00\u81f4\u7684\uff0c\u9664\u6709\u8bc1\u636e\u8bc1\u660e\u4e0d\u52a8\u4ea7\u767b\u8bb0\u7c3f\u786e\u6709\u9519\u8bef\u5916\uff0c\u4ee5\u4e0d\u52a8\u4ea7\u767b\u8bb0\u7

### 使用 LLM 获取 RAG 响应

将检索到的文档转换为字符串格式。

In [217]:
context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

In [218]:
context

'本编调整因物的归属和利用产生的民事关系。\n不动产登记簿由登记机构管理。\n不动产登记簿应当采用纸质形式或者电子形式。\n不动产登记簿采用电子形式的，应当备份。\n不动产登记簿是物权归属和内容的根据。\n不动产登记簿记载的事项与不动产权属证书记载的事项不一致的，除有证据证明不动产登记簿确有错误外，以不动产登记簿为准。\n国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。\n国家巩固和发展公有制经济，鼓励、支持和引导非公有制经济的发展。\n国家实行社会主义市场经济，保障一切市场主体的平等法律地位和发展权利。\n当事人申请登记，应当根据不同登记事项提供材料。\n申请登记材料以及登记事项相关信息，可以公开查询。\n不动产登记，由不动产所在地的登记机构办理。\n国家对不动产实行统一登记制度。统一登记的范围、登记机构和登记办法，由法律、行政法规规定。\n利害关系人可以申请查询不动产登记资料。申请查询的，登记机构应当提供。\n权利人、利害关系人可以申请查询、复制不动产登记资料，登记机构应当提供。\n国家、集体、私人的物权和其他权利人的物权受法律平等保护，任何组织或者个人不得侵犯。\n不动产物权的设立、变更、转让和消灭，依照法律规定应当登记的，自记载于不动产登记簿时发生效力。'

In [219]:
question

'不动产'

为语言模型定义系统和用户提示。此提示是使用从 Milvus 检索到的文档组装而成的。

In [220]:
SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
USER_PROMPT = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。
<context>
{context}
</context>
<question>
{question}
</question>
<translated>
</translated>
"""

In [108]:
USER_PROMPT

"\n请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。\n<context>\n本编调整因物的归属和利用产生的民事关系。\n不动产登记簿由登记机构管理。\n不动产登记簿应当采用纸质形式或者电子形式。\n不动产登记簿采用电子形式的，应当备份。\n不动产登记簿是物权归属和内容的根据。\n不动产登记簿记载的事项与不动产权属证书记载的事项不一致的，除有证据证明不动产登记簿确有错误外，以不动产登记簿为准。\n国家坚持和完善社会主义公有制为主体、多种所有制经济共同发展的基本经济制度。\n国家巩固和发展公有制经济，鼓励、支持和引导非公有制经济的发展。\n国家实行社会主义市场经济，保障一切市场主体的平等法律地位和发展权利。\n当事人申请登记，应当根据不同登记事项提供材料。\n申请登记材料以及登记事项相关信息，可以公开查询。\n</context>\n<question>\n['什么', '是', '预告', '登记']\n</question>\n<translated>\n</translated>\n"

使用 DeepSeek 提供的 `deepseek-chat` 模型根据提示生成响应。

In [221]:
response = deepseek_client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(f"Question: {question}")
print(f"Answer: \n{response.choices[0].message.content}")

Question: 不动产
Answer: 
不动产是指土地、建筑物等不可移动的财产。根据提供的上下文，不动产的相关规定包括：

1. 不动产登记簿由登记机构管理，可以采用纸质或电子形式（电子形式需备份）。
2. 不动产登记簿是物权归属和内容的法定依据，当与权属证书不一致时，以登记簿为准（除非证明登记簿有误）。
3. 国家实行统一登记制度，登记由不动产所在地的登记机构办理。
4. 不动产物权的设立、变更等，依法需登记的，自记载于登记簿时生效。
5. 权利人及利害关系人可查询、复制登记资料，登记机构应提供。
6. 国家、集体、私人的物权受法律平等保护。

<translated>
不动产指土地、房屋等无法移动的财产。根据上下文，其管理规定包括：登记簿由官方管理（纸质/电子），是物权法定依据；实行统一登记制度，物权变动以登记簿记载为准；相关方可依法查询登记信息；各类主体的物权均受法律平等保护。
</translated>


In [54]:
import jieba.posseg as pseg

# 待分词的句子
sentence = "什么是预告登记？"

# 分词
words = pseg.lcut(sentence)
print(words)
nouns = [word for word, flag in words if flag.startswith('n') or flag.startswith('v')]

# 输出结果
print(nouns)

[pair('什么', 'r'), pair('是', 'v'), pair('预告', 'v'), pair('登记', 'v'), pair('？', 'x')]
['是', '预告', '登记']
