In [1]:
#!pip install "pymilvus[model]==2.5.10" openai==1.82.0 requests==2.32.3 tqdm==4.67.1 torch==2.7.0

---

In [2]:
import os

# 从环境变量获取 DeepSeek API Key
api_key = os.getenv("SILICON_API_KEY")
if not api_key:
    raise ValueError("请设置 SILICON_API_KEY 环境变量")

### 准备数据

我们使用 Milvus 文档 2.4.x 中的 FAQ 页面作为我们 RAG 中的私有知识库，这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档解压到 `milvus_docs` 文件夹。

**建议在命令行执行下面命令**

In [3]:
#!wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
#!unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从 `milvus_docs/en/faq` 文件夹加载所有 markdown 文件。对于每个文档，我们简单地使用 "# " 来分割文件中的内容，这样可以大致分离出 markdown 文件中每个主要部分的内容。

In [4]:
from glob import glob

text_lines = []

for file_path in glob("./milvus_docs/en/faq/*.md", recursive=True):
    with open(file_path, "r") as file:
        file_text = file.read()

    text_lines += file_text.split("# ")

In [5]:
len(text_lines)

72

### 准备 LLM 和 Embedding 模型

DeepSeek 支持 OpenAI 风格的 API，您可以使用相同的 API 进行微小调整来调用 LLM。

In [6]:
from openai import OpenAI

deepseek_client = OpenAI(
    api_key=api_key,
    base_url="https://api.siliconflow.cn/v1",  # DeepSeek API 的基地址
)

定义一个 embedding 模型，使用 `milvus_model` 来生成文本嵌入。我们以 `DefaultEmbeddingFunction` 模型为例，这是一个预训练的轻量级嵌入模型。

In [7]:
# from pymilvus import model as milvus_model

# embedding_model = milvus_model.DefaultEmbeddingFunction()

from pymilvus import model as milvus_model

# OpenAI国内代理 https://api.apiyi.com/token 
embedding_model = milvus_model.dense.OpenAIEmbeddingFunction(
    model_name='BAAI/bge-m3', # Specify the model name
    api_key=api_key, # Provide your OpenAI API key
    base_url='https://api.siliconflow.cn/v1',
    dimensions=512
)

  from .autonotebook import tqdm as notebook_tqdm


生成一个测试嵌入并打印其维度和前几个元素。

In [8]:
test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

1024
[-0.01914445  0.02160134 -0.0458619  -0.01061761 -0.01767996 -0.01904811
  0.03992683  0.03857796  0.0173042   0.00452116]


In [9]:
test_embedding_0 = embedding_model.encode_queries(["That is a test"])[0]
print(test_embedding_0[:10])

[-0.00315623  0.02990664 -0.0430564  -0.01652786 -0.01064959 -0.0133597
  0.04141507  0.03630021  0.01143209  0.00224968]


## 将数据加载到 Milvus

### 创建 Collection

In [10]:
from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")

collection_name = "my_rag_collection"

  from pkg_resources import DistributionNotFound, get_distribution


关于 `MilvusClient` 的参数：

*   将 `uri` 设置为本地文件，例如 `./milvus.db`，是最方便的方法，因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
*   如果您有大规模数据，可以在 Docker 或 Kubernetes 上设置性能更高的 Milvus 服务器。在此设置中，请使用服务器 URI，例如 `http://localhost:19530`，作为您的 `uri`。
*   如果您想使用 Zilliz Cloud（Milvus 的完全托管云服务），请调整 `uri` 和 `token`，它们对应 Zilliz Cloud 中的 Public Endpoint 和 Api key。

检查 collection 是否已存在，如果存在则删除它。

In [11]:
if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

创建一个具有指定参数的新 collection。

如果我们不指定任何字段信息，Milvus 将自动创建一个默认的 `id` 字段作为主键，以及一个 `vector` 字段来存储向量数据。一个保留的 JSON 字段用于存储非 schema 定义的字段及其值。

