In [None]:
import json
import numpy as np
from sentence_transformers import SentenceTransformer, InputExample, losses, evaluation
from torch.utils.data import DataLoader
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

# 1. 数据准备
def load_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        data = [json.loads(line.strip()) for line in f]
    return data

# 2. 微调模型
def fine_tune_model(base_model, train_data, output_path, num_epochs=3, batch_size=16):
    #  创建训练数据
    train_examples = []
    for item in train_data:
        query = item['query']
        for pos in item["pos"]:
            train_examples.append(InputExample(texts=[query, pos], label=1))
        for neg in item["neg"]:
            train_examples.append(InputExample(texts=[query, neg], label=0))
    train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=batch_size)
    train_loss = losses.CosineSimilarityLoss(model=base_model)
    # 训练模型
    base_model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        epochs=num_epochs,
        warmup_steps=1000,
        output_path=output_path
    )
    return base_model

# 3. 评估函数
def evaluate_model(model, test_data, k=5):
    all_recalls = []
    all_avg_precisions = []
    similarity_scores = {
        "pos": [],
        "neg": []
    }
    for item in tqdm(test_data, desc="Evaluating"):
        query = item["query"]
        positives = item["pos"]  
        negatives = item["neg"]
        # 计算嵌入
        query_embedding = model.encode(query, convert_to_tensor=True)
        pos_embeddings = model.encode(positives, convert_to_tensor=True)
        neg_embeddings = model.encode(negatives, convert_to_tensor=True)
        # 计算相似度
        pos_similarity = cosine_similarity(
            query_embedding.cpu().numpy().reshape(1, -1),
            pos_embeddings.cpu().numpy()
        ).flatten()
        neg_similarity = cosine_similarity(
            query_embedding.cpu().numpy().reshape(1, -1),
            neg_embeddings.cpu().numpy()
        ).flatten()
        # 记录相似度分数
        similarity_scores["pos"].extend(pos_similarity)
        similarity_scores["neg"].extend(neg_similarity)
        # 计算Recall@k和平均精度
        all_scores = np.concatenate([pos_similarity, neg_similarity])
        all_labels = np.concatenate([
            np.ones(len(pos_similarity)), 
            np.zeros(len(neg_similarity))
        ])
        # 按相似度排序
        sorted_indices = np.argsort(-all_scores)
        sorted_labels = all_labels[sorted_indices]
        # 计算Recall@k
        
        recall_at_k = np.sum(sorted_labels[:k]) / len(pos_similarity)
        all_recalls.append(recall_at_k)
        # 计算平均精度
        precisions = []
        relevant_count = 0
        for i, label in enumerate(sorted_labels, 1):
            if label == 1:
                relevant_count += 1
                precisions.append(relevant_count / i)
        avg_precision = np.mean(precisions) if precisions else 0
        all_avg_precisions.append(avg_precision)
    # 计算整体指标
    mean_recall = np.mean(all_recalls)
    mean_avg_precision = np.mean(all_avg_precisions)
    return {
        "recall@k": mean_recall,
        "MAP:mean_avg_precision": mean_avg_precision,
        "similarity_scores": similarity_scores
    }

# 4. 可视化函数
def plot_similarity_distribution(baseline_scores, finetuned_scores, model_names):
    plt.figuire(figsize=(12, 6))
    for i, model_name in enumerate(model_names):
        plt.subplot(1, 2, i+1)
        sns.kdeplot(baseline_scores["pos"], label="Baseline Pos", color="blue", linestyle="--")
        sns.kdeplot(baseline_scores["neg"], label="Baseline Neg", color="red", linestyle="--")
        sns.kdeplot(finetuned_scores["pos"], label="Finetuned_scores Pos", color="blue")
        sns.kdeplot(finetuned_scores["neg"], label="Finetuned_scores Neg", color="red")
        plt.title(f"Similarity Distribution for {model_name}")
        plt.xlabel("Cosine Similarity")
        plt.ylabel("Density")
        plt.legend()
    plt.tight_layout()
    plt.show()

# 5. 主流程
def main():
    data_train = load_data("data_train.jsonl")
    data_test = load_data("data_test.jsonl")
    # 基线模型
    baseline_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
    # 评估基线模型
    print("Evaluating baseline model...")
    baseline_results = evaluate_model(baseline_model, data_test)
    # 微调模型
    print("Fine-tuning model...")
    finetuned_model = fine_tune_model(
        baseline_model,
        train_data,
        output_path="./model/qwen_finetuned_model"
    )
    # 评估微调模型
    print("Evaluating fine-tuned model...")
    finetuned_results = evaluate_model(finetuned_model, data_test)
    # 打印结果比较
    print("\n=== Evaluation Results ===")
    print(f"{"metric":<15} {"baseline":<15} {"finetuned":<15} {"improvement":<15}")
    print("-" * 60)
    for metric in ["recall@k", "map@k"]:
        base_val = baseline_results[metric]
        fine_val = finetuned_results[metric]
        improvement = (fine_val - base_val) / base_val * 100
        print(f"{metric:<15} {base_val:<15.4f} {fine_val:<15.4f} {improvement:<15.2f}%")
    # 可视化相似度分布
    plot_similarity_distribution(
        baseline_results["similarity_scores"],
        finetuned_results["similarity_scores"],
        model_names=["Baseline", "Fine-tuned"],
    )

            