# CIFAR-10 图像分类实验

本notebook实现了在CIFAR-10数据集上的深度学习图像分类实验，包含多种CNN架构和训练策略的比较。

## 实验内容
- 基础CNN模型
- ResNet残差连接
- 不同层数和隐藏层维度
- 不同池化方式（最大池化/平均池化）
- SGD和Adam优化器
- L2正则化
- 数据增强


In [None]:
# 导入必要的库
import torch
import torchvision
import os
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
import time
import argparse
from model_utils import *


## 数据加载和预处理


In [None]:
# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 数据预处理
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

print("加载CIFAR-10数据集...")

# 加载数据集
train_dataset = datasets.CIFAR10(root='/Users/daixunlian/workspace/class_project/deep_learning/hw_1/dataset/cifar', 
                                 train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root='/Users/daixunlian/workspace/class_project/deep_learning/hw_1/dataset/cifar', 
                                train=False, download=True, transform=transform)

# 创建数据加载器
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True, num_workers=2)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False, num_workers=2)

# CIFAR-10类别名称
classes = ('plane', 'car', 'bird', 'cat', 'deer', 
           'dog', 'frog', 'horse', 'ship', 'truck')

print(f"训练集大小: {len(train_dataset)}")
print(f"测试集大小: {len(test_dataset)}")
print(f"类别数量: {len(classes)}")


In [None]:
# 展示样本图像
def show_sample(train_loader):
    """展示10个样本"""
    class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 
                   'dog', 'frog', 'horse', 'ship', 'truck']
    data_iter = iter(train_loader)
    images, labels = next(data_iter)

    plt.figure(figsize=(12, 6))
    for i in range(10):
        img = (images[i] * 0.5 + 0.5)  # 反归一化
        img = img.permute(1, 2, 0).numpy()  # CHW -> HWC

        plt.subplot(2, 5, i+1)
        plt.imshow(img)
        plt.title(class_names[labels[i].item()])
        plt.axis('off')

    plt.suptitle("CIFAR-10 Sample Images", fontsize=16)
    plt.tight_layout()
    plt.savefig('./outputs/cifar/cifar10_samples.png', dpi=150, bbox_inches='tight')
    plt.show()
    print("样本图像已保存到 cifar10_samples.png")

# 展示样本
show_sample(train_loader)


## 训练和评估函数


In [None]:
# 训练函数
def train(model, train_loader, test_loader, criterion, optimizer, 
          epochs, device, save_dir, is_l2_loss=False, l2_lambda=0.001,
          config_name="base"):
    model.train()
    
    # 记录训练历史
    train_losses = []
    test_accuracies = []
    best_acc = 0.0
    
    print(f"\n{'='*60}")
    print(f"训练配置: {config_name}")
    print(f"L2正则化: {'是 (λ=' + str(l2_lambda) + ')' if is_l2_loss else '否'}")
    print(f"{'='*60}\n")
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            
            loss = criterion(outputs, labels)
            
            if is_l2_loss:
                l2_reg = torch.tensor(0., device=device)
                for param in model.parameters():
                    l2_reg += torch.norm(param, 2)
                loss += l2_lambda * l2_reg
            
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            if batch_idx % 100 == 99:
                avg_loss = running_loss / 100
                train_acc = 100 * correct / total
                print(f'Epoch: {epoch+1}/{epochs} | '
                      f'Batch: {batch_idx+1}/{len(train_loader)} | '
                      f'Loss: {avg_loss:.4f} | '
                      f'Train Acc: {train_acc:.2f}%')
                running_loss = 0.0

        test_acc = evaluate(model, test_loader, device)
        test_accuracies.append(test_acc)
        
        print(f'\nEpoch {epoch+1} 完成 - 测试准确率: {test_acc:.2f}%\n')
        
        if test_acc > best_acc:
            best_acc = test_acc
            save_path = os.path.join(save_dir, f'best_{config_name}.pth')
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'accuracy': test_acc,
            }, save_path)
            print(f'最佳模型已保存，准确率: {test_acc:.2f}%')

        if (epoch + 1) % 10 == 0:
            save_path = os.path.join(save_dir, f'{config_name}_epoch{epoch+1}.pth')
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'accuracy': test_acc,
            }, save_path)
    
    return test_accuracies, best_acc


