In [3]:
import os
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score
from PIL import Image
import pandas as pd

class VehicleDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)
        
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')  # 读取图像并转换为RGB格式
        if self.transform:
            image = self.transform(image)  # 如果提供了transform就进行预处理
        label = self.labels[idx]  # 获取对应的标签
        return image, label  # 返回图像张量和标签

def train_one_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    return running_loss / len(train_loader)

def evaluate(model, val_loader, device, verbose=False):
    model.eval()
    all_preds = []
    all_labels = []
    batch_accuracies = []
    batch_f1_scores = []

    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(val_loader):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            
            batch_preds = preds.cpu().numpy()
            batch_labels = labels.cpu().numpy()
            
            # 计算每批次的accuracy和macro-F1
            if len(np.unique(batch_labels)) > 1:  # 确保批次中至少有两个不同的类别
                batch_accuracy = accuracy_score(batch_labels, batch_preds)
                batch_f1 = f1_score(batch_labels, batch_preds, average='macro', zero_division=1)
                batch_accuracies.append(batch_accuracy)
                batch_f1_scores.append(batch_f1)
            
            all_preds.extend(batch_preds)
            all_labels.extend(batch_labels)
    
    # 计算整体metrics
    accuracy = accuracy_score(all_labels, all_preds)
    macro_f1 = f1_score(all_labels, all_preds, average='macro')
    
    # 计算每批次的方差（如果有足够的批次）
    accuracy_variance = np.var(batch_accuracies) if len(batch_accuracies) > 1 else 0.0
    f1_variance = np.var(batch_f1_scores) if len(batch_f1_scores) > 1 else 0.0
    
    # 在 evaluate 函数中的打印信息修改
    if verbose and len(batch_accuracies) > 0:
        print(f"\n批次级别统计:")
        print(f"批次数量: {len(batch_accuracies)}")
        print(f"批次准确率: {[f'{acc:.4f}' for acc in batch_accuracies]}")
        print(f"批次宏F1分数: {[f'{f1:.4f}' for f1 in batch_f1_scores]}")
        print(f"批次准确率均值: {np.mean(batch_accuracies):.4f}, 方差: {accuracy_variance:.4f}")
        print(f"批次宏F1均值: {np.mean(batch_f1_scores):.4f}, 方差: {f1_variance:.4f}")

    return accuracy, macro_f1, accuracy_variance, f1_variance

In [None]:
def main():
    # 设置随机种子
    torch.manual_seed(42)
    np.random.seed(42)
    
    # 设备配置
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"使用设备: {device}")
    
    # 数据预处理
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                           std=[0.229, 0.224, 0.225])
    ])
    
    # 加载数据
    data_dir = "Data/Data"  # 修正数据路径
    image_paths = []
    labels = []
    
    # 遍历文件夹收集图片路径和标签
    for filename in os.listdir(data_dir):
        if filename.endswith('.png'):
            # 从文件名中提取类别标签（1_xxx.png -> 0, 2_xxx.png -> 1, 3_xxx.png -> 2）
            class_id = int(filename.split('_')[0]) - 1  # 将1,2,3映射到0,1,2
            image_paths.append(os.path.join(data_dir, filename))
            labels.append(class_id)
    
    # 转换为numpy数组
    image_paths = np.array(image_paths)
    labels = np.array(labels)
    
    print(f"总样本数: {len(labels)}")
    print("类别分布:")
    for i in range(3):
        print(f"类别 {i+1}: {sum(labels == i)} 个样本")
    
    # 5折交叉验证
    kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    
    accuracies = []
    f1_scores = []
    all_batch_accuracies = []  # 存储所有fold的batch准确率
    all_batch_f1_scores = []   # 存储所有fold的batch F1分数
    
    for fold, (train_idx, val_idx) in enumerate(kfold.split(image_paths, labels)):
        print(f"\nFold {fold + 1}")
        
        # 准备数据集
        train_dataset = VehicleDataset(
            [image_paths[i] for i in train_idx],
            [labels[i] for i in train_idx],
            transform=transform
        )
        val_dataset = VehicleDataset(
            [image_paths[i] for i in val_idx],
            [labels[i] for i in val_idx],
            transform=transform
        )
        
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32)
        
        # 模型初始化
        model = models.resnet18(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, 3)  # 3个类别
        model = model.to(device)
        
        # 损失函数和优化器
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        
        best_score = 0
        best_epoch = 0
        patience = 3
        no_improve = 0
        
        # 训练循环 (修正缩进)
        # 训练轮次 (15)通常前十轮就收敛了
        for epoch in range(15):
            train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
            accuracy, macro_f1, accuracy_variance, f1_variance = evaluate(model, val_loader, device, verbose=False)
            
            current_score = 0.7 * accuracy + 0.3 * macro_f1
            
            print(f"轮次 {epoch+1}, 损失: {train_loss:.4f}, 准确率: {accuracy:.4f}, 宏F1: {macro_f1:.4f}, 得分: {current_score:.4f}")
            
            if current_score > best_score:
                best_score = current_score
                best_epoch = epoch
                no_improve = 0
                torch.save(model.state_dict(), f'best_model_fold{fold}.pth')
            else:
                no_improve += 1
                if no_improve >= patience:
                    print(f"在轮次 {epoch+1} 触发早停")
                    break
        
        # 重新加载当前fold的最佳模型再评估
        model.load_state_dict(torch.load(f'best_model_fold{fold}.pth'))
        
        # 记录最终结果（使用verbose=True来显示batch级别的结果）
        final_accuracy, final_macro_f1, accuracy_variance, f1_variance = evaluate(model, val_loader, device, verbose=True)
        accuracies.append(final_accuracy)
        f1_scores.append(final_macro_f1)
        
        print(f"\n第 {fold + 1} 折最终结果:")
        print(f"Accuracy: {final_accuracy:.4f}")
        print(f"Macro-F1: {final_macro_f1:.4f}")
        print(f"Accuracy Variance: {accuracy_variance:.4f}")
        print(f"Macro-F1 Variance: {f1_variance:.4f}")
        fold_score = 0.7 * final_accuracy + 0.3 * final_macro_f1
        print(f"本折得分: {fold_score:.4f}")

    # 计算5折交叉验证的平均结果
    mean_accuracy = np.mean(accuracies)
    std_accuracy = np.std(accuracies)  # 使用标准差代替方差
    mean_f1 = np.mean(f1_scores)
    std_f1 = np.std(f1_scores)  # 使用标准差代替方差
    
    print("\n最终五折交叉验证结果与指标:")
    print(f"Accuracy: {mean_accuracy:.4f} ± {std_accuracy:.4f}")
    print(f"Macro-F1: {mean_f1:.4f} ± {std_f1:.4f}")
    

if __name__ == "__main__":
    main()

使用设备: cuda
总样本数: 1572
类别分布:
类别 1: 720 个样本
类别 2: 608 个样本
类别 3: 244 个样本

Fold 1




KeyboardInterrupt: 