## 准备数据

In [1]:
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pickle
from sklearn.model_selection import train_test_split

def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='latin1')
    return dict

cifar10_dir = '/home/lc/d2l-zh/exercise/cifar-10-python/cifar-10-batches-py/'
all_train_data = []
all_train_labels = []

for i in range(1, 6):
    file_path = f'{cifar10_dir}/data_batch_{i}'
    data_dict = unpickle(file_path)
    all_train_data.append(data_dict['data'])
    all_train_labels.extend(data_dict['labels'])

train_data_np = np.concatenate(all_train_data)
train_labels_np = np.array(all_train_labels)

X_train, X_val, y_train, y_val = train_test_split(
    train_data_np, 
    train_labels_np, 
    test_size=0.1, 
    random_state=42,
    stratify=train_labels_np 
)

test_file_path = f'{cifar10_dir}/test_batch'
test_dict = unpickle(test_file_path)
X_test = test_dict['data']
y_test = np.array(test_dict['labels'])

print(f"测试数据 (X_test) 形状: {X_test.shape}")
print(f"测试标签 (y_test) 形状: {y_test.shape}")

print(f"原始训练数据形状: {train_data_np.shape}")
print("-" * 30)
print(f"划分后的训练数据 (X_train) 形状: {X_train.shape}")
print(f"划分后的训练标签 (y_train) 形状: {y_train.shape}")
print(f"划分后的验证数据 (X_val) 形状: {X_val.shape}")
print(f"划分后的验证标签 (y_val) 形状: {y_val.shape}")

测试数据 (X_test) 形状: (10000, 3072)
测试标签 (y_test) 形状: (10000,)
原始训练数据形状: (50000, 3072)
------------------------------
划分后的训练数据 (X_train) 形状: (45000, 3072)
划分后的训练标签 (y_train) 形状: (45000,)
划分后的验证数据 (X_val) 形状: (5000, 3072)
划分后的验证标签 (y_val) 形状: (5000,)


In [2]:
class SimpleCIFAR10Dataset(data.Dataset):
    """
    一个极简的自定义数据集类。
    它直接接收NumPy数据，并在需要时返回PyTorch张量。
    """
    def __init__(self, images, labels):
        """
        初始化函数。
        Args:
            images (numpy.ndarray): 图像数据，形状为 (样本数, 3072)，代表一维形式。
            labels (numpy.ndarray): 标签数据，形状为 (样本数,)。
        """
        # 将数据转换为 PyTorch Tensor。这是与 PyTorch 模型交互的必要步骤。
        # 图像数据需要是浮点型以供模型计算。
        self.images = torch.from_numpy(images / 255.0).float()
        # 标签在计算损失函数时通常需要是长整型 (LongTensor)。
        self.labels = torch.from_numpy(labels).long()

    def __len__(self):
        """返回数据集的样本总数。"""
        return len(self.labels)

    def __getitem__(self, idx):
        """根据索引获取一个样本（图像和标签）。"""
        return self.images[idx], self.labels[idx]

# 1. 实例化训练和验证数据集
train_dataset = SimpleCIFAR10Dataset(X_train, y_train)
val_dataset = SimpleCIFAR10Dataset(X_val, y_val)
test_dataset = SimpleCIFAR10Dataset(X_test, y_test)

# 2. 创建 DataLoader
batch_size = 2000  # 定义每个批次的大小

train_loader = data.DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True,  # 训练时打乱数据顺序以增强模型泛化能力
    num_workers=16, # 增加工作进程数
    pin_memory=True
)

val_loader = data.DataLoader(
    dataset=val_dataset,
    batch_size=batch_size,
    shuffle=False, # 验证时通常不需要打乱
    num_workers=16, # 增加工作进程数
    pin_memory=True
)

test_loader = data.DataLoader(
    dataset=test_dataset,
    batch_size=batch_size, # 可以和训练时用一样的batch_size
    shuffle=False,      # 测试时不需要打乱数据
    num_workers=16, # 增加工作进程数
    pin_memory=True
)

# 3. 验证 DataLoader 是否工作正常
print("成功创建最简化的 DataLoader！")
print("-" * 30)

