## 2.2 数据插入与管理 [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/richzw/milvus-workshop/blob/main/ch2/ch2_2.ipynb) 

在上一节中，我们学习了如何创建和管理 Collection。现在，我们将学习如何向这些 Collection 中插入数据，并对数据进行基本管理，如删除。

### 概念：Entity (实体)

在 Milvus 中，**Entity (实体)** 是数据的基本单元，它代表了存储在 Collection 中的单个对象或记录。

- **结构**: 每个实体都包含一组字段 (Fields)，这些字段的定义遵循其所属 Collection 的 Schema。
- **组成**: 一个实体至少包含一个主键字段 (Primary Key) 和一个或多个向量字段 (Vector Field)，通常还会包含其他标量字段 (Scalar Fields) 作为元数据或过滤条件。
- **类比**: 如果将 Collection 比作数据库中的“表”，那么一个实体就相当于表中的“一行数据”或 NoSQL 数据库中的一个“文档”。
- **唯一性**: 每个实体由其主键唯一标识。

### 概念：Partition (分区)

**Partition (分区)** 是 Collection 内部的一种可选的数据划分机制。它允许您将一个大的 Collection 分割成多个更小、更易于管理的部分。

- **目的**:
    - **提高搜索效率**: 在搜索时，可以指定在特定的一个或多个分区内进行搜索，从而减少搜索范围，加快查询速度。
    - **数据管理**: 可以对特定分区进行加载 (Load)、释放 (Release) 或删除 (Drop) 操作，方便对不同数据集进行生命周期管理。例如，按日期、类别等创建分区。
    - **数据隔离**: 不同分区的数据在物理上可以被组织得更紧凑。
- **特点**:
    - 每个 Collection 可以包含多个分区。
    - 每个分区都有一个唯一的名称。
    - 如果不创建分区，所有数据默认存放在一个名为 `_default` 的分区中。
    - 一个实体只能属于一个分区。
- **操作**: 可以创建分区、删除分区、列出分区、检查分区是否存在等。

### 实操：准备数据进行插入

在将数据插入 Milvus 之前，我们需要按照 Collection 的 Schema 准备好数据。数据通常是以 Python 列表的形式组织的，列表中的每个元素可以是一个字典 (推荐，更直观) 或一个元组 (字段顺序必须严格对应 Schema)。

我们将为之前在 Hands-on Exercise 1 中创建的 `book_search` Collection (或者如果它不存在，我们会重新创建它) 准备一些模拟数据。

`book_search` 的 Schema:
1.  `book_id`: `INT64`, 主键, 自动ID
2.  `book_title`: `VARCHAR`, max_length=512
3.  `publication_year`: `INT32`
4.  `book_embedding`: `FLOAT_VECTOR`, dim=768

In [None]:
!pip install numpy==1.26.4

In [1]:
# 导入必要的库
import random
import numpy as np
from pymilvus import MilvusClient, DataType, FieldSchema, CollectionSchema

MILVUS_URI = "http://localhost:19530"
client = MilvusClient(uri=MILVUS_URI)

# 定义 Collection 名称 
EXERCISE_COLLECTION_NAME = "book_search"

# 检查并重新创建 Collection
if client.has_collection(collection_name=EXERCISE_COLLECTION_NAME):
    print(f"发现已存在的 Collection '{EXERCISE_COLLECTION_NAME}', 将其删除。")
    client.drop_collection(collection_name=EXERCISE_COLLECTION_NAME)

