❗不能直接用于继续训练（参数结构不同），但可以“迁移编码器部分”实现 “迁移式微调”（transfer learning）。

In [1]:
from transformers import BertForSequenceClassification, BertTokenizer, Trainer, TrainingArguments
from datasets import load_dataset
import torch
from torch import nn
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
import matplotlib.pyplot as plt
from transformers import EarlyStoppingCallback



In [2]:
# 1. 标签映射
five_class_mapping = {
    'anger': 0, 'disgust': 0, 'fear': 0, 'sadness': 0,
    'annoyance': 1, 'disappointment': 1,
    'neutral': 2,
    'approval': 3, 'joy': 3, 'love': 3, 'optimism': 3,
    'admiration': 4, 'excitement': 4, 'gratitude': 4, 'pride': 4, 'relief': 4
}
id2label = {v: k for k, v in {
    "very negative": 0,
    "negative": 1,
    "neutral": 2,
    "positive": 3,
    "very positive": 4
}.items()}

In [3]:


# 加载GoEmotions数据集
dataset = load_dataset("go_emotions")

# 加载已保存的BERT模型和Tokenizer
model_path = "./go_emotions_model_1"  # 已训练的GoEmotions模型路径
tokenizer = BertTokenizer.from_pretrained(model_path)
model = BertForSequenceClassification.from_pretrained(model_path, num_labels=5)


In [4]:
# 主任务：五分类情感标签；辅助任务：情绪强度（label数量相关性）
def map_label(example):
    label_ids = example['labels']
    emotions = [dataset['train'].features['labels'].feature.names[i] for i in label_ids]
    for emo in emotions:
        if emo in five_class_mapping:
            print(f"Mapping for {emo}: {five_class_mapping[emo]}")
            return {
                "label": five_class_mapping[emo],
                "intensity": len(label_ids) / 5  # 简化情绪强度的构造
            }
    return {"label": 2, "intensity": 0.2}  # 默认neutral

dataset = dataset.map(map_label)

In [None]:
# print("训练集样本数:", dataset['train']['label'])



In [6]:
from transformers import BertTokenizer

# 加载 tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-chinese")

# 情感替换增强
replacement_dict = {
    "bad": "terrible", "good": "excellent", "love": "adore", "hate": "despise",
    "happy": "joyful", "sad": "depressed", "angry": "furious"
}

def augment_text(example):
    words = example['text'].split()
    new_words = []
    for w in words:
        lower_w = w.lower()
        if lower_w in replacement_dict:
            replacement = replacement_dict[lower_w]
            if w.istitle():
                replacement = replacement.capitalize()
            elif w.isupper():
                replacement = replacement.upper()
            new_words.append(replacement)
        else:
            new_words.append(w)
    example['text'] = " ".join(new_words)
    return example

# 情感替换增强
dataset['train'] = dataset['train'].map(augment_text)

# 编码函数
def encode_examples(examples):
    return tokenizer(
        examples["text"], 
        padding=True, 
        truncation=True, 
        return_tensors="pt", 
        max_length=512
    )

# # 标签处理函数
# def ensure_correct_label_format(example):
#     if isinstance(example['label'], list):
#         # 如果标签是列表，则取第一个元素
#         example['label'] = example['label'][0]
#     return example

# # 确保标签格式正确
# dataset['train'] = dataset['train'].map(ensure_correct_label_format)

def check_labels(dataset):
    print(dataset['train']['label'][:10])  # 检查前 10 个标签项

# def check_data_structure(examples):
#     print(f"Input: {examples['text'][:5]}")  # 仅打印前 5 个文本
#     print(f"Label: {examples['label']}")  # 打印标签，直接作为一个整数
#     return examples

# # 调用修改后的函数
# dataset['train'] = dataset['train'].map(check_data_structure)


# Tokenize
def tokenize_function(examples):
    return tokenizer(
        examples['text'], 
        padding=True, 
        truncation=True, 
        max_length=128
    )

# 批量 Tokenize
dataset = dataset.map(tokenize_function, batched=True)

# 设置数据格式
# dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

In [7]:
from sklearn.metrics import mean_squared_error

