## 1. 环境准备

In [None]:
# 导入必要的库
import os
import sys
import yaml
import json
import pandas as pd
from pathlib import Path
from dotenv import load_dotenv
from loguru import logger

# 添加项目路径
project_root = Path.cwd()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# 加载环境变量
load_dotenv()

print(f"项目根目录: {project_root}")
print(f"Python路径: {sys.executable}")

In [None]:
# 加载配置
with open('config.yaml', 'r', encoding='utf-8') as f:
    config = yaml.safe_load(f)

print("配置加载成功")
print(f"\n向量模型: {config['embedding']['model_name']}")
print(f"LLM提供商: {config['llm']['provider']}")
print(f"LLM模型: {config['llm']['model_name']}")

In [None]:
from src.data_loader import CMedQA2Loader

# 初始化数据加载器
loader = CMedQA2Loader(config)
print("数据加载器初始化完成")

In [None]:
# 加载问题数据
questions_df = loader.load_questions()
print(f"问题数据: {len(questions_df)} 条")
print("\n前5条问题:")
questions_df.head(10)

In [None]:
# 加载答案数据
answers_df = loader.load_answers()
print(f"答案数据: {len(answers_df)} 条")
print("\n前5条答案:")
answers_df.head()

In [None]:
# 加载训练集候选对
train_candidates = loader.load_candidates('train')
print(f"训练集候选对: {len(train_candidates)} 条")

# 统计正负样本
positive = sum(1 for _, _, label in train_candidates if label == 1)
negative = len(train_candidates) - positive
print(f"正样本: {positive}, 负样本: {negative}")
print(f"\n前5条候选对:")
for i, (q_id, a_id, label) in enumerate(train_candidates[:5]):
    print(f"{i+1}. Question ID: {q_id}, Answer ID: {a_id}, Label: {label}")

In [None]:
# 构建QA对
qa_pairs, seen_pairs = loader.build_qa_pairs('train')
print(f"训练集QA对: {len(qa_pairs)} 条")

# 查看示例
print("\n示例QA对:")
for i, qa in enumerate(qa_pairs[:3]):
    print(f"\n{i+1}. 问题: {qa['question']}")
    print(f"   答案: {qa['answer']}")

## 3. 文本分块测试

In [None]:
from src.text_splitter import ChineseTextSplitter

# 初始化分块器
splitter = ChineseTextSplitter(config)
print("文本分块器初始化完成")
print(f"分块大小: {config['chunking']['chunk_size']}")
print(f"重叠大小: {config['chunking']['chunk_overlap']}")

In [None]:
# 测试分块
# 取少量QA对进行测试
test_qa_pairs = qa_pairs[:100]

# 构建测试知识库
test_kb = []
for qa in test_qa_pairs:
    test_kb.append({
        'id': f"{qa['question_id']}_{qa['answer_id']}",
        'content': qa['combined_text'],
        'type': 'qa_pair',
        'question': qa['question'],
        'answer': qa['answer']
    })

# 分块
chunks = splitter.split_documents(test_kb)
print(f"\n生成文本块数: {len(chunks)} 个")

# 查看示例
print("\n示例文本块:")
for i, chunk in enumerate(chunks[:3]):
    print(f"\n块 {i+1}:")
    print(f"ID: {chunk['chunk_id']}")
    print(f"内容长度: {len(chunk['content'])} 字符")
    print(f"内容预览: {chunk['content'][:150]}...")

## 4. 向量检索测试

In [None]:
from src.vector_store import VectorStore, Retriever

# 初始化向量数据库
print("正在初始化向量数据库...")
vector_store = VectorStore(config)
print("向量数据库初始化完成")

In [None]:
# 构建向量索引（使用测试数据）
print("正在构建向量索引...")
print("注意: 这一步可能需要几分钟时间")

vector_store.build_index(chunks)
print("向量索引构建完成")
print(f"索引维度: {vector_store.dimension}")
print(f"文档数量: {len(vector_store.documents)}")

In [None]:
# 测试检索
retriever = Retriever(vector_store, config)

# 测试查询
test_queries = [
    "感冒了怎么办？",
    "发烧需要吃什么药？",
    "咳嗽有痰怎么治疗？"
]

for query in test_queries:
    print(f"\n{'='*60}")
    print(f"查询: {query}")
    print('='*60)
    
    results, contexts = retriever.retrieve(query, top_k=3)
    
    print(f"\n检索到 {len(results)} 条结果:\n")
    for i, result in enumerate(results, 1):
        print(f"结果 {i} (相似度: {result['score']:.3f}):")
        print(f"{result['content'][:200]}...")
        print()

## 5. LLM生成测试

In [None]:
from src.llm import LLMFactory

# 创建LLM实例
print("正在初始化LLM...")
print(f"LLM提供商: {config['llm']['provider']}")

try:
    llm = LLMFactory.create_llm(config)
    print("LLM初始化成功")