field_book_id = FieldSchema(name="book_id", dtype=DataType.INT64, is_primary=True, auto_id=True)
field_book_title = FieldSchema(name="book_title", dtype=DataType.VARCHAR, max_length=512)
field_publication_year = FieldSchema(name="publication_year", dtype=DataType.INT32)
field_book_embedding = FieldSchema(name="book_embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
book_schema_def = CollectionSchema(
    fields=[field_book_id, field_book_title, field_publication_year, field_book_embedding],
    description="Collection for storing book information and embeddings (MilvusClient)",
    enable_dynamic_field=False
)
client.create_collection(
    collection_name=EXERCISE_COLLECTION_NAME,
    schema=book_schema_def,
    consistency_level="Strong"
)
print(f"Collection '{EXERCISE_COLLECTION_NAME}' 已创建。")


发现已存在的 Collection 'book_search', 将其删除。
Collection 'book_search' 已创建。


In [2]:
# 准备模拟数据
NUM_ENTITIES = 100
DIMENSION = 768 # 必须与 Collection Schema 中定义的维度一致

# 生成模拟数据
data_to_insert = []
for i in range(NUM_ENTITIES):
    entity = {
        "book_title": f"The Amazing Book Title {i+1}",
        "publication_year": random.randint(1980, 2023),
        "book_embedding": np.random.rand(DIMENSION).astype(np.float32).tolist() # 生成随机向量
    }
    data_to_insert.append(entity)

print(f"成功生成 {len(data_to_insert)} 条模拟数据。")
print("第一条数据示例:")
print(data_to_insert[0])

成功生成 100 条模拟数据。
第一条数据示例:
{'book_title': 'The Amazing Book Title 1', 'publication_year': 1984, 'book_embedding': [0.7314586043357849, 0.4912657141685486, 0.8054483532905579, 0.1915837675333023, 0.5356130003929138, 0.47961628437042236, 0.5584157705307007, 0.36631351709365845, 0.35549741983413696, 0.2609621584415436, 0.917695939540863, 0.7205129861831665, 0.629404604434967, 0.23459409177303314, 0.041139811277389526, 0.781905472278595, 0.1988876312971115, 0.28417158126831055, 0.6814543008804321, 0.20588260889053345, 0.8389808535575867, 0.655585765838623, 0.05121138319373131, 0.19016890227794647, 0.0843825712800026, 0.28124430775642395, 0.49163511395454407, 0.1423206776380539, 0.7293863892555237, 0.2718774676322937, 0.39181455969810486, 0.6644443273544312, 0.5318488478660583, 0.9179801940917969, 0.4088197946548462, 0.2085767537355423, 0.016186945140361786, 0.7079914212226868, 0.11504505574703217, 0.9050011038780212, 0.9991667866706848, 0.7568649053573608, 0.4680970013141632, 0.1977389454841

### 实操：创建和管理 Partition (可选)

虽然我们可以直接向 Collection 的默认分区 (`_default`) 插入数据，但这里我们先演示如何创建和管理分区。

In [3]:
PARTITION_NAME_FICTION = "fiction_books"
PARTITION_NAME_NON_FICTION = "non_fiction_books"

# 1. 创建分区
try:
    if not client.has_partition(collection_name=EXERCISE_COLLECTION_NAME, partition_name=PARTITION_NAME_FICTION):
        client.create_partition(collection_name=EXERCISE_COLLECTION_NAME, partition_name=PARTITION_NAME_FICTION)
        print(f"分区 '{PARTITION_NAME_FICTION}' 在 Collection '{EXERCISE_COLLECTION_NAME}' 中创建成功。")
    else:
        print(f"分区 '{PARTITION_NAME_FICTION}' 已存在。")

    if not client.has_partition(collection_name=EXERCISE_COLLECTION_NAME, partition_name=PARTITION_NAME_NON_FICTION):
        client.create_partition(collection_name=EXERCISE_COLLECTION_NAME, partition_name=PARTITION_NAME_NON_FICTION)
        print(f"分区 '{PARTITION_NAME_NON_FICTION}' 在 Collection '{EXERCISE_COLLECTION_NAME}' 中创建成功。")
    else:
        print(f"分区 '{PARTITION_NAME_NON_FICTION}' 已存在。")
except Exception as e:
    print(f"创建分区失败: {e}")

# 2. 列出所有分区
try:
    partitions = client.list_partitions(collection_name=EXERCISE_COLLECTION_NAME)
    print(f"\nCollection '{EXERCISE_COLLECTION_NAME}' 中的分区: {partitions}")
except Exception as e:
    print(f"列出分区失败: {e}")

# 3. 检查分区是否存在
try:
    has_fiction = client.has_partition(collection_name=EXERCISE_COLLECTION_NAME, partition_name=PARTITION_NAME_FICTION)
    print(f"分区 '{PARTITION_NAME_FICTION}' 是否存在: {has_fiction}")
    has_scifi = client.has_partition(collection_name=EXERCISE_COLLECTION_NAME, partition_name="sci_fi_books") # 一个不存在的分区
    print(f"分区 'sci_fi_books' 是否存在: {has_scifi}")
except Exception as e:
    print(f"检查分区是否存在失败: {e}")

# 4. 删除分区 (如果需要，通常在测试后清理)
# try:
#     if client.has_partition(collection_name=EXERCISE_COLLECTION_NAME, partition_name=PARTITION_NAME_NON_FICTION):
#         client.drop_partition(collection_name=EXERCISE_COLLECTION_NAME, partition_name=PARTITION_NAME_NON_FICTION)
#         print(f"分区 '{PARTITION_NAME_NON_FICTION}' 已删除。")
# except Exception as e:
#     print(f"删除分区失败: {e}")

分区 'fiction_books' 在 Collection 'book_search' 中创建成功。
分区 'non_fiction_books' 在 Collection 'book_search' 中创建成功。

Collection 'book_search' 中的分区: ['_default', 'fiction_books', 'non_fiction_books']
分区 'fiction_books' 是否存在: True
分区 'sci_fi_books' 是否存在: False


 ### 实操：将数据插入 Collection (或指定 Partition)

 使用 `client.insert()` 方法可以将数据插入到指定的 Collection 中。

 - **数据格式要求**:
     - `data`: 一个列表，其中每个元素代表一个实体。
         - 如果是字典列表 (推荐): `[{"field1": value1, "field2": value2, ...}, ...]`。字段名必须与 Schema 中的名称匹配。对于自动生成ID的主键，**不需要**在数据中提供该主键字段。
         - 如果是元组列表 (或列表的列表): `[(value_field1, value_field2, ...), ...]`。值的顺序必须严格按照 `CollectionSchema.fields` 中定义的顺序。同样，自动ID主键字段不应包含值。
 - **`partition_name` (可选)**: 如果要将数据插入特定分区，请指定此参数。如果省略，数据将插入到 `_default` 分区。
 - **返回值**: `insert()` 方法返回一个 `MutationResult` 对象，其中包含 `insert_count` (成功插入的实体数量) 和 `primary_keys` (新插入实体的主键列表，对于自动ID尤其重要)。

 - **批量插入的技巧**:
     - `client.insert()` 本身就支持批量插入 (传递一个包含多个实体的列表)。
     - 相比于单条循环插入，一次性插入一个较大的批次通常效率更高，因为可以减少网络通信开销。
     - Milvus 对一次插入的数据量有限制 (通常由 gRPC 消息大小限制，约为 32MB-64MB，具体取决于 Milvus 版本和配置)。如果数据量非常大，需要应用程序层面进行分批。PyMilvus 内部也会做一些批处理。

In [15]:
# 将前一半数据插入 'fiction_books' 分区，后一半插入默认分区
num_to_fiction = NUM_ENTITIES // 2
data_for_fiction = data_to_insert[:num_to_fiction]
data_for_default = data_to_insert[num_to_fiction:]

inserted_pks = [] # 用于存储所有插入数据的主键，方便后续删除操作

# 1. 插入到指定分区 'fiction_books'
try:
    print(f"\n准备向分区 '{PARTITION_NAME_FICTION}' 插入 {len(data_for_fiction)} 条数据...")
    res_fiction = client.insert(
        collection_name=EXERCISE_COLLECTION_NAME,
        data=data_for_fiction,
        partition_name=PARTITION_NAME_FICTION
    )
    print(f"成功向分区 '{PARTITION_NAME_FICTION}' 插入 {res_fiction['insert_count']} 条数据。")
    print(f"返回的主键 (前5个): {res_fiction['ids'][:5]}")
    inserted_pks.extend(res_fiction['ids'])
except Exception as e:
    print(f"向分区 '{PARTITION_NAME_FICTION}' 插入数据失败: {e}")

# 2. 插入到默认分区 (_default)
try:
    print(f"\n准备向默认分区插入 {len(data_for_default)} 条数据...")
    res_default = client.insert(
        collection_name=EXERCISE_COLLECTION_NAME,
        data=data_for_default
        # partition_name 省略，则插入到 _default
    )
    print(f"成功向默认分区插入 {res_default['insert_count']} 条数据。")
    print(f"返回的主键 (前5个): {res_default['ids'][:5]}")
    inserted_pks.extend(res_default['ids'])
except Exception as e:
    print(f"向默认分区插入数据失败: {e}")



准备向分区 'fiction_books' 插入 50 条数据...
成功向分区 'fiction_books' 插入 50 条数据。
返回的主键 (前5个): [457888763904009644, 457888763904009645, 457888763904009646, 457888763904009647, 457888763904009648]

准备向默认分区插入 50 条数据...
成功向默认分区插入 50 条数据。
返回的主键 (前5个): [457888763904009695, 457888763904009696, 457888763904009697, 457888763904009698, 457888763904009699]


 **重要：Flush 数据**

 插入数据后，这些数据首先位于内存缓冲区中。为了确保数据持久化到磁盘并能被后续操作（如索引构建、搜索）正确处理，需要执行 `flush()` 操作。
 `flush()` 会将内存中的数据段（growing segments）刷写（flush）到磁盘，形成持久化的数据段（sealed segments）。

 虽然 Milvus 有自动 flush 机制，但在进行重要操作（如构建索引、大量删除后希望立即看到 `num_entities` 变化、或确保数据完全持久化）之前，手动调用 `client.flush()` 是一个好习惯。

In [5]:
try:
    print(f"\n正在 Flush Collection '{EXERCISE_COLLECTION_NAME}'...")
    client.flush(collection_name=EXERCISE_COLLECTION_NAME) # Milvus 2.3+
    # 对于 PyMilvus < 2.3.2 (大致), 如果 client 是通过 MilvusClient 创建的，
    # flush 可能需要通过 utility.flush([EXERCISE_COLLECTION_NAME]) 或 Collection 对象
    # 但 MilvusClient 2.3+ 应该可以直接 client.flush()
    print("Flush 操作已请求。这可能需要一些时间来完成。")
    
    # 检查实体数量 (flush 后 num_entities 应该更新)
    # 注意：num_entities 的更新可能不是瞬时的，取决于 flush 完成和元数据同步
    # 多次查询或稍作等待可以观察到变化
    import time
    time.sleep(2) # 稍作等待，让 flush 和元数据同步有机会完成
    
    stats_after_insert = client.get_collection_stats(collection_name=EXERCISE_COLLECTION_NAME)
    print(f"Flush 后 Collection '{EXERCISE_COLLECTION_NAME}' 的统计信息: {stats_after_insert}")
    # 'row_count' 字段通常反映了实体数量
    current_num_entities = int(stats_after_insert.get('row_count', 0)) # Milvus 2.x
    # 或者 desc = client.describe_collection(...); current_num_entities = desc['num_entities']

    print(f"Collection '{EXERCISE_COLLECTION_NAME}' 当前实体数量: {current_num_entities}")
    if current_num_entities == NUM_ENTITIES:
        print("实体数量与预期一致！")
    else:
        print(f"实体数量与预期不符 (预期: {NUM_ENTITIES}, 实际: {current_num_entities})。Flush 可能仍在后台进行或存在其他问题。")

except Exception as e:
    print(f"Flush Collection 或获取统计信息失败: {e}")


正在 Flush Collection 'book_search'...
Flush 操作已请求。这可能需要一些时间来完成。
Flush 后 Collection 'book_search' 的统计信息: {'row_count': 100}
Collection 'book_search' 当前实体数量: 100
实体数量与预期一致！


 ### 实操：删除数据 (按 ID 或过滤条件)

 Milvus 支持基于主键 ID 或标量字段的过滤条件来删除实体。

 - **`client.delete(collection_name, pks, filter, partition_name)`**:
     - `pks`: 一个主键值列表，用于删除具有这些 ID 的实体。
     - `filter`: 一个字符串形式的布尔表达式，用于删除满足条件的实体 (例如: `"publication_year < 2000"` 或 `"book_title like 'The Amazing%'"`)。
     - `pks` 和 `filter` 至少提供一个。如果都提供，则删除满足任一条件的实体。
     - `partition_name` (可选): 将删除操作限制在指定分区内。
 - **逻辑删除与 Compaction**:
     - Milvus 中的删除操作是**逻辑删除 (Soft Delete)**。这意味着数据并没有立即从磁盘上物理移除，而是被标记为已删除。这些被标记的数据在搜索时会被过滤掉。
     - 物理删除和空间回收是通过后续的 **Compaction (压缩)** 操作完成的。Compaction 会合并数据段，移除已标记为删除的数据，并整理数据以优化存储和查询性能。
     - `client.compact(collection_name)`: 手动触发 Compaction。这是一个异步操作。
     - `client.get_compaction_state(compaction_id)`: 查询 Compaction 状态。
     - `client.wait_for_compaction_completed(compaction_id)`: 等待 Compaction 完成。
 - **删除后 `num_entities` 的变化**:
     - 逻辑删除后，立即查询 `num_entities` 可能不会立即反映删除。
     - 执行 `flush()` 后，`num_entities` 通常会更新以反映逻辑删除的数量。
     - 只有在 Compaction 完成后，已删除数据占用的磁盘空间才会被真正回收。

In [22]:
# 在加载之前，需要先为向量字段创建索引 (后续详细介绍索引相关内容)
index_params = MilvusClient.prepare_index_params()

index_params.add_index(
    field_name="book_embedding",
    metric_type="COSINE",
    index_type="IVF_FLAT",
    index_name="vector_index",
    params={ "nlist": 128 }
)

client.create_index(
    collection_name=EXERCISE_COLLECTION_NAME,
    index_params=index_params,
    sync=False 
)

In [23]:
# 准备要删除的数据
pks_to_delete = []
if len(inserted_pks) >= 5:
    pks_to_delete = inserted_pks[:3] # 删除前3条插入的数据 (通过ID)
    print(f"准备通过 ID 删除以下主键: {pks_to_delete}")
else:
    print("没有足够的主键用于演示按 ID 删除。")

filter_expr_delete = "publication_year < 1990" # 删除1990年以前出版的书籍
print(f"准备通过过滤条件删除: '{filter_expr_delete}'")

# 0. 确保 Collection 已加载 
try:
    print(f"\n确保 Collection '{EXERCISE_COLLECTION_NAME}' 已加载...")
    # 检查加载状态 (可选，但有助于调试)
    load_state_before_delete = client.get_load_state(collection_name=EXERCISE_COLLECTION_NAME)
    print(f"删除操作前，Collection 加载状态: {load_state_before_delete}")
    
    client.load_collection(collection_name=EXERCISE_COLLECTION_NAME)
    print(f"Collection '{EXERCISE_COLLECTION_NAME}' 加载指令已发送/确认。")
    # MilvusClient.load_collection() 是阻塞的，直到加载完成（或超时）
    # 对于小型或空集合，它会很快返回
except Exception as e:
    print(f"加载 Collection '{EXERCISE_COLLECTION_NAME}' 失败: {e}")
    raise

# 1. 按 ID 删除
if pks_to_delete:
    try:
        print(f"\n正在通过 ID 删除数据...")
        del_res_ids = client.delete(
            collection_name=EXERCISE_COLLECTION_NAME,
            pks=pks_to_delete
        )
        print(f"通过 ID 删除操作完成。删除计数: {del_res_ids['delete_count']}") # delete_count 是匹配到的数量
    except Exception as e:
        print(f"通过 ID 删除数据失败: {e}")

# 2. 按过滤条件删除
try:
    print(f"\n正在通过过滤条件 '{filter_expr_delete}' 删除数据...")
    # 先查询一下有多少符合条件的，以便对比
    query_before_delete_count = client.query(collection_name=EXERCISE_COLLECTION_NAME, 
                                             filter=filter_expr_delete, 
                                             output_fields=["book_id"])
    print(f"删除前，符合条件 '{filter_expr_delete}' 的实体数量: {len(query_before_delete_count)}")

    del_res_filter = client.delete(
        collection_name=EXERCISE_COLLECTION_NAME,
        filter=filter_expr_delete
        # partition_name="fiction_books" # 也可以指定分区
    )
    print(f"通过过滤条件删除操作完成。删除计数: {del_res_filter['delete_count']}")
except Exception as e:
    print(f"通过过滤条件删除数据失败: {e}")

# 3. Flush Collection 以使删除生效 (更新 num_entities)
try:
    print(f"\n删除操作后，正在 Flush Collection '{EXERCISE_COLLECTION_NAME}'...")
    client.flush(collection_name=EXERCISE_COLLECTION_NAME)
    print("Flush 操作已请求。")
    
    time.sleep(2) # 等待
    stats_after_delete = client.get_collection_stats(collection_name=EXERCISE_COLLECTION_NAME)
    current_num_entities_after_delete = int(stats_after_delete.get('row_count', 0))
    print(f"Flush 后 Collection '{EXERCISE_COLLECTION_NAME}' 当前实体数量: {current_num_entities_after_delete}")
except Exception as e:
    print(f"Flush Collection 或获取统计信息失败: {e}")

# 4. (可选演示) 手动触发 Compaction
# Compaction 可能需要较长时间，在 workshop 中可能只演示触发，不等待完成
try:
    print(f"\n手动触发 Collection '{EXERCISE_COLLECTION_NAME}' 的 Compaction...")
    compaction_id = client.compact(collection_name=EXERCISE_COLLECTION_NAME)
    print(f"Compaction 已触发，Compaction ID: {compaction_id}") 
    # print(f"Compaction ID: {compaction_id}") # Older versions might return ID directly

    # 检查 Compaction 状态 (通常需要轮询)
    # state = client.get_compaction_state(compaction_id=compaction_id.compaction_id)
    # print(f"Compaction 状态: {state}")
    
    # 如果想等待完成 (可能耗时较长):
    # client.wait_for_compaction_completed(compaction_id=compaction_id.compaction_id)
    # print("Compaction 已完成。")
    # stats_after_compaction = client.get_collection_stats(collection_name=EXERCISE_COLLECTION_NAME)
    # print(f"Compaction 完成后 Collection 统计: {stats_after_compaction}")

except Exception as e:
    print(f"Compaction 操作失败: {e}")

准备通过 ID 删除以下主键: [457888763904009644, 457888763904009645, 457888763904009646]
准备通过过滤条件删除: 'publication_year < 1990'

确保 Collection 'book_search' 已加载...
删除操作前，Collection 加载状态: {'state': <LoadState: Loaded>}
Collection 'book_search' 加载指令已发送/确认。

正在通过 ID 删除数据...
通过 ID 删除操作完成。删除计数: 3

正在通过过滤条件 'publication_year < 1990' 删除数据...
删除前，符合条件 'publication_year < 1990' 的实体数量: 0
通过过滤条件删除操作完成。删除计数: 0

删除操作后，正在 Flush Collection 'book_search'...
Flush 操作已请求。
Flush 后 Collection 'book_search' 当前实体数量: 600

手动触发 Collection 'book_search' 的 Compaction...
Compaction 已触发，Compaction ID: 457888763904754600


 ### Hands-on Exercise 2: 插入与删除数据

 **任务**:
 1.  为 `book_search` Collection 生成一批新的模拟数据 (例如，50条)。
 2.  将这批新数据插入到 `book_search` 的 `_default` 分区。
 3.  记录插入后返回的主键。
 4.  Flush Collection。
 5.  验证 `num_entities` 是否增加了对应数量。
 6.  从插入的新数据中，随机选择 5 个主键，并使用这些主键删除对应的实体。
 7.  定义一个过滤条件 (例如: `publication_year > 2010`)，并删除所有满足此条件的实体。
 8.  再次 Flush Collection。
 9.  验证 `num_entities` 是否相应减少。
 10. (可选) 删除用于练习的分区 `fiction_books` 和 `non_fiction_books` (如果之前创建了)。