# 优化版中文诈骗场景 NER 微调训练脚本
本Notebook用于加载`data/ner_training_data.csv`，并用BERT进行命名实体识别微调。

In [ ]:
import pandas as pd
import numpy as np
import os
import torch
from transformers import BertTokenizerFast, BertForTokenClassification, Trainer, TrainingArguments
from datasets import Dataset
from sklearn.model_selection import train_test_split
import logging
from collections import Counter

# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
logger.info(f'使用设备: {device}')

In [ ]:
# 1. 加载数据
try:
    # 读取CSV文件，将空值替换为空字符串
    df = pd.read_csv('data/ner_training_data.csv', encoding='utf-8')
    
    # 检查并处理缺失值
    if df['labels'].isnull().any():
        logger.warning(f'发现 {df["labels"].isnull().sum()} 个缺失值，将被替换为空字符串')
        df['labels'] = df['labels'].fillna('')
    
    # 数据预处理
    df['labels'] = df['labels'].apply(lambda x: x.split() if isinstance(x, str) else [])
    
    # 移除空标签的行
    df = df[df['labels'].apply(len) > 0]
    
    logger.info(f'成功加载数据，共 {len(df)} 条有效样本')
    
    # 显示数据样例
    print('\n数据样例：')
    print(df.head())
    
    # 显示标签分布
    all_labels = [label for labels in df['labels'] for label in labels]
    label_counts = pd.Series(all_labels).value_counts()
    print('\n标签分布：')
    print(label_counts)
    
    # 计算类别权重
    label_weights = {label: 1.0 / count for label, count in label_counts.items()}
    max_weight = max(label_weights.values())
    label_weights = {label: weight/max_weight for label, weight in label_weights.items()}
    
    print('\n类别权重：')
    for label, weight in label_weights.items():
        print(f'{label}: {weight:.2f}')
    
except Exception as e:
    logger.error(f'加载数据失败: {str(e)}')

In [ ]:
# 2. 定义标签列表
label_list = ["O", "B-MONEY", "I-MONEY", "B-ACCOUNT", "I-ACCOUNT", "B-LINK", "I-LINK", 
             "B-TIME", "I-TIME", "B-LOC", "I-LOC", "B-PHONE", "I-PHONE", "B-NAME", "I-NAME"]
label2id = {l: i for i, l in enumerate(label_list)}
id2label = {i: l for i, l in enumerate(label_list)}

# 3. 标签转为id
df['label_ids'] = df['labels'].apply(lambda x: [label2id[l] for l in x])

# 4. 划分训练/验证集
train_df, eval_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['labels'].apply(lambda x: x[0]))
train_dataset = Dataset.from_pandas(train_df)
eval_dataset = Dataset.from_pandas(eval_df)

logger.info(f'训练集大小: {len(train_dataset)}, 验证集大小: {len(eval_dataset)}')

In [ ]:
# 5. 分词器
tokenizer = BertTokenizerFast.from_pretrained('bert-base-chinese')

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples['text'], 
        truncation=True, 
        padding='max_length', 
        max_length=128,  # 增加最大长度
        is_split_into_words=False
    )
    
    labels = []
    for i, label in enumerate(examples['label_ids']):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            else:
                label_ids.append(label[word_idx])
        labels.append(label_ids)
    
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# 处理数据集
train_dataset = train_dataset.map(tokenize_and_align_labels, batched=True)
eval_dataset = eval_dataset.map(tokenize_and_align_labels, batched=True)

logger.info('数据集处理完成')

In [ ]:
# 6. 定义模型
model = BertForTokenClassification.from_pretrained(
    'bert-base-chinese', 
    num_labels=len(label_list), 
    id2label=id2label, 
    label2id=label2id
)
model.to(device)

# 7. 训练参数
training_args = TrainingArguments(
    output_dir='./bert_ner_antifraud',
    num_train_epochs=15,  # 增加训练轮数
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    evaluation_strategy='epoch',
    save_strategy='epoch',
    logging_dir='./logs',
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model='eval_f1',  # 使用F1作为最佳模型指标
    greater_is_better=True,
    warmup_steps=500,
    weight_decay=0.01,
    learning_rate=2e-5,
    # 添加类别权重
    label_smoothing_factor=0.1  # 添加标签平滑
)

In [ ]:
def compute_metrics(eval_pred):
    from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    
    # 只计算非-100的标签
    true_labels = []
    true_preds = []
    for pred, label in zip(predictions, labels):
        for p, l in zip(pred, label):
            if l != -100:
                true_labels.append(l)
                true_preds.append(p)
    
    # 计算多个指标
    metrics = {
        'accuracy': accuracy_score(true_labels, true_preds),
        'f1': f1_score(true_labels, true_preds, average='macro'),
        'precision': precision_score(true_labels, true_preds, average='macro'),
        'recall': recall_score(true_labels, true_preds, average='macro')
    }
    
    # 添加每个类别的F1分数
    for label in label_list:
        label_id = label2id[label]
        metrics[f'f1_{label}'] = f1_score(
            [1 if l == label_id else 0 for l in true_labels],
            [1 if p == label_id else 0 for p in true_preds],
            average='binary'
        )
    
    return metrics

In [ ]:
# 8. 训练模型
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    compute_metrics=compute_metrics
)

logger.info('开始训练...')
trainer.train()

# 保存模型和分词器
output_dir = './bert_ner_antifraud'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
logger.info(f'模型已保存到 {output_dir}')

In [ ]:
# 9. 测试模型
def predict_entities(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
        predictions = torch.argmax(outputs.logits, dim=-1)
    
    tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
    pred_labels = [id2label[p.item()] for p in predictions[0]]
    
    # 提取实体
    entities = []
    current_entity = {"text": "", "type": "", "start": 0}
    
    for i, (token, label) in enumerate(zip(tokens, pred_labels)):
        if token in ["[CLS]", "[SEP]", "[PAD]"]:
            continue
            
        if label.startswith("B-"):
            if current_entity["text"]:
                entities.append(current_entity)
            current_entity = {
                "text": token,
                "type": label[2:],
                "start": i
            }
        elif label.startswith("I-"):
            if current_entity["text"]:
                current_entity["text"] += token
        else:
            if current_entity["text"]:
                entities.append(current_entity)
                current_entity = {"text": "", "type": "", "start": 0}
    
    if current_entity["text"]:
        entities.append(current_entity)
        
    return entities

# 测试样例
test_texts = [
    "恭喜您中奖了50000元，请点击http://prize.com领取奖金",
    "中奖信息已发放，请联系李经理，电话：13912345678",
    "您是本月幸运用户，奖金将汇入账号6222000000000000"
]

for text in test_texts:
    entities = predict_entities(text)
    print(f'\n测试文本: {text}')
    print('识别出的实体:')
    for entity in entities:
        print(f"类型: {entity['type']}, 文本: {entity['text']}")