In [None]:
from torchvision import datasets, transforms                  # datasets用于加载数据集，transforms是各种变换
from torch.utils.data import DataLoader, random_split         # DataLoader用于分批加载数据，random_split函数用于随机分割数据集
import matplotlib.pyplot as plt     # 用于画图
import torch.nn as nn               # 用于构建网络模型
import torch.optim as optim         # 用于选择优化器
import torch
import numpy as np
import os                           # 用于可视化错误样本
from PIL import Image               # 用于可视化错误样本

## 0. 选择是否使用GPU

要求：
1. 选择使用gpu还是cpu
2. 把三个地方的代码放在gpu上运算：模型、损失函数、数据

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")   
print("当前使用的设备是：{}".format(device))

## 1. 数据集处理

### 1.1 下载并加载数据集

要求：

知道使用datasets下载并加载常用公开数据集，以及加载自己的数据集

In [None]:
# 下载并加载数据集（官网上默认只有训练集和测试集，验证集需要自己划分）
# 创建训练集对象
train_dataset = datasets.MNIST( root="./",                          # 数据集的存放地址
                                download=True,                      # 如果本地没有，那么从官网下载数据集
                                train=True,                         # 官网数据集分为6000张训练集，10000张测试集，这里加载训练集部分
                                transform=transforms.ToTensor())    # 把从官网加载的数据转换成tensor的形式

test_dataset = datasets.MNIST(  root="./",                          # 数据集的存放地址
                                download=True,                      # 如果本地没有，那么从官网下载数据集
                                train=False,                        # 官网数据集分为6000张训练集，10000张测试集，这里加载测试集部分
                                transform=transforms.ToTensor())    # 把从官网加载的数据转换成tensor的形式

### 1.2 从训练集中分割出验证集

要求：
1. 知道为什么要分割数据集， 训练集-测试集 的训练过程和 训练集-验证集-测试集 的区别
2. 知道可以使用random_split和设置随机种子分割数据集

In [None]:
# 从训练集中分割验证集

# 定义训练集和验证集大小
train_size = 55000  # 训练集大小
val_size = 5000     # 验证集大小 (60000 - 55000 = 5000)

# 设置随机种子
def set_seed(seed=42):
    torch.manual_seed(seed) # 设置PyTorch的随机种子，影响CPU上的随机数生成。
    np.random.seed(seed)    # 设置NumPy的随机种子，因为random_split可能使用NumPy的随机数生成器。
    if torch.cuda.is_available():   # 如果GPU可用，设置CUDA的随机种子。
        torch.cuda.manual_seed(seed)    # 设置当前GPU的随机种子。
        torch.cuda.manual_seed_all(seed)    # 如果使用多GPU，设置所有GPU的随机种子。

set_seed(42)

# 使用random_split划分训练集和验证集，random_split依赖上面随机种子的设置
train_dataset, val_dataset = random_split(
    train_dataset,
    [train_size, val_size]
)

### 1.3 可视化检验三个数据集

要求：
1. print格式化输出数据
2. 了解使用datasets加载的数据集的格式 
3. 使用matplotlib.pyplot可视化图片

In [None]:
# 检验三个数据集
print("【查看数据集整体大小】")
print("训练集尺寸：{}".format(len(train_dataset)))
print("验证集尺寸：{}".format(len(val_dataset)))
print("测试集尺寸：{}".format(len(test_dataset)))

# 抽取一个样本查看详细内容
print("【训练集查看一个样本点】")
img, label = train_dataset[0]
print("训练集第一张图片的size:{}".format(img.size()))
plt.imshow(img.reshape(28,28), cmap="binary")   # 显示图片，颜色映射为二值化图像
plt.show()
print("训练集第一张图片的label:{}".format(label))

print("【验证集查看一个样本点】")
img, label = val_dataset[0]
print("验证集第一张图片的size:{}".format(img.size()))
plt.imshow(img.reshape(28,28), cmap="binary")   # 显示图片，颜色映射为二值化图像
plt.show()
print("验证集第一张图片的label:{}".format(label))

