In [91]:
import random 
import numpy as np
from tqdm.notebook import tqdm

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader

from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

In [80]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [81]:
# normalize images
transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
])

# mnist dataset
img_train_dataset = datasets.MNIST('../data', train=True, download=True, transform=transform)
img_test_dataset = datasets.MNIST('../data', train=False, transform=transform)

# mnist dataloader
img_train_loader = DataLoader(img_train_dataset, shuffle=True, batch_size=256)
img_test_loader = DataLoader(img_test_dataset, shuffle=False, batch_size=256)

In [82]:
# random number dataset
class NumDataset(Dataset):
    def __init__(self, num_digits):
        # init random int tensor
        self.num = torch.tensor([random.randint(0, 9) for x in range(num_digits)], dtype=torch.long).unsqueeze(1)

    def __getitem__(self, idx):
        return self.num[idx]
    
    def __len__(self):
        return len(self.num)


num_train_dataset = NumDataset(num_digits=len(img_train_dataset))
num_test_dataset = NumDataset(num_digits=len(img_test_dataset))

num_train_loader = DataLoader(num_train_dataset, shuffle=False, batch_size=64)
num_test_loader = DataLoader(num_train_dataset, shuffle=False, batch_size=64)

In [83]:
class ModelClassifier(nn.Module):
    """
    Model definisition.
    Input: 2 objects. A number and image.
    Output: Sum of number and image representations.
    """
    def __init__(self):
        super(ModelClassifier, self).__init__()
        
        self.conv1 = self.conv_block(c_in=1, c_out=64, kernel_size=3, stride=1, padding=1)
        self.conv2 = self.conv_block(c_in=64, c_out=128, kernel_size=3, stride=1)
        self.conv3 = self.conv_block(c_in=128, c_out=64, kernel_size=3, stride=1)

        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = self.lin_block(c_in=10 * 10 * 64, c_out=20)

        self.fc2 = self.lin_block(c_in=1, c_out=10)
        self.fc3 = self.lin_block(c_in=10, c_out=20)


    def forward(self, x_img, x_num):
        # x_img = 28 x 28 x 3 
        x_img = self.conv1(x_img) # 28 x 28 x 32
        x_img = self.maxpool(x_img) # 14 x 14 x 32
        x_img = self.conv2(x_img) # 12 x 12 x 64
        x_img = self.conv3(x_img) # 10 x 10 x 16
        x_img = x_img.flatten(1) # 1600
        x_img = self.fc1(x_img) # 10

        x_num = self.fc2(x_num.unsqueeze(1)) # 10
        x_num = self.fc3(x_num) # 20

        return x_img + x_num


    def conv_block(self, c_in, c_out, **kwargs):
        seq_block = nn.Sequential(
            nn.Conv2d(in_channels=c_in, out_channels=c_out, **kwargs),
            nn.BatchNorm2d(num_features=c_out),
            nn.ReLU()
        )
        
        return seq_block



    def lin_block(self, c_in, c_out):
        seq_block = nn.Sequential(
            nn.Linear(in_features=c_in, out_features=c_out),
            nn.BatchNorm1d(num_features=c_out),
            nn.ReLU()
        )
        
        return seq_block


# initialise model
model = ModelClassifier()

model.to(device)

# initialise loss function
criterion = nn.CrossEntropyLoss()

# initialise optimizer
optimizer = optim.SGD(
    model.parameters(), lr=0.001, momentum=0.99, weight_decay=0.0005
)

In [84]:
# accuracy function
def multi_acc(y_pred, y_test):
    y_pred_softmax = torch.log_softmax(y_pred, dim = 1)
    _, y_pred_tags = torch.max(y_pred_softmax, dim = 1)    
    
    correct_pred = (y_pred_tags == y_test).float()
    acc = correct_pred.sum() / len(correct_pred)
    
    acc = torch.round(acc * 100)
    
    return acc

