In [1]:
import sys
import os
from dotenv import load_dotenv

# 添加 backend 路径
current_dir = os.getcwd()
if 'notebooks' in current_dir:
    backend_path = os.path.abspath(os.path.join(current_dir, '..', 'backend'))
    env_path = os.path.abspath(os.path.join(current_dir, '..', 'backend', '.env'))
else:
    backend_path = os.path.abspath(os.path.join(current_dir, 'backend'))
    env_path = os.path.abspath(os.path.join(current_dir, 'backend', '.env'))

if backend_path not in sys.path:
    sys.path.append(backend_path)

load_dotenv(env_path)
print(f"Backend path added: {backend_path}")

Backend path added: d:\My Data\Rag\rag-project01-framework\backend


In [2]:
import chromadb
import pandas as pd
import json
from typing import List, Dict, Any
from datetime import datetime

# 导入后端服务
from services.embedding_service import EmbeddingService, EmbeddingConfig, EmbeddingProvider
from utils.config import CHROMA_CONFIG

# 配置
OLLAMA_MODEL = "nomic-embed-text"
COLLECTION_NAME = "financial_terms_normalization"

# 实例化 EmbeddingService
embedding_service = EmbeddingService()
print("EmbeddingService initialized.")

EmbeddingService initialized.


In [3]:
# 定义金融术语数据
financial_data = [
    {
        "canonical": "ROI",
        "full_name": "Return on Investment",
        "aliases": ["投资回报率", "投入产出比", "回本率", "R.O.I."],
        "definition": "衡量投资效率或比较不同投资效率的绩效指标。"
    },
    {
        "canonical": "EBITDA",
        "full_name": "Earnings Before Interest, Taxes, Depreciation, and Amortization",
        "aliases": ["税息折旧及摊销前利润", "未计利息、税项、折旧及摊销前的利润", "Ebitda"],
        "definition": "衡量公司经营业绩的指标，剔除了融资和会计决策的影响。"
    },
    {
        "canonical": "IPO",
        "full_name": "Initial Public Offering",
        "aliases": ["首次公开募股", "上市", "股票上市", "首次公开发行"],
        "definition": "私人公司首次向公众发行股票的过程。"
    },
    {
        "canonical": "Bull Market",
        "full_name": "Bull Market",
        "aliases": ["牛市", "多头市场", "涨市"],
        "definition": "金融市场价格上涨或预期会上涨的市场状况。"
    },
    {
        "canonical": "Bear Market",
        "full_name": "Bear Market",
        "aliases": ["熊市", "空头市场", "跌市"],
        "definition": "市场价格长期下跌，通常伴随着广泛的悲观情绪。"
    },
    {
        "canonical": "P/E Ratio",
        "full_name": "Price-to-Earnings Ratio",
        "aliases": ["市盈率", "本益比", "PE值", "股价收益比"],
        "definition": "公司当前股价相对于每股收益的比率。"
    },
    {
        "canonical": "Liquidity",
        "full_name": "Market Liquidity",
        "aliases": ["流动性", "变现能力", "市场流动性"],
        "definition": "资产或证券在不影响其市场价格的情况下在市场上买卖的难易程度。"
    },
    {
        "canonical": "Hedging",
        "full_name": "Hedging Strategy",
        "aliases": ["对冲", "套期保值", "避险"],
        "definition": "旨在抵消潜在损失或收益的投资策略。"
    }
]

df_terms = pd.DataFrame(financial_data)
df_terms.head()

Unnamed: 0,canonical,full_name,aliases,definition
0,ROI,Return on Investment,"[投资回报率, 投入产出比, 回本率, R.O.I.]",衡量投资效率或比较不同投资效率的绩效指标。
1,EBITDA,"Earnings Before Interest, Taxes, Depreciation,...","[税息折旧及摊销前利润, 未计利息、税项、折旧及摊销前的利润, Ebitda]",衡量公司经营业绩的指标，剔除了融资和会计决策的影响。
2,IPO,Initial Public Offering,"[首次公开募股, 上市, 股票上市, 首次公开发行]",私人公司首次向公众发行股票的过程。
3,Bull Market,Bull Market,"[牛市, 多头市场, 涨市]",金融市场价格上涨或预期会上涨的市场状况。
4,Bear Market,Bear Market,"[熊市, 空头市场, 跌市]",市场价格长期下跌，通常伴随着广泛的悲观情绪。


