# Classifying Street View House Numbers

## Solution using offline dataloader

In [1]:
import torch
from scipy.io import loadmat
import numpy as np
from PIL import ImageEnhance
from torchvision.transforms import ToPILImage, ToTensor
import torchvision.transforms as T
from torchvision.io import read_image
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import h5py

In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cuda:0


### Dataloader
I follow the Pytorch convention where a Dataset object handles all data transformations (shuffling, splitting, augmentation), and a Dataloader object is responsible for generating batches for training.

In [3]:
class Dataset:
    """ Dataset containing (X, y) pairs.
    Args:
        data_src (str | Tuple): The data source, which can either be a file path
        or a (X, y) pair.
        
    Attributes:
        augment: Boolean that indicates whether dataset should return augmented image.
    """
    def __init__(self, data_src):
        if isinstance(data_src, str):
            data = loadmat(data_src)
            self.X = torch.from_numpy(data['X']).permute(3, 2, 0, 1) # [height, width, channels, batch_size]-> [batch_size, channels, height, width]
            self.X = self.X / 255
            self.y = torch.from_numpy(data['y'].astype(np.int64)).squeeze()
            self.y[self.y == 10] = 0
        else:
            self.X, self.y = data_src
        self.augment = False
        
    def shuffle(self):
        """ Randomly shuffle dataset. """
        shuffled_idxs = torch.randperm(len(self))
        self.X = self.X[shuffled_idxs]
        self.y = self.y[shuffled_idxs]
        
    def split(self, ratio):
        """ Randomly split dataset into two parts. """
        self.shuffle()
        split_point = int(ratio * len(self))
        train_split = (self.X[:split_point], self.y[:split_point])
        val_split = (self.X[split_point:], self.y[split_point:])
        return Dataset(train_split), Dataset(val_split)

    def augment_data(self):
        """ Apply data augmentation. """
        self.augment = True
        
    def __getitem__(self, i):
        """ Allow indexing and slicing, e.g. data[3], data[5:9]. Either return
        original data or transformed (augmented) data. """
        if not self.augment:
            return self.X[i], self.y[i]
        
        N, _, _, _ = self.X[i].shape
        if N > 1:
            return self._transform_batch(self.X[i]), self.y[i]
        else:
            return self._transform(self.X[i]), self.y[i]

    def _transform_batch(self, batch):
        """ Transform batch with probability p=0.5. """
        p = 0.5
        if np.random.rand() < p:
            return batch
        else:
            return torch.stack([self._transform(x) for x in batch])
    
    def _transform(self, x):
        """ Transform image by applying rotation, color augmentation, and gaussian blur. """
        # Convert tensor to PIL image
        img = T.ToPILImage()(x)

        # Rotate between -60 and 60 degrees
        k = 30
        angle = np.random.randint(k)
        img = T.functional.rotate(img, angle)

        # Color transform
        jitter = T.ColorJitter(brightness=.5, hue=.3)
        img = jitter(img)

        # Blur
        kernel_size = 2*(np.random.randint(11) // 2) + 1 # Odd kernel between 1 and 11
        img = T.functional.gaussian_blur(img, kernel_size)
        return T.ToTensor()(img)
    
    def __len__(self):
        return len(self.y)
    
    def __repr__(self):
        N, C, W, H = tuple(self.X.shape)
        return f'Dataset(N={N}, C={C}, W={W}, H={H})\nLabels: {self.y.unique()}'


In [4]:
class Dataloader:
    """ A data loader responsible for generating batches of data for training.
    
    Args:
        dataset (Dataset): A Dataset object.
        batch_size (int): The number of data points in a batch.
        shuffle (boolean): Whether to shuffle the data at start of epoch.
    """
    def __init__(self, dataset, batch_size=32, shuffle=True):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
            
    def __iter__(self):
        """  Generate batches. Optionally shuffle at start of each epoch. """
        if self.shuffle:
            self.dataset.shuffle()
        for i in range(len(self.dataset) // self.batch_size + 1):
            start = i * self.batch_size
            end = (i + 1) * self.batch_size
            X, y = self.dataset[start:end]
            yield X, y
    
    def __len__(self):
        return len(self.dataset)

### Model training
Below follows a simple ResNet implementation consisiting of some number of same-size ResBlocks, followed by two fully connected layers.

I have also implemented a Learner class to handle training-related tasks (data preparation, the training loop, validation-training-evaluation).

In [5]:
class ResNet(nn.Module):
    def __init__(self, num_blocks=5):
        super().__init__()
        in_channels = 3
        conv_dim = 64
        pool_size = 2
        image_size = 32
        conv_out = image_size // pool_size
        fc_in = int(conv_out * conv_out * conv_dim)
        fc_dim = 512
        n_classes = 10
        
        self.conv = nn.Conv2d(in_channels, conv_dim, kernel_size=3, stride=1, padding=1)
        self.res_blocks = nn.ModuleList([ResBlock(conv_dim, conv_dim) for _ in range(num_blocks)])
        self.max_pool = nn.MaxPool2d(pool_size)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(fc_in, fc_dim)
        self.fc2 = nn.Linear(fc_dim, n_classes)
        
    def forward(self, x):
        x = self.conv(x)
        for res_block in self.res_blocks:
            x = res_block(x)
        x = self.max_pool(x)
        x = self.dropout(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        return x
    
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
    def forward(self, x):
        identity = x
        
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity # Res connection
        out = self.relu(out)
        return out

In [6]:
class Learner:
    def __init__(self, model, loss_function, optimizer, data, testdata):
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.model = model
        self.model.to(self.device)
        self.loss_function = loss_function
        self.optimizer = optimizer
        
        self.data = data
        self.testdata = testdata
        
    def validate(self, num_epochs, augment=False, batch_size=32, shuffle=True, split_ratio=0.8, save_best=False):
        """ Split data into training and validation sets. Train on training set, validate on validation set."""
        
        # Set up train and validation data
        train_set, val_set = self.data.split(split_ratio)
        if augment:
            train_set.augment_data()
        train_loader = Dataloader(train_set, batch_size=batch_size, shuffle=shuffle)
        val_loader = Dataloader(val_set, batch_size=batch_size, shuffle=False)
        
        # Training loop
        self._run(num_epochs, (train_loader, val_loader), save_best=True, validate=True)
                    
    def train(self, num_epochs, augment=False, batch_size=32, shuffle=True, save_best=True):
        """ Train on full dataset, i.e. on both training and validation data. """
        if augment:
            self.data.augment()
        full_dataloader = Dataloader(self.data, batch_size=batch_size, shuffle=shuffle)
        
        # Training loop
        self._run(num_epochs, (full_dataloader), save_best=True, validate=False)
    
    def evaluate(self):
        """ Evaluate on test set. """
        testloader = Dataloader(self.testdata, shuffle=False)
        test_loss, test_acc = self._run_one_epoch(testloader, should_update=False)
        print(f'Test loss: {test_loss:.3f}\nTest accuracy: {test_acc:.3f}')
    
    def _run(self, num_epochs, data, save_best, validate=True):
        """ Prepare data and run the training loop. """
        
        if validate:
            train_loader, val_loader = data
            heading = f'Epoch\tTrain loss\tVal loss\tTrain acc\tVal acc'
        else:
            train_loader = data
            heading = f'Epoch\tTrain loss\tTrain acc'
            
        best_acc = 0.0
        print(heading)
        for epoch in range(num_epochs):
            self.model.train(True)
            train_loss, train_acc = self._run_one_epoch(train_loader)
            
            if validate:
                self.model.train(False)
                val_loss, val_acc = self._run_one_epoch(val_loader, should_update=False)
                result_str = f'{epoch + 1}\t{train_loss:.3f}\t\t{val_loss:.3f}\t\t{train_acc:.3f}\t\t{val_acc:.3f}'
            else:
                result_str = f'{epoch + 1}\t{train_loss:.3f}\t\t{train_acc:.3f}'
            print(result_str)

            if save_best:
                acc = val_acc if validate else train_acc
                if acc > best_acc:
                    best_acc = acc
                    torch.save(self.model.state_dict(), 'model.pt')
                    
    def _run_one_epoch(self, dataloader, should_update=True):
        """ Training loop for single epoch. """
        loss_tot, correct_tot = 0, 0
        for batch in dataloader:
            X, y = batch[0].to(self.device), batch[1].to(self.device)
            out = self.model(X)
            loss = self.loss_function(out, y)
            if should_update:
                self.optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            
            loss_tot += loss.item()
            _, y_hat = torch.max(out.data, 1)
            correct_tot += (y_hat == y).sum().item()
        
        return loss_tot / len(dataloader), correct_tot / len(dataloader)

In [7]:
net = ResNet(num_blocks=7)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
dataset = Dataset('data/train_32x32.mat')
testset = Dataset('data/test_32x32.mat')
learner = Learner(net, loss_function, optimizer, dataset, testset)

num_epochs = 40
learner.validate(num_epochs, save_best=True, augment=True)
learner.evaluate()

Epoch	Train loss	Val loss	Train acc	Val acc
1	0.038		0.018		0.592		0.819
2	0.018		0.014		0.818		0.870
3	0.015		0.012		0.849		0.889
4	0.014		0.014		0.867		0.864
5	0.013		0.011		0.878		0.901
6	0.012		0.010		0.886		0.904
7	0.011		0.010		0.896		0.908
8	0.010		0.010		0.901		0.909
9	0.010		0.009		0.906		0.913
10	0.009		0.009		0.910		0.922
11	0.009		0.009		0.913		0.919
12	0.009		0.009		0.918		0.920
13	0.009		0.008		0.919		0.929
14	0.008		0.009		0.921		0.921
15	0.008		0.008		0.923		0.931
16	0.008		0.009		0.925		0.923
17	0.008		0.008		0.928		0.930
18	0.008		0.008		0.929		0.930
19	0.007		0.008		0.932		0.930
20	0.007		0.008		0.932		0.935
21	0.007		0.007		0.934		0.936
22	0.007		0.008		0.935		0.932
23	0.007		0.008		0.937		0.931
24	0.007		0.008		0.936		0.934
25	0.006		0.008		0.940		0.934
26	0.006		0.007		0.940		0.935
27	0.006		0.008		0.939		0.929
28	0.006		0.008		0.942		0.936
29	0.006		0.008		0.945		0.936
30	0.006		0.008		0.945		0.935
31	0.006		0.008		0.945		0.937
32	0.006		0.007		0.946		0.941
33	0.