except Exception as e:
    print(f"LLM初始化失败: {e}")
    print("\n提示: 请检查:")
    print("1. 如果使用通义千问，请确保设置了DASHSCOPE_API_KEY环境变量")
    print("2. 如果使用本地模型，请确保model_path配置正确")
    print("3. 如果使用vLLM，请确保vLLM服务器已启动")
    llm = None

In [None]:
# 测试LLM生成
if llm:
    test_prompt = "什么是感冒？请简要说明。"
    
    print(f"测试提示: {test_prompt}")
    print("\n生成中...\n")
    
    response = llm.generate(test_prompt)
    print(f"回答: {response}")
else:
    print("跳过LLM测试（LLM未初始化）")

## 6. 完整RAG流程测试

In [None]:
from src.rag_system import MedicalRAGSystem

# 初始化RAG系统（使用已构建的向量数据库和LLM）
print("正在初始化RAG系统...")

try:
    rag_system = MedicalRAGSystem(config, vector_store=vector_store, llm=llm)
    print("RAG系统初始化成功")
except Exception as e:
    print(f"RAG系统初始化失败: {e}")
    rag_system = None

In [None]:
# 测试单次查询
if rag_system:
    test_question = "感冒了怎么办？"
    
    print(f"问题: {test_question}")
    print("\n正在查询...\n")
    
    result = rag_system.query(test_question, use_history=False)
    
    print(f"\n回答:\n{result['answer']}")
    print(f"\n引用来源数: {result['num_sources']}")
    
    if result['sources']:
        print("\n相关来源:")
        for i, source in enumerate(result['sources'][:3], 1):
            print(f"\n[{i}] (相似度: {source['score']:.3f})")
            print(source['content'][:200] + "...")
else:
    print("跳过RAG测试（RAG系统未初始化）")

In [None]:
# 测试多轮对话
if rag_system:
    print("测试多轮对话\n")
    print("="*60)
    
    # 清空历史
    rag_system.clear_history()
    
    # 第一轮
    q1 = "什么是感冒？"
    print(f"\n用户: {q1}")
    r1 = rag_system.query(q1, use_history=True)
    print(f"助手: {r1['answer'][:200]}...")
    
    # 第二轮
    q2 = "应该怎么治疗？"  # 基于上下文的问题
    print(f"\n用户: {q2}")
    r2 = rag_system.query(q2, use_history=True)
    print(f"助手: {r2['answer'][:200]}...")
    
    # 第三轮
    q3 = "需要注意什么？"  # 继续基于上下文
    print(f"\n用户: {q3}")
    r3 = rag_system.query(q3, use_history=True)
    print(f"助手: {r3['answer'][:200]}...")
    
    print("\n" + "="*60)
    print(f"对话历史长度: {len(rag_system.get_history())} 轮")
else:
    print("跳过多轮对话测试（RAG系统未初始化）")

## 7. 性能分析

In [None]:
# 检索性能分析
import time
import numpy as np

if vector_store and vector_store.index:
    print("检索性能测试\n")
    
    test_queries = [
        "感冒怎么办",
        "发烧吃什么药",
        "咳嗽治疗方法",
        "头痛是什么原因",
        "高血压注意事项"
    ]
    
    retrieval_times = []
    
    for query in test_queries:
        start_time = time.time()
        results = vector_store.search(query, top_k=5)
        end_time = time.time()
        
        retrieval_time = (end_time - start_time) * 1000  # 转换为毫秒
        retrieval_times.append(retrieval_time)
        print(f"查询: {query:20s} | 时间: {retrieval_time:.2f}ms | 结果数: {len(results)}")
    
    print(f"\n平均检索时间: {np.mean(retrieval_times):.2f}ms")
    print(f"最快检索时间: {np.min(retrieval_times):.2f}ms")
    print(f"最慢检索时间: {np.max(retrieval_times):.2f}ms")
else:
    print("跳过性能测试（向量数据库未初始化）")

## 8. 保存测试结果

In [None]:
# 保存小规模向量数据库（可选）
if vector_store and vector_store.index:
    save_dir = "data/vector_store_test"
    vector_store.save(save_dir)
    print(f"测试向量数据库已保存到: {save_dir}")
else:
    print("没有向量数据库可保存")

## 9. 调试工具函数

In [None]:
def debug_query(question: str, rag_system, show_details=True):
    """
    调试单个查询，显示详细信息
    
    Args:
        question: 问题
        rag_system: RAG系统实例
        show_details: 是否显示详细信息
    """
    print(f"\n{'='*80}")
    print(f"问题: {question}")
    print('='*80)
    
    result = rag_system.query(question, use_history=False)
    
    if show_details:
        print(f"\n检索到的文档数: {result['num_sources']}")
        print("\n检索结果:")
        for i, source in enumerate(result['sources'], 1):
            print(f"\n[{i}] 相似度: {source['score']:.4f}")
            print(f"内容: {source['content'][:300]}...")
    
    print(f"\n最终回答:\n{result['answer']}")
    print('='*80)
    
    return result

# 使用示例
if rag_system:
    result = debug_query("感冒发烧怎么办？", rag_system)
else:
    print("RAG系统未初始化，无法运行调试函数")