本实验使用Qwen2.5-1.5B模型 测试
仅用于学习不用于生成最终模型 - 因此数据量也较低

### 1. 数据处理 - Tokenizer
#### 1.1 加载数据 - Dataset

In [None]:
import pandas as pd
from datasets import Dataset
dev = pd.read_csv("data/dev.csv")[:50] # 仅前50条数据用于测试
dev_ds = Dataset.from_pandas(dev)

{'text': ['投活络效灵丹加味：当归、丹参各１５ｇ，生乳香、生没药各６ｇ，柴胡１２ｇ，白芍、黄芩、大黄各１０ｇ，蒲公英３０ｇ，甘草５ｇ',
  '目的补气健脾升清法治疗糖尿病性功能性消化不良的临床效果',
  '结论温脾散穴位敷贴联合理中复元方可改善脾虚痰瘀型慢性萎缩性胃炎患者临床症状（尤其是胃窦大弯侧、胃体小弯侧萎缩），值得推广应用',
  '完带汤治疗肠道易激综合征６０例',
  '结论：辨证针刺治疗ＦＤ能明显改善患者的生活质量，为治疗ＦＤ的有效方法'],
 'label': ["{'活络效灵丹': '方剂', '当归': '中药', '丹参': '中药', '生乳香': '中药', '生没药': '中药', '柴胡': '中药', '黄芩': '中药', '大黄': '中药', '蒲公英': '中药', '甘草': '中药'}",
  "{'糖尿病性功能性消化不良': '西医诊断'}",
  "{'温脾散穴位敷贴': '中医治疗', '脾虚痰瘀': '中医证候', '慢性萎缩性胃炎': '西医诊断'}",
  "{'完带汤': '方剂', '肠道易激综合征': '西医诊断'}",
  "{'辨证针刺': '中医治疗'}"]}

#### 1.2 Tokenization
+ 加载tokenizer 
+ 定义process function：prompt方程
+ 处理dataset为 [input_id, attention_mask,labels]

In [2]:
from modelscope import AutoTokenizer
model_dir ="/Users/luyi/PythonProjects/Atomy/03LLM微调/命名实体微调实战/qwen/Qwen2-1___5B-Instruct" # 定义本地路径
tokenizer = AutoTokenizer.from_pretrained(model_dir, use_fast=False, trust_remote_code=True,padding_side='left') # 加载tokenizer

In [3]:
# 定义process function:
def process_func(example):
    MAX_LENGTH = 200
    
    instruction = """你是一个文本实体识别领域的医学专家，你需要从给定的句子中提取中医诊断,中药,中医治疗, 方剂, 西医治疗, 西医诊断 '其他治疗'等. 以 json 格式输出, 如 {'口苦': '临床表现','肺结核': '西医诊断'} 注意: 1. 输出的每一行都必须是正确的json字符串. 2.找不到任何实体时, 输出"没有找到任何实体"."""
    instructions_messages= [
        {"role": "system", "content": f"{instruction}"},
        {"role": "user", "content": f"{example['text']}"}
    ]

    instructions_chatTamplate = tokenizer.apply_chat_template(instructions_messages, tokenize=True, add_generation_prompt=False,return_dict=True)

    input_ids = instructions_chatTamplate['input_ids']
    attention_mask = instructions_chatTamplate['attention_mask']
    
    # 限制最大长度做截断处理
    if len(input_ids) > MAX_LENGTH:
        input_ids = input_ids[:MAX_LENGTH]
        attention_mask = attention_mask[:MAX_LENGTH]
    else:
        pad_len = MAX_LENGTH - len(input_ids)
        input_ids = [tokenizer.pad_token_id] * pad_len + input_ids
        attention_mask = [0] * pad_len + attention_mask
    
    return {"input_ids": input_ids, "attention_mask": attention_mask}  

In [4]:
dev_dataset = dev_ds.map(process_func, remove_columns=dev_ds.column_names,num_proc=4) 
# 删除column_names 保证简洁性
# 多线程处理数据 加速数据处理速度

