In [None]:
from torch import nn
from torch.nn import functional as F

class VGG16(nn.Module):
    def __init__(self):
        super(VGG16, self).__init__()
        self.block1 = nn.Sequential(
            # 这里使用灰度图测试，通道为1
            nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.block2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.block3 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.block4 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.block5 = nn.Sequential(
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.block6 = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=512*7*7, out_features=256),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features=256, out_features=128),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features=128, out_features=10)
        )

        # 初始化权重
        for m in self.modules():
            # 卷积层初始化
            if isinstance(m, nn.Conv2d):
                # 使用kaiming初始化
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            # 全连接层初始化
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        return x
        

In [2]:
# 画图
import matplotlib.pyplot as plt

def matplot_train_process_data(train_process_data):
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_process_data['epoch'], train_process_data['train_loss'], 'ro-',label='Train Loss')
    plt.plot(train_process_data['epoch'], train_process_data['val_loss'], 'bs-', label='Val Loss')
    plt.legend()
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Train and Val Loss')

    plt.subplot(1, 2, 2)
    plt.plot(train_process_data['epoch'], train_process_data['train_acc'], 'ro-', label='Train Acc')
    plt.plot(train_process_data['epoch'], train_process_data['val_acc'], 'bs-', label='Val Acc')
    plt.legend()
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Train and Val Accuracy')
    plt.show()

In [None]:
# 训练
import copy
import time
import torchvision.transforms as Transforms
import torchvision.datasets as Datasets
import torch.utils.data as Data
import pandas as pd


def train_val_data_process():
    print(f"使用 num_workers={num_workers} 加载数据")
    
    # 加载数据集
    train_data = Datasets.FashionMNIST(root='./data', train=True, 
        transform=Transforms.Compose([
            Transforms.Resize((224, 224)),
            Transforms.ToTensor()
        ]), 
        download=True)
    
    # 划分训练集和验证集
    train_data, val_data = Data.random_split(train_data, 
    [round(len(train_data) * 0.8), round(len(train_data) * 0.2)])
    
    train_loader = Data.DataLoader(dataset=train_data, 
        batch_size=64,
        shuffle=True,
        num_workers=8)  # GPU 时启用 pin_memory 加速

    val_loader = Data.DataLoader(dataset=val_data, 
        batch_size=64,
        shuffle=True,
        num_workers=8)
    
    return train_loader, val_loader


def train_model_process(model, train_loader, val_loader, num_epochs=5):
    # 加载模型
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("当前设备：", device)
    model = model.to(device)

    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    # 训练损失
    train_loss_all = []
    # 训练准确率
    train_acc_all = []
    # 验证损失
    val_loss_all = []
    # 验证准确率
    val_acc_all = []

    since = time.time()
    # 训练
    for epoch in range(num_epochs):
        
        print(f"Epoch {epoch+1}/{num_epochs} - LR: {optimizer.param_groups[0]['lr']}")
        print('-' * 10)
        
        train_loss = 0.0
        train_corrects = 0.0

        val_loss = 0.0
        val_corrects = 0.0

        train_num = 0
        val_num = 0

        # 训练
        for step, (b_x, b_y) in enumerate(train_loader):
            # 数据移动到设备
            b_x = b_x.to(device)
            b_y = b_y.to(device)

            train_num += b_x.size(0)

            # 模型训练
            model.train()
            outputs = model(b_x)
            # 计算损失
            loss = criterion(outputs, b_y)
            
            # 梯度清零，防止梯度叠加
            optimizer.zero_grad()
            # 反向传播
            loss.backward()
            # 更新参数
            optimizer.step()

            # 计算准确率
            pre_lab = torch.argmax(outputs, dim=1)
            train_corrects += torch.sum(pre_lab == b_y)
            # 计算损失
            train_loss += loss.item() * b_x.size(0)
            
            # 计算验证损失和准确率
        for step, (b_x, b_y) in enumerate(val_loader):
            b_x = b_x.to(device)
            b_y = b_y.to(device)

            val_num += b_x.size(0)

            # 模型验证
            model.eval()
            outputs = model(b_x)
            loss = criterion(outputs, b_y)

            # 计算验证损失
            val_loss += loss.item() * b_x.size(0)
            pre_lab = torch.argmax(outputs, dim=1)
            val_corrects += torch.sum(pre_lab == b_y)

        # 计算训练损失和准确率
        train_loss_all.append(train_loss / train_num)
        train_acc_all.append(train_corrects.double().item() / train_num)
        val_loss_all.append(val_loss / val_num)
        val_acc_all.append(val_corrects.double().item() / val_num)

        print('Train loss: {:.4f} Train acc: {:.4f} | Val loss: {:.4f} Val acc: {:.4f}'
        .format(train_loss_all[-1], train_acc_all[-1], val_loss_all[-1], val_acc_all[-1]))

        # 保存最佳模型参数
    if val_acc_all[-1] > best_acc:
        best_acc = val_acc_all[-1]
        best_model_wts = copy.deepcopy(model.state_dict())
    

    time_elapsed = time.time() - since
    print(f"训练完成，用时 {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s")
    
    # 保存最佳模型
    torch.save(best_model_wts, 'model/vgg16_best_model.pth')

    train_process_data = pd.DataFrame({
        'epoch': range(num_epochs),
        'train_loss': train_loss_all,
        'train_acc': train_acc_all,
        'val_loss': val_loss_all,
        'val_acc': val_acc_all
    })
    return train_process_data
    

model = VGG16()
train_loader, val_loader = train_val_data_process()
train_process_data = train_model_process(model, train_loader, val_loader, num_epochs=20)

matplot_train_process_data(train_process_data)
    

In [None]:
# 测试
import torch
import torchvision.datasets as datasets
import torchvision.transforms as Transforms
import torch.utils.data as Data


def get_test_loader():
    # 数据预处理
    transforms = Transforms.Compose([
        Transforms.Resize(size=227),
        Transforms.ToTensor()
    ])

    # 加载测试数据
    test_dataset = datasets.FashionMNIST(root='./data', 
        train=False,
        transform=transforms, 
        download=True)
    
    test_loader = Data.DataLoader(dataset=test_dataset, 
        batch_size=1, 
        shuffle=True,
        num_workers=4)
    
    return test_loader

def test_model_process(model, test_loader):
    # 模型放入设备
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # 初始化参数
    test_corrects = 0.0
    test_num = 0

    # 只进行前向传播
    with torch.no_grad():
        for test_x, test_y in test_loader:
            test_x = test_x.to(device)
            test_y = test_y.to(device)

            test_num += test_x.size(0)

            outputs = model(test_x)
            model.eval()

            outputs = model(test_x)
            
            pre_lab = torch.argmax(outputs, dim=1)
            test_corrects += torch.sum(pre_lab == test_y)
    
    test_acc = test_corrects.double().item() / test_num
    print(f"测试准确率: {test_acc:.4f}")

model = VGG16()
model.load_state_dict(torch.load('./model/vgg16_best_model.pth'))
test_loader = get_test_loader()
test_model_process(model=model, test_loader=test_loader)