In [None]:
# 评估函数
def evaluate(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    return accuracy

# 详细测试函数
def detailed_test(model, test_loader, device, classes):
    model.eval()
    class_correct = [0] * 10
    class_total = [0] * 10
    y_trues, y_preds = [], []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            
            y_preds.extend(predicted.cpu().numpy())
            y_trues.extend(labels.cpu().numpy())
            
            c = (predicted == labels)
            for i in range(len(labels)):
                label = labels[i].item()
                class_correct[label] += c[i].item()
                class_total[label] += 1
    
    print("\n" + "="*50)
    print("各类别准确率:")
    print("="*50)
    for i in range(10):
        if class_total[i] > 0:
            acc = 100 * class_correct[i] / class_total[i]
            print(f'{classes[i]:12s}: {acc:5.2f}% ({class_correct[i]}/{class_total[i]})')

    overall_acc = 100 * sum(class_correct) / sum(class_total)
    print(f'\n总体准确率: {overall_acc:.2f}%')
    
    print("\n" + "="*50)
    print("分类报告:")
    print("="*50)
    print(classification_report(y_trues, y_preds, target_names=classes))
    
    return y_trues, y_preds, overall_acc


In [None]:
# 可视化函数
def plot_confusion_matrix(y_trues, y_preds, classes, save_path):
    cm = confusion_matrix(y_trues, y_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
    
    fig, ax = plt.subplots(figsize=(10, 10))
    disp.plot(cmap=plt.cm.Blues, ax=ax)
    plt.xticks(rotation=45, ha='right')
    plt.title("混淆矩阵", fontsize=14)
    plt.tight_layout()
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.show()
    print(f"混淆矩阵已保存到 {save_path}")

def plot_training_history(history_dict, save_path):
    plt.figure(figsize=(12, 6))
    
    for config_name, accuracies in history_dict.items():
        epochs = range(1, len(accuracies) + 1)
        plt.plot(epochs, accuracies, marker='o', label=config_name, linewidth=2)
    
    plt.xlabel('Epoch', fontsize=12)
    plt.ylabel('Test Accuracy (%)', fontsize=12)
    plt.title('不同配置的测试准确率比较', fontsize=14)
    plt.legend(fontsize=10)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.show()
    print(f"训练历史图已保存到 {save_path}")


## 实验配置和运行


In [None]:
# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

# 创建输出目录
save_dir = './outputs/cifar/checkpoints'
os.makedirs(save_dir, exist_ok=True)

# 实验参数
num_epochs = 20
learning_rate = 0.001

# 实验配置列表
experiments = [
    {"name": "base_experiment", "args": {}},
    {"name": "add_hidden_dim", "args": {"is_large_hidden": True}},
    {"name": "add_layer_num", "args": {"is_large_layer": True}},
    {"name": "use_mean_pooling", "args": {"mean_pooling": True}},
    {"name": "use_resnet", "args": {"is_resnet": True}},
    {"name": "use_l2_regular", "args": {"is_l2_loss": True}},
    {"name": "use_adam", "args": {"use_adam": True}},
]

print(f"将运行 {len(experiments)} 个实验")


In [None]:
# 运行实验
results = {}

for i, exp in enumerate(experiments):
    print(f"\n{'#'*60}")
    print(f"# 实验 {i+1}/{len(experiments)}: {exp['name']}")
    print(f"{'#'*60}")
    
    # 创建模型
    model = CNNBase(
        is_large_layer=exp['args'].get('is_large_layer', False),
        is_large_hidden=exp['args'].get('is_large_hidden', False),
        pooling_type=exp['args'].get('mean_pooling', False),
        is_resnet=exp['args'].get('is_resnet', False),
        label_num=10
    ).to(device)
    
    # 损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    if exp['args'].get('use_adam', False):
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    else:
        optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
    
    config_name = exp['name']
    if exp['args'].get('is_l2_loss', False):
        config_name += "_l2reg"
    
    # 训练模型
    start_time = time.time()
    accuracies, best_acc = train(
        model, train_loader, test_loader, criterion, optimizer,
        epochs=num_epochs, device=device, save_dir=save_dir,
        is_l2_loss=exp['args'].get('is_l2_loss', False),
        l2_lambda=0.001,
        config_name=config_name
    )
    training_time = time.time() - start_time
    
    # 详细测试
    y_trues, y_preds, final_acc = detailed_test(model, test_loader, device, classes)
    
    # 保存混淆矩阵
    cm_path = f'./outputs/cifar/confusion_matrix_{config_name}.png'
    plot_confusion_matrix(y_trues, y_preds, classes, cm_path)
    
    # 记录结果
    results[exp['name']] = {
        'accuracies': accuracies,
        'best_acc': best_acc,
        'final_acc': final_acc,
        'training_time': training_time
    }
    
    print('*'*20)
    print(f"实验 {exp['name']} 最终准确率: {final_acc:.2f}%")
    
    # 保存结果到文件
    with open('./outputs/cifar/experiment_results.txt', 'a') as f:
        f.write(f"Experiment: {exp['name']}, Final Accuracy: {final_acc:.2f}%, Training Time: {training_time:.2f} seconds\n")


## 结果分析和可视化


In [None]:
# 绘制训练历史比较图
history_dict = {name: data['accuracies'] for name, data in results.items()}
plot_training_history(history_dict, './outputs/cifar/training_history_comparison.png')

# 创建结果汇总表
print("\n" + "="*80)
print("实验结果汇总")
print("="*80)
print(f"{'实验名称':<20} {'最终准确率':<12} {'最佳准确率':<12} {'训练时间(秒)':<15}")
print("-"*80)

for name, data in results.items():
    print(f"{name:<20} {data['final_acc']:<12.2f} {data['best_acc']:<12.2f} {data['training_time']:<15.2f}")

# 找出最佳配置
best_exp = max(results.items(), key=lambda x: x[1]['final_acc'])
print(f"\n最佳配置: {best_exp[0]} (准确率: {best_exp[1]['final_acc']:.2f}%)")
