In [None]:
import os, time, copy, math
from pathlib import Path
from collections import Counter, defaultdict
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import cv2
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
import torchvision
from torchvision import models, datasets
import torch.nn.functional as F

from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support, 
    confusion_matrix, classification_report, f1_score
)
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")
torch.backends.cudnn.benchmark = True

Device: cuda


In [None]:
work_dir = os.path.abspath("..")  
data_dir = os.path.join(work_dir, "dataprocess", "augmented")   # 你的增强数据
save_dir = os.path.join(work_dir, "Mobilenetv3-Large","checkpoints") # 模型保存路径
result_dir = os.path.join(work_dir,"Mobilenetv3-Large","results")  # 结果保存路径
os.makedirs(save_dir, exist_ok=True)

print("当前工作目录:", work_dir)
print("数据增强目录:", data_dir)
print("模型保存目录:", save_dir)
print("模型结果目录:", result_dir)

num_classes = len([d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir,d))])
print("类别数:", num_classes)

当前工作目录: e:\1.code\Jupyter-notebook\MUST-DataScience\1-groupwork
数据增强目录: e:\1.code\Jupyter-notebook\MUST-DataScience\1-groupwork\dataprocess\augmented
模型保存目录: e:\1.code\Jupyter-notebook\MUST-DataScience\1-groupwork\cnn_models\checkpoints
模型结果目录: e:\1.code\Jupyter-notebook\MUST-DataScience\1-groupwork\cnn_models\results
类别数: 8


In [None]:
CFG = {
    "model_name": "mobilenet_v3_large",
    "img_size": 224,
    "batch_size": 32,
    "stage1_epochs": 10,
    "stage2_epochs": 15,
    "lr_stage1": 1e-3,
    "lr_stage2": 1e-5,
    "weight_decay": 1e-4,
    "num_workers": 4,
    "save_path": save_dir,
    "accumulation_steps": 1,
    "early_stopping_patience": 8,
    "use_mixup": True,
    "mixup_alpha": 0.2,
    "n_splits": 5,  # K折数量：推荐5-10折
}

In [None]:
# data trasnsforms
from torchvision import datasets, transforms
data_transforms = {
    "train": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomRotation(15),
        transforms.RandomHorizontalFlip(p=0.5),#水平翻转
        transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),#增强颜色变化
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),# 平移
        transforms.RandomErasing(p=0.5, scale=(0.02, 0.1), ratio=(0.3, 3.3), value=0),#随即遮挡图片区域，用黑色填充
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ]),
    "val": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])
}



In [6]:
def mixup_data(x, y, alpha=0.2, device=None):
    if alpha <= 0:
        return x, y, None, 1.0
    lam = np.random.beta(alpha, alpha)
    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam
def mixup_criterion(criterion, pred, y_a, y_b, lam):
    """Mixup损失函数"""
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device, 
                use_mixup=True, mixup_alpha=0.2):
    """训练一个epoch"""
    model.train()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0
    
    pbar = tqdm(train_loader, desc="Training", leave=False)
    for images, labels in pbar:
        images = images.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        
        if use_mixup:
            images, labels_a, labels_b, lam = mixup_data(
                images, labels, alpha=mixup_alpha
            )
        
        outputs = model(images)
        
        if use_mixup:
            loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)
            _, preds = torch.max(outputs, 1)
            if lam >= 0.5:
                running_corrects += torch.sum(preds == labels_a.data)
            else:
                running_corrects += torch.sum(preds == labels_b.data)
        else:
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data)
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * images.size(0)
        total_samples += labels.size(0)
        
        pbar.set_postfix({'loss': running_loss / total_samples})
    
    epoch_loss = running_loss / total_samples
    epoch_acc = running_corrects.double() / total_samples
    
    return epoch_loss, epoch_acc.item()

In [8]:
def validate_epoch(model, val_loader, criterion, device):
    """验证一个epoch"""
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0
    
    with torch.no_grad():
        pbar = tqdm(val_loader, desc="Validating", leave=False)
        for images, labels in pbar:
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data)
            total_samples += labels.size(0)
            
            pbar.set_postfix({'loss': running_loss / total_samples})
    
    epoch_loss = running_loss / total_samples
    epoch_acc = running_corrects.double() / total_samples
    
    return epoch_loss, epoch_acc.item()