`metric_type` (距离度量类型):
     作用：定义如何计算向量之间的相似程度。
     例如：`IP` (内积) - 值越大通常越相似；`L2` (欧氏距离) - 值越小越相似；`COSINE` (余弦相似度) - 通常转换为距离，值越小越相似。
     选择依据：根据你的嵌入模型的特性和期望的相似性定义来选择。

 `consistency_level` (一致性级别):
     作用：定义数据写入后，读取操作能多快看到这些新数据。
     例如：
         `Strong` (强一致性): 总是读到最新数据，可能稍慢。
         `Bounded` (有界过期): 可能读到几秒内旧数据，性能较好 (默认)。
         `Session` (会话一致性): 自己写入的自己能立刻读到。
         `Eventually` (最终一致性): 最终会读到新数据，但没时间保证，性能最好。
     选择依据：在数据实时性要求和系统性能之间做权衡。

简单来说：
 `metric_type`：怎么算相似。
 `consistency_level`：新数据多久能被读到。

In [12]:
milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # 内积距离
    consistency_level="Strong",  # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level。
)

### 插入数据

遍历文本行，创建嵌入，然后将数据插入 Milvus。

这里有一个新字段 `text`，它是在 collection schema 中未定义的字段。它将自动添加到保留的 JSON 动态字段中，该字段在高级别上可以被视为普通字段。

In [13]:
from tqdm import tqdm

data = []

def chunked(list_, batch_size):
    for i in range(0, len(list_), batch_size):
        yield list_[i:i+batch_size]

batch_size = 64
all_embeddings = []
for chunk in chunked(text_lines, batch_size):
    emb = embedding_model.encode_documents(chunk)
    all_embeddings.extend(emb)

# doc_embeddings = embedding_model.encode_documents(text_lines_64)

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
    data.append({"id": i, "vector": all_embeddings[i], "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 901462.35it/s]


{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}

## 构建 RAG

### 检索查询数据

我们指定一个关于 Milvus 的常见问题。

In [14]:
question = "How is data stored in milvus?"

在 collection 中搜索该问题，并检索语义上最匹配的前3个结果。

In [15]:
search_res = milvus_client.search(
    collection_name=collection_name,
    data=embedding_model.encode_queries(
        [question]
    ),  # 将问题转换为嵌入向量
    limit=3,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回 text 字段
)

让我们看一下查询的搜索结果

In [16]:
import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))

