### CNN in PyTorch

References:
- https://pytorch.org/tutorials/beginner/basics/data_tutorial.html
- https://pytorch.org/tutorials/beginner/basics/optimization_tutorial.html

In [None]:
import torch 
import torchvision
import numpy as np
import matplotlib.pyplot as plt

from collections import defaultdict

from torch import nn
from torchvision.datasets import MNIST
from torch.utils.data import Dataset, DataLoader

### Settings

In [None]:
device = 'cpu'

### Dataset

In [None]:
class MNISTDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        
        x = self.X[idx]
        y = self.y[idx]

        return x, y

In [None]:
path = './data'

mnist_dataset = MNIST(root=path, download=True)

# Get images and labels
X = mnist_dataset.data
y = mnist_dataset.targets

# Normalize
X = X / X.max()

# X: (n_samples, h, w) -> (n_samples, n_channel, h, w)
X = X.unsqueeze(dim=1)

# Dimensionality
# n_samples, n_samples = X.shape

# Dimensionality
n_samples, n_channels, n_h, n_w = X.shape

Inspect images

In [None]:
def imshow(img):
    img = img # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1,2,0)))

imshow(torchvision.utils.make_grid(X[:8]))

### Model

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=2, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=2, out_channels=3, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(1728, 32),
            nn.ReLU(),
            nn.Linear(32, 10),
        )

    def forward(self, x):
        logits = self.layers(x)
        return logits

Check model

Building our module and running through one batch could help debugging the model.

In [None]:
# Build DataLoader
train_dataset = MNISTDataset(X, y)
train_dl = DataLoader(train_dataset, batch_size=8, shuffle=True)

batch = next(iter(train_dl))
print(batch[0].shape, batch[1].shape)

model = CNN().to(device)
y_hat = model(batch[0])

print(y_hat.shape)

#### Training

In [None]:
def train_loop(epoch, dataloader, model, loss_fn, optimizer, history=None):
    
    # Set train mode
    model.train()
    
    train_loss_batch = []
    size = len(dataloader.dataset)
    
    for batch, (X, y) in enumerate(dataloader):
        
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            # print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            print(f"Epoch:{epoch} loss: {loss:>7f}  [{current:>5d}/{size:>5d}]", end='\r')   
            
            train_loss_batch.append(loss)
    
    # End of epoch
    print(f"Epoch:{epoch} loss: {loss:>7f}  [{size:>5d}/{size:>5d}]") 
    
    # Save loss
    if isinstance(history, defaultdict):
        train_loss = sum(train_loss_batch)/len(train_loss_batch)
        history['loss'].append(train_loss)
            
def test_loop(epoch, dataloader, model, loss_fn, history=None):
    
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    val_loss, correct = 0, 0
    
    # Set evaluation mode
    model.eval()
    
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)

            val_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    val_loss /= num_batches
    val_accuracy = correct / size
    print(f"Epoch:{epoch} Val accuracy: {(100*val_accuracy):>0.1f}%, Avg loss: {val_loss:>8f} \n")
    
    if isinstance(history, defaultdict):
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_accuracy)

In [None]:
num_epochs = 10

# Keep track of model metrics
history = defaultdict(list)
    
# Model hyperparameters
batch_size = 128
learning_rate = 0.001

# Build DataLoader
train_dataset = MNISTDataset(X, y)
train_dl = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Initialize model
model = CNN().to(device)

# Initialize the loss function
loss_fn = nn.CrossEntropyLoss()

# Initalizer loss function
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Train model for `num_epochs
# For simplicity we are evaluating in the same dataset
# You should always evaluate model performance on a separate holdout set
for epoch in range(num_epochs):
    train_loop(epoch, train_dl, model, loss_fn, optimizer, history)
    test_loop(epoch, train_dl, model, loss_fn, history)

### Computing ConvTranspose1d dimensionality

Understanding how convolutions work for CNN and any layer you plan to use is key to develop any model. PyTorch includes a good summary of the inner workings of these layers.

- https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
- https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html

In order to make a Conv layer work, we need to compute the dimensionality of certain parameters such as kernel_size, stride or padding. In this example, we look at ConvTranspose1d and build a few methods to help with the calculation:
- https://pytorch.org/docs/stable/generated/torch.nn.ConvTranspose1d.html


Some parameters are left as default to work out the math of the formulas below:
```
groups=1
dilation=1
output_padding=0
```

Note that some combinations of parameters will result in float dims which are unfeasible dims.

In [None]:
# Data dim
l_in = 28
n_samples = 10

# Conv1d parameter
stride = 2
padding = 2
kernel_size = 3
in_channels = 3
out_channels = 3

In [None]:
def compute_l_out(l_in, stride, padding, kernel_size):
    
    return (l_in-1)*stride - 2*padding + (kernel_size-1) + 1

l_out = compute_l_out(l_in, stride, padding, kernel_size)
l_out

In [None]:
def compute_padding(l_in, l_out, stride, kernel_size):
    """
    Method to compute ConvTranspose1d where stride is known.
    """
        
    return ((l_in-1)*stride + kernel_size - l_out) / 2

padding_val = compute_padding(l_in, l_out, stride, kernel_size)
padding_val

In [None]:
def compute_stride(l_in, l_out, padding, kernel_size):
    """
    Method to compute ConvTranspose1d where padding is known.
    """
    
    return (l_out + 2*padding - kernel_size) / (l_in-1)

stride_val = compute_stride(l_in, l_out, padding, kernel_size)
stride_val

In [None]:
def compute_dim(l_in, l_out, kernel_size, stride=None, padding=None):
    """
    Method to compute ConvTranspose1d where either stride or padding is known.
    """
    
    assert (stride is None) != (padding is None), "Only one of the variables should be None"
    
    if padding is None:
        padding = compute_padding(l_in=l_in, l_out=l_out, stride=stride, kernel_size=kernel_size)
    
    if stride is None:
        stride = compute_stride(l_in=l_in, l_out=l_out, padding=padding, kernel_size=kernel_size) 
        
    dim = {'l_in':l_in, 'l_out':l_out, 'stride':stride, 'padding':padding, 'kernel_size':kernel_size}
        
    return dim

dims = compute_dim(l_in, l_out, kernel_size=kernel_size, stride=None, padding=padding)
print(dims)

dims = compute_dim(l_in, l_out=53, kernel_size=kernel_size, stride=2, padding=None)
print(dims)