In [None]:
# 准备用于向量化的数据
chunks_to_embed = []

for term in financial_data:
    canonical = term["canonical"]
    full_name = term["full_name"]
    definition = term["definition"]
    
    # 1. 标准词
    chunks_to_embed.append({
        "content": f"search_document: {canonical}",
        "metadata": {
            "chunk_id": len(chunks_to_embed),
            "page_number": 0,
            "page_range": "0",
            "word_count": len(canonical.split()),
            # 自定义元数据
            "canonical": canonical,
            "type": "canonical",
            "full_name": full_name,
            "definition": definition
        }
    })
    
    # 2. 全称
    if full_name != canonical:
        chunks_to_embed.append({
            "content": f"search_document: {full_name}",
            "metadata": {
                "chunk_id": len(chunks_to_embed),
                "page_number": 0,
                "page_range": "0",
                "word_count": len(full_name.split()),
                "canonical": canonical,
                "type": "full_name",
                "full_name": full_name,
                "definition": definition
            }
        })
    
    # 3. 别名
    for alias in term["aliases"]:
        chunks_to_embed.append({
            "content": f"search_document: {alias}",
            "metadata": {
                "chunk_id": len(chunks_to_embed),
                "page_number": 0,
                "page_range": "0",
                "word_count": len(alias.split()),
                "canonical": canonical,
                "type": "alias",
                "full_name": full_name,
                "definition": definition
            }
        })

print(f"准备生成 {len(chunks_to_embed)} 个向量...")

# 配置 EmbeddingService
config = EmbeddingConfig(
    provider=EmbeddingProvider.OLLAMA,
    model_name=OLLAMA_MODEL
)

# 构造输入格式
input_data = {
    "chunks": chunks_to_embed,
    "metadata": {"filename": "financial_terms_kb.json"}
}

# 生成向量
try:
    embedding_results, _ = embedding_service.create_embeddings(input_data, config)
    print("向量生成成功！")
    
    # --- 增加向量质量检查 ---
    if embedding_results and len(embedding_results) > 1:
        first_vec = embedding_results[0]['embedding']
        # 简单比较前10个维度的和，避免完全相等的比较
        first_sum = sum(first_vec[:10])
        is_all_same = True
        for res in embedding_results[1:]:
            current_sum = sum(res['embedding'][:10])
            if abs(current_sum - first_sum) > 1e-6:
                is_all_same = False
                break
        
        if is_all_same:
            print("所有生成的向量似乎都是相同的！请检查 Ollama 模型状态。")
        else:
            print("向量检查通过：生成的向量具有多样性。")

except Exception as e:
    print(f"向量生成失败: {e}")
    print("请确保 Ollama 服务已启动且安装了 nomic-embed-text 模型")

