# 文本分类

## 导入包

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
import json
import numpy as np
from sklearn.metrics import f1_score
import logging
from tqdm import tqdm
import gc

import os
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'

# 设置日志

In [3]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 设置随机种子以确保可重复性

In [4]:
torch.manual_seed(42)
np.random.seed(42)


# 检查设备

In [5]:
device = torch.device("cpu")
logger.info(f"使用设备: {device}")


INFO:__main__:使用设备: cpu


# 数据集类定义

In [3]:
class TextClassificationDataset(Dataset):
    """
    自定义数据集类，用于加载和预处理文本分类任务的数据
    继承自PyTorch的Dataset类
    """
    def __init__(self, file_path, tokenizer, max_length=512):
        """
        初始化数据集
        
        参数:
            file_path: 数据文件路径，文件应为jsonl格式
            tokenizer: BERT分词器，用于将文本转换为模型输入
            max_length: 文本最大长度，默认为512个token
        """
        self.texts = []  # 存储文本数据
        self.labels = []  # 存储标签数据
        self.tokenizer = tokenizer  # BERT分词器
        self.max_length = max_length  # 最大序列长度
        
        # 读取jsonl格式文件
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                data = json.loads(line.strip())
                self.texts.append(data['text'])  # 提取文本
                if 'label' in data:  # 只有训练数据有标签
                    self.labels.append(data['label'])  # 提取标签
                    
        # 对于测试数据（无标签），用-1填充标签列表
        if not self.labels:  # 对于测试数据
            self.labels = [-1] * len(self.texts)
    
    def __len__(self):
        """返回数据集中样本数量"""
        return len(self.texts)
    
    def __getitem__(self, idx):
        """
        获取指定索引的样本
        
        参数:
            idx: 样本索引
            
        返回:
            包含模型输入所需的所有张量的字典
        """
        text = self.texts[idx]  # 获取文本
        label = self.labels[idx]  # 获取标签
        
        # 使用BERT分词器处理文本
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,  # 截断/填充到指定长度
            padding='max_length',  # 填充到最大长度
            truncation=True,  # 截断超长文本
            return_tensors='pt'  # 返回PyTorch张量
        )
        
        # 返回模型需要的所有输入
        return {
            'input_ids': encoding['input_ids'].squeeze(),  # 移除批次维度
            'attention_mask': encoding['attention_mask'].squeeze(),  # 移除批次维度
            'label': torch.tensor(label, dtype=torch.long)  # 转换为LongTensor类型
        }

## 模型定义

In [4]:
class TextClassifier(nn.Module):
    def __init__(self, bert_model, dropout_rate=0.3):
        super(TextClassifier, self).__init__()
        self.bert = bert_model
        self.dropout = nn.Dropout(dropout_rate)
        self.classifier = nn.Linear(bert_model.config.hidden_size, 2)
        
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits


## 训练函数

In [5]:
def train_model(model, train_loader, optimizer, criterion, device):
    """训练模型的函数，执行一个完整训练周期"""
    model.train()  # 设置模型为训练模式
    total_loss = 0
    predictions = []  # 存储所有预测结果
    true_labels = []  # 存储所有真实标签
    
    # 创建进度条，方便观察训练进度
    progress_bar = tqdm(train_loader, desc="训练中", ncols=100)
    batch_count = 0
    running_loss = 0  # 用于计算移动平均损失
    
    for batch in progress_bar:
        # 将数据移至指定设备(CPU/GPU)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        
        optimizer.zero_grad()  # 清除之前的梯度
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)  # 前向传播
        loss = criterion(outputs, labels)  # 计算损失
        
        loss.backward()  # 反向传播计算梯度
        optimizer.step()  # 更新模型参数
        
        # 更新损失统计和进度条显示
        batch_count += 1
        running_loss = (running_loss * (batch_count - 1) + loss.item()) / batch_count
        progress_bar.set_postfix({
            '当前损失': f'{loss.item():.4f}',
            '平均损失': f'{running_loss:.4f}'
        })
        
        total_loss += loss.item()
        
        # 收集预测结果和真实标签用于计算F1分数
        predictions.extend(outputs.argmax(dim=1).cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
        
        # 清理内存，减少内存占用
        del input_ids, attention_mask, labels, outputs, loss
        if batch_count % 10 == 0:  # 定期执行垃圾回收
            gc.collect()
    
    # 计算整个epoch的平均损失和F1分数
    epoch_loss = total_loss / len(train_loader)
    epoch_f1 = f1_score(true_labels, predictions)
    
    return epoch_loss, epoch_f1

## 评估函数

In [6]:
def evaluate_model(model, eval_loader, criterion, device):
    """评估模型性能的函数，不进行梯度计算和参数更新"""
    model.eval()  # 设置模型为评估模式
    total_loss = 0
    predictions = []  # 存储预测结果
    true_labels = []  # 存储真实标签
    
    progress_bar = tqdm(eval_loader, desc="评估中", ncols=100)
    
    with torch.no_grad():  # 不计算梯度，减少内存使用
        for batch in progress_bar:
            # 将数据移至指定设备
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)  # 前向传播
            loss = criterion(outputs, labels)  # 计算损失
            
            total_loss += loss.item()
            
            # 收集预测结果和真实标签
            predictions.extend(outputs.argmax(dim=1).cpu().numpy())
            true_labels.extend(labels.cpu().numpy())
            
            progress_bar.set_postfix({'当前损失': f'{loss.item():.4f}'})
            
            # 清理内存
            del input_ids, attention_mask, labels, outputs, loss
            gc.collect()
    
    # 计算整体评估指标
    epoch_loss = total_loss / len(eval_loader)
    epoch_f1 = f1_score(true_labels, predictions)  # 计算F1分数
    
    return epoch_loss, epoch_f1

