In [5]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import BertTokenizer, BertModel
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import numpy as np
from tqdm import tqdm

# 1. 设置本地模型路径
BERT_PATH = r"D:\Downloads\bert-base-chinese\bert-base-chinese"

# 2. 数据加载（保持不变）
class TextClassificationDataset(Dataset):
    def __init__(self, root_dir):
        self.classes = []
        self.texts = []
        self.labels = []
        
        for class_idx, class_dir in enumerate(sorted(os.listdir(root_dir))):
            class_path = os.path.join(root_dir, class_dir)
            if os.path.isdir(class_path):
                self.classes.append(class_dir)
                for file_name in os.listdir(class_path):
                    if file_name.endswith('.txt'):
                        file_path = os.path.join(class_path, file_name)
                        with open(file_path, 'r', encoding='utf-8') as f:
                            text = f.read().strip()
                            self.texts.append(text)
                            self.labels.append(class_idx)
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]
    
    def get_num_classes(self):
        return len(self.classes)

# 3. 模型定义（改用本地BERT路径）
class MultiHeadAttentionClassifier(nn.Module):
    def __init__(self, bert_path=BERT_PATH, num_classes=6, num_heads=8, dropout=0.1):
        super().__init__()
        self.bert = BertModel.from_pretrained(bert_path)
        self.bert_dim = self.bert.config.hidden_size
        self.attention = nn.MultiheadAttention(self.bert_dim, num_heads, dropout)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(self.bert_dim)
        self.classifier = nn.Linear(self.bert_dim, num_classes)
    
    def forward(self, input_ids, attention_mask):
        embeddings = self.bert(input_ids, attention_mask).last_hidden_state
        embeddings = embeddings.permute(1, 0, 2)  # [seq_len, batch, hidden]
        attn_output, _ = self.attention(embeddings, embeddings, embeddings)
        attn_output = self.layer_norm(embeddings + self.dropout(attn_output))
        cls_token = attn_output[0, :, :]  # [batch, hidden]
        return self.classifier(cls_token)

# 4. 数据预处理和批处理函数
def collate_fn(batch, tokenizer, max_length=512):
    texts, labels = zip(*batch)
    # 使用tokenizer对文本进行编码
    encoding = tokenizer(
        list(texts), 
        padding=True, 
        truncation=True, 
        max_length=max_length, 
        return_tensors='pt'
    )
    return {
        'input_ids': encoding['input_ids'],
        'attention_mask': encoding['attention_mask'],
        'labels': torch.tensor(labels, dtype=torch.long)
    }

# 5. 训练函数
def train_model(model, train_loader, val_loader, num_epochs=10, learning_rate=2e-5):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    
    # 定义优化器和损失函数
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    
    best_f1 = 0.0
    best_model = None
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}')
        
        for batch in progress_bar:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            progress_bar.set_postfix({'loss': loss.item()})
        
        avg_train_loss = total_loss / len(train_loader)
        
        # 验证阶段
        val_metrics = evaluate_model(model, val_loader, device)
        val_f1 = val_metrics['f1']
        
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}")
        print(f"Validation Precision: {val_metrics['precision']:.4f}")
        print(f"Validation Recall: {val_metrics['recall']:.4f}")
        print(f"Validation F1: {val_f1:.4f}")
        print(f"Validation Accuracy: {val_metrics['accuracy']:.4f}")
        
        # 保存最佳模型
        if val_f1 > best_f1:
            best_f1 = val_f1
            best_model = model.state_dict()
    
    # 加载最佳模型
    model.load_state_dict(best_model)
    return model

# 6. 评估函数
def evaluate_model(model, data_loader, device=None):
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.eval()
    model = model.to(device)
    
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(data_loader, desc='Evaluating'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(input_ids, attention_mask)
            preds = torch.argmax(outputs, dim=1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # 计算各项指标
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    accuracy = accuracy_score(all_labels, all_preds)
    
    return {
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'accuracy': accuracy
    }

# 7. 主程序
if __name__ == '__main__':
    # 初始化本地分词器
    tokenizer = BertTokenizer.from_pretrained(BERT_PATH)
    
    # 加载数据
    dataset = TextClassificationDataset(r"C:\Users\86178\Desktop\词汇适配度优化\语料")
    num_classes = dataset.get_num_classes()
    
    # 划分训练集和验证集
    train_size = int(0.7 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    
    # 创建数据加载器
    train_loader = DataLoader(
        train_dataset,
        batch_size=16,
        shuffle=True,
        collate_fn=lambda x: collate_fn(x, tokenizer)
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=16,
        shuffle=False,
        collate_fn=lambda x: collate_fn(x, tokenizer)
    )
    
    # 创建模型
    model = MultiHeadAttentionClassifier(num_classes=num_classes)
    
    # 训练模型
    trained_model = train_model(model, train_loader, val_loader, num_epochs=10)
    
    # 最终测试集评估
    test_metrics = evaluate_model(trained_model, val_loader)
    print('\nFinal Test Results:')
    print(f'Precision: {test_metrics["precision"]:.4f}')
    print(f'Recall: {test_metrics["recall"]:.4f}')
    print(f'F1 Score: {test_metrics["f1"]:.4f}')
    print(f'Accuracy: {test_metrics["accuracy"]:.4f}')

Epoch 1/10:   0%|                                                                               | 0/59 [00:17<?, ?it/s]


KeyboardInterrupt: 