Map (num_proc=4):   0%|          | 0/50 [00:00<?, ? examples/s]

### 2. 验证模型
#### 2.1 加载原始模型

In [5]:
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(model_dir)

#### 2.2 加载Lora参数

In [6]:
from peft import PeftModel
# 加载LoRA权重（轻量化推理）
peft_model = PeftModel.from_pretrained(model, "./lora_weights_qwen2.5", inference_mode=True)
peft_model.print_trainable_parameters()

trainable params: 0 || all params: 1,552,946,688 || trainable%: 0.0000


In [7]:
import torch
def predict(dataset, model, tokenizer):
    """
    修正后的预测函数：支持批量/单条样本，处理维度问题，优化切片逻辑
    :param dataset: 处理后的Dataset（如dev_ds_processed[:5]）
    :param model: 加载的LoRA/Qwen2模型
    :param tokenizer: 初始化后的tokenizer
    :return: 模型生成的实体识别结果列表
    """
    # 步骤1：提取数据并转换为二维张量（适配批量输入）
    input_ids = torch.tensor(dataset['input_ids']).to(model.device)
    attention_mask = torch.tensor(dataset['attention_mask']).to(model.device)
    
    # 步骤2：模型生成（禁用梯度计算，节省显存）
    with torch.no_grad():
        generated_ids = model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            max_new_tokens=500,
            temperature=0.7,  # 降低随机性，让JSON输出更稳定
            top_p=0.9,
            pad_token_id=tokenizer.pad_token_id,  # 指定pad_token_id
            eos_token_id=tokenizer.eos_token_id   # 指定结束token
        )
    
    # 步骤3：修正切片逻辑——去掉输入部分，仅保留模型生成的内容
    # input_ids.shape[1]是单条样本的序列长度（MAX_LENGTH）
    input_seq_len = input_ids.shape[1]
    generated_ids = generated_ids[:, input_seq_len:]  # 批量切片
    
    # 步骤4：解码结果（跳过特殊token，得到纯净文本）
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)    
    # # 打印结果
    # for idx, res in enumerate(response):
    #     print(f"样本{idx+1}生成结果：\n{res}\n" + "-"*80)
    
    return response

In [9]:
response_base = predict(dev_dataset, model, tokenizer)
response_lora = predict(dev_dataset, peft_model, tokenizer)

In [14]:
import re,ast
def f1_helper(response):
    dataset = {}
    for idx,res in enumerate(response):
        if idx == 16:
            pass
        pattern = r"\{[\s\S]*?\}"  # 匹配JSON大括号片段
        res = re.search(pattern, res)
        res = res.group() if res else "{}"
        label = dev['label'][idx]
        # print(res,label)
        res = ast.literal_eval(res)
        label = ast.literal_eval(label) # k 是 '腹痛'症状 v 是 '临床表现'
        # 筛选key
        samekeys=set()
        diffkeys1=set()
        diffkeys2=set()
        for k in label:
            if k in res:
                samekeys.add(k)
            else:
                diffkeys1.add(k)
        for k in res:
            if k not in samekeys:
                diffkeys2.add(k) 
        
        # 处理相同的
        for k in samekeys:
            v1 = label[k]
            v2 = res[k]
            if v1 not in dataset:
                dataset[v1] = {"FN":0,"FP":0,"TP":0} 
            if v1 == v2:
                dataset[v1]["TP"]+=1
            elif v1!=v2:
                dataset[v1]["FP"]+=1
                if v2 in dataset:
                    dataset[v2]["FN"]+=1
        for k in diffkeys1:
            v1 = label[k]
            if v1 not in dataset:
                dataset[v1] = {"FN":0,"FP":0,"TP":0} 
            dataset[v1]["FP"]+=1
        for k in diffkeys2:
            v2 = res[k]
            if v2 in dataset:
                dataset[v2]["FN"]+=1
    return dataset