## 预测函数

In [7]:
def predict(model, test_loader, device):
    """预测函数，用于对测试数据进行推理并返回预测结果"""
    model.eval()  # 设置模型为评估模式
    predictions = []  # 存储所有预测结果
    
    progress_bar = tqdm(test_loader, desc="预测中", ncols=100)
    
    with torch.no_grad():  # 不计算梯度，提高推理速度和减少内存使用
        for batch in progress_bar:
            # 将输入数据移至指定设备
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)  # 模型推理
            predictions.extend(outputs.argmax(dim=1).cpu().numpy())  # 获取预测类别
            
            # 清理内存，避免内存泄漏
            del input_ids, attention_mask, outputs
            gc.collect()
    
    return predictions  # 返回所有预测结果列表

## 模型训练与评估

In [10]:
# 超参数
BATCH_SIZE = 16  # CPU上可以使用更大的批次大小
EPOCHS = 3
LEARNING_RATE = 2e-5
MAX_LENGTH = 512

logger.info("开始加载预训练模型...")
# 初始化分词器和模型
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')


INFO:__main__:开始加载预训练模型...


# 准备数据集

In [17]:
logger.info("准备数据集...")
# 创建数据集
train_dataset = TextClassificationDataset('train.jsonl', tokenizer, MAX_LENGTH)
test_dataset = TextClassificationDataset('test.jsonl', tokenizer, MAX_LENGTH)

logger.info(f"训练集大小: {len(train_dataset)}, 测试集大小: {len(test_dataset)}")

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)


INFO:__main__:准备数据集...
INFO:__main__:训练集大小: 28000, 测试集大小: 2800


# 初始化模型

In [12]:
logger.info("初始化模型...")
# 初始化模型、损失函数和优化器
model = TextClassifier(bert_model).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)


INFO:__main__:初始化模型...


# 训练循环

In [13]:
best_f1 = 0
for epoch in range(EPOCHS):
    logger.info(f"\n第 {epoch+1}/{EPOCHS} 轮训练")
    
    train_loss, train_f1 = train_model(model, train_loader, optimizer, criterion, device)
    logger.info(f"训练损失: {train_loss:.4f}, 训练F1分数: {train_f1:.4f}")
    
    if train_f1 > best_f1:
        best_f1 = train_f1
        logger.info(f"保存最佳模型 (F1: {best_f1:.4f})")
        torch.save(model.state_dict(), 'model.pt')
    
    # 进行垃圾回收
    gc.collect()


INFO:__main__:
第 1/3 轮训练
训练中: 100%|███████████████████████| 4/4 [04:19<00:00, 64.86s/it, 当前损失=0.6018, 平均损失=0.6788], ?it/s]
INFO:__main__:训练损失: 0.6788, 训练F1分数: 0.7368
INFO:__main__:保存最佳模型 (F1: 0.7368)
INFO:__main__:
第 2/3 轮训练
训练中: 100%|███████████████████████| 4/4 [05:19<00:00, 79.84s/it, 当前损失=0.6900, 平均损失=0.6095], ?it/s]
INFO:__main__:训练损失: 0.6095, 训练F1分数: 0.8254
INFO:__main__:保存最佳模型 (F1: 0.8254)
INFO:__main__:
第 3/3 轮训练
训练中: 100%|███████████████████████| 4/4 [05:39<00:00, 84.82s/it, 当前损失=0.4180, 平均损失=0.4808], ?it/s]
INFO:__main__:训练损失: 0.4808, 训练F1分数: 0.9062
INFO:__main__:保存最佳模型 (F1: 0.9062)


# 加载最佳模型并进行预测

In [14]:
logger.info("\n开始预测...")
model.load_state_dict(torch.load('model.pt'))
predictions = predict(model, test_loader, device)


INFO:__main__:
开始预测...
预测中: 100%|█████████████████████████████████████████████████████████| 4/4 [00:32<00:00,  8.19s/it]


# 保存预测结果

In [16]:
logger.info("保存预测结果...")
with open('submit.txt', 'w') as f:
    for pred in predictions:
        f.write(f"{pred}\n")

logger.info("预测结果保存到 submit.txt")

INFO:__main__:保存预测结果...
INFO:__main__:预测结果保存到 submit.txt