# 定义多任务模型（情绪分类 + 情绪强度预测）
class MultiTaskModel(nn.Module):
    def __init__(self, base_model):
        super(MultiTaskModel, self).__init__()
        self.bert = base_model.bert  # 使用已经训练过的BERT encoder
        self.classifier = base_model.classifier  # 情绪分类头
        # 情绪强度预测的任务头
        self.intensity_head = nn.Linear(base_model.config.hidden_size, 1)  # 强度预测任务

    def forward(self, input_ids, attention_mask, labels=None, intensity_labels=None):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        logits = self.classifier(outputs[1])  # 分类任务
        intensity_logits = self.intensity_head(outputs[1])  # 强度预测任务

        loss = None
        if labels is not None and intensity_labels is not None:
            # 计算分类任务损失
            classification_loss = nn.CrossEntropyLoss()(logits, labels)
            # 计算强度预测任务损失
            intensity_loss = nn.MSELoss()(intensity_logits.squeeze(), intensity_labels)
            loss = classification_loss + intensity_loss
        elif labels is not None:
            classification_loss = nn.CrossEntropyLoss()(logits, labels)
            loss = classification_loss
        elif intensity_labels is not None:
            intensity_loss = nn.MSELoss()(intensity_logits.squeeze(), intensity_labels)
            loss = intensity_loss

        # 返回损失、分类预测和强度预测
        return {"loss": loss, "logits": logits, "intensity_logits": intensity_logits}

# 加载并修改为多任务模型
multi_task_model = MultiTaskModel(model)

# ========== 3. Tokenize ==========

def tokenize_function(examples):
    return tokenizer(examples['text'], padding="max_length", truncation=True, max_length=512)

dataset = dataset.map(tokenize_function, batched=True)

# 设置格式供 PyTorch Trainer 使用，包括标签
dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label", "intensity"])



In [None]:
# from typing import Dict
# import evaluate
# from transformers.trainer_utils import EvalPrediction

# # 加载评估指标
# metric = evaluate.load("accuracy")

# def compute_metrics(pred:EvalPrediction) -> Dict[str, float]:
#     # 预测输出
#     # logits_cls, preds_intensity = pred.predictions  # 预测结果是一个 tuple
#     # labels_cls = pred.label_ids[:, 0]  # 第0列是分类标签
#     # labels_intensity = pred.label_ids[:, 1]  # 第1列是强度标签

#     # preds_cls = np.argmax(logits_cls, axis=1)

#     # # acc = accuracy_score(labels_cls, preds_cls)
#     # # f1 = f1_score(labels_cls, preds_cls, average="weighted")
#     # # mse = mean_squared_error(labels_intensity, preds_intensity)
#     # print(pred)
#     # return {
#     #     "eval_accuracy": metric.compute(predictions=preds_cls, references=labels_cls)["accuracy"],
#     #     "eval_f1": f1_score(labels_cls, preds_cls, average="weighted"),
#     #     "eval_mse": mean_squared_error(labels_intensity, preds_intensity),
#     #     "eval_loss": pred.losses.mean(),
#     #     # "eval_accuracy": acc,
#     #     # "accuracy": acc,
#     #     # "f1": f1,
#     #     # "mse": mse,
#     # }
#     raise "omg"

In [9]:
# from transformers.trainer_utils import PredictionOutput
# import numpy as np
# from sklearn.metrics import accuracy_score, f1_score, mean_squared_error

# def compute_metrics(pred):
#     # 预测输出是一个包含两个部分的数组：分类和强度预测
#     logits_cls = pred.predictions[:, 0]  # 分类预测结果
#     preds_intensity = pred.predictions[:, 1]  # 强度预测结果
    
#     # 获取分类标签
#     labels_cls = pred.label_ids[:, 0]  # 分类标签
#     labels_intensity = pred.label_ids[:, 1]  # 强度标签

#     # 如果 logits_cls 是一维数组，直接计算 argmax
#     if logits_cls.ndim == 1:
#         pred_cls = np.argmax(logits_cls)  # 不需要指定 axis
#     else:
#         pred_cls = np.argmax(logits_cls, axis=1)  # 分类预测

#     # 计算分类准确率
#     acc = accuracy_score(labels_cls, pred_cls)
#     f1 = f1_score(labels_cls, pred_cls, average="weighted")

#     # 计算强度预测的均方误差
#     mse = mean_squared_error(labels_intensity, preds_intensity)

#     return {
#         "accuracy": acc,
#         "f1": f1,
#         "mse": mse,  # 添加情绪强度的均方误差
#     }




