In [None]:
# 核心依赖库
import oss2
import dashscope
import json
import numpy as np
import logging
from typing import Dict, List, Any, Optional

# 配置日志（便于查看运行信息）
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
def read_health_file(file_path):
    """
    读取健康文件（支持.txt和.json格式），提取纯文本内容
    :param file_path: 本地文件路径（如"test_blood_pressure.txt"）
    :return: 提取的纯文本内容
    """
    # 先判断文件后缀，区分格式
    if file_path.endswith(".txt"):
        # 读取txt文件：直接读取全部文本
        try:
            with open(file_path, "r", encoding="utf-8") as f:
                text_content = f.read().strip()  # 去除首尾空白字符
            return text_content
        except Exception as e:
            raise Exception(f"读取txt文件失败：{str(e)}")
    elif file_path.endswith(".json"):
        # 读取json文件：提取content字段的文本（和测试json模板对应）
        try:
            with open(file_path, "r", encoding="utf-8") as f:
                json_data = json.load(f)
            # 从json中获取content字段，若不存在返回空字符串
            text_content = json_data.get("content", "").strip()
            return text_content
        except Exception as e:
            raise Exception(f"读取json文件失败：{str(e)}")
    else:
        # 不支持的文件格式
        raise ValueError("仅支持.txt和.json格式的文件")

# 测试函数（可选运行，验证文件读取是否正常）
# test_text = read_health_file("test_blood_pressure.txt")
# print(f"读取的文本内容：\n{test_text}")

In [None]:
# ====================== 替换为你的DashScope API Key ======================
DASHSCOPE_API_KEY = "sk-7465f14e980d4eb9b1578901db5c5311"  # 改成你自己申请的API Key
# =========================================================================
dashscope.api_key = DASHSCOPE_API_KEY

def text_to_vector(text):
    """
    将文本转为768维向量（使用阿里云DashScope text-embedding-v1模型）
    :param text: 待向量化的纯文本
    :return: 一维向量列表（长度768）
    """
    if not text:
        raise ValueError("待向量化的文本不能为空")
    
    try:
        # 调用DashScope Embedding API
        response = dashscope.TextEmbedding.call(
            model=dashscope.TextEmbedding.Models.text_embedding_v1,
            input=text
        )
        # 判断请求是否成功
        if response.status_code != 200:
            raise Exception(f"向量化API调用失败：{response.message}")
        # 提取向量数据
        embedding_vector = response.output["embeddings"][0]["embedding"]
        return embedding_vector
    except Exception as e:
        raise Exception(f"文本向量化失败：{str(e)}")

# 测试函数（可选运行，验证向量化是否正常）
# test_text = "2023-10-27 血压：145/95mmHg，略高于正常范围"
# test_vector = text_to_vector(test_text)
# print(f"向量维度：{len(test_vector)}")
# print(f"向量前10位：{test_vector[:10]}")

In [None]:
# ====================== 替换为你的阿里云普通Bucket配置 ======================
ALI_ACCESS_KEY_ID = "LTAI5t7V2vGe6mMP8p7NM9H6"        # 改成你的AccessKey ID
ALI_ACCESS_KEY_SECRET = "v7uQu0krg47pBvE28uhfh6ExyklTLB"  # 改成你的AccessKey Secret
BUCKET_NAME = "xkz1"                            # 你的Bucket名称（无需修改，和页面一致）
ENDPOINT = "oss-cn-beijing.aliyuncs.com"        # 关键修正：对应你页面的外网Endpoint，已加oss-前缀
# ===========================================================================

# 核心依赖库（若单元格1已导入，此处可忽略；若单独运行，需保留）
import oss2
import json
import logging
from typing import Dict, List, Any, Optional

# 配置日志（便于查看运行信息）
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 初始化普通Bucket客户端（使用正确的Endpoint，已修正地址拼接问题）
auth = oss2.Auth(ALI_ACCESS_KEY_ID, ALI_ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME)

def upload_to_common_bucket(obj_key: str, vector: List[float], original_text: str, metadata: Dict[str, Any]) -> bool:
    """
    向普通OSS Bucket上传（向量+原始文本+元数据）
    :param obj_key: 自定义对象名称（如"health_record_bp_001.json"）
    :param vector: 文本向量
    :param original_text: 原始文本
    :param metadata: 元数据字典
    :return: 上传成功返回True，失败返回False
    """
    try:
        # 组装完整数据
        full_data = {
            "vector": vector,
            "original_text": original_text,
            "metadata": metadata
        }
        # 序列化为JSON字符串
        json_data = json.dumps(full_data, ensure_ascii=False)
        # 上传到普通Bucket
        bucket.put_object(obj_key, json_data.encode("utf-8"))
        logger.info(f"成功上传对象：{obj_key}")
        return True
    except Exception as e:
        logger.error(f"上传失败：{str(e)}")
        return False

def pull_all_from_common_bucket(prefix: Optional[str] = None) -> List[Dict[str, Any]]:
    """
    从普通Bucket拉取指定前缀的所有数据
    :param prefix: 对象前缀（如"health_record_"）
    :return: 反序列化后的数据集
    """
    all_data = []
    try:
        # 遍历Bucket中的对象
        for obj in oss2.ObjectIterator(bucket, prefix=prefix):
            # 下载并解码内容
            obj_content = bucket.get_object(obj.key).read().decode("utf-8")
            # 反序列化
            full_data = json.loads(obj_content)
            all_data.append(full_data)
        logger.info(f"拉取到{len(all_data)}条数据")
        return all_data
    except Exception as e:
        logger.error(f"拉取失败：{str(e)}")
        return []

