### Load data from dataset

In [None]:
import pickle
import numpy as np
from sklearn.model_selection import train_test_split


# function used to extract 
def unpickle(path):
    with open(path, 'rb') as fo:
        data = pickle.load(fo, encoding='bytes')
    return data


# image data is stored as N * W * H * C
# 50000 training data, 10000 testing data
train_x, train_y = np.zeros((50000, 32, 32, 3), dtype=np.uint8), []
test_y = []

# extract training data
for i in range(1, 6):
    data_path = f"../data/cifar-10-batches-py/data_batch_{i}"
    data = unpickle(path=data_path)
    train_x[(i - 1) * 10000: i * 10000] = data[b'data'].reshape(10000, 3, 32, 32).transpose((0, 2, 3, 1))
    train_y.extend(data[b'labels'])

# extract testing data
data_path = f"../data/cifar-10-batches-py/test_batch"
data = unpickle(path=data_path)
test_x = data[b'data'].reshape(10000, 3, 32, 32).transpose((0, 2, 3, 1))
test_y = data[b'labels']

# convert to numpy arrays
train_y = np.array(train_y)
test_y = np.array(test_y)

# split the validation set
train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, test_size=0.15)


### Model hyperparameters

In [None]:
import torch
from model import ResNet110


device = "cuda" if torch.cuda.is_available() else "cpu"
lr = 1e-2
batch_size = 256
epochs = 50

model = ResNet110(device=device)
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(
    model.parameters(),
    lr=lr,
    momentum=0.9,
    weight_decay=1e-4
)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(
    optimizer,
    milestones=[100, 150],
)


### Obtain the datasets

In [None]:
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T


class CIFARDataset(Dataset):
    def __init__(self, images, labels, transform):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label


transform_train = T.Compose([
    T.ToTensor(),
    T.RandomCrop(32, padding=4),
    T.RandomHorizontalFlip(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = T.Compose([
        T.ToTensor(),
        T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# dataset
train_set = CIFARDataset(train_x, train_y, transform_train)
val_set = CIFARDataset(val_x, val_y, transform_test)
test_set = CIFARDataset(test_x, test_y, transform_test)

# dataloader
trainloader = DataLoader(
    dataset=train_set,
    batch_size=batch_size,
    shuffle=True,
    pin_memory=True,
)
valloader = DataLoader(
    dataset=val_set,
    batch_size=batch_size,
    shuffle=False,
    pin_memory=True,
)
testloader = DataLoader(
    dataset=test_set,
    batch_size=batch_size,
    shuffle=False,
    pin_memory=True,
)


### Define the train and test function

In [None]:
def train_model(model, trainloader, device, optimizer, loss_fn):
    # train one epoch
    model.train()
    train_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in trainloader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    return train_loss / len(trainloader), correct / total


def test_model(model, testloader, device, loss_fn):
    # evaluate
    model.eval()

    train_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in testloader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        train_loss += loss.item()

        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    
    return train_loss / len(testloader), correct / total


### Train and validate the model

In [None]:
import copy


train_loss_lst, train_acc_lst = [], []
val_loss_lst, val_acc_lst = [], []
best_acc = 0.0
best_model = None

for epoch in range(epochs):
    # training part
    print(f"\nEpoch {epoch + 1}:")
    train_loss, train_acc = train_model(
        model=model,
        trainloader=trainloader,
        device=device,
        optimizer=optimizer,
        loss_fn=loss_fn,
    )
    lr_scheduler.step()
    
    # validation part
    val_loss, val_acc = test_model(
        model=model,
        testloader=valloader,
        device=device,
        loss_fn=loss_fn,
    )

    # store and print the training & validation loss
    train_loss_lst.append(train_loss)
    train_acc_lst.append(train_acc_lst)
    val_loss_lst.append(val_loss)
    val_acc_lst.append(val_acc)
    print(f"Training loss for current epoch: {train_loss}")
    print(f"Training accuracy for current epoch: {train_acc}")
    print(f"Validation loss for current epoch: {val_loss}")
    print(f"Validation accuracy for current epoch: {val_acc}")

    # store the best model
    if val_acc > best_acc:
        best_acc = val_acc
        best_model = copy.deepcopy(model.state_dict())


# load the best model
model.load_state_dict(best_model)

# eval part
test_loss, test_acc = test_model(model=model, testloader=testloader, device=device, loss_fn=loss_fn)
print(f"Test loss: {test_loss}")
print(f"Test accuracy: {test_acc}")

# save model parameters
torch.save(best_model, f"../model/resnet110-{epochs}.pt")