In [10]:
# from transformers.trainer_utils import PredictionOutput
# # 测试 compute_metrics 是否正常
# print(compute_metrics(PredictionOutput(
#     predictions=np.array([[0.1, 0.9], [0.2, 0.8]]),  # 假设有两个预测值，一个是分类结果，一个是强度
#     label_ids=np.array([[1, 5], [0, 3]]),  # 假设有两个标签，一个是分类标签，一个是强度标签
#     metrics={}  # 空的 metrics 参数
# )))


In [11]:
# def compute_metrics(pred):
#     labels = pred.label_ids
#     preds = np.argmax(pred.predictions, axis=1)
#     acc = accuracy_score(labels, preds)
#     f1 = f1_score(labels, preds, average="weighted")
#     return {"accuracy": acc, "f1": f1}


In [None]:
# dataset['validation'] = dataset['validation'].map(
#     lambda x: tokenizer(x['text'], padding=True, truncation=True, max_length=512), batched=True
# )
# dataset['validation'].dic
def tokenize_function(examples):
    return tokenizer(examples['text'], padding="max_length", truncation=True, max_length=512)

dataset = dataset.map(tokenize_function, batched=True)

# 设置格式供 PyTorch Trainer 使用
dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])

# isinstance(dataset['validation'].map(
#     lambda x: tokenizer(x['text'], padding=True, truncation=True, max_length=512), batched=True
# ), dict)

In [None]:

from typing import Dict
import evaluate
from transformers.trainer_utils import EvalPrediction

# 加载评估指标
metric = evaluate.load("accuracy")

def compute_metrics(pred:EvalPrediction) -> Dict[str, float]:
    print(pred)
    # 预测输出
    logits_cls, preds_intensity = pred.predictions  # 预测结果是一个 tuple
    labels_cls = pred.label_ids[:, 0]  # 第0列是分类标签
    labels_intensity = pred.label_ids[:, 1]  # 第1列是强度标签
    
    preds_cls = np.argmax(logits_cls, axis=1)
    
    # 计算分类准确率
    acc = accuracy_score(labels_cls, preds_cls)
    f1 = f1_score(labels_cls, preds_cls, average="weighted")
    mse = mean_squared_error(labels_intensity, preds_intensity)
    return {
        "eval_accuracy": acc,
        "f1": f1,
        "mse": mse,  # 添加情绪强度的均方误差
    }
    

# ========== 5. 训练参数 ==========

training_args = TrainingArguments(
    output_dir='./results',          # 输出目录
    num_train_epochs=3,              # 训练轮数
    per_device_train_batch_size=1,#8,   # 每设备的批次大小
    per_device_eval_batch_size=1,#16,   # 每设备的评估批次大小
    logging_dir='./logs',            # 日志目录
    logging_steps=10,
    save_steps=100,
    load_best_model_at_end=True,     # 在训练结束时加载最佳模型
    eval_strategy= "steps",          # ✅ 新增：评估策略
    eval_on_start=True,             # ✅ 新增：在训练开始时评估模型
    eval_delay=10,
    eval_steps=10,
    metric_for_best_model="eval_accuracy",
    # eval_on_start=True,             # ✅ 新增：在训练开始时评估模型
    # evaluation_strategy="epoch",          # ✅ 新增：匹配 save_strategy
    # batch_eval_metrics=["accuracy", "f1"],  # ✅ 新增：评估指标
    # batch_eval_metrics
    greater_is_better=True,          # F1越高越好
    save_strategy="steps", 
    weight_decay=0.01,               # 权重衰减
    warmup_steps=500,                # 预热步数
    fp16=True,                       # 启用混合精度训练
    max_steps=50,                  # 最大训练步数
)

# 使用Trainer类进行训练
trainer = Trainer(
    model=multi_task_model,                      # 预训练模型
    args=training_args,                         # 训练参数
    train_dataset=dataset['train'],             # 训练数据集
    eval_dataset=dataset['validation'].select(range(100)),        # 验证数据集
    tokenizer=tokenizer,                        # tokenizer
    compute_metrics=compute_metrics,            # 自定义评估指标
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)],  # 早停回调
)

# 训练模型
trainer.train()

# ========== 6. 训练结果可视化 ==========

# 绘制训练过程中的损失曲线
history = trainer.state.log_history
train_losses = [entry['loss'] for entry in history if 'loss' in entry]
eval_losses = [entry['eval_loss'] for entry in history if 'eval_loss' in entry]
epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(10, 5))
plt.plot(epochs, train_losses, label="Train Loss", marker='o')
plt.plot(epochs, eval_losses, label="Eval Loss", marker='x')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training and Evaluation Loss")
plt.legend()
plt.show()














