# Learning pytorch with CNN and the CIFAR10 dataset

In [2]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

import torch
from torch import nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms

## Initialize dataset

In [3]:
class Cifar10(torchvision.datasets.CIFAR10):
  def __init__(self, train: bool, normalize=False):
    super().__init__(root='/data', train=train, download=True)
    self.data = self.data.astype(np.float32)
    if normalize:
      self.data = self.data / 255 - 0.5

  def __getitem__(self, idx):
    return self.data[idx], self.targets[idx]


normalize = False # True
data_train = Cifar10(train=True, normalize=normalize)
data_train, data_valid = torch.utils.data.random_split(data_train, (45000, 5000))
data_test = Cifar10(train=False, normalize=normalize)

Files already downloaded and verified
Files already downloaded and verified


## Load training, test and validation data

In [4]:
kwargs = {'batch_size': 100, 'num_workers': 2}
loader_train = torch.utils.data.DataLoader(data_train, **kwargs, shuffle=True)
loader_valid = torch.utils.data.DataLoader(data_valid, **kwargs)
loader_test = torch.utils.data.DataLoader(data_test, **kwargs)

## Create LeNet CNN structure

In [5]:
class LeNet(nn.Module):
    def __init__(self, num_channels, classes) -> None:
        super().__init__()

        # Input size & output size
        # N = 32*32*3
        # n_classes = 10

        # Hyperparameters
        self.dropout_rate = 0.5

        # Batch normalization layers
        self.bn1 = nn.BatchNorm2d(32)
        self.bn2 = nn.BatchNorm2d(128)
        self.bn3 = nn.BatchNorm1d(256)

        # Define activation function & softmax layer
        self.relu1 = nn.ReLU()
        self.relu2 = nn.ReLU()
        self.relu3 = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)

        # Define max pooling layer
        self.max_pool1 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        self.max_pool2 = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))


        # Define convolutional layers
        # nn.Conv2d()
        self.conv1 = nn.Conv2d(in_channels=num_channels, out_channels=32, kernel_size=(5,5))
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=128, kernel_size=(5,5))

        # Define fully connected layers
        self.fc1 = nn.Linear(in_features=128*5*5, out_features=256)
        self.fc2 = nn.Linear(in_features=256, out_features=classes)

        # self.fc1 = nn.Linear(in_features=50*5*5, out_features=500)
        # self.fc2 = nn.Linear(in_features=500, out_features=classes)


    def forward(self, x):
        # permutate input to match pytorch
        # From: [batch_size, height, width, channels]
        # To:   [batch_size, channels, height, width]
        x = x.permute(0, 3, 1, 2)

        # Pass input through convolutions and max pooling layers
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu1(out)
        out = self.max_pool1(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu2(out)
        out = self.max_pool2(out)

        # Flatten the output of the convolutional layers
        out = torch.flatten(out, 1)
        # out = out.view(-1, 50*5*5)

        # Pass the flattened output to the fully connected layers
        out = self.fc1(out)
        out = self.bn3(out)

        out = self.relu3(out)
        
        out = self.fc2(out)
        out = self.softmax(out)
        return out

In [8]:
device = torch.device('cuda')

def train(model, loader, optimizer, loss_fn):
    epoch_losses = []
    correct, total = 0, 0

    # Begin training
    model.train()
    for x, y in loader:
        x, y = x.to(device), y.to(device)

        # Forward pass
        y_pred = model(x)
        loss = loss_fn(y_pred, y)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate loss and accuracy
        epoch_losses.append(loss.item())
        total += len(x)
        correct += (torch.argmax(y_pred, dim=1) == y).sum().item()

    return epoch_losses, correct / total
    
def evaluate(model, loader, loss_fn):
    epoch_losses = []
    correct, total = 0, 0
    
    # Begin validation
    model.eval()
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)

            # Forward pass
            y_pred = model(x)
            loss = loss_fn(y_pred, y)

            # Calculate loss and accuracy
            epoch_losses.append(loss.item())
            total += len(x)
            correct += (torch.argmax(y_pred, dim=1) == y).sum().item()
    
    return epoch_losses, correct / total



def optimize_model(model, optimizer, loss_fn, sched):
    train_losses, train_accuracies = [], []
    valid_losses, valid_accuracies = [], []

    t = tqdm(range(40))
    for epoch in t:
        losses, acc = train(model, loader_train, optimizer, loss_fn)
        train_losses.append(np.mean(losses))
        train_accuracies.append(acc)


        losses, acc = evaluate(model, loader_valid, loss_fn)
        valid_losses.append(np.mean(losses))
        valid_accuracies.append(acc)


        if sched is not None:
            sched.step(valid_losses[-1])

        t.set_description(f'loss, val: {valid_losses[-1]:.2f}, acc, val: {valid_accuracies[-1]:.2f}')
    
    return train_losses, valid_losses, train_accuracies, valid_accuracies

## Visualize results


In [7]:
def visualize_results(train_losses, valid_losses, train_accuracies, valid_accuracies): 
    plt.figure(figsize=(10, 4))

    plt.subplot(1, 2, 1)
    plt.xlabel('epoch')
    plt.ylabel('loss')
    p = plt.plot(train_losses, label='train')
    plt.plot(valid_losses, label='valid')
    plt.legend()
    plt.grid()

    plt.subplot(1, 2, 2)
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    p = plt.plot(train_accuracies, label='train')
    plt.plot(valid_accuracies, label='valid')
    plt.legend()
    plt.grid()

    plt.tight_layout()
    plt.show()

## Run the model

In [9]:
# Define model and optimizer
model = LeNet(num_channels=3, classes=10).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)
loss_fn = nn.NLLLoss()
sched = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)


tl, vl, ta, va = optimize_model(model, optimizer, loss_fn, sched)
visualize_results(tl, vl, ta, va)

  0%|          | 0/40 [00:00<?, ?it/s]