In [17]:
class EarlyStopping:
    def __init__(self, patience=10, delta=0.0005, save_path="best_model.pth", best_model_path=None):
        """
        patience：多少 epoch 内没有提升就停止
        delta：最小提升幅度
        save_path：最优模型保存路径
        best_model_path：最优模型保存路径（兼容参数）
        """
        self.patience = patience
        self.counter = 0
        self.best_loss = np.inf
        self.delta = delta
        self.save_path = best_model_path if best_model_path else save_path
        self.best_model_path = self.save_path
        self.early_stop = False
        self.best_epoch=0

    def __call__(self, val_loss, model,epoch):
        # 第一次直接保存
        if val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
            self.save_checkpoint(model, epoch)
        elif val_loss > self.best_loss - self.delta:
            self.counter += 1
        else:
            self.best_loss = val_loss
            self.counter =0
            self.save_checkpoint(model, epoch)

        return self.counter >= self.patience
    
    def save_checkpoint(self, model, epoch):
        torch.save(model.state_dict(), self.best_model_path)
        self.best_epoch = epoch

In [18]:
def create_model(num_classes,device):
    from torchvision.models import mobilenet_v3_large
    from torchvision.models import MobileNet_V3_Large_Weights

    # 加载预训练模型
    weight = MobileNet_V3_Large_Weights.DEFAULT
    model = mobilenet_v3_large(weights=weight)

    # 冻结backbone权重
    for param in model.features.parameters():
        param.requires_grad = False #冻结梯度 

    # 修改分类头 别的不做修改
    in_features = model.classifier[3].in_features
    model.classifier[3] = nn.Linear(in_features, num_classes)
    print(model)

    in_features = model.classifier[0].in_features
    model.classifier = nn.Sequential(
        nn.Linear(in_features, 512),
        nn.BatchNorm1d(512),
        nn.ReLU(inplace=True),
        nn.Dropout(0.5),
        nn.Linear(512, 256),
        nn.BatchNorm1d(256),
        nn.ReLU(inplace=True),
        nn.Dropout(0.3),
        nn.Linear(256, num_classes)
    )
    
    return model.to(device)

In [19]:
def train_two_stage(model, train_loader, val_loader, device, CFG, fold_num=0):
    """两步走训练策略"""
    
    criterion = nn.CrossEntropyLoss()
    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }
    
    total_epochs = CFG["stage1_epochs"] + CFG["stage2_epochs"]
    
    # ========== 阶段1 ==========
    print(f"\n[Fold {fold_num}] 阶段1: 训练分类头")
    
    optimizer1 = optim.AdamW(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=CFG["lr_stage1"],
        weight_decay=CFG["weight_decay"]
    )
    
    scheduler1 = optim.lr_scheduler.CosineAnnealingLR(
        optimizer1, T_max=CFG["stage1_epochs"]
    )
    
    early_stopping1 = EarlyStopping(
        patience=CFG["early_stopping_patience"],
        best_model_path=os.path.join(CFG["save_path"], f"fold{fold_num}_stage1.pth")
    )
    
    for epoch in range(CFG["stage1_epochs"]):
        train_loss, train_acc = train_epoch(
            model, train_loader, criterion, optimizer1, device,
            use_mixup=CFG["use_mixup"], mixup_alpha=CFG["mixup_alpha"]
        )
        val_loss, val_acc = validate_epoch(model, val_loader, criterion, device)
        
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        scheduler1.step()
        current_lr = optimizer1.param_groups[0]['lr']
        
        print(f"  Epoch [{epoch+1}/{CFG['stage1_epochs']}] "
              f"TL: {train_loss:.4f} TA: {train_acc:.4f} "
              f"VL: {val_loss:.4f} VA: {val_acc:.4f} "
              f"LR: {current_lr:.6f}")
        
        if early_stopping1(val_loss, model, epoch):
            print(f"   早停 (epoch {epoch+1})")
            break
    
    # ========== 阶段2 ==========
    print(f"[Fold {fold_num}] 阶段2: 微调所有层")
    
    for param in model.parameters():
        param.requires_grad = True
    
    optimizer2 = optim.AdamW(
        model.parameters(),
        lr=CFG["lr_stage2"],
        weight_decay=CFG["weight_decay"]
    )
    
    scheduler2 = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer2, mode='min', factor=0.5, patience=3, verbose=False, min_lr=1e-6
    )
    
    early_stopping2 = EarlyStopping(
        patience=CFG["early_stopping_patience"],
        best_model_path=os.path.join(CFG["save_path"], f"fold{fold_num}_best.pth")
    )
    
    for epoch in range(CFG["stage2_epochs"]):
        train_loss, train_acc = train_epoch(
            model, train_loader, criterion, optimizer2, device,
            use_mixup=CFG["use_mixup"], mixup_alpha=CFG["mixup_alpha"]
        )
        val_loss, val_acc = validate_epoch(model, val_loader, criterion, device)
        
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        scheduler2.step(val_loss)
        current_lr = optimizer2.param_groups[0]['lr']
        
        print(f"  Epoch [{CFG['stage1_epochs']+epoch+1}/{total_epochs}] "
              f"TL: {train_loss:.4f} TA: {train_acc:.4f} "
              f"VL: {val_loss:.4f} VA: {val_acc:.4f} "
              f"LR: {current_lr:.6f}")
        
        if early_stopping2(val_loss, model, epoch):
            print(f"   早停 (epoch {CFG['stage1_epochs'] + epoch+1})")
            break
    
    return history