In [None]:
# 1. 读取健康文件（复用上面的函数）
def load_health_data(file_path: str) -> str:
    """读取健康文件文本"""
    try:
        text = read_health_file(file_path)
        logger.info(f"成功读取文件：{file_path}，文本长度：{len(text)}")
        return text
    except Exception as e:
        logger.error(f"读取文件失败：{str(e)}")
        return ""

# 2. 向量化并上传到普通Bucket
def process_and_upload(file_path: str, obj_key: str, metadata: Dict[str, Any]) -> bool:
    """
    完整流程：读取文件→向量化→上传普通Bucket
    """
    # 1. 读取文件
    original_text = load_health_data(file_path)
    if not original_text:
        return False
    # 2. 文本向量化
    try:
        vector = text_to_vector(original_text)
        logger.info(f"成功生成向量，向量维度：{len(vector)}")
    except Exception as e:
        logger.error(f"向量化失败：{str(e)}")
        return False
    # 3. 上传普通Bucket
    return upload_to_common_bucket(obj_key, vector, original_text, metadata)

# 3. 从普通Bucket拉取并语义检索
def search_from_common_bucket(
    query_text: str,
    filter_metadata: Optional[Dict[str, Any]] = None,
    return_top_k: int = 1,
    obj_prefix: Optional[str] = "health_record_"
) -> List[Dict[str, Any]]:
    """
    从普通Bucket检索：拉取数据→过滤元数据→计算相似度→返回结果
    """
    # 1. 校验入参
    if not query_text:
        logger.error("查询文本不能为空")
        return []
    if return_top_k < 1:
        return_top_k = 1
    # 2. 从普通Bucket拉取所有数据
    all_data = pull_all_from_common_bucket(prefix=obj_prefix)
    if not all_data:
        logger.info("普通Bucket中无匹配数据")
        return []
    # 3. 查询语句向量化
    try:
        query_vector = np.array(text_to_vector(query_text))
        query_norm = np.linalg.norm(query_vector)
        if query_norm == 0:
            raise ValueError("查询向量模长为0，无法计算相似度")
    except Exception as e:
        logger.error(f"查询语句向量化失败：{str(e)}")
        return []
    # 4. 元数据过滤
    filtered_data = all_data
    if filter_metadata and isinstance(filter_metadata, dict):
        filtered_data = [
            item for item in all_data
            if all(item["metadata"].get(k) == v for k, v in filter_metadata.items())
        ]
    if not filtered_data:
        logger.info("无符合元数据过滤条件的数据")
        return []
    # 5. 批量计算余弦相似度
    item_vectors = np.array([item["vector"] for item in filtered_data])
    item_norms = np.linalg.norm(item_vectors, axis=1)
    valid_indices = item_norms != 0
    if not np.any(valid_indices):
        logger.error("所有过滤后向量模长均为0")
        return []
    # 计算相似度
    dot_products = item_vectors[valid_indices].dot(query_vector)
    similarities = dot_products / (item_norms[valid_indices] * query_norm)
    # 6. 排序取Top K
    index_sim_pairs = list(zip(np.where(valid_indices)[0], similarities))
    index_sim_pairs.sort(key=lambda x: x[1], reverse=True)
    top_k_pairs = index_sim_pairs[:return_top_k]
    # 7. 构造结果
    results = []
    for idx, sim in top_k_pairs:
        item = filtered_data[idx]
        results.append({
            "original_text": item["original_text"],
            "similarity": round(float(sim), 4),
            "metadata": item["metadata"].copy(),
            "obj_key": item.get("obj_key", "")
        })
    logger.info(f"检索完成，返回{len(results)}条结果")
    return results

In [None]:
# ====================== 配置你的测试参数（无需修改，对应之前的测试文件） ======================
TEST_FILE_TXT = "test_blood_pressure.txt"  # 你的txt测试文件（和Notebook同目录）
TEST_FILE_JSON = "test_nursing.json"       # 你的json测试文件（和Notebook同目录）
OBJ_KEY1 = "health_record_bp_001.json"     # 血压记录的Bucket对象名称
OBJ_KEY2 = "health_record_nursing_001.json" # 护理记录的Bucket对象名称
METADATA1 = {"category": "blood_pressure", "date": "2023-10-27", "user_id": "test001"}
METADATA2 = {"category": "nursing", "date": "2023-11-01", "user_id": "test001"}
# ===========================================================================================

# 1. 测试：上传两个文件到普通Bucket
logger.info("===== 开始上传文件到普通Bucket =====")
process_and_upload(TEST_FILE_TXT, OBJ_KEY1, METADATA1)
process_and_upload(TEST_FILE_JSON, OBJ_KEY2, METADATA2)

# 2. 测试1：检索血压记录（带元数据过滤）
logger.info("\n===== 测试1：带元数据过滤的血压记录检索 =====")
query1 = "查找2023年10月的血压异常记录"
filter1 = {"category": "blood_pressure"}
results1 = search_from_common_bucket(query1, filter1, return_top_k=1)
if results1:
    for res in results1:
        print(f"相似度：{res['similarity']}")
        print(f"元数据：{res['metadata']}")
        print(f"原始文本：{res['original_text']}")
else:
    print("未找到血压相关记录")

# 3. 测试2：检索护理记录（带元数据过滤）
logger.info("\n===== 测试2：带元数据过滤的护理记录检索 =====")
query2 = "查找2023年11月的护理记录"
filter2 = {"category": "nursing"}
results2 = search_from_common_bucket(query2, filter2, return_top_k=1)
if results2:
    for res in results2:
        print(f"相似度：{res['similarity']}")
        print(f"元数据：{res['metadata']}")
        print(f"原始文本：{res['original_text']}")
else:
    print("未找到护理相关记录")