# 从 train_loader 中获取一个批次的数据进行检查
data_batch, labels_batch = next(iter(train_loader))
print(f"一个训练批次的数据 (images) 形状: {data_batch.shape}")
print(f"一个训练批次的标签 (labels) 形状: {labels_batch.shape}")
print(f"批次数据的类型: {data_batch.dtype}")
print(f"批次标签的类型: {labels_batch.dtype}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

成功创建最简化的 DataLoader！
------------------------------
一个训练批次的数据 (images) 形状: torch.Size([2000, 3072])
一个训练批次的标签 (labels) 形状: torch.Size([2000])
批次数据的类型: torch.float32
批次标签的类型: torch.int64
Using device: cuda


## 建立模型

In [6]:
class myModel(nn.Module):
    def __init__(self):
        super(myModel,self).__init__()
        self.linear1 = nn.Linear(3072,1024)
        self.linear2 = nn.Linear(1024,512)
        self.linear3 = nn.Linear(512,10)
        self.dropout = nn.Dropout(p=0.1)
    def forward(self, inp):
        x = self.linear1(inp)
        x = torch.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        x = torch.relu(x)
        x = self.dropout(x)
        logits = self.linear3(x)
        return logits
        
model = myModel()

optimizer = optim.Adam(model.parameters(),lr = 0.001, weight_decay=1e-4)
# optimizer = optim.Adam(model.parameters(),lr = 0.001)

def eval_model(model, test_loader, criterion):
    """
    在测试集上评估模型的最终性能。
    """
    model.eval()  # 切换到评估模式
    test_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad(): # 在此模式下，所有计算都不会被记录用于反向传播
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # 计算平均损失和准确率
    avg_loss = test_loss / len(test_loader)
    accuracy = 100 * correct / total

    return accuracy, avg_loss

## 计算 loss

In [7]:
criterion = nn.CrossEntropyLoss()

## 实际训练

In [8]:
num_epochs = 70
def train_one_step(model,optimizer,train_loader):
    model.train()
    for x_batch,y_batch in train_loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        optimizer.zero_grad()
        logits = model(x_batch)
        loss =  criterion(logits,y_batch)
        loss.backward()
        optimizer.step()

if __name__ == '__main__':
    model.to(device)
    best_val_accuracy = 0.0
    best_model_path = "best_model.pth"
    for i in range(num_epochs):
        train_one_step(model,optimizer,train_loader)
        train_accuracy, train_loss = eval_model(model, train_loader, criterion)
        val_accuracy, val_loss = eval_model(model, val_loader, criterion)
        print("-" * 30)
        print(f'Epoch [{i+1}/{num_epochs}] 的训练集准确率: {train_accuracy:.2f} %')
        print(f'Epoch [{i+1}/{num_epochs}] 的训练集loss: {train_loss:.2f}')
        print(f'Epoch [{i+1}/{num_epochs}] 的验证集准确率: {val_accuracy:.2f} %')
        print(f'Epoch [{i+1}/{num_epochs}] 的验证集loss: {val_loss:.2f}')
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), best_model_path)
            print(f"发现更好的模型，已保存到 {best_model_path}")
        print("-" * 30)
    print(f"加载最佳模型 ({best_model_path}) 进行最终测试...")
    model.load_state_dict(torch.load(best_model_path))

    # 使用加载了最佳权重的模型进行测试
    test_accuracy, test_loss = eval_model(model, test_loader, criterion)
    print(f'测试集准确率: {test_accuracy:.2f} %')
    print(f'测试集loss: {test_loss:.2f} %')

------------------------------
Epoch [1/200] 的训练集准确率: 29.93 %
Epoch [1/200] 的训练集loss: 1.96 %
Epoch [1/200] 的验证集准确率: 29.98 %
Epoch [1/200] 的验证集loss: 1.96 %
发现更好的模型，已保存到 best_model.pth
------------------------------
------------------------------
Epoch [2/200] 的训练集准确率: 34.75 %
Epoch [2/200] 的训练集loss: 1.84 %
Epoch [2/200] 的验证集准确率: 34.70 %
Epoch [2/200] 的验证集loss: 1.84 %
发现更好的模型，已保存到 best_model.pth
------------------------------
------------------------------
Epoch [3/200] 的训练集准确率: 37.80 %
Epoch [3/200] 的训练集loss: 1.75 %
Epoch [3/200] 的验证集准确率: 37.66 %
Epoch [3/200] 的验证集loss: 1.74 %
发现更好的模型，已保存到 best_model.pth
------------------------------
------------------------------
Epoch [4/200] 的训练集准确率: 40.05 %
Epoch [4/200] 的训练集loss: 1.69 %
Epoch [4/200] 的验证集准确率: 39.90 %
Epoch [4/200] 的验证集loss: 1.70 %
发现更好的模型，已保存到 best_model.pth
------------------------------
------------------------------
Epoch [5/200] 的训练集准确率: 41.79 %
Epoch [5/200] 的训练集loss: 1.64 %
Epoch [5/200] 的验证集准确率: 41.26 %
Epoch [5/200] 的验证集lo