验证向量数据库是否支持存储稠密向量并且基于稠密向量进行距离计算和近似检索。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "dense_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim)
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors = np.random.randn(100, dim).astype(np.float32)
    collection.insert([list(range(100)), vectors.tolist()])
    logging.info(f"Inserted {len(vectors)} vectors")

    # 5. Verify Query
    logging.info("Running query...")
    res = collection.query(expr="id == 0", output_fields=["vector"])
    logging.info("Query results:")
    logging.info(f"Query returned {len(res)} results:")
    for idx, result in enumerate(res):
        logging.info(f"Result {idx}: ID={result['id']}, Vector={result['vector'][:5]}...")  # Show first 5 elements

    # 6. Verify Search
    logging.info("Running search...")
    search_result = collection.search(
        vectors[:1].tolist(),
        "vector",
        {"metric_type": metric_type, "params": {"nprobe": 10}},
        limit=3,
        output_fields=["vector"]
    )
    
    logging.info("Search results:")
    logging.info(f"Search returned {len(search_result[0])} results:")
    for idx, hit in enumerate(search_result[0]):
        logging.info(f"Rank {idx+1}: ID={hit.id}, Distance={hit.distance:.4f}")
        logging.info(f"Vector: {hit.entity.fields['vector'][:5]}...")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("dense_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持存储稀疏向量并且基于稀疏向量进行距离计算和近似检索。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np
from scipy.sparse import random as sparse_random

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    utility.drop_collection("sparse_test")
    # 2. Create dense vector collection
    logging.info("Creating collection...")
    metric_type = "IP"  
    collection = Collection(
        "sparse_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.SPARSE_FLOAT_VECTOR)
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": metric_type, "params": {"nlist": 16, "inverted_index_algo": "DAAT_MAXSCORE"}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    sparse_vectors = [
        {"id": 0, "vector": {1: 0.5, 100: 0.3, 500: 0.8}},
        {"id": 1, "vector": {10: 0.1, 200: 0.7, 500: 0.9}},
    ]
    
    collection.insert(sparse_vectors)
    logging.info(f"Inserted {len(sparse_vectors)} vectors")

    # 5. Verify Query
    logging.info("Running query...")
    res = collection.query(expr="id == 0", output_fields=["vector"])
    logging.info("Query results:")
    logging.info(f"Query returned {len(res)} results:")
    for idx, result in enumerate(res):
        logging.info(f"Result {idx}: ID={result['id']}, Vector={result['vector']}") 

    # 6. Verify Search
    logging.info("Running search...")
    search_result = collection.search(
        [sparse_vectors[1]["vector"]],
        "vector",
        {"metric_type": metric_type, "params": {"drop_ratio_search": 0.2}},
        limit=3,
        output_fields=["vector"]
    )
    
    logging.info("Search results:")
    logging.info(f"Search returned {len(search_result[0])} results:")
    for idx, hit in enumerate(search_result[0]):
       logging.info(f"Result {idx}: ID={result['id']}, Vector={result['vector']}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("sparse_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持2~4096维度的稠密向量；是否支持2~4096维度的索引创建和查询。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np
from scipy.sparse import random as sparse_random

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")
    
    
    # Test multiple dimensions
    dimensions = [2, 64, 256, 1024, 4096]
    metric_type = "L2"
    
    for dim in dimensions:
        logging.info(f"{'='*30} Testing Dimension: {dim} {'='*30}")

        # Create dense vector collection
        logging.info("Creating collection...")
        dim = 768
        metric_type = "L2"  # or "IP"
        collection = Collection(
            "dim_test",
            CollectionSchema([
                FieldSchema("id", DataType.INT64, is_primary=True),
                FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim)
            ]),
            consistency_level="Strong" 
        )
        logging.info(f"Collection created: {collection.name}")

        # Create index and load
        logging.info("Creating index...")
        collection.create_index(
            "vector",
            {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
        )
        collection.load()
        logging.info("Index created and collection loaded")

        # Insert dense vectors
        logging.info("Generating and inserting vectors...")
        vectors = np.random.randn(100, dim).astype(np.float32)
        collection.insert([list(range(100)), vectors.tolist()])
        logging.info(f"Inserted {len(vectors)} vectors")

        # Verify Query
        logging.info("Running query...")
        res = collection.query(expr="id == 0", output_fields=["vector"])
        logging.info("Query results:")
        logging.info(f"Query returned {len(res)} results:")
        for idx, result in enumerate(res):
            logging.info(f"Result {idx}: ID={result['id']}, Vector={result['vector'][:5]}...")  # Show first 5 elements

        # Verify Search
        logging.info("Running search...")
        search_result = collection.search(
            vectors[:1].tolist(),
            "vector",
            {"metric_type": metric_type, "params": {"nprobe": 10}},
            limit=3,
            output_fields=["vector"]
        )
        
        logging.info("Search results:")
        logging.info(f"Search returned {len(search_result[0])} results:")
        for idx, hit in enumerate(search_result[0]):
            logging.info(f"Rank {idx+1}: ID={hit.id}, Distance={hit.distance:.4f}")
            logging.info(f"Vector: {hit.entity.fields['vector'][:5]}...")

        # Cleanup
        logging.info("Cleaning up...")
        utility.drop_collection("dim_test")
        logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持字符串、浮点数、整数、布尔值、时间戳等基础标量数值类型。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "scalar_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("str_field", DataType.VARCHAR, max_length=200),
            FieldSchema("float_field", DataType.FLOAT),
            FieldSchema("int_field", DataType.INT32),
            FieldSchema("bool_field", DataType.BOOL),
            FieldSchema("ts_field", DataType.INT64)  # Timestamp stored as INT64
        ]),
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors = np.random.randn(100, dim).astype(np.float32)
    data = [
        list(range(100)),  # IDs
        vectors,  # Vectors
        [f"str_{i}" + "a"*(i%10) for i in range(100)],  # VARCHAR
        np.random.rand(100).tolist(),  # FLOAT values
        list(np.random.randint(0, 10000, 100)),  # INT32
        [bool(i%2) for i in range(100)],  # BOOL
        [int(1672531200 + i*3600) for i in range(100)]  # INT64 timestamps (hourly)
    ]
    collection.insert(data)
    logging.info(f"Inserted {len(vectors)} vectors")

    # 5. Verify Query
    logging.info("Querying with scalar conditions...")
    res = collection.query(
        # expr="id == 0",
        # expr="bool_field == True",
        expr="float_field > 0.5 and bool_field == True",
        # expr="str_field like 'str_1%' and float_field > 0.5 and bool_field == True",
        output_fields=["*"]
    )
    logging.info("Scalar query results:")
    for r in res[:3]:  # logging.info first 3 results
        logging.info(f"ID:{r['id']} | str:{r['str_field']} | float:{r['float_field']:.2f} | "
              f"int:{r['int_field']} | bool:{r['bool_field']} | ts:{r['ts_field']}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("scalar_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持浮点数向量数据类型。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "float_vector_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim)
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors = np.random.randn(100, dim).astype(np.float32)
    collection.insert([list(range(100)), vectors.tolist()])
    logging.info(f"Inserted {len(vectors)} vectors")

    # 5. Verify Query
    logging.info("Running query...")
    res = collection.query(expr="id == 0", output_fields=["vector"])
    logging.info("Query results:")
    logging.info(f"Query returned {len(res)} results:")
    for idx, result in enumerate(res):
        logging.info(f"Result {idx}: ID={result['id']}, Vector={result['vector'][:5]}...")  # Show first 5 elements

    # 6. Verify Search
    logging.info("Running search...")
    search_result = collection.search(
        vectors[:1].tolist(),
        "vector",
        {"metric_type": metric_type, "params": {"nprobe": 10}},
        limit=3,
        output_fields=["vector"]
    )
    
    logging.info("Search results:")
    logging.info(f"Search returned {len(search_result[0])} results:")
    for idx, hit in enumerate(search_result[0]):
        logging.info(f"Rank {idx+1}: ID={hit.id}, Distance={hit.distance:.4f}")
        logging.info(f"Vector: {hit.entity.fields['vector'][:5]}...")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("float_vector_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持JSON等至少一种半结构化数据类型。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    utility.drop_collection("json_test")
    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "json_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("metadata", DataType.JSON)
        ]),
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors = np.random.randn(100, dim).astype(np.float32)
    data = [
        list(range(100)),
        vectors,
        [{
            "title": f"doc_{i}",
            "tags": ["tag1", "tag2"] if i%2 else ["tag3"],
            "stats": {"views": i*10, "rating": round(np.random.uniform(1, 5), 1)}
        } for i in range(100)]
    ]
    collection.insert(data)

    logging.info(f"Inserted {len(vectors)} vectors")

    # 5. Verify JSON query
    logging.info("Querying JSON data...")
    res = collection.query(
        expr="metadata['title'] == 'doc_0' and metadata['stats']['rating'] > 0",
        output_fields=["metadata", "id"]
    )
    logging.info("JSON query results:")
    for r in res:
        logging.info(f"ID:{r['id']} | Metadata:{r['metadata']}")

    # 6. Verify JSON in search results
    search_result = collection.search(
        data[1][:1],  # Use first vector
        "vector",
        param={"metric_type": "L2", "params": {"nprobe": 10}},
        limit=3,
        output_fields=["metadata"]
    )
    logging.info("JSON in search results:")
    for hit in search_result[0]:
        logging.info(f"ID:{hit.id} | Metadata:{hit.entity.fields['metadata']}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("json_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否能在单行数据中支持多个向量字段

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "multi_vector_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector1", DataType.FLOAT_VECTOR, dim=256),
            FieldSchema("vector2", DataType.FLOAT_VECTOR, dim=128),
            FieldSchema("metadata", DataType.JSON)
        ]),
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    index_params = [
        ("vector1", {"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}),
        ("vector2", {"index_type": "IVF_FLAT", "metric_type": "IP", "params": {"nlist": 64}})
    ]
    
    for field, params in index_params:
        collection.create_index(field, params)
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors1 = np.random.randn(100, 256).astype(np.float32)
    vectors2 = np.random.randn(100, 128).astype(np.float32)
    data = [
        list(range(100)),
        np.random.randn(100, 256).tolist(),  # Vector1
        np.random.randn(100, 128).tolist(),   # Vector2
        [{"description": f"item_{i}"} for i in range(100)]
    ]
    collection.insert(data)

    logging.info(f"Inserted {len(vectors1)} vectors")

    # 5. Verify query returns both vectors
    res = collection.query(
        expr="id == 0",
        output_fields=["vector1", "vector2"]
    )
    logging.info("Multi-vector query results:")
    for r in res:
        logging.info(f"ID:{r['id']} | Vector1 Head:{r['vector1'][:5]} | Vector2 Head:{r['vector2'][:5]}")

    # 6. Verify search on both vector fields
    for vec_field in ["vector1", "vector2"]:
        logging.info(f"Searching with {vec_field}...")
        search_result = collection.search(
            data[1 if vec_field == "vector1" else 2][:1],  # Get corresponding vector
            vec_field,
            param={"metric_type": "L2" if vec_field == "vector1" else "IP", "params": {"nprobe": 10}},
            limit=3,
            output_fields=["metadata"]
        )
        logging.info(f"{vec_field} search results:")
        for hit in search_result[0]:
            logging.info(f"ID:{hit.id} | Distance:{hit.distance:.4f} | Metadata:{hit.entity.fields['metadata']}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("multi_vector_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持向量数据的压缩能力，减少存储空间开销，压缩算法如PQ、float2、zstd等。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")
    
    if utility.has_collection("multi_vector_test"):
        utility.drop_collection("multi_vector_test")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "multi_vector_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector1", DataType.FLOAT16_VECTOR, dim=256), #float16
            FieldSchema("vector2", DataType.FLOAT_VECTOR, dim=128),
            FieldSchema("metadata", DataType.JSON)
        ]),
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    index_params = [
        ("vector1", {"index_type": "IVF_PQ", "metric_type": "L2", "params": {"m": 4}}),
        ("vector2", {"index_type": "HNSW_PQ", "metric_type": "L2", "params": {"M": 64, "efConstruction": 128}})
    ]
    
    for field, params in index_params:
        collection.create_index(field, params)
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors (modified for float16)
    logging.info("Generating and inserting vectors...")
    raw_vectors = []
    fp16_vectors = []
    # float16, little endian
    fp16_little = np.dtype('e').newbyteorder('<')
    for _ in range(100):
        raw_vector = [np.random.random() for _ in range(256)]
        raw_vectors.append(raw_vector)
        fp16_vector = np.array(raw_vector, dtype=fp16_little)
        fp16_vectors.append(fp16_vector)
    vectors2 = np.random.randn(100, 128).astype(np.float32)
    
    data = [
        list(range(100)),
        fp16_vectors,  # Use converted bytes for float16
        vectors2.tolist(),
        [{"description": f"item_{i}"} for i in range(100)]
    ]
    collection.insert(data)

    # 6. Verify search on both vector fields (modified for float16)
    for vec_field in ["vector1", "vector2"]:
        logging.info(f"Searching with {vec_field}...")
        search_vector = fp16_vectors[0] if vec_field == "vector1" else vectors2[0].tolist()
        
        search_result = collection.search(
            [search_vector],
            vec_field,
            param={"metric_type": "L2", "params": {"nprobe": 10}},
            limit=3,
            output_fields=["metadata", "vector1", "vector2"]
        )
        
        logging.info(f"{vec_field} search results:")
        for r in search_result[0]:
            logging.info(f"ID:{r.id} | Vector1 Head:{np.frombuffer(r.entity.fields['vector1'], dtype=fp16_little)[:5]} | Vector2 Head:{r.entity.fields['vector2'][:5]}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("multi_vector_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持至少一种近似最近邻检索算法（ANNS），如HNSW、IVF等。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    utility.drop_collection("multi_index_test")
    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "multi_index_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("ivf_flat", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("ivf_pq", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("hnsw", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("diskann", DataType.FLOAT_VECTOR, dim=dim)
        ]),
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    index_configs = [
        ("ivf_flat", {
            "index_type": "IVF_FLAT",
            "metric_type": "L2",
            "params": {"nlist": 1024}
        }),
        ("ivf_pq", {
            "index_type": "IVF_PQ",
            "metric_type": "L2",
            "params": {"nlist": 512, "m": 16, "nbits": 8}
        }),
        ("hnsw", {
            "index_type": "HNSW",
            "metric_type": "L2",
            "params": {"M": 24, "efConstruction": 200}
        }),
        ("diskann", {
            "index_type": "DISKANN",
            "metric_type": "L2",
            "params": {"search_cache_size": 2, "build_cache_size": 4}
        })
    ]
    for field, config in index_configs:
        collection.create_index(field, config)
    collection.load()

    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors1 = np.random.randn(100, dim).astype(np.float32)
    vectors2 = np.random.randn(100, dim).astype(np.float32)
    vectors3 = np.random.randn(100, dim).astype(np.float32)
    vectors4 = np.random.randn(100, dim).astype(np.float32)
    data = [
        list(range(100)),
        vectors1,  # ivf_flat
        vectors2,   # ivf_pq
        vectors3,   # hnsw
        vectors4    # diskann
    ]
    collection.insert(data)

    logging.info(f"Inserted {len(vectors1)} vectors")

    # 5. Verify search for each index type
    search_params = {
        "ivf_flat": {"nprobe": 32},
        "ivf_pq": {"nprobe": 64},
        "hnsw": {"ef": 128},
        "diskann": {"search_list": 100}
    }

    for field in ["ivf_flat", "ivf_pq", "hnsw", "diskann"]:
        logging.info(f"Testing {field.upper()} search...")
        results = collection.search(
            data[1][:1] if field == "ivf_flat" else 
            data[2][:1] if field == "ivf_pq" else
            data[3][:1] if field == "hnsw" else
            data[4][:1],
            field,
            param={"metric_type": "L2", "params": search_params[field]},
            limit=5,
            output_fields=["id"]
        )
        logging.info(f"{field} Top 3 results:")
        for hit in results[0][:3]:
            logging.info(f"ID: {hit.id} | Distance: {hit.distance:.4f}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("multi_index_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持向量的精确检索。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "exact_search_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim)
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "FLAT", "metric_type": metric_type} 
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors = np.random.randn(100, dim).astype(np.float32)
    collection.insert([list(range(100)), vectors.tolist()])
    logging.info(f"Inserted {len(vectors)} vectors")

    # 6. Verify exact search
    logging.info("Running exact search...")
    search_result = collection.search(
        vectors[:1].tolist(),
        "vector",
        {"metric_type": metric_type, "params": {}},  # Empty params for exact search
        limit=100,  # Return all for verification
        output_fields=["vector"]
    )
    
    # Verify all results are returned
    logging.info(f"Exact search returned {len(search_result[0])} results")
    logging.info("Top 3 matches:")
    for hit in search_result[0][:3]:
        logging.info(f"ID: {hit.id} | Distance: {hit.distance:.4f}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("exact_search_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持向量索引（包括近似索引和精确索引）和标量过滤同时检索，标量过滤应支持常用的运算符，包含比较运算符（=,<>,>,>=,<,<=）和逻辑运算符（与、或、非）等。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "hybrid_search_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("str_field", DataType.VARCHAR, max_length=200),
            FieldSchema("float_field", DataType.FLOAT),
            FieldSchema("int_field", DataType.INT32),
            FieldSchema("bool_field", DataType.BOOL),
            FieldSchema("ts_field", DataType.INT64)  # Timestamp stored as INT64
        ]),
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors = np.random.randn(100, dim).astype(np.float32)
    data = [
        list(range(100)),  # IDs
        vectors,  # Vectors
        [f"str_{i}" + "a"*(i%10) for i in range(100)],  # VARCHAR
        np.random.rand(100).tolist(),  # FLOAT values
        list(np.random.randint(0, 10000, 100)),  # INT32
        [bool(i%2) for i in range(100)],  # BOOL
        [int(1672531200 + i*3600) for i in range(100)]  # INT64 timestamps (hourly)
    ]
    collection.insert(data)
    logging.info(f"Inserted {len(vectors)} vectors")

    # 5. Enhanced query validation with various operators
    logging.info("Testing scalar filtering operators...")
    
    # Test cases with different operators
    test_cases = [
        ("Basic comparison", "int_field > 5000 and float_field <= 0.8"),
        ("Equality check", "str_field == 'str_5aaaaa' or bool_field == false"),
        ("Range filter", "ts_field >= 1672531200 and ts_field < 1672617600"),
        ("Negation", "not (int_field in [100, 200, 300])"),
        ("Complex logic", "(0.3 <= float_field <= 0.7) and (str_field like 'str_2%')")
    ]
    
    for desc, expr in test_cases:
        logging.info(f"Testing: {desc}")
        res = collection.query(
            expr=expr,
            output_fields=["*"]
        )
        logging.info(f"Expression: {expr}")
        logging.info(f"Matched {len(res)} records")
        if res:
            logging.info(f"First result - ID:{res[0]['id']} int:{res[0]['int_field']} float:{res[0]['float_field']:.2f}")

    # 6. Validate vector search with scalar filtering
    logging.info("Testing combined vector search and scalar filtering...")
    search_result = collection.search(
        vectors[:1],  # Use first vector
        "vector",
        param={"metric_type": "L2", "params": {"nprobe": 10}},
        expr="int_field < 5000 and bool_field == true",
        limit=5,
        output_fields=["*"]
    )
    
    logging.info("Vector search with scalar filter results:")
    for hit in search_result[0]:
        logging.info(f"ID:{hit.id} | Distance:{hit.distance:.4f} | int:{hit.entity.fields['int_field']} | bool:{hit.entity.fields['bool_field']}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("hybrid_search_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持对标量数据增删改查操作的原子性。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 64
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "insert_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("price", DataType.INT64)
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")


    # 1. Test atomic insert
    try:
        insert_ids = [10001]
        insert_vectors = [np.random.randn(dim).tolist()]
        insert_prices = [999]
        collection.insert([insert_ids, insert_vectors, insert_prices])
        res = collection.query(expr="id == 10001", output_fields=["price"])
        assert res[0]["price"] == 999, "Insert atomicity failed"
        logging.info("Atomic insert validated")
    except Exception as e:
        logging.error(f"Insert atomicity test failed: {str(e)}")

    # 2. Test atomic update
    try:
        original_price = collection.query(expr="id == 10001", output_fields=["price"])[0]["price"]
        
        # Update price and vector simultaneously
        collection.upsert(
            data=[{
                "id": 10001,
                "vector": np.random.randn(dim).tolist(),
                "price": original_price + 100
            }]
        )
        
        updated = collection.query(expr="id == 10001", output_fields=["price", "vector"])
        assert updated[0]["price"] == original_price + 100, "Update atomicity failed"
        logging.info("Atomic update validated")
    except Exception as e:
        logging.error(f"Update atomicity test failed: {str(e)}")

    # 3. Test atomic delete
    try:
        # Delete record and check both scalar/vector removal
        collection.delete(expr="id == 10001")
        res = collection.query(expr="id == 10001")
        assert len(res) == 0, "Delete atomicity failed"
        logging.info("Atomic delete validated")
    except Exception as e:
        logging.error(f"Delete atomicity test failed: {str(e)}")

    # 4. Test failed operation rollback
    try:
        # Attempt invalid upsert
        collection.upsert(data=[{"id": 10001, "price": 100}])
    except Exception as e:
        logging.info(f"Expected error occurred: {str(e)}")
        res = collection.query(expr="id == 10001")
        assert len(res) == 0, "Failed operation should rollback"
        logging.info("Operation rollback validated")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("insert_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持对向量数据增删改查操作的原子性。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 64
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "insert_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim),
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")


    # Insert atomicity test data
    test_id = 9999
    original_vector = [0.1] * dim
    updated_vector = [0.9] * dim

    # 1. Test atomic insert
    try:
        # Insert vector with metadata
        collection.insert([[test_id], [original_vector]])
        
        # Immediate query verification
        res = collection.query(expr=f"id == {test_id}", output_fields=["vector"])
        assert res[0]["vector"] == original_vector, "Inserted vector mismatch"
        logging.info("Atomic insert validated")
    except Exception as e:
        logging.error(f"Insert atomicity failed: {str(e)}")

    # 2. Test atomic update 
    try:
        # Update vector content
        collection.upsert([[test_id], [updated_vector]])
        
        # Verify complete replacement
        res = collection.query(expr=f"id == {test_id}", output_fields=["vector"])
        assert res[0]["vector"] == updated_vector, "Vector update incomplete"
        logging.info("Atomic update validated")
    except Exception as e:
        logging.error(f"Update atomicity failed: {str(e)}")

    # 3. Test atomic delete
    try:
        # Delete and verify removal
        collection.delete(expr=f"id == {test_id}")
        res = collection.query(expr=f"id == {test_id}")
        assert len(res) == 0, "Vector still exists after deletion"
        logging.info("Atomic delete validated")
    except Exception as e:
        logging.error(f"Delete atomicity failed: {str(e)}")

    # 4. Test bulk operation atomicity
    try:
        # Batch insert with invalid vector (should rollback)
        invalid_data = [
            [10001, 10002],
            [np.random.rand(128).tolist()],  # 错误的维度
            [{"status": "active"}, {"status": "pending"}]
        ]
        collection.insert(invalid_data)
    except Exception as e:
        logging.info(f"Expected error occurred: {str(e)}")
        res = collection.query(expr="id in [10001, 10002]")
        assert len(res) == 0, "Partial insert detected"
        logging.info("Bulk operation rollback validated")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("insert_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持根据主键查询对应的标量和向量数据。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "pk_query_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("title", DataType.VARCHAR, max_length=200),
            FieldSchema("price", DataType.DOUBLE)
        ]),
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "FLAT", "metric_type": metric_type} 
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors = np.random.randn(100, dim).astype(np.float32)
    data = [
        list(range(100)),  # IDs
        vectors.tolist(),  # Vectors
        [f"Product_{i}" for i in range(100)],  # Titles
        np.random.uniform(1, 1000, 100).tolist()  # Prices
    ]
    collection.insert(data)

    logging.info(f"Inserted {len(vectors)} vectors")

    # 5. Verify primary key query
    logging.info("Testing primary key lookup...")
    test_ids = [0, 50, 99]
    
    for pk in test_ids:
        result = collection.query(
            expr=f"id == {pk}",
            output_fields=["*"]  # Retrieve all fields
        )
        if result:
            item = result[0]
            logging.info(f"ID {pk} details: Title: {item['title']} | Price: ${item['price']:.2f} | Vector: {item['vector'][:5]}...")
        else:
            logging.warning(f"No result found for ID {pk}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("pk_query_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持按单条与批量导入方式。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "insert_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim)
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    # Batch insert (100 vectors)
    batch_ids = list(range(100))
    batch_vectors = np.random.randn(100, dim).astype(np.float32).tolist()
    collection.insert([batch_ids, batch_vectors])
    logging.info("Inserted 100 vectors in batch")

    # Single insert
    single_id = [100]
    single_vector = [np.random.randn(dim).astype(np.float32).tolist()]
    collection.insert([single_id, single_vector])
    logging.info("Inserted 1 vector individually")
    
    # Small batch insert
    small_batch_ids = [101, 102, 103]
    small_batch_vectors = np.random.randn(3, dim).astype(np.float32).tolist()
    collection.insert([small_batch_ids, small_batch_vectors])
    logging.info("Inserted 3 vectors in small batch")

    # Verify counts
    res = collection.query(
        expr="id >= 0",  # Match all records
        output_fields=["count(*)"],
        count=True
    )
    actual_count = res[0]["count(*)"]
    logging.info(f"Count(*) result: {actual_count} (Expected: 104)")

    # 5. Verify different insertions
    test_ids = [0, 100, 101]
    for tid in test_ids:
        res = collection.query(
            expr=f"id == {tid}",
            output_fields=["vector"]
        )
        status = "Found" if res else "Missing"
        logging.info(f"Verification: ID {tid} - {status}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("insert_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np
from pymilvus.bulk_writer import bulk_import, list_import_jobs, RemoteBulkWriter, BulkFileType
import json, time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")
    
    collection_name = "bulk_import_test"
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    metric_type = "L2"  # or "IP"
    
    
    schema = CollectionSchema([
        FieldSchema("id", DataType.INT64, is_primary=True),
        FieldSchema("vector", DataType.FLOAT_VECTOR, dim=256),
    ])
    collection = Collection(
        collection_name,
        schema,
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")
    
    # Third-party constants
    ACCESS_KEY="minioadmin"
    SECRET_KEY="minioadmin"
    BUCKET_NAME="a-bucket"
    
    minio_endpoint = "localhost:9000"  # the default MinIO service started along with Milvus
    remote_path = "/bulk_data/" + time.strftime("%Y-%m-%d-%H-%M-%S")
    url = f"http://127.0.0.1:19530"

    # Connections parameters to access the remote bucket
    conn = RemoteBulkWriter.S3ConnectParam(
        endpoint=minio_endpoint,
        access_key=ACCESS_KEY,
        secret_key=SECRET_KEY,
        bucket_name=BUCKET_NAME,
        secure=False
    )

    writer = RemoteBulkWriter(
        schema=schema,
        remote_path=remote_path,
        connect_param=conn,
        file_type=BulkFileType.PARQUET
    )
    print('bulk writer created.')
    
    for i in range(10000):
        writer.append_row({
            "id": i,
            "vector": np.random.randn(256).astype(np.float32).tolist(),
        })
        
        if i+1 % 1000 == 0:
            writer.commit()
            print(f'bulk writer flushed {i} rows.')
            
    writer.commit()
    print('bulk writer flushed all rows.')
    print(writer.batch_files)
    
    resp = bulk_import(
        url,
        collection_name,
        files=writer.batch_files,
    )

    job_id = resp.json()['data']['jobId']
    print(f'bulk import job id: {job_id}')
    
    progress = 0
    while True:
        resp = list_import_jobs(
            url=url,
            collection_name=collection_name,
        )
        new_progress = resp.json()['data']['records'][0]['progress']
        if new_progress > progress:
            progress = new_progress
            print(json.dumps(resp.json(), indent=4))
        
        if (resp.json()['data']['records'][0]['jobId'] == job_id) and (new_progress== 100):
            break

    # 3. Create index and load
    logging.info("Creating index...")
    index_params = [
        ("vector", {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 64, "efConstruction": 128}})
    ]
    
    for field, params in index_params:
        collection.create_index(field, params)
    collection.load()
    logging.info("Index created and collection loaded")
    
    # verify count(*) result
    res = collection.query(
        expr="id >= 0",  # Match all records
        output_fields=["count(*)"],
        count=True
    )
    actual_count = res[0]["count(*)"]
    logging.info(f"Count(*) result: {actual_count} (Expected: 10000)") 

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持按批量等至少一种导出方式。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 64
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "insert_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim)
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    # Batch insert (100 vectors)
    batch_ids = list(range(10000))
    batch_vectors = np.random.randn(10000, dim).astype(np.float32).tolist()
    collection.insert([batch_ids, batch_vectors])
    logging.info("Inserted 100 vectors in batch")

    # Add search iterator validation
    logging.info("\n=== Testing Search Iterator ===")
    export_results = []
    batch_size = 1000
    limit = 10000
    
    
    iterator = collection.search_iterator(
            data=[np.random.randn(dim).tolist()],  # Random query vector
            anns_field="vector",
            param={"metric_type": metric_type, "params": {"nprobe": 10}},
            batch_size=batch_size,
            limit=limit,
            output_fields=["vector"],
            expr="id >= 0"  # Export all data
        )
    
    while True:
        result = iterator.next()
        if not result:
            iterator.close()
            logging.info("Search iterator closed")
            break
        
        for hit in result:
            export_results.append(hit.to_dict())
        logging.info(f"Exported {len(result)} records in batch {len(export_results)//batch_size}")

    logging.info(f"Total exported records: {len(export_results)}")
    logging.info(f"Sample exported data: {export_results[:1]}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("insert_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持内积、欧氏距离、余弦等至少三种基础相似距离。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    metrics = [
        ("L2", "L2"),
        ("IP", "IP"),
        ("COSINE", "L2")  # Cosine uses L2 with normalized vectors
    ]
    dim = 768
    for metric_name, metric_type in metrics:
        coll_name = f"metric_test_{metric_name}"
        logging.info(f"Creating {metric_name} collection...")
        
        # Create collection
        coll = Collection(
            coll_name,
            CollectionSchema([
                FieldSchema("id", DataType.INT64, is_primary=True),
                FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim)
            ]),
            consistency_level="Strong"
        )

        # Generate vectors (normalize for cosine)
        if metric_name == "COSINE":
            vectors = np.random.randn(100, dim).astype(np.float32)
            norms = np.linalg.norm(vectors, axis=1)
            vectors = vectors / norms[:, np.newaxis]
        else:
            vectors = np.random.randn(100, dim).astype(np.float32)
        
        # Insert data
        coll.insert([list(range(100)), vectors.tolist()])
        logging.info(f"Inserted {len(vectors)} vectors")

        # Create index
        coll.create_index(
            "vector",
            {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
        )
        coll.load()

        # Search validation
        logging.info(f"Testing {metric_name} similarity...")
        search_vec = [vectors[0].tolist()]  # Query with first vector
        results = coll.search(
            search_vec,
            "vector",
            {"metric_type": metric_type, "params": {"nprobe": 10}},
            limit=3,
            output_fields=["vector"]
        )

        # logging.info top result distances
        logging.info(f"{metric_name} results:")
        for hit in results[0]:
            logging.info(f"ID: {hit.id} | Distance: {hit.distance:.4f}")

        # Cleanup
        utility.drop_collection(coll_name)

    logging.info("All metric tests completed")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持汉明距离、Jaccard相似系数、曼哈顿距离等至少一种二进制相似度计算。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # Add binary vector validation
    bin_dim = 1024  # 1024 bits = 128 bytes
    binary_metrics = ["HAMMING", "JACCARD"]
    
    for b_metric in binary_metrics:
        coll_name = f"binary_test_{b_metric}"
        logging.info(f"=== Testing {b_metric} ===")

        # Create binary collection
        bin_col = Collection(
            coll_name,
            CollectionSchema([
                FieldSchema("id", DataType.INT64, is_primary=True),
                FieldSchema("vector", DataType.BINARY_VECTOR, dim=bin_dim)
            ]),
            consistency_level="Strong"
        )
        
        # Generate binary vectors (128 bytes)
        byte_len = bin_dim // 8
        vectors = [bytes(np.random.bytes(byte_len)) for _ in range(100)]
        
        # Insert data
        bin_col.insert([list(range(100)), vectors])
        
        # Create index
        bin_col.create_index(
            "vector",
            {"index_type": "BIN_IVF_FLAT", "metric_type": b_metric, "params": {"nlist": 16}}
        )
        bin_col.load()
        
        # Search validation
        results = bin_col.search(
            [vectors[0]], 
            "vector", 
            {"metric_type": b_metric, "params": {"nprobe": 10}}, 
            limit=3
        )
        
        logging.info(f"{b_metric} Results:")
        for hit in results[0]:
            logging.info(f"ID: {hit.id} | Distance: {hit.distance}")
        
        # Cleanup
        utility.drop_collection(coll_name)


except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持至少一种索引的构建与删除，标量索引如布尔、字符、数字标量索引，向量索引如HNSW、IVF、LSH等。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "index_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=False
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(field_name="bool_field", datatype=DataType.BOOL)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=100)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection with schema
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
    )

    # 3. Index management
    # Vector indexes
    vector_indexes = [
        ("vector", "IVF_FLAT", "L2", {"nlist": 16}),
        ("vector", "HNSW", "L2", {"M": 16, "efConstruction": 200})
    ]

    for field_name, idx_name, metric, params in vector_indexes:
        logging.info(f"Testing {idx_name} vector index:")
        
        # Prepare index parameters
        index_params = client.prepare_index_params()
        index_params.add_index(
            field_name=field_name,
            index_type=idx_name,
            metric_type=metric,
            index_name=f"vector_{idx_name}_index",

        )
        
        # Create index
        client.create_index(
            collection_name=collection_name,
            index_params=index_params,
            sync=False
        )
        logging.info(f"Created {idx_name} index")
        
        # Add index description logging
        desc = client.describe_index(
            collection_name=collection_name,
            index_name=f"vector_{idx_name}_index"
        )
        logging.info(f"Vector index description: {desc}")
        
        client.drop_index(
            collection_name=collection_name,
            index_name=f"vector_{idx_name}_index"
        )
        logging.info(f"Dropped {idx_name} index")

    # Scalar indexes
    scalar_fields = [
        ("bool_field", "INVERTED"),
        ("string_field", "TRIE"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        logging.info(f"Testing {idx_type} index for {field}:")
        
        # Prepare scalar index parameters
        index_params = client.prepare_index_params()
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index"
        )
        
        client.create_index(
            collection_name=collection_name,
            index_params=index_params
        )
        logging.info(f"Created {idx_type} index")
        
        # Add scalar index description logging
        desc = client.describe_index(
            collection_name=collection_name,
            index_name=f"{field}_index"
        )
        logging.info(f"Scalar index description: {desc}")
        
        client.drop_index(
            collection_name=collection_name,
            index_name=f"{field}_index"
        )
        logging.info(f"Dropped {idx_type} index")

    # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持增量索引功能。

import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "dense_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("vector", DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema("payload", DataType.INT64)
        ]),
        consistency_level="Strong" 
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    collection.create_index(
        "vector",
        {"index_type": "IVF_FLAT", "metric_type": metric_type, "params": {"nlist": 16}}
    )
    collection.load()
    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors = np.random.randn(100, dim).astype(np.float32)
    collection.insert([list(range(100)), vectors.tolist(), list(range(100))])
    logging.info(f"Inserted {len(vectors)} vectors")

    # 5. Verify Query
    logging.info("Running query...")
    res = collection.query(expr="id == 0", output_fields=["vector"])
    logging.info("Query results:")
    logging.info(f"Query returned {len(res)} results:")
    for idx, result in enumerate(res):
        logging.info(f"Result {idx}: ID={result['id']}, Vector={result['vector'][:5]}...")  # Show first 5 elements

    # 6. Verify Search
    logging.info("Running search...")
    search_result = collection.search(
        vectors[:1].tolist(),
        "vector",
        {"metric_type": metric_type, "params": {"nprobe": 10}},
        limit=3,
        output_fields=["vector", "payload"]
    )
    
    logging.info("Search results:")
    logging.info(f"Search returned {len(search_result[0])} results:")
    for idx, hit in enumerate(search_result[0]):
        logging.info(f"Rank {idx+1}: ID={hit.id}, Distance={hit.distance:.4f}, Payload={hit.entity.fields['payload']}, Vector: {hit.entity.fields['vector'][:5]}...")
    
    # 7. Insert additional vectors for incremental index test
    logging.info("Inserting additional vectors for incremental index test...")
    new_vectors = np.random.randn(10, dim).astype(np.float32).tolist()
    collection.insert([list(range(100, 110)), new_vectors, list(range(100, 110))])
    logging.info(f"Inserted {len(new_vectors)} additional vectors")

    # 8. Verify incremental index functionality
    logging.info("Running search after incremental insertion...")
    incremental_search_result = collection.search(
        new_vectors[:1],
        "vector",
        {"metric_type": metric_type, "params": {"nprobe": 10}},
        limit=3,
        expr="payload > 100",
        output_fields=["vector", "payload"]
    )
    
    logging.info("Incremental search results:")
    for idx, hit in enumerate(incremental_search_result[0]):
        logging.info(f"Rank {idx+1}: ID={hit.id}, Distance={hit.distance:.4f}, Payload={hit.entity.fields['payload']},  Vector: {hit.entity.fields['vector'][:5]}...")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("dense_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持如强一致性、最终一致性、会话一致性、有界一致性等至少一种数据一致性；应提供数据一致性的明确说明，最终一致性需提供数据延迟的明确说明。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=False
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(field_name="bool_field", datatype=DataType.BOOL)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=100)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection with schema
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
    )

    # 3. Index management
    # Vector indexes
    vector_indexes = [
        ("vector", "HNSW", "L2", {"M": 16, "efConstruction": 200})
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name="vector",
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",

        )
        
    # Scalar indexes
    scalar_fields = [
        ("bool_field", "INVERTED"),
        ("string_field", "TRIE"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index"
        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params
    )
    client.load_collection(collection_name)
    
     # Insert test data for consistency validation
    test_vector = np.random.rand(dim).tolist()
    test_data = [{
        "id": 1001,
        "vector": test_vector,
        "bool_field": True,
        "string_field": "consistency_test",
        "int_field": 2024
    }]
    insert_res = client.insert(
        collection_name=collection_name,
        data=test_data
    )
    logging.info(f"Inserted test data with ID: {insert_res['ids'][0]}")

    # Validate data consistency
    logging.info("\n=== Data Consistency Validation ===")
    # consistency_level="Eventually"
    # consistency_level="Strong"
    # consistency_level="Session"
    consistency_level="Bounded"
    for i in range(1, 10):
        result = client.search(
            collection_name=collection_name,
            data=[test_vector],
            limit=1,
            consistency_level=consistency_level,
            output_fields=["id"]
        )
        if result and result[0]:
            logging.info(f"consistency achieved after {i} seconds: {result}")
            break
        else:
            logging.info(f"consistency not achieved after {i} seconds")
        time.sleep(1)
        
    # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持利用标量和枚举字段等方式对检索过程进行数据筛选过滤和重排序。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=False
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(field_name="bool_field", datatype=DataType.BOOL)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=100)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection with schema
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
    )

    # 3. Index management
    # Vector indexes
    vector_indexes = [
        ("vector", "HNSW", "L2", {"M": 16, "efConstruction": 200})
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name="vector",
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",

        )
        
    # Scalar indexes
    scalar_fields = [
        ("bool_field", "INVERTED"),
        ("string_field", "TRIE"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index"
        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params
    )
    client.load_collection(collection_name)

    # Insert test data for filtering/reranking
    vector = np.random.rand(dim).tolist()
    test_data = [
        {"id": 1001, "vector":  vector, "bool_field": True, "string_field": "apple", "int_field": 100},
        {"id": 1002, "vector":  vector, "bool_field": False, "string_field": "banana", "int_field": 200},
        {"id": 1003, "vector":  vector, "bool_field": True, "string_field": "cherry", "int_field": 300}
    ]
    client.insert(collection_name, test_data)
    logging.info("Inserted 3 test documents with different scalar values")

    # Filtering and reranking validation
    logging.info("=== Scalar Filtering & Sort Validation ===")
    results = client.search(
        collection_name=collection_name,
        data=[vector],
        filter="string_field in ['apple', 'cherry']",
        output_fields=["id", "string_field", "int_field"],
        # order_by="int_field desc", // not support for now
    )
    
    for result in results[0]:
        logging.info(f"Result: {result}")
    
     # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持Schemaless或强Schema。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=True
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(field_name="bool_field", datatype=DataType.BOOL)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=100)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection with schema
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
    )

    # 3. Index management
    # Vector indexes
    vector_indexes = [
        ("vector", "HNSW", "L2", {"M": 16, "efConstruction": 200})
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name="vector",
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",

        )
        
    # Scalar indexes
    scalar_fields = [
        ("bool_field", "INVERTED"),
        ("string_field", "TRIE"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index"
        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params
    )
    client.load_collection(collection_name)

    # Insert test data for filtering/reranking
    vector = np.random.rand(dim).tolist()
    test_data = [
        {"id": 1001, "vector":  vector, "bool_field": True, "string_field": "apple", "int_field": 100},
        {"id": 1002, "vector":  vector, "bool_field": False, "string_field": "banana", "int_field": 200},
        {"id": 1003, "vector":  vector, "bool_field": True, "string_field": "cherry", "int_field": 300}
    ]
    client.insert(collection_name, test_data)
    logging.info("Inserted 3 test documents with different scalar values")

    try:
        logging.info("Testing STRONG SCHEMA...")
        # Attempt to insert data with undefined field
        client.insert(
            collection_name=collection_name,
            data=[{"id": 9999, "vector": vector, "undefined_field": "invalid"}]
        )
    except Exception as e:
        logging.info(f"Strong schema validation working: {str(e)}")

    # Insert document with dynamic field
    client.insert(
        collection_name,
        data=[{"id": 10000, "vector": vector, "bool_field": True, "string_field": "apple", "int_field": 100, "test_field": "test_value", "test_field2": "test_value2"}]
    )
    # Query dynamic field
    res = client.search(
        collection_name,
        data=[vector],
        filter="id == 10000",
        output_fields=["test_field", "test_field2"],
        consistency_level="Strong",
    )
    logging.info(f"Dynamic field value: {res[0][0]}")
    
     # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持如按行、按表、按分区等至少一种粒度的数据TTL（Time to live）管理能力。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=True
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(field_name="bool_field", datatype=DataType.BOOL)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=100)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection with TTL
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
        properties={
            "collection.ttl.seconds": 5,  # 5-second TTL
        }
    )

    # 3. Index management
    # Vector indexes
    vector_indexes = [
        ("vector", "HNSW", "L2", {"M": 16, "efConstruction": 200})
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name="vector",
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",

        )
        
    # Scalar indexes
    scalar_fields = [
        ("bool_field", "INVERTED"),
        ("string_field", "TRIE"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index"
        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params,
    )
    client.load_collection(collection_name)

    # Insert test data for filtering/reranking
    vector = np.random.rand(dim).tolist()
    test_data = [
        {"id": 1001, "vector":  vector, "bool_field": True, "string_field": "apple", "int_field": 100},
        {"id": 1002, "vector":  vector, "bool_field": False, "string_field": "banana", "int_field": 200},
        {"id": 1003, "vector":  vector, "bool_field": True, "string_field": "cherry", "int_field": 300}
    ]
    client.insert(collection_name, test_data)
    client.flush(collection_name)
    logging.info("Inserted 3 test documents with different scalar values")


    # Immediate query should return both
    res = client.query(collection_name, filter="id in [1001,1002, 1003]")
    logging.info(f"Initial records: {len(res)} (should be 3)")
    
    # Wait for TTL cleanup
    time.sleep(5)
    
    # Wait for compact to clean expired data
    client.compact(collection_name)
    time.sleep(30)
    
    # Post-TTL query should only keep valid record
    res = client.query(collection_name, filter="id in [1001,1002, 1003]")
    logging.info(f"Post-TTL records: {len(res)} (should be 0)")
    
     # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持建立全文索引以及支持全文检索的能力，并支持全文、向量融合检索的能力。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=True
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(
        field_name="string_field",
        datatype=DataType.VARCHAR,
        max_length=1000,
        enable_analyzer=True,
        enable_match=True
    )
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
    )

    # 3. Index management
    # Vector indexes
    vector_indexes = [
        ("vector", "HNSW", "L2", {"M": 16, "efConstruction": 200})
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name="vector",
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",

        )
        
    # Scalar indexes
    scalar_fields = [
        ("string_field", "INVERTED"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index",
            params=params
        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params,
        consistency_level="Strong"
    )
    client.load_collection(collection_name)
    

    # Text match validation
    logging.info("\n=== Text Match Validation ===")
    vector = np.random.rand(dim).tolist()
    test_text_data = [
        {"id": 2001, "vector": vector, "string_field": "machine learning system", "int_field": 100},
        {"id": 2002, "vector": vector, "string_field": "deep neural network", "int_field": 200},
        {"id": 2003, "vector": vector, "string_field": "computer vision model", "int_field": 300}
    ]
    client.insert(collection_name, test_text_data)
    client.flush(collection_name)

    # Text match search
    text_results = client.search(
        collection_name=collection_name,
        data=[vector],
        filter="TEXT_MATCH(string_field, 'machine deep')",
        output_fields=["string_field"],
        limit=3
    )
    
    logging.info(f"Text match results: {text_results[0]}")
        
     # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持多元化存储方式，既可以存储在磁盘或对象存储，也可以加载到内存。

In [None]:
import logging
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
try:
    # 1. Connect to Milvus
    logging.info("Connecting to Milvus server...")
    connections.connect(host='127.0.0.1', port='19530')
    logging.info("Successfully connected to Milvus")

    utility.drop_collection("multi_index_test")
    # 2. Create dense vector collection
    logging.info("Creating collection...")
    dim = 768
    metric_type = "L2"  # or "IP"
    collection = Collection(
        "multi_index_test",
        CollectionSchema([
            FieldSchema("id", DataType.INT64, is_primary=True),
            FieldSchema("ivf_flat", DataType.FLOAT_VECTOR, dim=dim), # in memory index
            FieldSchema("ivf_pq", DataType.FLOAT_VECTOR, dim=dim),   # in memory index
            FieldSchema("hnsw", DataType.FLOAT_VECTOR, dim=dim),     # in memory index
            FieldSchema("diskann", DataType.FLOAT_VECTOR, dim=dim)   # on disk index
        ]),
        consistency_level="Strong"
    )
    logging.info(f"Collection created: {collection.name}")

    # 3. Create index and load
    logging.info("Creating index...")
    index_configs = [
        ("ivf_flat", {
            "index_type": "IVF_FLAT",
            "metric_type": "L2",
            "params": {"nlist": 1024}
        }),
        ("ivf_pq", {
            "index_type": "IVF_PQ",
            "metric_type": "L2",
            "params": {"nlist": 512, "m": 16, "nbits": 8}
        }),
        ("hnsw", {
            "index_type": "HNSW",
            "metric_type": "L2",
            "params": {"M": 24, "efConstruction": 200}
        }),
        ("diskann", {
            "index_type": "DISKANN",
            "metric_type": "L2",
            "params": {"search_cache_size": 2, "build_cache_size": 4}
        })
    ]
    for field, config in index_configs:
        collection.create_index(field, config)
    collection.load()  # data is loaded into memory before accessing

    logging.info("Index created and collection loaded")

    # 4. Insert dense vectors
    logging.info("Generating and inserting vectors...")
    vectors1 = np.random.randn(100, dim).astype(np.float32)
    vectors2 = np.random.randn(100, dim).astype(np.float32)
    vectors3 = np.random.randn(100, dim).astype(np.float32)
    vectors4 = np.random.randn(100, dim).astype(np.float32)
    data = [
        list(range(100)),
        vectors1,  # ivf_flat
        vectors2,   # ivf_pq
        vectors3,   # hnsw
        vectors4    # diskann
    ]
    collection.insert(data)

    logging.info(f"Inserted {len(vectors1)} vectors")

    # 5. Verify search for each index type
    search_params = {
        "ivf_flat": {"nprobe": 32},
        "ivf_pq": {"nprobe": 64},
        "hnsw": {"ef": 128},
        "diskann": {"search_list": 100}
    }

    for field in ["ivf_flat", "ivf_pq", "hnsw", "diskann"]:
        logging.info(f"Testing {field.upper()} search...")
        results = collection.search(
            data[1][:1] if field == "ivf_flat" else 
            data[2][:1] if field == "ivf_pq" else
            data[3][:1] if field == "hnsw" else
            data[4][:1],
            field,
            param={"metric_type": "L2", "params": search_params[field]},
            limit=5,
            output_fields=["id"]
        )
        logging.info(f"{field} Top 3 results:")
        for hit in results[0][:3]:
            logging.info(f"ID: {hit.id} | Distance: {hit.distance:.4f}")

    # Cleanup
    logging.info("Cleaning up...")
    utility.drop_collection("multi_index_test")
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持分区管理功能。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=True
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=1000)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
        consistency_level="Strong"
    )

    # Vector indexes
    vector_indexes = [
        ("vector", "HNSW", "L2", {"M": 16, "efConstruction": 200})
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name="vector",
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",
        )
        
    # Scalar indexes
    scalar_fields = [
        ("string_field", "INVERTED"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index",

        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params,
        consistency_level="Strong"
    )
    client.load_collection(collection_name)

    # Create partitions
    client.create_partition(
        collection_name=collection_name,
        partition_name="partition_alpha"
    )
    client.create_partition(
        collection_name=collection_name,
        partition_name="partition_beta"
    )
    logging.info("Created two partitions: partition_alpha and partition_beta")

    # Insert data into specific partitions
    vector = np.random.rand(dim).tolist()
    alpha_data = [{"id": 3001, "vector": vector, "string_field": "alpha data", "int_field": 100}]
    beta_data = [{"id": 3002, "vector": vector, "string_field": "beta data", "int_field": 200}]
    
    client.insert(collection_name, alpha_data, partition_name="partition_alpha")
    client.insert(collection_name, beta_data, partition_name="partition_beta")
    logging.info("Inserted test data into different partitions")

    # Partition query validation
    logging.info("\n=== Partition Query Test ===")
    alpha_results = client.search(
        collection_name=collection_name,
        data=[vector],
        partition_names=["partition_alpha"],
        output_fields=["string_field"],
        limit=5
    )
    logging.info(f"Alpha partition documents: {alpha_results[0]}")

    # Cross-partition search
    cross_partition_results = client.search(
        collection_name=collection_name,
        data=[vector],
        partition_names=["partition_alpha", "partition_beta"],
        output_fields=["string_field"],
        limit=5
    )
    logging.info(f"Cross-partition matches: {cross_partition_results[0]}")

    # Release partition validation
    logging.info("\n=== Partition Release Test ===")
    client.release_partitions(
        collection_name=collection_name,
        partition_names="partition_alpha"
    )
    
    # Verify release by attempting query
    try:
        client.query(
            collection_name=collection_name,
            partition_names=["partition_alpha"],
            filter="int_field >= 0"
        )
    except Exception as e:
        logging.info(f"Expected error after release: {str(e)}")

    # Drop partition validation
    logging.info("\n=== Partition Drop Test ===")
    client.drop_partition(
        collection_name=collection_name,
        partition_name="partition_alpha"
    )
    
    # Verify partition list
    remaining_partitions = client.list_partitions(collection_name)
    logging.info(f"Remaining partitions: {remaining_partitions}")
    
    # Verify drop by attempting insert
    try:
        client.insert(
            collection_name=collection_name,
            data=[{"id": 9999, "vector": vector}],
            partition_name="partition_beta"
        )
    except Exception as e:
        logging.info(f"Expected error after drop: {str(e)}")

    # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持多个字段作为联合主键。[暂不支持]

验证向量数据库是否支持往数据表实时写入或更新一定量向量数据。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=True
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=1000)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
        consistency_level="Strong"
    )

    # Vector indexes
    vector_indexes = [
        ("vector", "HNSW", "L2", {"M": 16, "efConstruction": 200})
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name="vector",
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",

        )
        
    # Scalar indexes
    scalar_fields = [
        ("string_field", "INVERTED"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index",

        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params,
        consistency_level="Strong"
    )
    client.load_collection(collection_name)

    # Real-time write validation
    logging.info("\n=== Real-time Write Test ===")
    
    # High-frequency insert test
    start_time = time.time()
    vector = np.random.randn(dim).astype(np.float32).tolist()
    real_time_data = [
        {"id": 4001, "vector": vector, "string_field": "live_update", "int_field": 999},
        {"id": 4002, "vector": vector, "string_field": "streaming_data", "int_field": 888}
    ]
    client.insert(collection_name, real_time_data)
    logging.info(f"Real-time insert completed in {time.time()-start_time:.4f}s")

    # Verify immediate searchability
    search_res = client.search(
        collection_name=collection_name,
        data=[vector],
        filter="int_field >= 888",
        output_fields=["id"],
        limit=5
    )
    logging.info(f"Real-time docs found: {search_res[0]}")

    # In-place update test
    start_time = time.time()
    client.insert(collection_name, [{"id": 4001, "vector": vector, "string_field": "live_update", "int_field": 1000}])
    logging.info(f"Document updated in {time.time()-start_time:.4f}s")

    # Verify update persistence
    query_res = client.query(
        collection_name=collection_name,
        filter="id == 4001",
        output_fields=["int_field"]
    )
    logging.info(f"Updated value: {query_res[0]['int_field']}")

    # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持diskann等磁盘索引方案，以承载更大的数据量。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=True
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=1000)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)

    # Create collection
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
        consistency_level="Strong"
    )

    # 3. Index management
    # Vector indexes
    vector_indexes = [
        ("vector", "DISKANN", "L2", {
            "index_build_ram_gb": 4,   # RAM allocation during build
            "search_cache_size": 2,    # GB allocated for search cache
            "pq_code_budget_gb": 0.1   # Compression ratio
        })
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name=field_name,
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",

        )
        
    # Scalar indexes
    scalar_fields = [
        ("string_field", "INVERTED"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index",
        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params,
        consistency_level="Strong"
    )
    
    logging.info("=== Index Descriptions ===")
    diskann_info = client.describe_index(
        collection_name=collection_name,
        index_name="vector_DISKANN_index"
    )
    logging.info(f"Index Config:{diskann_info}")
    
    client.load_collection(collection_name)

    # Real-time write validation
    logging.info("\n=== Real-time Write Test ===")
    
    # High-frequency insert test
    vector = np.random.randn(dim).astype(np.float32).tolist()
    real_time_data = [
        {"id": 4001, "vector": vector, "string_field": "live_update", "int_field": 999},
        {"id": 4002, "vector": vector, "string_field": "streaming_data", "int_field": 888}
    ]
    client.insert(collection_name, real_time_data)
    client.flush(collection_name)

     # DiskANN specific validation
    logging.info("\n=== DiskANN Validation ===")
    diskann_results = client.search(
        collection_name=collection_name,
        data=[vector],
        limit=5,
    )
    logging.info(f"DiskANN search success: {diskann_results[0]}")

    # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否支持在单个数据表中构建多个索引，以支持对比测试或者开发校验调优等功能。

In [None]:
import logging
from pymilvus import MilvusClient, DataType
import numpy as np
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=True
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="int_field", datatype=DataType.INT32)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=1000)
    
    # Add four vector fields with same dimension
    vector_fields = [
        ("vector_flat", "FLAT"),
        ("vector_ivf", "IVF_FLAT"),
        ("vector_hnsw", "HNSW"),
        ("vector_diskann", "DISKANN")
    ]
    
    for field_name, _ in vector_fields:
        schema.add_field(
            field_name=field_name,
            datatype=DataType.FLOAT_VECTOR,
            dim=dim
        )

    # Create collection
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
        consistency_level="Strong"
    )

    # 3. Index management
    # Vector indexes
    vector_indexes = [
        ("vector_flat", "FLAT", "L2", {}),
        ("vector_ivf", "IVF_FLAT", "L2", {"nlist": 128}),
        ("vector_hnsw", "HNSW", "L2", {"M": 16, "efConstruction": 200}),
        ("vector_diskann", "DISKANN", "L2", {
            "index_build_ram_gb": 4,
            "search_cache_size": 2
        })
    ]

    index_params = client.prepare_index_params()
    for field_name, idx_type, metric, params in vector_indexes:
        index_params.add_index(
            field_name=field_name,
            index_type=idx_type,
            metric_type=metric,
            index_name=f"vector_{idx_type}_index",

        )
        
    # Scalar indexes
    scalar_fields = [
        ("string_field", "INVERTED"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index",
        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params,
    )
    
    logging.info("=== Index Descriptions ===")
    for field_name, idx_type, metric, params in vector_indexes:
        diskann_info = client.describe_index(
            collection_name=collection_name,
            index_name=f"vector_{idx_type}_index"
        )
        logging.info(f"Index Config:{diskann_info}")
    
    client.load_collection(collection_name)
    
    # Insert test data with all vector columns
    test_data = [{
        "id": i,
        "vector_flat": vec,
        "vector_ivf": vec,
        "vector_hnsw": vec,
        "vector_diskann": vec,
        "string_field": f"text_{i}",
        "int_field": i
    } for i, vec in enumerate([np.random.rand(dim).tolist() for _ in range(5)])]
    client.insert(collection_name, test_data)
    client.flush(collection_name)
    
    logging.info("=== Cross Vector Column Search ===")
    for field_name, idx_type, _, _ in vector_indexes:
        results = client.search(
            collection_name=collection_name,
            data=[test_data[0][field_name]],
            anns_field=field_name,
            search_params={"metric_type": "L2", "params": {"nprobe": 10} if idx_type == "IVF_FLAT" else {}},
            limit=5,
        )
        logging.info(f"{field_name} found {len(results[0])} results")

    # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise

验证向量数据库是否提供混合检索的能力并提供不同检索方式的权重调整以适应不同类型的检索

In [None]:
import logging
from pymilvus import MilvusClient, DataType, Function, FunctionType
import numpy as np
import time
from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

try:
    # 1. Initialize MilvusClient
    logging.info("Initializing MilvusClient...")
    client = MilvusClient(uri="http://127.0.0.1:19530")
    logging.info("Client initialized successfully")

    # 2. Collection management
    collection_name = "consistency_test"
    dim = 768
    
    # Cleanup existing collection
    if client.has_collection(collection_name):
        client.drop_collection(collection_name)
    
    # Create schema explicitly
    schema = client.create_schema(
        auto_id=False,
        enable_dynamic_field=True
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1000, enable_analyzer=True)  # For sparse vector generation
    schema.add_field(field_name="sparse_vec", datatype=DataType.SPARSE_FLOAT_VECTOR)  # Sparse vector field
    schema.add_field(field_name="dense_vec", datatype=DataType.FLOAT_VECTOR, dim=dim)  # Rename dense vector
    schema.add_field(field_name="int_field", datatype=DataType.INT32)
    schema.add_field(field_name="string_field", datatype=DataType.VARCHAR, max_length=1000)    
    bm25_function = Function(
        name="text_bm25_emb", # Function name
        input_field_names=["text"], # Name of the VARCHAR field containing raw text data
        output_field_names=["sparse_vec"], # Name of the SPARSE_FLOAT_VECTOR field reserved to store generated embeddings
        function_type=FunctionType.BM25,
    )
    schema.add_function(bm25_function)

    # Create collection
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
        consistency_level="Strong"
    )

    index_params = client.prepare_index_params()
    # Sparse vector index (BM25)
    index_params.add_index(
        field_name="sparse_vec",
        index_type="SPARSE_INVERTED_INDEX",
        index_name="sparse_vec_index",
        metric_type="BM25",
        params={"inverted_index_algo": "DAAT_MAXSCORE"}
    )
    
    # Dense vector index
    index_params.add_index(
        field_name="dense_vec",
        index_type="IVF_FLAT",
        metric_type="IP",
        index_name="dense_vec_index",
        params={"nlist": 128}
    )
        
    # Scalar indexes
    scalar_fields = [
        ("string_field", "INVERTED"), 
        ("int_field", "STL_SORT")
    ]

    for field, idx_type in scalar_fields:
        index_params.add_index(
            field_name=field,
            index_type=idx_type,
            index_name=f"{field}_index",
        )
        
    client.create_index(
        collection_name=collection_name,
        index_params=index_params,
    )
    
    logging.info("=== Index Descriptions ===")
    for index_name in ["sparse_vec_index", "dense_vec_index"]:
        diskann_info = client.describe_index(
            collection_name=collection_name,
            index_name=index_name
        )
        logging.info(f"Index Config:{diskann_info}")
    
    client.load_collection(collection_name)


    docs = [
        "Artificial intelligence was founded as an academic discipline in 1956.",
        "Alan Turing was the first person to conduct substantial research in AI.",
        "Born in Maida Vale, London, Turing was raised in southern England.",
        "Turing's work laid the foundation for theoretical computer science.",
        "He is also known for his contributions to cryptanalysis and computer security.",
    ]
    # Insert test data with both vector types
    test_data = [{
        "id": i,
        "text": docs[i],
        "dense_vec": np.random.randn(dim).tolist(),
        "string_field": f"string_{i}",
        "int_field": i
    } for i in range(5)]
    client.insert(collection_name, test_data)

    
    logging.info("\n=== Sparse+Dense Hybrid Search ===")
    # Create search requests
    dense_request = AnnSearchRequest(
        data=[test_data[0]["dense_vec"]],
        anns_field="dense_vec",
        param={"metric_type": "IP", "params": {"nprobe": 10}},
        limit=5
    )
    
    sparse_request = AnnSearchRequest(
        data=[test_data[0]["text"]],  # Use text for BM25 sparse search
        anns_field="sparse_vec",
        param={"metric_type": "BM25"},
        limit=5
    )
    
    # ranker = WeightedRanker(0.8, 0.3) 
    # ranker = WeightedRanker(0.3, 0.8) 
    ranker = RRFRanker(100)

    # Execute hybrid search with RRF rerank
    hybrid_results = client.hybrid_search(
        collection_name=collection_name,
        reqs=[dense_request, sparse_request],
        ranker=RRFRanker(k=60),  # Reciprocal Rank Fusion
        limit=5
    )
    
    logging.info("Final Reranked Results:")
    for idx, hit in enumerate(hybrid_results[0]):
        logging.info(f"Rank {idx+1}: ID={hit['id']} Score={hit['distance']:.3f}")

    # Cleanup
    logging.info("Cleaning up...")
    client.drop_collection(collection_name)
    logging.info("Collection dropped successfully")

except Exception as e:
    logging.error(f"Error occurred: {str(e)}", exc_info=True)
    raise