[
    [
        " Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
        0.7449705600738525
    ],
    [
        "How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to t

### 使用 LLM 获取 RAG 响应

将检索到的文档转换为字符串格式。

In [17]:
context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

In [18]:
context

" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###\nHow does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the dis

In [19]:
question

'How is data stored in milvus?'

为语言模型定义系统和用户提示。此提示是使用从 Milvus 检索到的文档组装而成的。

In [20]:
SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
USER_PROMPT = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。
<context>
{context}
</context>
<question>
{question}
</question>
<translated>
</translated>
"""

In [21]:
USER_PROMPT

"\n请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。最后追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。\n<context>\n Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###\nHow does Milvus flush data?\n\nMilvus 

使用 DeepSeek 提供的 `deepseek-chat` 模型根据提示生成响应。

In [22]:
response = deepseek_client.chat.completions.create(
    model="deepseek-ai/DeepSeek-V3",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)

Milvus stores data in two main categories:  

1. **Inserted Data** (including vector data, scalar data, and collection-specific schema):  
   - Stored as incremental logs in persistent storage.  
   - Supports multiple object storage backends like MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud COS.  

2. **Metadata**:  
   - Generated internally by Milvus and stored in **etcd**.  

When inserting data:  
- Milvus first loads data into a message queue (in memory) before flushing it to disk.  
- `flush()` forces immediate persistence to storage.  

During queries:  
- Both **incremental data** (temporarily buffered in memory) and **historical data** (sealed segments in storage) are loaded into memory for search.  

<translated>  
Milvus以两种主要方式存储数据：  

1. **插入数据**（包括向量数据、标量数据和集合特定模式）：  
   - 以增量日志形式存储在持久化存储中。  
   - 支持多种对象存储后端，如MinIO、AWS S3、Google Cloud Storage (GCS)、Azure Blob Storage、阿里云OSS和腾讯云COS。  

2. **元数据**：  
   - 由Milvus内部生成，存储

# 民法典作业实现
## 数据载入

In [23]:
from glob import glob

mfd_text_lines = []

for file_path in glob("mfd.md", recursive=True):
    with open(file_path, "r") as file:
        file_text = file.read()

    mfd_text_lines += file_text.split("\n**")

In [24]:
len(mfd_text_lines)

388

In [25]:
mfd_collection_name = "mfd_rag_collection"

if milvus_client.has_collection(mfd_collection_name):
    milvus_client.drop_collection(mfd_collection_name)

milvus_client.create_collection(
    collection_name=mfd_collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # 内积距离
    consistency_level="Strong",  # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。更多详情请参见 https://milvus.io/docs/consistency.md#Consistency-Level。
)


In [26]:
milvus_client.list_collections()

['mfd_rag_collection', 'my_rag_collection']

In [27]:
from tqdm import tqdm

all_mfd_embeddings = []

def chunked(list_, batch_size):
    for i in range(0, len(list_), batch_size):
        yield list_[i:i+batch_size]

batch_size = 64

for chunk in chunked(mfd_text_lines, batch_size):
    emb = embedding_model.encode_documents(chunk)
    all_mfd_embeddings.extend(emb)

# doc_embeddings = embedding_model.encode_documents(text_lines_64)
mfd_data = []
for i, line in enumerate(tqdm(mfd_text_lines, desc="Creating mfd embeddings")):
    mfd_data.append({"id": i, "vector": all_mfd_embeddings[i], "text": line})

milvus_client.insert(collection_name=mfd_collection_name, data=mfd_data)

Creating mfd embeddings: 100%|██████████| 388/388 [00:00<00:00, 2324842.79it/s]


{'insert_count': 388, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 

## 检索查询数据

In [28]:
mfd_question = "根据中华人民共和国民法典物权编的有关规定，登记机构不得有哪些行为？"

In [29]:
mfd_search_res = milvus_client.search(
    collection_name=mfd_collection_name,
    data=embedding_model.encode_queries(
        [mfd_question]
    ),  # 将问题转换为嵌入向量
    limit=5,  # 返回前3个结果
    search_params={"metric_type": "IP", "params": {}},  # 内积距离
    output_fields=["text"],  # 返回 text 字段
)

In [30]:
import json

mfd_retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in mfd_search_res[0]
]
print(json.dumps(mfd_retrieved_lines_with_distances, indent=4))

[
    [
        "\u7b2c\u4e8c\u767e\u4e00\u5341\u4e09\u6761** \u767b\u8bb0\u673a\u6784\u4e0d\u5f97\u6709\u4e0b\u5217\u884c\u4e3a\uff1a\n\uff08\u4e00\uff09\u8981\u6c42\u5bf9\u4e0d\u52a8\u4ea7\u8fdb\u884c\u8bc4\u4f30\uff1b\n\uff08\u4e8c\uff09\u4ee5\u4e0d\u52a8\u4ea7\u767b\u8bb0\u4e3a\u6761\u4ef6\u6536\u53d6\u5176\u4ed6\u8d39\u7528\uff1b\n\uff08\u4e09\uff09\u8d85\u51fa\u767b\u8bb0\u804c\u8d23\u8303\u56f4\u7684\u5176\u4ed6\u884c\u4e3a\u3002\n",
        0.7167348861694336
    ],
    [
        "\u7b2c\u4e8c\u767e\u4e00\u5341\u4e8c\u6761** \u767b\u8bb0\u673a\u6784\u5e94\u5f53\u5c65\u884c\u4e0b\u5217\u804c\u8d23\uff1a\n\uff08\u4e00\uff09\u5ba1\u67e5\u7533\u8bf7\u4eba\u63d0\u4f9b\u7684\u6750\u6599\uff1b\n\uff08\u4e8c\uff09\u8be2\u95ee\u7533\u8bf7\u4eba\uff1b\n\uff08\u4e09\uff09\u5982\u5b9e\u3001\u53ca\u65f6\u767b\u8bb0\uff1b\n\uff08\u56db\uff09\u6cd5\u5f8b\u3001\u884c\u653f\u6cd5\u89c4\u89c4\u5b9a\u7684\u5176\u4ed6\u804c\u8d23\u3002\n\u7533\u8bf7\u767b\u8bb0\u7684\u4e0d\u52a8\u4ea7\u5b58\u5728\

## 调用LLM获取RAG响应

In [31]:
mfd_context = "\n".join(
    [line_with_distance[0] for line_with_distance in mfd_retrieved_lines_with_distances]
)

In [32]:
mfd_context

'第二百一十三条** 登记机构不得有下列行为：\n（一）要求对不动产进行评估；\n（二）以不动产登记为条件收取其他费用；\n（三）超出登记职责范围的其他行为。\n\n第二百一十二条** 登记机构应当履行下列职责：\n（一）审查申请人提供的材料；\n（二）询问申请人；\n（三）如实、及时登记；\n（四）法律、行政法规规定的其他职责。\n申请登记的不动产存在尚未解决的权属争议的，登记机构应当不予登记，并书面告知申请人。\n\n第二百二十四条** 船舶、航空器和机动车等的物权的设立、变更、转让和消灭，未经登记，不得对抗善意第三人。\n\n第四百三十六条** 设立质权，当事人应当依照法律规定办理登记。\n未经登记，不发生物权效力。\n\n第三百五十四条** 设立用益物权，当事人应当依照法律规定办理登记。\n未经登记，不发生物权效力。\n'

In [33]:
mfd_question

'根据中华人民共和国民法典物权编的有关规定，登记机构不得有哪些行为？'

In [34]:
MFD_SYSTEM_PROMPT = """
Human: 你是一个 AI 助手。你能够从提供的上下文段落片段中找到问题的答案。
"""
MFD_USER_PROMPT = f"""
请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。若最后的回答不是中文，追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。
<context>
{mfd_context}
</context>
<question>
{mfd_question}
</question>
<translated>
</translated>
"""

In [35]:
MFD_USER_PROMPT

'\n请使用以下用 <context> 标签括起来的信息片段来回答用 <question> 标签括起来的问题。若最后的回答不是中文，追加原始回答的中文翻译，并用 <translated>和</translated> 标签标注。\n<context>\n第二百一十三条** 登记机构不得有下列行为：\n（一）要求对不动产进行评估；\n（二）以不动产登记为条件收取其他费用；\n（三）超出登记职责范围的其他行为。\n\n第二百一十二条** 登记机构应当履行下列职责：\n（一）审查申请人提供的材料；\n（二）询问申请人；\n（三）如实、及时登记；\n（四）法律、行政法规规定的其他职责。\n申请登记的不动产存在尚未解决的权属争议的，登记机构应当不予登记，并书面告知申请人。\n\n第二百二十四条** 船舶、航空器和机动车等的物权的设立、变更、转让和消灭，未经登记，不得对抗善意第三人。\n\n第四百三十六条** 设立质权，当事人应当依照法律规定办理登记。\n未经登记，不发生物权效力。\n\n第三百五十四条** 设立用益物权，当事人应当依照法律规定办理登记。\n未经登记，不发生物权效力。\n\n</context>\n<question>\n根据中华人民共和国民法典物权编的有关规定，登记机构不得有哪些行为？\n</question>\n<translated>\n</translated>\n'

In [36]:
mfd_response = deepseek_client.chat.completions.create(
    model="deepseek-ai/DeepSeek-V3",
    messages=[
        {"role": "system", "content": MFD_SYSTEM_PROMPT},
        {"role": "user", "content": MFD_USER_PROMPT},
    ],
)
print(mfd_response.choices[0].message.content)

根据《中华人民共和国民法典》第二百一十三条的规定，登记机构不得有以下行为：

1. 要求对不动产进行评估；
2. 以不动产登记为条件收取其他费用；
3. 超出登记职责范围的其他行为。 

<translated>
According to Article 213 of the Civil Code of the People's Republic of China, registration authorities are prohibited from the following acts:
1. Requiring an appraisal of the real estate;
2. Collecting other fees as a condition for real estate registration;
3. Other acts beyond the scope of registration duties.
</translated>