print("【测试集查看一个样本点】")
img, label = test_dataset[0]
print("测试集第一张图片的size:{}".format(img.size()))
plt.imshow(img.reshape(28,28), cmap="binary")   # 显示图片，颜色映射为二值化图像
plt.show()
print("测试集第一张图片的label:{}".format(label))



### 1.4 数据集分批

要求：
1. 使用DataLoader对数据分批
2. 知道为什么训练集分批时要打乱顺序

In [None]:
# 批次大小
batch_size = 64

# 装载训练集
train_loader = DataLoader(dataset=train_dataset,
                         batch_size=batch_size,
                         shuffle=True)              # 训练时需要打乱顺序

# 装载验证集
val_loader = DataLoader(dataset=val_dataset,      
                         batch_size=batch_size,
                         shuffle=False)

# 装载测试集
test_loader = DataLoader(dataset=test_dataset,
                         batch_size=batch_size,
                         shuffle=False)

### 1.4 可视化检验每个dataloader

要求：
1. 了解dataloader对象的结构
2. 能使用print检查dataloader内容
3. 了解enumerate可以给可迭代对象添加索引

In [None]:
# 检验每个dataloader的内容

# train_loader某一个批次
for i,data in enumerate(train_loader):  #遍历dataloader里面每一个批次，enumerate用于给一个可迭代对象添加索引
    inputs,labels = data
    if(i == 8): #这里改看第几个批次的数据
        print("train_loader长度:{}个batch".format(len(train_loader)))
        print("train_loader第{}个批次:".format(i))
        print("图片size:{}".format(inputs.shape))
        print("标签size:{}".format(labels.shape))
        break  

# val_dataset某一个批次
for i,data in enumerate(val_loader):  #遍历dataloader里面每一个批次，enumerate用于给一个可迭代对象添加索引
    inputs,labels = data
    if(i == 8):
        print("train_loader长度:{}个batch".format(len(val_loader)))
        print("val_dataset第{}个批次:".format(i))
        print("图片size:{}".format(inputs.shape))
        print("标签size:{}".format(labels.shape))
        break 

# test_dataset某一个批次
for i,data in enumerate(test_loader):  #遍历dataloader里面每一个批次，enumerate用于给一个可迭代对象添加索引
    inputs,labels = data
    if(i == 8):
        print("train_loader长度:{}个batch".format(len(test_loader)))
        print("test_dataset第{}个批次:".format(i))
        print("图片size:{}".format(inputs.shape))
        print("标签size:{}".format(labels.shape))
        break 

## 2. 定义模型结构、损失函数、优化器

In [None]:
# 定义网络结构
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(1, 32, 5, 1, 2), nn.ReLU(), nn.MaxPool2d(2, 2))
        self.conv2 = nn.Sequential(nn.Conv2d(32, 64, 5, 1, 2), nn.ReLU(), nn.MaxPool2d(2, 2))
        self.fc1 = nn.Sequential(nn.Linear(64 * 7 * 7, 1000), nn.Dropout(p=0.4), nn.ReLU())
        self.fc2 = nn.Sequential(nn.Linear(1000, 10), nn.Softmax(dim=1))
        
    def forward(self, x):
        # ([64, 1, 28, 28])
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size()[0], -1)
        x = self.fc1(x)
        x = self.fc2(x)
        return x

In [None]:
# 学习率（超参数）
LR = 0.0003
# 定义模型
model = Net()
model = model.to(device)    # 把模型放到gpu上
# 定义代价函数
entropy_loss = nn.CrossEntropyLoss()    # 内置了log_softmax方法，避免loss出现nan；内部使用独热编码计算等价形式，不用在外部显示把标签转换成独热编码
entropy_loss = entropy_loss.to(device)  # 把损失函数放到gpu上
# 定义优化器
optimizer = optim.Adam(model.parameters(), LR)

## 3. 定义训练、验证、测试函数 和 错误样本可视化函数

### 3.1 定义错误样本可视化函数

In [None]:
# 错误样本可视化函数
error_dir = "./test_error"  # 创建保存错误样本的文件夹
os.makedirs(error_dir, exist_ok=True)