In [15]:
def get_f1_score(metric_dict: dict, key: str) -> float:
    """
    根据实体类型的TP/FP/FN字典，输入key计算对应F1分数
    :param metric_dict: 包含各实体类型TP/FP/FN的字典（如你提供的字典）
    :param key: 要查询的实体类型key（如'中药'/'西医诊断'）
    :return: 该类别的F1分数（保留4位小数）
    """
    # 检查key是否存在于字典中
    if key not in metric_dict:
        raise KeyError(f"Key '{key}' 不存在于字典中，可选key: {list(metric_dict.keys())}")
    
    # 提取该key对应的TP、FP、FN
    tp = metric_dict[key]['TP']
    fp = metric_dict[key]['FP']
    fn = metric_dict[key]['FN']
    
    # 计算精确率（处理分母为0的情况）
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    # 计算召回率（处理分母为0的情况）
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    # 计算F1分数（处理分母为0的情况）
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
    
    # 保留4位小数，便于阅读
    return round(f1, 4)

In [16]:
dataset_base = f1_helper(response_base)
# 示例1：查询'中药'的F1分数
f1_herb = get_f1_score(dataset_base, '中药')
print(f"中药的F1分数：{f1_herb}")

# 示例2：查询'西医诊断'的F1分数
f1_west_diag = get_f1_score(dataset_base, '西医诊断')
print(f"西医诊断的F1分数：{f1_west_diag}")

# 示例3：查询'临床表现'的F1分数
f1_symptom = get_f1_score(dataset_base, '临床表现')
print(f"临床表现的F1分数：{f1_symptom}")


dataset_lora = f1_helper(response_lora)
# 示例1：查询'中药'的F1分数
f1_herb = get_f1_score(dataset_lora, '中药')
print(f"中药的F1分数：{f1_herb}")

# 示例2：查询'西医诊断'的F1分数
f1_west_diag = get_f1_score(dataset_lora, '西医诊断')
print(f"西医诊断的F1分数：{f1_west_diag}")

# 示例3：查询'临床表现'的F1分数
f1_symptom = get_f1_score(dataset_lora, '临床表现')
print(f"临床表现的F1分数：{f1_symptom}")

中药的F1分数：0.8496
西医诊断的F1分数：0.7037
临床表现的F1分数：0.6667
中药的F1分数：0.5301
西医诊断的F1分数：0.5769
临床表现的F1分数：0.3704


In [17]:
dataset_base

{'中药': {'FN': 9, 'FP': 8, 'TP': 48},
 '方剂': {'FN': 10, 'FP': 4, 'TP': 3},
 '西医诊断': {'FN': 6, 'FP': 10, 'TP': 19},
 '中医证候': {'FN': 4, 'FP': 4, 'TP': 3},
 '中医治疗': {'FN': 2, 'FP': 3, 'TP': 3},
 '其他治疗': {'FN': 3, 'FP': 2, 'TP': 0},
 '临床表现': {'FN': 5, 'FP': 4, 'TP': 9},
 '中医治则': {'FN': 1, 'FP': 3, 'TP': 3},
 '西医治疗': {'FN': 1, 'FP': 2, 'TP': 2}}

In [18]:
dataset_lora

{'中药': {'FN': 5, 'FP': 34, 'TP': 22},
 '方剂': {'FN': 7, 'FP': 4, 'TP': 3},
 '西医诊断': {'FN': 8, 'FP': 14, 'TP': 15},
 '中医证候': {'FN': 4, 'FP': 4, 'TP': 3},
 '中医治疗': {'FN': 4, 'FP': 5, 'TP': 1},
 '其他治疗': {'FN': 0, 'FP': 1, 'TP': 1},
 '临床表现': {'FN': 9, 'FP': 8, 'TP': 5},
 '中医治则': {'FN': 5, 'FP': 3, 'TP': 3},
 '西医治疗': {'FN': 3, 'FP': 2, 'TP': 2}}