In [1]:
import os
# 设置环境变量，只让程序看到 GPU 2
os.environ['CUDA_VISIBLE_DEVICES'] = '1'


import torch
import torch.nn as nn
import wandb
import random
import argparse
import numpy as np
from tqdm import tqdm
from transformers import BertModel, AutoModel
from transformers import AdamW

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  from .autonotebook import tqdm as notebook_tqdm


# 数据处理函数  测试集

In [2]:
from torch.utils.data import Dataset, DataLoader
import json
import torch
from transformers import AutoTokenizer

class TestBAE2025Dataset(Dataset):
    def __init__(
            self,
            data_path,
            label_type="Actionability"  # 预测的标签类型
    ):
        self.data_path = data_path
        self.label_type = label_type
        self._get_data()
        
    def _get_data(self):
        with open(self.data_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
            
        self.data = []
        self.sample_info = []  # 存储样本ID和tutor信息，用于结果映射
        
        for item in data:
            conversation_id = item['conversation_id']
            conversation_history = item['conversation_history']
            
            # 处理每个tutor的回复
            for tutor_key, tutor_data in item['tutor_responses'].items():
                tutor_response = tutor_data['response']
                # 存储句子对
                self.data.append((conversation_history, tutor_response))
                # 存储样本信息，用于后续预测结果的映射
                self.sample_info.append({
                    'conversation_id': conversation_id,
                    'tutor_key': tutor_key
                })
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.sample_info[idx]

# 数据加载函数  测试集

In [3]:
class TestBAE2025DataLoader:
    def __init__(
        self,
        dataset,
        batch_size=16,
        max_length=512,
        shuffle=False,  # 测试集不需要打乱顺序
        drop_last=False,  # 测试集不应丢弃最后一个批次
        device=None,
        tokenizer_name='microsoft/deberta-v3-base'
    ):
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
        self.tokenizer.truncation_side = 'left'  # 设置截断方向为左侧，保留句子结尾的部分
        print("当前使用的 tokenizer 类型：", type(self.tokenizer))
        
        self.dataset = dataset
        self.batch_size = batch_size
        self.max_length = max_length
        self.shuffle = shuffle
        self.drop_last = drop_last

        if device is None:
            self.device = torch.device(
                'cuda' if torch.cuda.is_available() else 'cpu'
            )
        else:
            self.device = device

        self.loader = DataLoader(
            dataset=self.dataset,
            batch_size=self.batch_size,
            collate_fn=self.collate_fn,
            shuffle=self.shuffle,
            drop_last=self.drop_last
        )

    def collate_fn(self, data):
        sents = [i[0] for i in data]
        sample_info = [i[1] for i in data]

        # 处理两个句子的情况
        encoded = self.tokenizer.batch_encode_plus(
            batch_text_or_text_pairs=[(sent[0], sent[1]) for sent in sents],
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt',
            return_length=True
        )
        
        input_ids = encoded['input_ids'].to(self.device)
        attention_mask = encoded['attention_mask'].to(self.device)
        
        # 返回编码后的输入、样本信息（没有标签）
        return input_ids, attention_mask, sample_info

    def __iter__(self):
        for data in self.loader:
            yield data

    def __len__(self):
        return len(self.loader)

# 模型代码

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel


class ExpertLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.1):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
        )
        
    def forward(self, x):
        # x: [batch_size, seq_len, input_size]
        output, (hidden, _) = self.lstm(x)
        # 返回最后一个时间步的隐藏状态
        return hidden[-1]  # [batch_size, hidden_size]


class ExpertBiLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.1):
        super().__init__()
        self.bilstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size // 2,  # 因为是双向的，所以每个方向的隐藏层大小减半
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if num_layers > 1 else 0,
        )
        
    def forward(self, x):
        # x: [batch_size, seq_len, input_size]
        output, (hidden, _) = self.bilstm(x)
        # 拼接最后一层的正向和反向隐藏状态
        # hidden shape: [num_layers * num_directions, batch_size, hidden_size//2]
        hidden_forward = hidden[-2]  # 正向的最后一层 [batch_size, hidden_size//2]
        hidden_backward = hidden[-1]  # 反向的最后一层 [batch_size, hidden_size//2]
        hidden_concat = torch.cat([hidden_forward, hidden_backward], dim=1)  # [batch_size, hidden_size]
        return hidden_concat


class ExpertRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.1):
        super().__init__()
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
        )
        
    def forward(self, x):
        # x: [batch_size, seq_len, input_size]
        _, hidden = self.rnn(x)
        # 返回最后一个时间步的隐藏状态
        return hidden[-1]  # [batch_size, hidden_size]


class ExpertGRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.1):
        super().__init__()
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
        )
        
    def forward(self, x):
        # x: [batch_size, seq_len, input_size]
        _, hidden = self.gru(x)
        # 返回最后一个时间步的隐藏状态
        return hidden[-1]  # [batch_size, hidden_size]


class ExpertLinear(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.linear = nn.Sequential(
            nn.Linear(input_size, hidden_size * 2),
            nn.LayerNorm(hidden_size * 2),
            nn.GELU(),
            nn.Linear(hidden_size * 2, hidden_size)
        )
        
    def forward(self, x):
        # x: [batch_size, seq_len, input_size]
        # 我们需要把序列信息压缩为一个向量，可以使用平均池化
        pooled = torch.mean(x, dim=1)  # [batch_size, input_size]
        return self.linear(pooled)  # [batch_size, hidden_size]


class BertClassificationHead(nn.Module):
    def __init__(self, hidden_size=1024, num_classes=3, dropout_prob=0.3):
        super().__init__()
        self.dense = nn.Linear(hidden_size, hidden_size)
        self.dropout = nn.Dropout(dropout_prob)
        self.out_proj = nn.Linear(hidden_size, num_classes)
    
    def forward(self, features):
        # 提取 [CLS] 标记的表示
        x = features[:, 0, :]  # 使用第一个标记([CLS])
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


class MoERouter(nn.Module):
    """专家路由器，学习为每个样本分配专家权重"""
    def __init__(self, input_size, num_experts):
        super().__init__()
        self.router = nn.Linear(input_size, num_experts)
        
    def forward(self, x):
        # x: [batch_size, input_size]
        # 计算每个专家的权重 (使用softmax确保权重和为1)
        router_logits = self.router(x)
        router_probs = F.softmax(router_logits, dim=-1)
        return router_probs  # [batch_size, num_experts]


class DeBERTaMoEClassifier(nn.Module):
    def __init__(
        self, 
        pretrained_model_name, 
        num_classes=3, 
        freeze_pooler=0,
        expert_hidden_size=256,
        dropout=0.3,
        num_rnn_layers=1
    ):
        super().__init__()
        
        # 使用 AutoModel 加载 DeBERTa 模型
        self.bert = AutoModel.from_pretrained(pretrained_model_name)
        
        # 获取 bert 隐藏层大小
        self.bert_hidden_size = self.bert.config.hidden_size
        
        # 保留原有的分类头
        self.original_classifier = BertClassificationHead(
            hidden_size=self.bert_hidden_size,
            num_classes=num_classes,
            dropout_prob=dropout
        )
        
        # 创建多个专家模型
        self.experts = nn.ModuleDict({
            'lstm': ExpertLSTM(
                input_size=self.bert_hidden_size, 
                hidden_size=expert_hidden_size,
                num_layers=num_rnn_layers,
                dropout=dropout
            ),
            'bilstm': ExpertBiLSTM(
                input_size=self.bert_hidden_size, 
                hidden_size=expert_hidden_size,
                num_layers=num_rnn_layers,
                dropout=dropout
            ),
            'rnn': ExpertRNN(
                input_size=self.bert_hidden_size, 
                hidden_size=expert_hidden_size,
                num_layers=num_rnn_layers,
                dropout=dropout
            ),
            'gru': ExpertGRU(
                input_size=self.bert_hidden_size, 
                hidden_size=expert_hidden_size,
                num_layers=num_rnn_layers,
                dropout=dropout
            ),
            'linear': ExpertLinear(
                input_size=self.bert_hidden_size, 
                hidden_size=expert_hidden_size
            ),
        })
        
        # 创建路由器 (使用[CLS]标记表示作为路由的输入)
        self.router = MoERouter(self.bert_hidden_size, len(self.experts))
        
        # 各专家模型的输出映射层，将各自的hidden_size映射到统一的输出空间
        self.expert_outputs = nn.ModuleDict({
            expert_name: nn.Linear(expert_hidden_size, num_classes)
            for expert_name in self.experts.keys()
        })
        
        # 最终的融合层，将所有结果拼接后映射到输出类别
        # (1个原始分类头 + 5个专家) * 每个输出num_classes = 6 * num_classes
        combined_dim = num_classes * (1 + len(self.experts))
        self.final_classifier = nn.Sequential(
            nn.Linear(combined_dim, combined_dim // 2),
            nn.LayerNorm(combined_dim // 2),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(combined_dim // 2, num_classes)
        )
        
    def forward(self, input_ids, attention_mask):
        # DeBERTa 编码
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        # 获取序列隐藏状态
        hidden_states = outputs.last_hidden_state  # [batch_size, seq_len, hidden_size]
        
        # 获取原始分类头结果
        original_logits = self.original_classifier(hidden_states)  # [batch_size, num_classes]
        
        # 获取路由权重
        cls_embedding = hidden_states[:, 0]  # [batch_size, hidden_size]
        routing_weights = self.router(cls_embedding)  # [batch_size, num_experts]
        
        # 获取各专家结果
        expert_outputs = {}
        for expert_name, expert in self.experts.items():
            # 获取专家输出
            expert_hidden = expert(hidden_states)  # [batch_size, expert_hidden_size]
            # 映射到类别空间
            expert_logits = self.expert_outputs[expert_name](expert_hidden)  # [batch_size, num_classes]
            # 存储结果
            expert_outputs[expert_name] = expert_logits
        
        # 根据路由权重加权专家结果
        # 首先，将所有专家的结果拼接到一起
        expert_logits_list = [original_logits]  # 包含原始分类头
        expert_names = list(self.experts.keys())
        
        for expert_name in expert_names:
            expert_logits_list.append(expert_outputs[expert_name])
        
        # 拼接所有结果 [batch_size, (1+num_experts)*num_classes]
        combined_logits = torch.cat(expert_logits_list, dim=1)
        
        # 通过最终分类器输出最终结果
        final_logits = self.final_classifier(combined_logits)
        
        # return {
        #     'logits': final_logits,  # 最终预测
        #     'original_logits': original_logits,  # 原始分类头预测
        #     'expert_logits': expert_outputs,  # 各专家预测
        #     'routing_weights': routing_weights,  # 路由权重
        #     'combined_logits': combined_logits  # 拼接的中间结果
        # }
        
        return final_logits


# 测试代码

In [5]:
import os
import wandb
import random
import argparse
from tqdm import tqdm

import torch
import torch.nn as nn
import numpy as np
from transformers import AdamW
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

# 如果在Jupyter Notebook中运行，可以使用这个自定义参数函数替代argparser
def get_default_configs():
    """在Jupyter环境中使用的默认配置，避免argparse解析错误"""
    class Args:
        def __init__(self):
            # self.model_name = '/mnt/cfs/huangzhiwei/pykt-moekt/SBM/bge-large-en-v1.5'
            # self.model_name = "/mnt/cfs/huangzhiwei/BAE2025/models/ModernBERT-large"
            # self.model_name = '/mnt/cfs/huangzhiwei/pykt-moekt/SBM/xlm-roberta-large'
            # self.model_name = '/mnt/cfs/huangzhiwei/BAE2025/models/bge-base-en-v1.5'
            # self.model_name = '/mnt/cfs/huangzhiwei/BAE2025/models/bert-base-uncased'
            self.model_name = '/mnt/cfs/huangzhiwei/BAE2025/models/deberta-v3-base'
            # self.model_name = '/mnt/cfs/huangzhiwei/BAE2025/models/roberta-base'
            self.num_classes = 3
            self.dropout = 0.25
            self.freeze_pooler = 8
            self.batch_size = 16
            self.max_length = 512
            self.lr = 1e-5
            self.epochs = 50
            self.device = device
            self.name = None
            self.seed = 42
            self.data_path = '../data_new/track4_train.json'
            self.val_data_path = '../data_new/track4_val.json'
            self.checkpoint_dir = "/mnt/cfs/huangzhiwei/BAE2025/projects/predict"
            self.patience = 6
            self.expert_hidden_size = 512
            self.num_rnn_layers = 1
            self.warmup_ratio = 0.1
            self.test_data_path = "/mnt/cfs/huangzhiwei/BAE2025/data_new/mrbench_v3_testset.json"
            self.output_path = "/mnt/cfs/huangzhiwei/BAE2025/projects/predict/predictions_final.json"
            self.exp_name = 'BAE2025_track4_bert'
    return Args()


In [6]:
import os
import json
import torch
from tqdm import tqdm
from transformers import AutoTokenizer

def predict(configs, test_data_path, output_path):
    """
    使用训练好的模型对测试集进行预测，并将预测结果保存为JSON格式
    
    Args:
        configs: 配置参数
        model_class: 模型类
        test_data_path: 测试数据路径
        output_path: 输出结果保存路径
    """
    print(f"开始对 {test_data_path} 进行预测...")
    
    # 设置设备
    if configs.device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    else:
        device = configs.device
    
    # 加载测试数据集
    test_dataset = TestBAE2025Dataset(
        data_path=test_data_path,
        label_type=configs.label_type if hasattr(configs, 'label_type') else "Actionability"
    )
    
    # 创建测试数据加载器
    test_dataloader = TestBAE2025DataLoader(
        dataset=test_dataset,
        batch_size=configs.batch_size,
        max_length=configs.max_length,
        shuffle=False,
        drop_last=False,
        device=device,
        tokenizer_name=configs.model_name
    )
    
    # 创建模型
    model = DeBERTaMoEClassifier(
        pretrained_model_name=configs.model_name,
        num_classes=configs.num_classes,
        freeze_pooler=configs.freeze_pooler,
        num_rnn_layers=configs.num_rnn_layers,
        expert_hidden_size=configs.expert_hidden_size,
        dropout=configs.dropout
    ).to(device)
    
    # 获取检查点目录
    # checkpoint_dir = os.path.join(configs.checkpoint_dir, configs.exp_name)
    checkpoint_dir = configs.checkpoint_dir
    
    # 加载最佳模型
    model_path = os.path.join(checkpoint_dir, 'best_model_f1.pt')
    print(f"加载模型: {model_path}")
    
    model.load_state_dict(torch.load('/mnt/cfs/huangzhiwei/BAE2025/projects/predict/deberta_lr0.1/best_model_f1_21.pt', map_location=device))
    model.eval()
    
    # 定义标签映射（从数字到文本标签）
    label_mapping = {0: "Yes", 1: "To some extent", 2: "No"}
    
    # 保存所有预测结果
    predictions = {}
    
    # 开始预测
    with torch.no_grad():
        for input_ids, attention_mask, sample_info_batch in tqdm(test_dataloader, desc="预测中"):
            # 前向传播
            logits = model(input_ids, attention_mask)
            preds = logits.argmax(dim=1).cpu().numpy()
            
            # 处理预测结果
            for i, pred in enumerate(preds):
                sample_info = sample_info_batch[i]
                conv_id = sample_info['conversation_id']
                tutor_key = sample_info['tutor_key']
                
                # 获取文本标签
                predicted_label = label_mapping[int(pred)]
                
                # 初始化字典（如果不存在）
                if conv_id not in predictions:
                    predictions[conv_id] = {}
                    
                if 'tutor_responses' not in predictions[conv_id]:
                    predictions[conv_id]['tutor_responses'] = {}
                
                if tutor_key not in predictions[conv_id]['tutor_responses']:
                    predictions[conv_id]['tutor_responses'][tutor_key] = {}
                
                # 保存预测结果
                predictions[conv_id]['tutor_responses'][tutor_key]['annotation'] = {
                    'Actionability': predicted_label
                }
    
    # 读取原始测试数据，以保持完整结构
    with open(test_data_path, 'r', encoding='utf-8') as f:
        original_data = json.load(f)
    
    # 组合预测结果与原始数据
    final_results = []
    for item in original_data:
        conv_id = item['conversation_id']
        if conv_id in predictions:
            # 深拷贝原始数据
            new_item = item.copy()
            
            # 为每个tutor_response添加annotation字段
            for tutor_key in new_item['tutor_responses'].keys():
                if (tutor_key in predictions[conv_id]['tutor_responses'] and 
                    'annotation' in predictions[conv_id]['tutor_responses'][tutor_key]):
                    # 添加预测的annotation
                    new_item['tutor_responses'][tutor_key]['annotation'] = predictions[conv_id]['tutor_responses'][tutor_key]['annotation']
            
            final_results.append(new_item)
        else:
            # 如果没有预测结果，保留原始数据
            final_results.append(item)
    
    # 保存最终结果
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(final_results, f, ensure_ascii=False, indent=2)
    
    print(f"预测完成，结果已保存到: {output_path}")
    
    # 打印预测标签分布
    label_counts = {"Yes": 0, "To some extent": 0, "No": 0}
    total_predictions = 0
    
    for item in final_results:
        for tutor_data in item['tutor_responses'].values():
            if 'annotation' in tutor_data and 'Actionability' in tutor_data['annotation']:
                label = tutor_data['annotation']['Actionability']
                label_counts[label] += 1
                total_predictions += 1
    
    print("\n预测标签分布:")
    for label, count in label_counts.items():
        percentage = (count / total_predictions) * 100 if total_predictions > 0 else 0
        print(f"  {label}: {count} ({percentage:.2f}%)")

        

if __name__ == '__main__':
    # 判断是否在Jupyter环境中运行
    try:
        # 检查是否在Jupyter中运行
        get_ipython = globals().get('get_ipython', None)
        if get_ipython and 'IPKernelApp' in get_ipython().config:
            # 在Jupyter环境中运行，使用默认配置
            print("Running in Jupyter environment, using default configs")
            configs = get_default_configs()
        else:
            # 在命令行环境中运行，使用argparse
            configs = argparser()
    except:
        # 任何异常都使用argparse处理
        configs = argparser()
    
    # 设置实验名称
    if configs.name is None:
        configs.exp_name = \
            f'{os.path.basename(configs.model_name)}' + \
            f'{"_fp" if configs.freeze_pooler else ""}' + \
            f'_b{configs.batch_size}_e{configs.epochs}' + \
            f'_len{configs.max_length}_lr{configs.lr}'
    else:
        configs.exp_name = configs.name
    
    # 设置设备
    if configs.device is None:
        configs.device = torch.device(
            'cuda' if torch.cuda.is_available() else 'cpu'
        )
    
    print(f"使用设备: {configs.device}")
    
    # 判断是否进行训练或预测
    # if hasattr(configs, 'predict') and configs.predict:
    # 导入模型类
    # from model import DeBERTaMoEClassifier

    # 进行预测
    predict(
        configs=configs,
        test_data_path=configs.test_data_path,
        output_path=configs.output_path
    )
    # else:
    #     # 进行训练
    #     train(configs)

Running in Jupyter environment, using default configs
使用设备: cuda
开始对 /mnt/cfs/huangzhiwei/BAE2025/data_new/mrbench_v3_testset.json 进行预测...




当前使用的 tokenizer 类型： <class 'transformers.models.deberta_v2.tokenization_deberta_v2_fast.DebertaV2TokenizerFast'>
加载模型: /mnt/cfs/huangzhiwei/BAE2025/projects/predict/best_model_f1.pt


  model.load_state_dict(torch.load('/mnt/cfs/huangzhiwei/BAE2025/projects/predict/deberta_lr0.1/best_model_f1_21.pt', map_location=device))
预测中: 100%|██████████| 97/97 [00:14<00:00,  6.53it/s]

预测完成，结果已保存到: /mnt/cfs/huangzhiwei/BAE2025/projects/predict/predictions_final.json

预测标签分布:
  Yes: 826 (53.39%)
  To some extent: 232 (15.00%)
  No: 489 (31.61%)