In [20]:
def run_stratified_kfold_validation(data_dir, num_classes, device, CFG):
    """
    运行分层K折交叉验证
    关键特性：
    1. 保证每个fold的类别分布一致
    2. 完整的两阶段训练
    3. 详细的性能评估
    4. 模型融合预测
    """
    
    # 加载完整数据集
    full_dataset = datasets.ImageFolder(
        root=data_dir,
        transform=data_transforms["train"]
    )
    
    class_names = full_dataset.classes
    print(f" 加载数据集: {len(full_dataset)} 样本, {num_classes} 类别")
    
    # 获取所有标签
    all_labels = [full_dataset[i][1] for i in range(len(full_dataset))]
    all_indices = np.arange(len(full_dataset))
    
    # 初始化分层K折
    skf = StratifiedKFold(
        n_splits=CFG["n_splits"],
        shuffle=True,
        random_state=42
    )
    
    fold_results = []
    all_fold_preds = defaultdict(list)
    all_fold_labels = defaultdict(list)
    all_fold_probs = defaultdict(list)
    
    # ========= K折循环 =========
    for fold, (train_idx, val_idx) in enumerate(skf.split(all_indices, all_labels)):
        print(f"\n{'='*70}")
        print(f"Fold {fold+1}/{CFG['n_splits']}")
        print(f"{'='*70}")
        
        # 创建子集
        train_subset = Subset(full_dataset, train_idx)
        val_subset = Subset(full_dataset, val_idx)
        
        # 为验证集设置不同的transform
        val_subset_with_transform = copy.deepcopy(val_subset)
        val_subset_with_transform.dataset.transform = data_transforms["val"]
        
        # 创建数据加载器
        train_loader = DataLoader(
            train_subset, batch_size=CFG["batch_size"],
            shuffle=True, num_workers=CFG["num_workers"], pin_memory=True
        )
        
        val_loader = DataLoader(
            val_subset_with_transform, batch_size=CFG["batch_size"],
            shuffle=False, num_workers=CFG["num_workers"], pin_memory=True
        )
        
        # 显示类别分布
        fold_train_labels = [all_labels[i] for i in train_idx]
        fold_val_labels = [all_labels[i] for i in val_idx]
        
        print(f"训练: {len(train_idx)}, 验证: {len(val_idx)}")
        print(f"训练集类别: {Counter(fold_train_labels)}")
        print(f"验证集类别: {Counter(fold_val_labels)}")
        
        # 创建新模型
        model = create_model(num_classes, device)
        
        # 训练
        history = train_two_stage(model, train_loader, val_loader, device, CFG, fold_num=fold+1)
        
        # 加载最优模型
        best_model_path = os.path.join(CFG["save_path"], f"fold{fold+1}_best.pth")
        model.load_state_dict(torch.load(best_model_path))
        
        # 在验证集上评估
        model.eval()
        fold_preds = []
        fold_labels = []
        fold_probs = []
        fold_acc = 0
        
        with torch.no_grad():
            for images, labels in tqdm(val_loader, desc="Evaluating", leave=False):
                images = images.to(device)
                outputs = model(images)
                probs = F.softmax(outputs, dim=1)
                _, preds = torch.max(outputs, 1)
                
                fold_preds.extend(preds.cpu().numpy())
                fold_labels.extend(labels.numpy())
                fold_probs.extend(probs.cpu().numpy())
                fold_acc += torch.sum(preds == labels.to(device)).item()
        
        fold_acc = fold_acc / len(val_idx)
        fold_f1 = f1_score(fold_labels, fold_preds, average='weighted')
        
        print(f"\n Fold {fold+1} 结果:")
        print(f"   Accuracy: {fold_acc*100:.2f}%")
        print(f"   Weighted F1: {fold_f1:.4f}")
        
        fold_results.append({
            'fold': fold + 1,
            'accuracy': fold_acc,
            'f1_score': fold_f1,
            'history': history,
            'preds': np.array(fold_preds),
            'labels': np.array(fold_labels),
            'probs': np.array(fold_probs),
            'train_idx': train_idx,
            'val_idx': val_idx,
            'model_path': best_model_path
        })
        
        all_fold_preds[fold] = np.array(fold_preds)
        all_fold_labels[fold] = np.array(fold_labels)
        all_fold_probs[fold] = np.array(fold_probs)
    
    # ========= 整合结果 =========
    print(f"\n{'='*70}")
    print(f"K折交叉验证总结 (n_splits={CFG['n_splits']})")
    print(f"{'='*70}\n")
    
    accuracies = [r['accuracy'] for r in fold_results]
    f1_scores = [r['f1_score'] for r in fold_results]
    
    mean_acc = np.mean(accuracies)
    std_acc = np.std(accuracies)
    mean_f1 = np.mean(f1_scores)
    std_f1 = np.std(f1_scores)
    
    print(f"Accuracy: {mean_acc*100:.2f}% ± {std_acc*100:.2f}%")
    print(f"各折: {[f'{acc*100:.2f}%' for acc in accuracies]}")
    print(f"\nWeighted F1: {mean_f1:.4f} ± {std_f1:.4f}")
    print(f"各折: {[f'{f1:.4f}' for f1 in f1_scores]}")
    
    if std_acc < 0.02:
        print(f" 模型稳定性: 优秀 (std < 2%)")
    elif std_acc < 0.05:
        print(f" 模型稳定性: 良好 (std < 5%)")
    else:
        print(f" 模型稳定性: 一般")
    
    return fold_results, class_names, all_fold_preds, all_fold_labels, all_fold_probs

