In [None]:
import os
import random
import time
from pathlib import Path
from typing import Dict, List

import numpy as np
import pandas as pd
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from torch.cuda import amp

In [None]:
# 基础配置
seed = 42
batch_size = 128
num_workers = 4 if torch.cuda.is_available() else 2
epochs = 3  # 若想取得更好精度，可提高到10+
learning_rate = 1e-3
weight_decay = 5e-4
use_amp = True

def set_seed(seed: int) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(seed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 数据路径：优先使用已有CIFAR-10缓存
data_root_candidates = [Path('../cnn_cifar/data'), Path('./data'), Path('../data')]
data_root = None
for cand in data_root_candidates:
    if (cand / 'cifar-10-batches-py').exists():
        data_root = cand
        break
if data_root is None:
    data_root = data_root_candidates[0]
    download_flag = True
else:
    download_flag = False

print(f"Data root: {data_root.resolve()} | download: {download_flag}")

weights_dir = Path('./datasets-readonly')

In [None]:
# 数据集与数据增强
mean = (0.4914, 0.4822, 0.4465)
std = (0.2470, 0.2435, 0.2616)
train_tf = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])
test_tf = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])

train_set = datasets.CIFAR10(root=data_root, train=True, download=download_flag, transform=train_tf)
test_set = datasets.CIFAR10(root=data_root, train=False, download=download_flag, transform=test_tf)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True, persistent_workers=True if num_workers > 0 else False)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True, persistent_workers=True if num_workers > 0 else False)

print(f"Train batches: {len(train_loader)}, Test batches: {len(test_loader)}")

In [None]:
# 模型构建与训练工具
PRETRAINED_FILES = {
    'alexnet': weights_dir / 'alexnet_pretrained_pytorch.pth',
    'vgg16': weights_dir / 'vgg16_pretrained_pytorch.pth',
    'resnet18': weights_dir / 'resnet18_pretrained_pytorch.pth',
}

def build_model(name: str, num_classes: int = 10) -> nn.Module:
    name = name.lower()
    if name not in PRETRAINED_FILES:
        raise ValueError(f'Unsupported model: {name}')
    weights_path = PRETRAINED_FILES[name]

    if name == 'alexnet':
        model = models.alexnet(weights=None)
    elif name == 'vgg16':
        model = models.vgg16(weights=None)
    elif name == 'resnet18':
        model = models.resnet18(weights=None)

    if weights_path.exists():
        state = torch.load(weights_path, map_location='cpu')
        missing = model.load_state_dict(state, strict=False)
        if missing.missing_keys:
            print(f'[warn] Missing keys while loading {name}: {missing.missing_keys}')
        if missing.unexpected_keys:
            print(f'[warn] Unexpected keys while loading {name}: {missing.unexpected_keys}')
    else:
        print(f'[info] Local weights not found for {name}, falling back to torchvision pretrained.')
        if name == 'alexnet':
            model = models.alexnet(weights=models.AlexNet_Weights.IMAGENET1K_V1)
        elif name == 'vgg16':
            model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)
        elif name == 'resnet18':
            model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

    # 替换分类头
    if name in ('alexnet', 'vgg16'):
        in_features = model.classifier[6].in_features
        model.classifier[6] = nn.Linear(in_features, num_classes)
    else:  # resnet18
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, num_classes)
    return model

def set_trainable_layers(model: nn.Module, finetune_backbone: bool) -> None:
    # 仅分类头可训练，或全部可训练
    for name, param in model.named_parameters():
        is_head = name.startswith('classifier') or name.startswith('fc')
        param.requires_grad = finetune_backbone or is_head

def accuracy_from_logits(logits: torch.Tensor, labels: torch.Tensor) -> float:
    preds = logits.argmax(dim=1)
    correct = (preds == labels).sum().item()
    return correct / labels.size(0)

