In [2]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from torchsummary import summary
import torch.nn.functional as F
from torchvision import datasets, transforms
import torch.optim as optim
from torch.utils.data import DataLoader
import torch.nn as nn

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.286,), (0.353,))  # 假设FashionMNIST数据集已经预处理为[0,1]区间
])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.286,), (0.353,))
])

# 导入dataset
train_set = datasets.FashionMNIST(root="./data", train=True, download=True, transform=transform_train)
test_set = datasets.FashionMNIST(root="./data", train=False, download=True, transform=transform_test)

# 导入dataloader
train_dataloader = DataLoader(train_set, batch_size=20, shuffle=True)
test_dataloader = DataLoader(test_set, batch_size=20, shuffle=False)

class VGG(nn.Module):
    def __init__(self):
        super(VGG, self).__init__()
        self.features = nn.Sequential(
            # Conv Block 1
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Conv Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Conv Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Conv Block 4
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Conv Block 5
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 10),
        )
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight,nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            else isinstance(m, nn.Linear):
                nn.init.normal_(m.bias, 0)
    
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        output = self.classifier(x)
        return output

# 计算宏平均F1分数的辅助函数
def calculate_macro_f1_score(predictions, true_labels, num_classes=10):
    f1_scores = []
    for i in range(num_classes):
        tp = ((predictions == i) & (true_labels == i)).sum().item()
        fp = ((predictions == i) & (true_labels != i)).sum().item()
        fn = ((predictions != i) & (true_labels == i)).sum().item()
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_scores.append(2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0)
    macro_f1_score = np.mean(f1_scores)
    return macro_f1_score

loss_list = []
accuracy_list = []
macro_f1_score_list = []

def train(num_epochs):
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        correct_preds = 0
        total_samples = 0
        for i, (images, labels) in enumerate(train_dataloader):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1) # 预测取最大
            correct_preds += (predicted == labels).sum().item() # 转化标量
            total_samples += labels.size(0)
            if (i+1) % 200 == 0:
                print(f"第{i+1}批, Loss: {loss:.4f}")
        
        epoch_loss /= len(train_dataloader)
        epoch_accuracy = correct_preds / total_samples
        loss_list.append(epoch_loss)
        accuracy_list.append(epoch_accuracy)
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_accuracy:.4f}')
        
    # 绘制训练过程
    plt.figure(figsize=(10, 8))
    plt.subplot(2, 1, 1)  # 将图表分为2行1列，并定位在第1个子图
    plt.plot(range(1, num_epochs + 1), loss_list, marker='o', linewidth=2, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.xticks(range(1, num_epochs + 1))
    plt.legend()

    plt.subplot(2, 1, 2)  # 定位在第2个子图
    plt.plot(range(1, num_epochs + 1), accuracy_list, marker='o', linewidth=2, label='Training Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.xticks(range(1, num_epochs + 1))
    plt.legend()

    plt.tight_layout()
    plt.show()
    
def evaluate(model, test_dataloader):
    model.eval()
    correct_preds = 0
    total_samples = 0
    total_preds = []
    total_labels = []
    
    with torch.no_grad():
        for images, labels in test_dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total_preds.extend(predicted.view(-1).cpu().numpy())
            total_labels.extend(labels.view(-1).cpu().numpy())
            
            correct_preds += (predicted == labels).sum().item()
            total_samples += labels.size(0)
    
    accuracy = correct_preds / total_samples
    macro_f1_score = calculate_macro_f1_score(np.array(total_preds), np.array(total_labels))
    
    print(f'Test Accuracy: {accuracy:.4f}')
    print(f'Test Macro F1 Score: {macro_f1_score:.4f}')

if __name__ == "__main__":
    model = VGG().to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    # print(summary(model, (1, 224, 224)))
    # train(5)
    # evaluate(model, test_dataloader)

VGG(
  (features): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1