In [21]:
fold_results, class_names, all_fold_preds, all_fold_labels, all_fold_probs = \
    run_stratified_kfold_validation(data_dir, num_classes, device, CFG)

 加载数据集: 2860 样本, 8 类别

Fold 1/5
训练: 2288, 验证: 572
训练集类别: Counter({2: 343, 3: 291, 5: 282, 0: 281, 1: 273, 6: 273, 7: 273, 4: 272})
验证集类别: Counter({2: 86, 3: 72, 0: 71, 5: 70, 4: 69, 1: 68, 6: 68, 7: 68})
MobileNetV3(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, tr

                                                

TypeError: mixup_data() got an unexpected keyword argument 'device'

In [None]:
def plot_training_curves(fold_results, result_dir):
    """绘制所有fold的训练曲线和平均曲线"""
    
    num_folds = len(fold_results)
    
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 单个fold的曲线 (第一个fold作为代表)
    history = fold_results[0]['history']
    epochs = np.arange(1, len(history['train_loss']) + 1)
    
    # Loss - 单fold
    axes[0, 0].plot(epochs, history['train_loss'], 'b-', label='Train', linewidth=2)
    axes[0, 0].plot(epochs, history['val_loss'], 'r-', label='Val', linewidth=2)
    axes[0, 0].set_xlabel('Epoch', fontsize=11)
    axes[0, 0].set_ylabel('Loss', fontsize=11)
    axes[0, 0].set_title(f'Loss Curve (Fold 1)', fontsize=12, fontweight='bold')
    axes[0, 0].legend(fontsize=10)
    axes[0, 0].grid(True, alpha=0.3)
    
    # Accuracy - 单fold
    axes[0, 1].plot(epochs, history['train_acc'], 'b-', label='Train', linewidth=2)
    axes[0, 1].plot(epochs, history['val_acc'], 'r-', label='Val', linewidth=2)
    axes[0, 1].set_xlabel('Epoch', fontsize=11)
    axes[0, 1].set_ylabel('Accuracy', fontsize=11)
    axes[0, 1].set_title(f'Accuracy Curve (Fold 1)', fontsize=12, fontweight='bold')
    axes[0, 1].legend(fontsize=10)
    axes[0, 1].grid(True, alpha=0.3)
    
    # 平均Loss曲线
    min_len = min([len(r['history']['train_loss']) for r in fold_results])
    avg_train_loss = np.mean(
        [r['history']['train_loss'][:min_len] for r in fold_results], axis=0
    )
    avg_val_loss = np.mean(
        [r['history']['val_loss'][:min_len] for r in fold_results], axis=0
    )
    avg_epochs = np.arange(1, min_len + 1)
    
    axes[1, 0].plot(avg_epochs, avg_train_loss, 'b-', label='Train (Avg)', linewidth=2)
    axes[1, 0].plot(avg_epochs, avg_val_loss, 'r-', label='Val (Avg)', linewidth=2)
    axes[1, 0].fill_between(avg_epochs, avg_train_loss, alpha=0.2, color='blue')
    axes[1, 0].fill_between(avg_epochs, avg_val_loss, alpha=0.2, color='red')
    axes[1, 0].set_xlabel('Epoch', fontsize=11)
    axes[1, 0].set_ylabel('Loss', fontsize=11)
    axes[1, 0].set_title(f'Average Loss ({CFG["n_splits"]} Folds)', fontsize=12, fontweight='bold')
    axes[1, 0].legend(fontsize=10)
    axes[1, 0].grid(True, alpha=0.3)
    
    # 平均Accuracy曲线
    avg_train_acc = np.mean(
        [r['history']['train_acc'][:min_len] for r in fold_results], axis=0
    )
    avg_val_acc = np.mean(
        [r['history']['val_acc'][:min_len] for r in fold_results], axis=0
    )
    
    axes[1, 1].plot(avg_epochs, avg_train_acc, 'b-', label='Train (Avg)', linewidth=2)
    axes[1, 1].plot(avg_epochs, avg_val_acc, 'r-', label='Val (Avg)', linewidth=2)
    axes[1, 1].fill_between(avg_epochs, avg_train_acc, alpha=0.2, color='blue')
    axes[1, 1].fill_between(avg_epochs, avg_val_acc, alpha=0.2, color='red')
    axes[1, 1].set_xlabel('Epoch', fontsize=11)
    axes[1, 1].set_ylabel('Accuracy', fontsize=11)
    axes[1, 1].set_title(f'Average Accuracy ({CFG["n_splits"]} Folds)', fontsize=12, fontweight='bold')
    axes[1, 1].legend(fontsize=10)
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig(os.path.join(result_dir, 'training_curves.png'), dpi=300, bbox_inches='tight')
    plt.show()
    print(" 训练曲线已保存")

plot_training_curves(fold_results, result_dir)

In [None]:
def evaluate_with_f1_metrics(fold_results, class_names, result_dir):
    """使用F1分数和详细的分类指标评估"""
    
    # 合并所有fold的预测
    all_preds = np.concatenate([r['preds'] for r in fold_results])
    all_labels = np.concatenate([r['labels'] for r in fold_results])
    
    # 逐fold的评估
    print("\n各Fold的F1分数:")
    print("-" * 60)
    f1_scores = []
    
    for i, result in enumerate(fold_results):
        preds = result['preds']
        labels = result['labels']
        
        f1_weighted = f1_score(labels, preds, average='weighted')
        f1_macro = f1_score(labels, preds, average='macro')
        f1_micro = f1_score(labels, preds, average='micro')
        acc = accuracy_score(labels, preds)
        
        f1_scores.append(f1_weighted)
        
        print(f"Fold {i+1}: Acc={acc:.4f}, F1_W={f1_weighted:.4f}, F1_M={f1_macro:.4f}")
    
    mean_f1 = np.mean(f1_scores)
    std_f1 = np.std(f1_scores)
    
    print(f"\n平均 Weighted F1: {mean_f1:.4f} ± {std_f1:.4f}")
    
    # 整体分类报告
    print("\n" + "="*60)
    print("整体分类报告（所有Fold合并）")
    print("="*60)
    print(classification_report(all_labels, all_preds, target_names=class_names, digits=4))
    
    # 按类别的F1分数
    f1_per_class = f1_score(all_labels, all_preds, average=None)
    
    fig, ax = plt.subplots(figsize=(12, 5))
    x = np.arange(len(class_names))
    bars = ax.bar(x, f1_per_class, color='steelblue', alpha=0.8)
    ax.set_xlabel('Class', fontsize=12)
    ax.set_ylabel('F1 Score', fontsize=12)
    ax.set_title('F1 Score per Class', fontsize=14, fontweight='bold')
    ax.set_xticks(x)
    ax.set_xticklabels(class_names, rotation=45, ha='right')
    ax.set_ylim([0, 1])
    ax.grid(True, alpha=0.3, axis='y')
    
    # 添加数值标签
    for bar in bars:
        height = bar.get_height()
        ax.text(bar.get_x() + bar.get_width()/2., height,
               f'{height:.3f}', ha='center', va='bottom', fontsize=9)
    
    plt.tight_layout()
    plt.savefig(os.path.join(result_dir, 'f1_scores_per_class.png'), dpi=300, bbox_inches='tight')
    plt.show()
    print("\n F1分数图已保存")
    
    return all_preds, all_labels

all_preds, all_labels = evaluate_with_f1_metrics(fold_results, class_names, result_dir)


In [None]:
def plot_confusion_matrix(all_preds, all_labels, class_names, result_dir):
    """绘制混淆矩阵"""
    
    cm = confusion_matrix(all_labels, all_preds)
    
    fig, ax = plt.subplots(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names,
                cbar_kws={'label': 'Count'},
                ax=ax, square=True)
    
    ax.set_xlabel('Predicted Label', fontsize=12)
    ax.set_ylabel('True Label', fontsize=12)
    ax.set_title(f'Confusion Matrix ({CFG["n_splits"]}-Fold CV)', fontsize=14, fontweight='bold')
    
    plt.tight_layout()
    plt.savefig(os.path.join(result_dir, 'confusion_matrix.png'), dpi=300, bbox_inches='tight')
    plt.show()
    print(" 混淆矩阵已保存")
    
    # 计算每类的准确率
    print("\n各类别准确率:")
    print("-" * 40)
    for i, class_name in enumerate(class_names):
        class_acc = cm[i, i] / cm[i].sum()
        print(f"{class_name:15s}: {class_acc*100:.2f}%")

plot_confusion_matrix(all_preds, all_labels, class_names, result_dir)

In [None]:
def visualize_random_predictions(fold_results, class_names, result_dir, num_samples=12):
    """随机展示模型预测结果"""
    
    # 合并所有fold的数据
    all_indices = []
    for result in fold_results:
        all_indices.append(result['val_idx'])
    all_indices = np.concatenate(all_indices)
    
    # 加载完整数据集
    full_dataset = datasets.ImageFolder(
        root=data_dir,
        transform=data_transforms["val"]
    )
    
    # 选择一个fold的模型进行推理
    best_fold = np.argmax([r['accuracy'] for r in fold_results])
    model_path = fold_results[best_fold]['model_path']
    
    model = create_model(num_classes, device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    
    # 从验证集中随机选择示例
    val_indices = fold_results[best_fold]['val_idx']
    random_sample_indices = np.random.choice(val_indices, size=min(num_samples, len(val_indices)), replace=False)
    
    fig, axes = plt.subplots(3, 4, figsize=(15, 10))
    axes = axes.ravel()
    
    with torch.no_grad():
        for idx, sample_idx in enumerate(random_sample_indices):
            image, label = full_dataset[sample_idx]
            
            # 推理
            image_batch = image.unsqueeze(0).to(device)
            output = model(image_batch)
            prob = F.softmax(output, dim=1)
            pred = prob.argmax(dim=1).item()
            confidence = prob[0, pred].item()
            
            # 反标准化显示
            image_np = image.cpu().numpy()
            image_np = image_np * np.array([0.229, 0.224, 0.225]).reshape(3, 1, 1) + \
                       np.array([0.485, 0.456, 0.406]).reshape(3, 1, 1)
            image_np = np.clip(image_np, 0, 1)
            image_np = np.transpose(image_np, (1, 2, 0))
            
            ax = axes[idx]
            ax.imshow(image_np)
            
            true_label = class_names[label]
            pred_label = class_names[pred]
            color = 'green' if label == pred else 'red'
            
            title = f'True: {true_label}\nPred: {pred_label}\nConf: {confidence:.2f}'
            ax.set_title(title, color=color, fontweight='bold', fontsize=10)
            ax.axis('off')
    
    for i in range(len(random_sample_indices), len(axes)):
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.savefig(os.path.join(result_dir, 'random_predictions.png'), dpi=300, bbox_inches='tight')
    plt.show()
    print(f"示例预测已保存（使用Fold {best_fold+1}的最优模型）")

visualize_random_predictions(fold_results, class_names, result_dir, num_samples=12)


In [None]:
def measure_inference_time(fold_results, device, num_iterations=100):
    """测量模型推理时间"""
    
    model_path = fold_results[0]['model_path']
    model = create_model(num_classes, device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    
    # 生成虚拟输入
    dummy_input = torch.randn(1, 3, 224, 224).to(device)
    
    # 预热GPU
    with torch.no_grad():
        for _ in range(10):
            _ = model(dummy_input)
    
    # 测量推理时间
    torch.cuda.synchronize()
    start_time = time.time()
    
    with torch.no_grad():
        for _ in range(num_iterations):
            _ = model(dummy_input)
    
    torch.cuda.synchronize()
    end_time = time.time()
    
    total_time = end_time - start_time
    avg_time_per_image = (total_time / num_iterations) * 1000  # ms
    fps = 1 / (total_time / num_iterations)
    
    print(f"\n推理时间统计 (n={num_iterations} iterations):")
    print(f"  总时间: {total_time:.4f}s")
    print(f"  单张图像: {avg_time_per_image:.4f}ms")
    print(f"  FPS: {fps:.2f}")
    
    # 不同batch size的推理时间
    batch_sizes = [1, 4, 8, 16, 32]
    fps_list = []
    
    print(f"\n不同Batch Size的推理速度:")
    print("-" * 40)
    
    for batch_size in batch_sizes:
        dummy_input = torch.randn(batch_size, 3, 224, 224).to(device)
        
        torch.cuda.synchronize()
        start = time.time()
        
        with torch.no_grad():
            for _ in range(50):
                _ = model(dummy_input)
        
        torch.cuda.synchronize()
        elapsed = time.time() - start
        
        fps = (50 * batch_size) / elapsed
        fps_list.append(fps)
        print(f"  Batch {batch_size:2d}: {fps:6.2f} FPS")
    
    # 绘制速度-精度图
    accuracies = [r['accuracy'] for r in fold_results]
    mean_acc = np.mean(accuracies)
    
    fig, ax = plt.subplots(figsize=(10, 6))
    
    colors = plt.cm.viridis(np.linspace(0, 1, len(batch_sizes)))
    for i, (batch_size, fps) in enumerate(zip(batch_sizes, fps_list)):
        ax.scatter(fps, mean_acc, s=300, alpha=0.7, color=colors[i], 
                  label=f'Batch {batch_size}', edgecolors='black', linewidth=2)
    
    ax.set_xlabel('FPS (Inference Speed)', fontsize=12)
    ax.set_ylabel('Accuracy', fontsize=12)
    ax.set_title('Speed-Accuracy Trade-off (MobileNetV3-Large)', fontsize=14, fontweight='bold')
    ax.legend(fontsize=10)
    ax.grid(True, alpha=0.3)
    ax.set_ylim([mean_acc - 0.05, min(mean_acc + 0.05, 1.0)])
    
    plt.tight_layout()
    plt.savefig(os.path.join(result_dir, 'speed_accuracy_tradeoff.png'), dpi=300, bbox_inches='tight')
    plt.show()
    print("\n 速度-精度图已保存")
    
    return avg_time_per_image, fps

inference_time, fps = measure_inference_time(fold_results, device)

In [None]:
class GradCAM:
    """Grad-CAM实现"""
    
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.activations = None
        self.gradients = None
        
        # 注册hook
        self.target_layer.register_forward_hook(self.save_activations)
        self.target_layer.register_backward_hook(self.save_gradients)
    
    def save_activations(self, module, input, output):
        self.activations = output.detach()
    
    def save_gradients(self, module, grad_input, grad_output):
        self.gradients = grad_output[0].detach()
    
    def generate_cam(self, input_tensor, class_idx):
        """生成CAM"""
        output = self.model(input_tensor)
        
        self.model.zero_grad()
        loss = output[0, class_idx]
        loss.backward()
        
        gradients = self.gradients.mean(dim=[2, 3], keepdim=True)
        cam = (gradients * self.activations).sum(dim=1, keepdim=True)
        cam = F.relu(cam)
        cam = cam.squeeze().cpu().numpy()
        
        # 归一化
        cam = (cam - cam.min()) / (cam.max() - cam.min() + 1e-8)
        
        return cam

In [None]:
def visualize_gradcam_predictions(fold_results, class_names, result_dir, num_samples=6):
    """使用Grad-CAM可视化模型关注区域"""
    
    model_path = fold_results[0]['model_path']
    model = create_model(num_classes, device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    
    # 获取features的最后一层
    target_layer = model.features[-1]
    grad_cam = GradCAM(model, target_layer)
    
    # 加载验证集
    full_dataset = datasets.ImageFolder(
        root=data_dir,
        transform=data_transforms["val"]
    )
    
    val_indices = fold_results[0]['val_idx']
    random_indices = np.random.choice(val_indices, size=min(num_samples, len(val_indices)), replace=False)
    
    fig, axes = plt.subplots(num_samples, 3, figsize=(12, 4*num_samples))
    if num_samples == 1:
        axes = axes.reshape(1, -1)
    
    with torch.no_grad():
        for row, sample_idx in enumerate(random_indices):
            image, label = full_dataset[sample_idx]
            
            # 推理
            image_batch = image.unsqueeze(0).to(device)
            output = model(image_batch)
            prob = F.softmax(output, dim=1)
            pred = prob.argmax(dim=1).item()
            
            # 生成CAM
            cam = grad_cam.generate_cam(image_batch, pred)
            
            # 反标准化原图
            image_np = image.cpu().numpy()
            image_np = image_np * np.array([0.229, 0.224, 0.225]).reshape(3, 1, 1) + \
                       np.array([0.485, 0.456, 0.406]).reshape(3, 1, 1)
            image_np = np.clip(image_np, 0, 1)
            image_np = np.transpose(image_np, (1, 2, 0))
            
            # 原图
            axes[row, 0].imshow(image_np)
            axes[row, 0].set_title(f'Original\n{class_names[label]}', fontsize=10)
            axes[row, 0].axis('off')
            
            # CAM热力图
            axes[row, 1].imshow(image_np)
            axes[row, 1].imshow(cv2.resize(cam, (224, 224)), cmap='jet', alpha=0.5)
            axes[row, 1].set_title(f'Grad-CAM\nPred: {class_names[pred]}', fontsize=10)
            axes[row, 1].axis('off')
            
            # 叠加图
            cam_resized = cv2.resize(cam, (224, 224))
            cam_colored = plt.cm.jet(cam_resized)[:, :, :3]
            blended = 0.7 * image_np + 0.3 * cam_colored
            axes[row, 2].imshow(blended)
            axes[row, 2].set_title(f'Blended', fontsize=10)
            axes[row, 2].axis('off')
    
    plt.tight_layout()
    plt.savefig(os.path.join(result_dir, 'gradcam_visualization.png'), dpi=300, bbox_inches='tight')
    plt.show()
    print(" Grad-CAM可视化已保存")

visualize_gradcam_predictions(fold_results, class_names, result_dir, num_samples=6)

MobileNetV3(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
        )
      )
    )
    (2): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1), bi

old

In [22]:
import matplotlib.pyplot as plt

def plot_training_curves(train_losses, val_losses, train_accs, val_accs):
    epochs = len(train_losses)

    plt.figure(figsize=(14,5))

    # Loss
    plt.subplot(1,2,1)
    plt.plot(range(epochs), train_losses, label="Train Loss")
    plt.plot(range(epochs), val_losses, label="Val Loss")
    plt.title("Training & Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    # Accuracy
    plt.subplot(1,2,2)
    plt.plot(range(epochs), train_accs, label="Train Acc")
    plt.plot(range(epochs), val_accs, label="Val Acc")
    plt.title("Training & Validation Accuracy")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()

    plt.show()


In [23]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

def evaluate_model(model, test_loader, class_names, device):
    model.eval()
    preds_list = []
    labels_list = []

    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            preds = outputs.argmax(dim=1).cpu()

            preds_list.extend(preds.numpy())
            labels_list.extend(labels.numpy())

    # 混淆矩阵
    cm = confusion_matrix(labels_list, preds_list)
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()

    # 分类指标报告
    print("\n 分类指标报告（Classification Report）")
    print(classification_report(labels_list, preds_list, target_names=class_names))


In [24]:
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize

def plot_multiclass_roc(model, test_loader, class_names, device):
    model.eval()
    y_true = []
    y_score = []

    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)

            y_true.extend(labels.numpy())
            y_score.extend(outputs.cpu().numpy())

    # One-vs-Rest
    y_true_bin = label_binarize(y_true, classes=list(range(len(class_names))))
    y_score = np.array(y_score)

    plt.figure(figsize=(10,8))

    for i in range(len(class_names)):
        fpr, tpr, _ = roc_curve(y_true_bin[:, i], y_score[:, i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f"{class_names[i]} (AUC={roc_auc:.2f})")

    plt.plot([0,1], [0,1], "k--")
    plt.title("ROC Curves (One-vs-Rest)")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.legend()
    plt.show()