def train_one_epoch(model: nn.Module, loader: DataLoader, criterion, optimizer, scaler: amp.GradScaler) -> Dict[str, float]:
    model.train()
    running_loss = 0.0
    running_correct = 0
    total = 0
    for images, labels in loader:
        images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
        optimizer.zero_grad()
        with amp.autocast(enabled=use_amp and device.type == 'cuda'):
            outputs = model(images)
            loss = criterion(outputs, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * labels.size(0)
        running_correct += (outputs.argmax(dim=1) == labels).sum().item()
        total += labels.size(0)
    return {
        'loss': running_loss / total,
        'acc': running_correct / total,
    }

@torch.no_grad()
def evaluate(model: nn.Module, loader: DataLoader, criterion) -> Dict[str, float]:
    model.eval()
    running_loss = 0.0
    running_correct = 0
    total = 0
    for images, labels in loader:
        images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
        with amp.autocast(enabled=use_amp and device.type == 'cuda'):
            outputs = model(images)
            loss = criterion(outputs, labels)
        running_loss += loss.item() * labels.size(0)
        running_correct += (outputs.argmax(dim=1) == labels).sum().item()
        total += labels.size(0)
    return {
        'loss': running_loss / total,
        'acc': running_correct / total,
    }

def run_experiment(model_name: str, finetune_backbone: bool) -> Dict[str, float]:
    model = build_model(model_name).to(device)
    set_trainable_layers(model, finetune_backbone)

    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = optim.AdamW(params, lr=learning_rate, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    criterion = nn.CrossEntropyLoss()
    scaler = amp.GradScaler(enabled=use_amp and device.type == 'cuda')

    history = []
    best_acc = 0.0
    start_time = time.time()
    for epoch in range(1, epochs + 1):
        train_metrics = train_one_epoch(model, train_loader, criterion, optimizer, scaler)
        val_metrics = evaluate(model, test_loader, criterion)
        scheduler.step()
        best_acc = max(best_acc, val_metrics['acc'])
        history.append({**train_metrics, **{'val_loss': val_metrics['loss'], 'val_acc': val_metrics['acc']}})
        print(f"[ {model_name} | finetune_backbone={finetune_backbone} ] Epoch {epoch}/{epochs} | "
              f"train_loss={train_metrics['loss']:.4f}, train_acc={train_metrics['acc']:.4f}, "
              f"val_loss={val_metrics['loss']:.4f}, val_acc={val_metrics['acc']:.4f}")

    elapsed = time.time() - start_time
    return {
        'model': model_name,
        'finetune_backbone': finetune_backbone,
        'best_acc': best_acc,
        'last_acc': history[-1]['val_acc'],
        'elapsed_sec': elapsed,
        'history': history,
    }

In [None]:
# 运行实验：三种模型 × 两种策略
model_list = ['alexnet', 'vgg16', 'resnet18']
results: List[Dict] = []

for model_name in model_list:
    for finetune_backbone in (True, False):
        print(f"===== Running {model_name} | finetune_backbone={finetune_backbone} =====")
        res = run_experiment(model_name, finetune_backbone)
        results.append({k: v for k, v in res.items() if k != 'history'})

summary_df = pd.DataFrame(results)
summary_df['best_acc'] = (summary_df['best_acc'] * 100).round(2)
summary_df['last_acc'] = (summary_df['last_acc'] * 100).round(2)
summary_df['elapsed_sec'] = summary_df['elapsed_sec'].round(1)
summary_df

## 提示与可选改进
- 如需更高精度，将`epochs`提高到10-30，并适当调大学习率预热或使用更强数据增强（MixUp/CutMix/随机擦除等）。
- 如果希望仅替换最后一层而冻结中间全连接层，可在`set_trainable_layers`中改为只开放`classifier.6`或`fc`层。
- 若本地`./datasets-readonly`缺失预训练权重，代码会自动回退到torchvision预训练（需要网络可用）。
- 运行结束后可根据`summary_df`中的精度比较两种策略表现。