# 保存错误分类的样本图像
# 参数:
# error_samples: 列表，每个元素是一个元组 (图像, 真实标签, 预测标签, 索引)
# error_dir: 保存错误图像的目录路径
def save_error_images(error_samples, error_dir):
    print(f"保存 {len(error_samples)} 个错误分类样本到 {error_dir}")
    
    for img, true_label, pred_label, idx in error_samples:
        # 创建图像
        fig, ax = plt.subplots(figsize=(3, 3))
        
        # 显示图像
        if len(img.shape) == 3 and img.shape[0] == 1:  # 如果是(1, 28, 28)的形状
            img_display = img.squeeze(0)  # 去掉通道维度，变为(28, 28)
        else:
            img_display = img
            
        ax.imshow(img_display.cpu().numpy(), cmap='gray')
        
        # 设置标题
        title = f"True: {true_label}, Pred: {pred_label}"
        ax.set_title(title, fontsize=10)
        
        # 不显示坐标轴
        ax.set_xticks([])
        ax.set_yticks([])
        
        # 保存图像
        save_path = os.path.join(error_dir, f"error_{idx:05d}_true{true_label}_pred{pred_label}.png")
        plt.savefig(save_path, bbox_inches='tight', pad_inches=0.1, dpi=100)
        plt.close(fig)  # 关闭图形以释放内存

### 3.2 定义训练、验证、测试函数

要求：
1. 了解训练时 前向传播--计算loss--反向传播--迭代参数 的训练流程
2. 了解交叉熵损失函数的原理
3. 验证时的流程： 前向传播--统计正确个数并计算准确率
4. 使用张量的方法统计正确个数：直接比较两个张量--得到布尔张量--使用sum方法求和--使用item方法转换为整型

In [None]:
# 训练-验证流程（训练阶段）：
# 每一轮先在训练集上训练，训练完成后在验证集上验证
# 输出在验证集上的accuracy，以及在训练集上的平均loss和accuracy

# 训练函数
def train():
    model.train()       # 设置模型处于训练模式
    total_loss = 0      # 统计训练总loss
    for i, data in enumerate(train_loader):
        # （1）加载数据
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)    # 把训练数据放到gpu上
        # （2）前向传播，获得模型预测结果，（64，10）
        out = model(inputs)
        # （3）计算loss
        # 交叉熵代价函数out(batch,C),labels(batch)
        loss = entropy_loss(out, labels)        # 这里不需要把标签转换成独热编码，CrossEntropyLoss会自动使用等价的方式计算loss
        total_loss += loss
        # （4）反向传播并迭代一次
        # 梯度清0
        optimizer.zero_grad()
        # 计算梯度
        loss.backward()
        # 修改权值
        optimizer.step()
    return total_loss / len(train_dataset)   # 返回平均loss（每张图片的loss）

# 验证函数（在训练集和验证集上分别验证）
def val():
    model.eval()        # 设置模型处于验证模式
    accuracy_val = 0    # 在验证集上的准确率
    accuracy_train = 0  # 在训练集上的准确率
    
    # 在验证集上验证
    correct_val = 0         # 正确的个数
    for i, data in enumerate(val_loader):
        # （1）加载数据
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)    # 把测试数据放到gpu上
        # （2）前向传播，获得模型预测结果，（64，10）
        out = model(inputs)
        # 使用torch.max()把概率张量 转换成 所预测数字的张量predicted，尺寸是（64,1）
        # torch.max() 第一个参数是输入张量；第二个参数表示：沿哪个维度求最大值（0表示按列，1表示按行）。
        # torch.max() 返回一个元组，包含两个张量：第一个值：最大值本身;第二个值：最大值对应的索引位置
        _, predicted = torch.max(out, 1)        
        # （3）统计正确个数
        # 统计预测正确的数量，两种方法实现
        # 方法一：迭代比较
        for i, predict in enumerate(predicted):
            if(labels[i] == predict):
                correct_val += 1
        # 方法二：直接使用张量计算
        # 一次性比较predicted和labels两个张量，得到一个布尔张量，.sum方法求和得到一个只有一个元素的张量。
        # 严谨来说应该在后面继续使用.item()方法把张量类型转换成整型，但是这里自动转化了
        # correct_val += (predicted == labels).sum().item()     
    # 统计准确率
    accuracy_val = correct_val / len(val_dataset)
    
   # 在训练集上验证
    correct_train = 0         # 正确的个数
    for i, data in enumerate(train_loader):
        # （1）加载数据
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)    # 把测试数据放到gpu上
        # （2）前向传播，获得模型预测结果，（64，10）
        out = model(inputs)
        # 使用torch.max()把概率张量 转换成 所预测数字的张量predicted，尺寸是（64,1）
        # torch.max() 第一个参数是输入张量；第二个参数表示：沿哪个维度求最大值（0表示按列，1表示按行）。
        # torch.max() 返回一个元组，包含两个张量：第一个值：最大值本身;第二个值：最大值对应的索引位置
        _, predicted = torch.max(out, 1)        
        # （3）统计正确个数
        # 统计预测正确的数量，方法二实现
        # 方法二：直接使用张量计算
        # 一次性比较predicted和labels两个张量，得到一个布尔张量，.sum方法求和得到一个只有一个元素的张量，.item()得到整型
        correct_train += (predicted == labels).sum().item()     
    # 统计准确率
    accuracy_train = correct_train / len(train_dataset)

    # 返回
    return accuracy_val, accuracy_train

