In [None]:
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torch
import time
import numpy as np
from torch import nn

import matplotlib.pyplot as plt 
from IPython.display import clear_output,display
#https://data.vision.ee.ethz.ch/cvl/datasets_extra/food-101/
device = 'mps'
transform = transforms.Compose([
    transforms.Resize((64,64)),
    transforms.RandomRotation(degrees=15),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor()
])

all_data = ImageFolder(root='food-101/images/',transform=transform)

train_size =int( 0.8 * len(all_data))
test_size = len(all_data) - train_size

train_data,test_data = random_split(all_data,[train_size,test_size])
train_data,test_data = train_data.dataset,test_data.dataset
train_loader = DataLoader(dataset=train_data,batch_size=128,shuffle=True)
test_loader = DataLoader(dataset=test_data,batch_size=128,shuffle=False)

In [None]:

class CustomModelWithConv(nn.Module):
    def __init__(self, weight_decay=1e-4,dropout_prob=0.5):
        super(CustomModelWithConv, self).__init__()

        self.features = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.flatten = nn.Flatten()

        self.classifier = nn.Sequential(
            nn.Linear(10 * 16 * 16, 32),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Dropout(p=dropout_prob),
            nn.Linear(32, len(train_data.classes)),
            nn.ReLU()
        )

        # 添加L2正则化
        self.weight_decay = weight_decay

    def forward(self, x):
        x = self.features(x)
        x = self.flatten(x)
        x = self.classifier(x)
        return x

    
    def l2_regularization(self):
        l2_reg = torch.tensor(0.0).to(device)
        for param in self.parameters():
            if len(param.shape) > 1:  # 仅对权重矩阵应用 L2 正则化
                l2_reg += torch.norm(param, p='fro')  # 计算 Frobenius 范数
        return self.weight_decay * l2_reg
    
module = CustomModelWithConv()

def weights_init(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

module.apply(weights_init)
    
loss_fn = nn.CrossEntropyLoss()
from torch.optim.lr_scheduler import StepLR,ReduceLROnPlateau
#optimizer = torch.optim.Adam(params=module.parameters(),lr = 0.01, weight_decay=1e-4)
#optimizer = torch.optim.Adadelta(module.parameters(), lr=0.05, rho=0.9, eps=1e-6, weight_decay=1e-4)
optimizer = torch.optim.SGD(params=module.parameters(),lr = 0.01)
#scheduler = StepLR(optimizer, step_size=20, gamma=0.2)
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.1, verbose=True)

from util import accuracy_fn,train_step,test_step



best_loss = float('inf')
patience= 10
current_patience = 0
epochs = 100
train_result = []
test_result = []

test_losses= []
test_accs = []
train_losses = []
train_accs = []

fig, ((ax1, ax2), (ax3, ax4),(ax5,ax6)) = plt.subplots(3, 2, figsize=(16, 8), gridspec_kw={'height_ratios': [1, 1,1]})
#plt.subplots_adjust(wspace=0.5, hspace=0.5)

for epoch in range(epochs):
    start_time = time.time()
    train_loss,train_acc,train_epoch_losses,trian_epoch_accs = train_step(module=module,data_loader=train_loader,loss_fn=loss_fn,optimizer=optimizer,device=device,accuracy_fn=accuracy_fn)
    test_loss,test_acc,test_epoch_losses,test_epoch_accs = test_step(module=module,data_loader= test_loader,loss_fn= loss_fn,device=device,accuracy_fn=accuracy_fn)
    scheduler.step(test_loss)

    if test_loss < best_loss:
        best_loss = test_loss
        current_patience = 0
    else:
        current_patience += 1

    train_result.append((epoch,train_loss,train_acc))
    train_losses.append(train_loss.detach().cpu().numpy())
    #train_losses.append(train_loss.cpu())
    train_accs.append(train_acc)
    test_result.append((epoch,test_loss,test_acc))
    test_losses.append(test_loss.detach().cpu().numpy())
    test_accs.append(test_acc)

    clear_output(wait=True)
    ax1.plot([item.detach().cpu().numpy() for item in train_epoch_losses])
    ax2.plot([item.detach().cpu().numpy() for item in test_epoch_losses])
    ax3.plot(train_losses, )
    ax4.plot(test_losses, )
    ax5.plot(train_accs)
    ax6.plot(test_accs)
    ax1.set_title(f'current epoch:{epoch},Train Loss')
    ax2.set_title(f'current epoch:{epoch},Test Loss')
    ax3.set_title(f'Train Loss={[ np.round(item,4) for item in train_losses[-5:]]}')
    ax4.set_title(f'Test Loss={[ np.round(item,4) for item in test_losses[-5:]]}')
    ax5.set_title(f'Trian Accuracy={[ round(item,4) for item in train_accs[-5:]]}')
    ax6.set_title(f'Test Accuracy={[ round(item,4) for item in test_accs[-5:]]}')
    display(fig)
    end_time = time.time()
    current_lr = optimizer.param_groups[0]['lr']
    print(f'-------epoch------:{epoch} time:{(end_time - start_time):.2f}s current_lr:{current_lr},current_patience:{current_patience}')
    for name, param in module.named_parameters():
        if param.grad is not None:
            print(f'Gradient for {name}: {param.grad.norm()}')
    if current_patience == patience:
        print(f'Early stopping! No improvement for {patience} consecutive epochs.')
        print(f"train loss:{[ np.round(item,4) for item in train_losses]}")
        print(f"test loss:{[ np.round(item,4) for item in test_losses]}")
        break

    if epoch % 10 == 0:
        print(f"last 10 loss:{[ np.round(item,4) for item in test_losses[-10:]]}")
plt.ioff()