准备生成 41 个向量...


  return OllamaEmbeddings(


向量生成成功！
向量检查通过：生成的向量具有多样性。


In [5]:
# 获取 Chroma 持久化路径
persist_dir = os.path.join(backend_path, CHROMA_CONFIG["persist_directory"])
print(f"Connecting to Chroma at: {persist_dir}")

client = chromadb.PersistentClient(path=persist_dir)

# 创建或重置 Collection
try:
    client.delete_collection(name=COLLECTION_NAME)
    print(f"Deleted existing collection: {COLLECTION_NAME}")
except ValueError:
    pass

# 注意：我们这里不需要传入 embedding_function，因为我们将直接插入向量
collection = client.create_collection(
    name=COLLECTION_NAME,
    metadata={"hnsw:space": "cosine"}  # 使用余弦相似度
)

# 准备批量插入数据
ids = []
embeddings = []
metadatas = []
documents = []

# 注意：EmbeddingService 返回的 metadata 可能不包含我们自定义的字段
# 所以我们需要从原始数据 chunks_to_embed 中重新获取业务字段
# Fixed by Assistant: 使用 zip 同时遍历结果和原始数据
for i, (result, original_chunk) in enumerate(zip(embedding_results, chunks_to_embed)):
    # result 结构: {'embedding': [...], 'metadata': {...}}
    # original_chunk 结构: {'content': '...', 'metadata': {'canonical': '...', ...}}
    
    original_meta = original_chunk['metadata']
    
    # 构造唯一 ID
    unique_id = f"{original_meta['canonical']}_{original_meta['type']}_{i}"
    ids.append(unique_id)
    
    embeddings.append(result['embedding'])
    documents.append(original_chunk['content'])
    
    # 构造存入 Chroma 的 metadata
    # 必须确保只包含简单类型 (str, int, float, bool)
    clean_meta = {
        "canonical": original_meta.get("canonical", ""),
        "type": original_meta.get("type", ""),
        "full_name": original_meta.get("full_name", ""),
        "definition": original_meta.get("definition", ""),
        "content": original_chunk.get("content", "")
    }
    metadatas.append(clean_meta)

# 执行插入
collection.add(
    ids=ids,
    embeddings=embeddings,
    metadatas=metadatas,
    documents=documents
)
print(f"成功插入 {len(ids)} 条数据到集合 '{COLLECTION_NAME}'")

Connecting to Chroma at: d:\My Data\Rag\rag-project01-framework\backend\03-vector-store/chroma_db
Deleted existing collection: financial_terms_normalization
成功插入 41 条数据到集合 'financial_terms_normalization'


In [6]:
def normalize_term(query_text: str, top_k: int = 1) -> Dict[str, Any]:
    """
    标准化查询术语
    """
    # 1. 生成查询向量
    # 注意：nomic-embed-text 需要 'search_query: ' 前缀
    query_vector = embedding_service.create_single_embedding(
        text=f"search_query: {query_text}",
        provider=EmbeddingProvider.OLLAMA,
        model=OLLAMA_MODEL
    )
    
    # 2. 执行向量搜索
    results = collection.query(
        query_embeddings=[query_vector],
        n_results=top_k,
        include=["metadatas", "documents", "distances"]
    )
    
    if not results["documents"] or not results["documents"][0]:
        return None
        
    # 3. 提取结果
    top_meta = results["metadatas"][0][0]
    distance = results["distances"][0][0]
    
    # 转换为相似度分数 (Chroma Cosine Distance: 0 means identical, 1 means opposite)
    # 通常 1 - distance = similarity (但 Chroma 的 cosine 可能是 distance)
    # 越小越好
    
    return {
        "query": query_text,
        "matched_text": results["documents"][0][0],
        "canonical": top_meta["canonical"],
        "full_name": top_meta["full_name"],
        "definition": top_meta["definition"],
        "distance": distance
    }

# 测试案例
test_queries = [
    "投资回报率", 
    "PE值", 
    "我想知道怎么上市", 
    "市场是不是要涨了",
    "如何规避风险"
]

print("--- 术语标准化测试 ---")
for q in test_queries:
    result = normalize_term(q)
    if result:
        print(f"\n输入: '{q}'")
        print(f"匹配: '{result['matched_text']}' (Distance: {result['distance']:.4f})")
        print(f"标准化 -> **{result['canonical']}** ({result['full_name']})")
        print(f"定义: {result['definition']}")
    else:
        print(f"\n输入: '{q}' - 未找到匹配项")

--- 术语标准化测试 ---

输入: '投资回报率'
匹配: 'search_document: 多头市场' (Distance: 0.2461)
标准化 -> **Bull Market** (Bull Market)
定义: 金融市场价格上涨或预期会上涨的市场状况。

输入: 'PE值'
匹配: 'search_document: PE值' (Distance: 0.2273)
标准化 -> **P/E Ratio** (Price-to-Earnings Ratio)
定义: 公司当前股价相对于每股收益的比率。

输入: '我想知道怎么上市'
匹配: 'search_document: 股票上市' (Distance: 0.3050)
标准化 -> **IPO** (Initial Public Offering)
定义: 私人公司首次向公众发行股票的过程。

输入: '市场是不是要涨了'
匹配: 'search_document: 股价收益比' (Distance: 0.4016)
标准化 -> **P/E Ratio** (Price-to-Earnings Ratio)
定义: 公司当前股价相对于每股收益的比率。

输入: '如何规避风险'
匹配: 'search_document: 多头市场' (Distance: 0.2499)
标准化 -> **Bull Market** (Bull Market)
定义: 金融市场价格上涨或预期会上涨的市场状况。