In [85]:
def train_loop(
    model, epochs, optimizer, criterion, img_train_loader, num_train_loader, device
):

    print("Begin training.")
    img_train_loader_iter = iter(img_train_loader)
    num_train_loader_iter = iter(num_train_loader)

    # run mulitple epochs
    for e in range(epochs):
        train_epoch_loss = 0
        train_epoch_acc = 0

        # load images from train loader
        model.train()
        for x_train_img, y_train_img in img_train_loader:
            x_train_img, y_train_img = x_train_img.to(device), y_train_img.to(device)

            # generate a random integer tensor
            numbers = torch.randint(0, 9, y_train_img.shape).to(device)
            
            # generate output
            y_train_batch = numbers + y_train_img

            # zero out gradients for the batch
            optimizer.zero_grad()

            # generate predictions
            y_train_pred = model(x_train_img, numbers/10)

            # calculate loss and acc
            train_loss = criterion(y_train_pred, y_train_batch)
            train_acc = multi_acc(y_train_pred, y_train_batch)

            # gradients
            train_loss.backward()

            # backprop
            optimizer.step()

            # accumulate loss/acc per epoch
            train_epoch_loss += train_loss.item()
            train_epoch_acc += train_acc.item()

        # average loss/acc per epoch
        avg_train_epoch_loss = train_epoch_loss / len(img_train_loader)
        avg_train_epoch_acc = train_epoch_acc / len(img_train_loader)

        print(
            f"Epoch {e+0:02}/{epochs}: | Train Loss: {avg_train_epoch_loss:.5f} | Train Acc: {avg_train_epoch_acc:.3f}%"
        )
        
    return model

In [86]:
EPOCHS = 200
trained_model = train_loop(model, EPOCHS, optimizer, criterion, img_train_loader, num_train_loader, device)

Begin training.
Epoch 00/200: | Train Loss: 2.80924 | Train Acc: 12.145%
Epoch 01/200: | Train Loss: 2.52387 | Train Acc: 16.140%
Epoch 02/200: | Train Loss: 2.42741 | Train Acc: 15.728%
Epoch 03/200: | Train Loss: 2.34542 | Train Acc: 17.472%
Epoch 04/200: | Train Loss: 2.26836 | Train Acc: 18.494%
Epoch 05/200: | Train Loss: 2.21400 | Train Acc: 19.906%
Epoch 06/200: | Train Loss: 2.16902 | Train Acc: 20.774%
Epoch 07/200: | Train Loss: 2.13691 | Train Acc: 21.523%
Epoch 08/200: | Train Loss: 2.11130 | Train Acc: 21.932%
Epoch 09/200: | Train Loss: 2.09248 | Train Acc: 22.660%
Epoch 10/200: | Train Loss: 2.07777 | Train Acc: 23.187%
Epoch 11/200: | Train Loss: 2.06198 | Train Acc: 23.991%
Epoch 12/200: | Train Loss: 2.05039 | Train Acc: 24.106%
Epoch 13/200: | Train Loss: 2.03954 | Train Acc: 24.719%
Epoch 14/200: | Train Loss: 2.03000 | Train Acc: 24.591%
Epoch 15/200: | Train Loss: 2.02018 | Train Acc: 25.234%
Epoch 16/200: | Train Loss: 2.01457 | Train Acc: 24.562%
Epoch 17/200: |

In [99]:
def test_loop(test_loader):
    y_pred_list = []
    y_true_list = []

    with torch.no_grad():
        # load batches from test loader
        for x_batch_img, y_batch_img in tqdm(test_loader):
            x_batch_img, y_batch_img = x_batch_img.to(device), y_batch_img.to(device)

            # generate random integers
            numbers = torch.randint(0, 9, y_batch_img.shape).to(device)

            y_true = numbers + y_batch_img
            
            # get prediction probs
            y_pred_probs = model(x_batch_img, numbers/10)

            # get indices of max prob
            _, y_pred = torch.max(y_pred_probs, dim = 1)

            # store predictions in a list
            y_pred_list.append(y_pred.cpu().numpy())
            y_true_list.append(y_true.cpu().numpy())

    return y_pred_list, y_true_list

In [100]:
# generate predictions
y_pred_list, y_true_list = test_loop(img_test_loader)

HBox(children=(FloatProgress(value=0.0, max=40.0), HTML(value='')))




In [101]:
# flatten lists
y_pred_list = np.concatenate(y_pred_list).ravel().tolist()
y_true_list = np.concatenate(y_true_list).ravel().tolist()

