## Milvus使用範例

In [7]:
# from pymilvus import model, MilvusClient, DataType, FieldSchema, CollectionSchema
# import pandas as pd
from pymilvus import model, MilvusClient, DataType, FieldSchema, CollectionSchema, connections
import pandas as pd

In [8]:
# def init_embedding():
#     # 初始化嵌入模型
#     ef = model.DefaultEmbeddingFunction()  # 正確引用model內的方法
#     client = MilvusClient("./milvus_data.db")
#     return ef, client

In [9]:
'''
to-do:
    1.Adding whether dbName contain '.db' extension
    2.Checking whether collectionName has exist.
'''
    # 初始化组件
ef = model.DefaultEmbeddingFunction()  # 确保已安装 pymilvus[model]
client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
) 



In [10]:
client.list_databases()

['default', 'dqe_kb_db']

In [11]:
# client.create_database(
#     db_name="dqe_kb_db"
# )

In [12]:
_dbName="dqe_kb_db"
_collectionName='qualityQA'
_collectionDesc="quailty_qa_collection"
_csv_src = "../../../source_data/DQE_Issues/DQE_Issue_total_corrected_v1.csv"

In [28]:
import chardet

with open(_csv_src, 'rb') as f:
    result = chardet.detect(f.read(10000))  # 讀取前 10000 bytes
print(result['encoding'])

ISO-8859-1


In [43]:
# def create_dqe_db(dbName:str="dqe_milvus_data.db", collectionName:str="quality_issues", fields:list=None):
def create_dqe_collection(dbName:str=None, collectionName:str=None, collectionDesc:str=None, fieldList:list=None):
    # 定义字段结构（关键修正点）
    client.use_database(dbName)
    fields = [
        FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="problemtype", dtype=DataType.VARCHAR, max_length=50),
        FieldSchema(name="module", dtype=DataType.VARCHAR, max_length=50),
        FieldSchema(name="severity", dtype=DataType.VARCHAR, max_length=1),
        FieldSchema(name="description", dtype=DataType.VARCHAR, max_length=2000),
        FieldSchema(name="causeAnalysis", dtype=DataType.VARCHAR, max_length=2000),
        FieldSchema(name="improve", dtype=DataType.VARCHAR, max_length=2000),
        FieldSchema(name="experience", dtype=DataType.VARCHAR, max_length=2000),
        # FieldSchema(name="judge", dtype=DataType.VARCHAR, max_length=2000),
        # FieldSchema(name="score", dtype=DataType.INT16),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=ef.dim)
    ]
    
    # 创建集合（新API规范）
    # collection_name = "quality_issues"
    if client.has_collection(collectionName):
        client.drop_collection(collectionName)
    
    client.create_collection(
        collection_name=collectionName,
        schema=CollectionSchema(fields, description=collectionDesc),  # 单一路径传递schema
        # 不再需要单独传递fields参数
    )
    print(f"collection created:\n DB:{dbName}\ncollection name:{collectionName}");


In [46]:
# create_dqe_collection(dbName=_dbName, collectionName=_collectionName, collectionDesc=_collectionDesc)
# client.use_database(db_name=_dbName)

#### delete collection

In [47]:
# def drop_collection(dbName:str=None, collectionName:str=None):
#     # client.use_database(db_name=dbName)
#     print(collectionName)
#     client.list_collections()
#     client.drop_collection(
#         collection_name=collectionName
#     )

In [10]:
# drop_collection(dbName=_dbName, collectionName=_collectionName)

In [16]:
# def create_index(dbName:str=None, collectionName:str=None, paramDict:dict=None):
#     # 索引創建參數
#     # client.use_database(db_name=dbName)
#     # client.load_collection(collection_name=collectionName)
#     index_params = client.prepare_index_params()
#     index_params.add_index(
#         field_name="vector", # use paramDict["field_name"]
#         index_type="IVF_FLAT", # use paramDict["index_type"]
#         metric_type="COSINE", # use paramDict["metric_type"]
#         params={"nlist": 256}  # 典型值 128-4096 #use paramDict["params"]
#     )
#     # 搜索參數對應關係
#     search_params = {
#         "nprobe": 32  # 值範圍 [1, nlist] #use paramDict["search_params"]
#     }

In [12]:
# create_index(dbName=_dbName, collectionName=_collectionName)

In [17]:
# client.list_collections()

#### 建立資料

