# Milvus 混合检索与分区路由调试 Notebook

本 Notebook 用于交互式验证 Milvus 2.6 的混合搜索（Hybrid Search）功能，并测试基于意图的分区路由（Partition Routing）逻辑。

### 1. 导入库与环境变量设置
安装并导入 `pymilvus`、`scipy`、`numpy` 等必要库，配置 Milvus 连接以及 HuggingFace 环境。

In [1]:
import os
import json
import time
from pathlib import Path

# 环境设置
os.environ['NO_PROXY'] = 'localhost,127.0.0.1,host.docker.internal'
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
os.environ['HF_HUB_DISABLE_SYMLINKS_WARNING'] = 'True'
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import (
    connections,
    Collection,
    AnnSearchRequest,
    RRFRanker,
    WeightedRanker
)
from pymilvus.model.hybrid import BGEM3EmbeddingFunction


# 连接参数
URI = "http://localhost:19530"
TOKEN = "root:Milvus"
COLLECTION_NAME = "hybrid_rag_collection_v1"

  from .autonotebook import tqdm as notebook_tqdm


### 2. 定义配置加载与工具类

实现 `load_local_config` 函数读取本地配置。定义 `CSRWithLen` 类以解决 Scipy 稀疏矩阵兼容性问题。

In [2]:
def load_local_config():
    # 假设 notebook 在根目录，config 在 src/milvus_test/
    config_path = Path("src/milvus_test/local_config.json")
    if not config_path.exists():
        return {}
    try:
        return json.loads(config_path.read_text(encoding="utf-8"))
    except Exception:
        return {}

local_cfg = load_local_config()
hf_token = os.getenv("HF_TOKEN") or local_cfg.get("HF_TOKEN")
if hf_token:
    os.environ["HF_TOKEN"] = hf_token

class CSRWithLen(csr_matrix):
    """解决 Scipy 稀疏矩阵长度歧义问题"""
    def __len__(self):
        return self.shape[0]

### 3. 设置路由策略与分区映射

将查询意图映射到 Milvus 中的物理分区名称。

In [3]:
STRATEGY_MAP = {
    "ai": ["partition_ai"],
    "cpp": ["partition_cpp"],
    "python": ["partition_py"],
    "ml": ["partition_ml"],
    "global": [] 
}

def get_strategy_partitions(intent: str):
    """根据意图获取目标分区"""
    return STRATEGY_MAP.get(intent, [])

### 4. 实现核心检索器 RouterRetriever 类

封装向量化、分区锁定以及 RRF 混合检索逻辑。

In [4]:
class RouterRetriever:
    def __init__(self, uri, token, collection_name):
        print(f"正在连接 Milvus: {uri}...")
        connections.connect(uri=uri, token=token)
        self.col = Collection(collection_name)
        self.col.load()
        print("正在加载 BGE-M3 模型...")
        self.ef = BGEM3EmbeddingFunction(use_fp16=False, device="cpu")
        print("检索器初始化完成。")

    def search(self, query_text: str, intent: str = "global", top_k: int = 5):
        target_partitions = get_strategy_partitions(intent)
        embeddings = self.ef([query_text])
        query_dense = embeddings["dense"]
        query_sparse = CSRWithLen(embeddings["sparse"]).tocsr()

        dense_req = AnnSearchRequest(
            data=query_dense,
            anns_field="dense_vector",
            param={"metric_type": "IP", "params": {"ef": 100}},
            limit=top_k * 2
        )
        sparse_req = AnnSearchRequest(
            data=[query_sparse],
            anns_field="sparse_vector",
            param={"metric_type": "IP", "params": {"drop_ratio_search": 0.1}}, 
            limit=top_k * 2
        )

        rerank = RRFRanker(k=60) 
        res = self.col.hybrid_search(
            reqs=[dense_req, sparse_req],
            rerank=rerank,
            limit=top_k,
            partition_names=target_partitions if target_partitions else None,
            output_fields=["content", "metadata"]
        )
        return res[0]

# 实例化
retriever = RouterRetriever(URI, TOKEN, COLLECTION_NAME)

正在连接 Milvus: http://localhost:19530...
正在加载 BGE-M3 模型...


Fetching 30 files: 100%|██████████| 30/30 [00:00<?, ?it/s]


检索器初始化完成。


### 5. 场景模拟 1：Python 分区定向搜索

验证是否能正确命中 `partition_py` 分区。

In [None]:
query = " 装饰器是什么？"
hits = retriever.search(query, intent="python", top_k=3)

for i, hit in enumerate(hits):
    meta = hit.entity.get("metadata")
    print(f"{i+1}. 分数: {hit.distance:.4f} | 分区: {meta.get('partition')} | 内容: {hit.entity.get('content')[:100]}...")

1. 分数: 0.0328 | 分区: partition_py | 内容: 装饰器（decorator）是 Python 中一种高阶函数用法，常用于在不修改原函数代码的前提下增强函数行为，如 @lru_cache。...
2. 分数: 0.0320 | 分区: partition_py | 内容: Python 的虚拟环境（如 venv 或 conda）有助于隔离不同项目的依赖，避免包版本冲突。...
3. 分数: 0.0310 | 分区: partition_py | 内容: Python 的 f-string（格式化字符串字面值）从 3.6 版本开始引入，允许直接在字符串中嵌入表达式，例如 f"Hello {name}!"。...


### 6. 场景模拟 2：全局混合搜索

验证在不指定分区的情况下，跨分区的召回表现。

In [9]:
query = "智能与机器学习的基础概念"
hits = retriever.search(query, intent="global", top_k=3)

for i, hit in enumerate(hits):
    meta = hit.entity.get("metadata")
    print(f"{i+1}. 分数: {hit.distance:.4f} | 分区: {meta.get('partition')} | 内容: {hit.entity.get('content')[:100]}...")

1. 分数: 0.0323 | 分区: partition_ai | 内容: 在监督学习中，模型通过最小化预测输出与真实标签之间的损失函数（如交叉熵）来优化参数。...
2. 分数: 0.0318 | 分区: partition_ai | 内容: 大语言模型（LLM）通常基于海量文本预训练，再通过指令微调或人类反馈强化学习（RLHF）对齐用户意图。...
3. 分数: 0.0164 | 分区: partition_cpp | 内容: 智能指针 std::unique_ptr 和 std::shared_ptr 是 C++11 引入的内存管理工具，可有效避免裸指针导致的内存泄漏。...


### 7. 场景模拟 3：C++ 分区定向搜索

针对 C++ 标准特性提问。

In [10]:
query = "C++20 的主要特性有哪些？"
hits = retriever.search(query, intent="cpp", top_k=3)

for i, hit in enumerate(hits):
    meta = hit.entity.get("metadata")
    print(f"{i+1}. 分数: {hit.distance:.4f} | 分区: {meta.get('partition')} | 内容: {hit.entity.get('content')[:100]}...")

1. 分数: 0.0323 | 分区: partition_cpp | 内容: C++ 中的头文件（.h 或 .hpp）通常声明接口，而实现放在源文件（.cpp）中，需通过 include 引入依赖。...
2. 分数: 0.0323 | 分区: partition_cpp | 内容: C++ 模板支持泛型编程，允许编写与类型无关的代码，标准库中的 std::vector 和 std::sort 都是基于模板实现的。...
3. 分数: 0.0312 | 分区: partition_cpp | 内容: C++ 中的 RAII（Resource Acquisition Is Initialization）机制通过对象生命周期自动管理资源，如内存、文件句柄等，确保异常安全。...