In [102]:
# classification report
print(classification_report(y_pred_list, y_true_list))

              precision    recall  f1-score   support

           0       0.79      0.84      0.82       106
           1       0.85      0.68      0.76       275
           2       0.74      0.47      0.57       540
           3       0.00      0.00      0.00       837
           4       0.50      0.26      0.34      1007
           5       0.37      0.21      0.27      1182
           6       0.21      0.30      0.24       542
           7       0.32      0.38      0.35       789
           8       0.16      0.44      0.23       353
           9       0.15      0.38      0.22       417
          10       0.14      0.31      0.20       410
          11       0.13      0.26      0.17       360
          12       0.43      0.23      0.30      1229
          13       0.51      0.18      0.26      1472
          14       0.07      0.22      0.11       149
          15       0.07      0.17      0.10       142
          16       0.06      0.22      0.10        68
          17       0.12    

In [103]:
print(accuracy_score(y_pred_list, y_true_list))

0.2659


In [104]:
print(confusion_matrix(y_pred_list, y_true_list))

[[ 89   0   0   0   1   1   2   4   7   2   0   0   0   0   0   0   0   0]
 [ 22 187  61   0   0   0   0   4   0   1   0   0   0   0   0   0   0   0]
 [  1  31 253 209  42   0   1   1   0   1   1   0   0   0   0   0   0   0]
 [  0   0   0   0   0   0   8  99 229 268 174  57   0   0   2   0   0   0]
 [  0   0  28 238 263 292 155  29   1   0   0   1   0   0   0   0   0   0]
 [  0   0   0  24 198 251 298 197 129  80   4   1   0   0   0   0   0   0]
 [  0   2   0   4   5 107 160 164  62  22  10   5   1   0   0   0   0   0]
 [  0   0   2   0  13  14 109 298 219  70  49  10   4   1   0   0   0   0]
 [  0   0   0   0   2   4  18  80 156  73  14   1   1   2   2   0   0   0]
 [  0   0   0   0   2   7  15  23 106 157  75  22   7   2   1   0   0   0]
 [  0   0   0   1   0   0   2  12  46 155 127  50  13   1   2   1   0   0]
 [  0   0   0   0   0   0   0   0   4  61  63  93  76  19  15  18   7   4]
 [  0   0   0   0   0   0   0   6  20  81 165 252 279 204 129  60  25   8]
 [  0   0   0   0   0   0

In [None]:
# def train_loop(
#     model, epochs, optimizer, criterion, img_train_loader, num_train_loader, device
# ):

#     print("Begin training.")
#     img_train_loader_iter = iter(img_train_loader)
#     num_train_loader_iter = iter(num_train_loader)

#     for e in range(epochs):
#         train_epoch_loss = 0
#         train_epoch_acc = 0

#         model.train()
#         for b in range(len(img_train_loader)):
#             x_train_img, y_train_img = next(img_train_loader_iter)
#             x_train_num = next(num_train_loader_iter)

#             y_train_batch =  x_train_num.squeeze() + y_train_img

#             x_train_img, y_train_img = x_train_img.to(device), y_train_img.to(device)
#             x_train_num = x_train_num.to(device)

#             optimizer.zero_grad()

#             y_train_pred = model(x_train_img, x_train_num.squeeze()/10)

#             # print(y_train_pred.shape, y_train_batch.shape, max(y_train_batch), min(y_train_batch))
#             # print(y_train_pred.shape, y_train_pred)
#             # print(y_train_batch.shape, y_train_batch)

#             train_loss = criterion(y_train_pred, y_train_batch)
#             train_acc = multi_acc(y_train_pred, y_train_batch)

#             train_loss.backward()
#             optimizer.step()

#             train_epoch_loss += train_loss.item()
#             train_epoch_acc += train_acc.item()

#         avg_train_epoch_loss = train_epoch_loss / len(img_train_loader)
#         avg_train_epoch_acc = train_epoch_acc / len(img_train_loader)

#         print(
#             f"Epoch {e+0:02}/{epochs}: | Train Loss: {avg_train_epoch_loss:.5f} | Train Acc: {avg_train_epoch_acc:.3f}%"
#         )
        
#     return model