In [48]:
def insert_items(csv_file:str=None, dbName:str=None, collectionName:str=None):
    # 数据插入示例
    client.use_database(db_name=dbName)
    df = pd.read_csv(csv_file,encoding='utf-8')
    # print(df.head())
    fill_df = df.fillna(0)
    data = [{
        "problemtype":row["问题来源"],
        "module": row["模块"],
        "severity": str(row["严重度"]),
        "description": row["问题现象描述"],
        "causeAnalysis": str(row["原因分析"]),
        "improve": row["改善对策"],
        "experience": str(row["经验萃取"]),
        # "judge": row["评审后优化"],
        # "score": int(row["评分"]),
        "vector": ef.encode_documents([row["问题来源"],row["模块"],row["问题现象描述"]])[0]
    } for _, row in fill_df.iterrows()]
    # print(data)
    client.insert(collectionName, data)
    
    print(f"成功插入 {len(df)} 条数据，向量维度={ef.dim}")

In [50]:
# insert_items(csv_file=_csv_src, dbName=_dbName, collectionName=_collectionName)
##成功插入 108 条数据，向量维度=768

#### 建立index

In [51]:
def create_index(dbName:str=None, collectionName:str=None):
    print(f"db name:{dbName}\ncollection name:{collectionName}")
    client.use_database(db_name=dbName)
    index_params = client.prepare_index_params()
    index_params.add_index(
        field_name="vector",
        index_type="IVF_FLAT",  # 改用支持的类型
        metric_type="COSINE",
        params={"nlist": 128}  # 典型参数配置
    )
    
    # 创建索引
    client.create_index(
        collection_name=collectionName,
        index_params=index_params
    )

In [52]:
create_index(dbName=_dbName, collectionName=_collectionName)

db name:dqe_kb_db
collection name:qualityQA


#### Test Codes

#### loading db and collection

In [19]:
df = pd.read_csv(_csv_src)

In [21]:
def test_basic_operations():
    # 连接验证
    client.use_database(db_name=_dbName)
    client.load_collection(_collectionName)
    assert client.has_collection(_collectionName), "集合创建失败"
    print(client.has_collection(_collectionName))
    
    # 数据量验证
    count = client.query(_collectionName, filter="", output_fields=["count(*)"])[0]["count(*)"]
    assert count == len(df), f"数据量不符 ({count} vs {len(df)})"
    
    # 向量维度验证
    collection_info = client.describe_collection(_collectionName)
    
    # assert collection_info["vector_field"]["dim"] == ef.dim, "向量维度错误"
    vector_fields = [f for f in collection_info['fields'] if f['type'] == DataType.FLOAT_VECTOR]
    assert len(vector_fields) > 0, "未找到向量字段"
    assert vector_fields[0]['params']['dim'] == ef.dim, "向量維度錯誤"
    
    # 随机抽样验证
    sample = client.query(_collectionName, filter="pk < 5", output_fields=["*"])
    for item in sample:
        assert len(item["vector"]) == ef.dim, "向量长度异常"
        assert item["description"], "描述字段为空"

test_basic_operations()


True


#### Test different searches

In [25]:
def test_hybrid_search_1():
    client.use_database(db_name=_dbName)
    client.load_collection(_collectionName)
    index_info = client.describe_index(_collectionName, "vector")
    print(index_info)
    assert index_info["index_type"] == "IVF_FLAT"
    # assert index_info["nlist"] == 256
    assert int(index_info["nlist"]) == 128
    
    # 執行混合查詢
    query = "电池鼓包问题"
    query_vec = ef.encode_documents([query])[0]
    results = client.search(
        collection_name=_collectionName,
        data=[query_vec],
        filter="severity == 'B'",
        # limit=3,
        search_params={"nprobe":32}
    )
    # 精度驗證
    print(f"results:searched rows:{len(results[0])}\n{results}")
    distances = [hit['distance'] for hit in results[0]]
    # assert max(distances) > 0.7  # 根據實際數據調整閾值

test_hybrid_search_1()

{'nlist': '128', 'index_type': 'IVF_FLAT', 'metric_type': 'COSINE', 'field_name': 'vector', 'index_name': 'vector', 'total_rows': 0, 'indexed_rows': 0, 'pending_index_rows': 0, 'state': 'Finished'}
results:searched rows:3
data: ["[{'pk': 457271718883363561, 'distance': 0.6660386323928833, 'entity': {}}, {'pk': 457271718883363547, 'distance': 0.35495442152023315, 'entity': {}}, {'pk': 457271718883363540, 'distance': 0.34422993659973145, 'entity': {}}]"]


In [30]:
import time

def test_performance():
    # 查询延迟测试
    start = time.time()
    client.query(collection_name, filter="pk < 100", limit=100)
    print(f"标量查询延迟: {time.time()-start:.4f}s")

    # 向量搜索压力测试
    test_vectors = [ef.random_vector(ef.dim) for _ in range(100)]
    start = time.time()
    client.search(collection_name, data=test_vectors, limit=3)
    print(f"批量向量搜索延迟: {time.time()-start:.4f}s")

test_performance()

标量查询延迟: 0.0019s


AttributeError: 'OnnxEmbeddingFunction' object has no attribute 'random_vector'