# 测试函数（在测试集上验证,包含保存错误样本）
def test():
    model.eval()        # 设置模型处于验证模式

    # 在验证集上验证
    correct_test = 0         # 正确的个数
    error_samples = []       # 保存错误样本
    for i, data in enumerate(test_loader):
        # （1）加载数据
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)    # 把测试数据放到gpu上
        # （2）前向传播，获得模型预测结果，（64，10）
        out = model(inputs)
        # 使用torch.max()把概率张量 转换成 所预测数字的张量predicted，尺寸是（64,1）
        # torch.max() 第一个参数是输入张量；第二个参数表示：沿哪个维度求最大值（0表示按列，1表示按行）。
        # torch.max() 返回一个元组，包含两个张量：第一个值：最大值本身;第二个值：最大值对应的索引位置
        _, predicted = torch.max(out, 1)        
        # （3）统计正确个数，并记录错误样本
        for j in range(len(labels)):    #遍历这个批次的每一个样本
            if labels[j] == predicted[j]:
                correct_test += 1
            else:
                # 收集错误样本：图像、真实标签、预测标签、全局索引
                img_idx = i * batch_size + j    # 图像的绝对标签，整个数据集中的第几张
                error_samples.append((inputs[j], labels[j].item(), predicted[j].item(), img_idx))
    
    # 统计准确率
    print(f"在测试集上的精度是:{correct_test / len(test_dataset)}")

    # 保存错误样本
    save_error_images(error_samples, error_dir)
    
    # 返回准确率
    return correct_test / len(test_dataset)

## 4. 主函数：训练-验证-测试全流程

In [None]:

best_model_path = "./models/best.pth"        # 定义模型保存路径
best_val_acc = 0                    # 验证集上的最高精度

# 训练-验证流程
for epoch in range(0, 20):   # 此处调整训练轮数
    print('epoch:',epoch)
    avg_loss = train()
    accuracy_val, accuracy_train = val()
    print(f"训练集上的loss是:{avg_loss}, accuracy是:{accuracy_train}")
    print(f"验证集上的accuracy是:{accuracy_val}")

    # 保存验证集上精度最高的模型
    if accuracy_val > best_val_acc:
        best_val_acc = accuracy_val
        # 保存模型参数
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_acc': accuracy_val,
            'train_acc': accuracy_train
        }, best_model_path)
        print(f'  ✓ 保存最佳模型，验证集准确率: {accuracy_val:.2f}%')

# 测试流程
# 加载最佳模型，在测试集上测试精度
print('\n加载最佳模型并在测试集上测试...')
checkpoint = torch.load(best_model_path)                 # checkpoint包含很多参数
model.load_state_dict(checkpoint['model_state_dict'])    # 取其中的模型参数加载
# 开始测试
test_acc = test()