![model](./images/model.png)

### Imports

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import time
import numpy as np
import matplotlib.pyplot as plt
import random


In [2]:
input_channels = 3 # no of input channels (RGB)
num_classes = 10 # classes for CIFAR 10 dataset

device = 'cuda' # if running on desktop
lr = 1e-2 #learning rates idk if this is constant
epochs = 80
batch_size = 16


### CIFAR 10 Dataset

The CIFAR 10 dataset can be downloaded using pytorch. we may also utilize dataloader function of pytorch to process the data easily

In [3]:
# create a list of commands for transforming the image
img_transform = transforms.Compose([
    transforms.ToTensor(), # convert image to tensor
    transforms.CenterCrop(28) # crop from the center
])

# create a function that normalizes each image
def norm_image(data_sample):
    img_tensor = data_sample[0] # get image values/tensors
    label = data_sample[1]

    img_means = img_tensor.mean(axis=[1,2]) # get the mean of the image per channel
    img_sds = img_tensor.std(axis=[1,2]) # get over SD of the image per channel

    mean_sub = img_tensor - img_means.unsqueeze(1).unsqueeze(2) # subtract the mean
    img_norm = mean_sub.true_divide(img_sds.unsqueeze(1).unsqueeze(2)) # divide by SD

    return (img_norm, label)



### Splitting the dataset

In [4]:
#training set
all_train = list(datasets.CIFAR10(root = 'data/', transform = img_transform, train=True, download=True)) #unsure about download value

random.shuffle(all_train)

train_data = all_train[:40000]
train_transformed = list(map(norm_image, train_data))
train_loader = DataLoader(dataset=train_transformed, batch_size=batch_size, shuffle=True)

# get a validation of 10k images
val_data = all_train[40000:]
val_tranformed = list(map(norm_image, val_data))
val_loader = DataLoader(dataset=val_tranformed, batch_size=batch_size, shuffle=True)

#test data
#perform on the test data as well
test_data = datasets.CIFAR10(root = 'data/', transform = img_transform, train=False, download=False ) #unsure about download value
test_transformed = list(map(norm_image, test_data))
test_loader = DataLoader(dataset=test_transformed, batch_size=batch_size)


Files already downloaded and verified


### Create a classes for modules

In [5]:
#class for the convolution module
class conv_block(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(conv_block, self).__init__()
        self.relu = nn.ReLU() #relu
        self.conv = nn.Conv2d(in_channels, out_channels, **kwargs) # convolution
        self.batchnorm = nn.BatchNorm2d(out_channels) # batch normalization

    def forward(self, x):
        x = self.conv(x)
        x = self.batchnorm(x)
        x = self.relu(x)
        return x
    
#class for the inception block
class inception_block(nn.Module):
    def __init__(self, in_channels, out_ch1, out_ch3):
        super(inception_block, self).__init__()
        #first conv module; padding should be 'same'
        self.ch1 = conv_block(in_channels=in_channels, out_channels=out_ch1, kernel_size=(3,3), stride=(1,1), padding='same')
        #second conv module; padding should be 'same' to be able to concatenate them together
        self.ch3 = conv_block(in_channels=in_channels, out_channels=out_ch3, kernel_size=(3,3), stride=(1,1), padding='same') 

    def forward(self, x):
        #concat the outputs of the convolutions / aka Merge
        return torch.cat([self.ch1(x), self.ch3(x)], 1)
    
# createclass for the downsample module
class downsample_block(nn.Module):
    def __init__(self, in_channels, conv_out):
        super(downsample_block, self).__init__()
        #conv module
        self.convblock = conv_block(in_channels, conv_out, kernel_size=(3,3), stride=(2,2))
        # max pooling
        self.maxpool = nn.MaxPool2d(kernel_size=(3,3), stride=(2,2))

    def forward(self, x):
        return torch.cat([self.convblock(x), self.maxpool(x)], 1)
    

### putting the model together 

In [13]:
class Inception(nn.Module):
    def __init__(self, in_channels=3, num_classes=10, dropout_prob=0):
        super(Inception, self).__init__()
        self.conv1 = conv_block(in_channels=3, out_channels=96, kernel_size=(3,3), stride=(1,1))

        self.inception1 = inception_block(96, 32, 32) # first part comes from the output of the previous layer
        self.inception2 = inception_block(64, 32, 48) # example 32 + 32 = 64
        self.downsample1 = downsample_block(80, 80) # 32 + 48 = 80

        self.inception3 = inception_block(160, 112, 48)
        self.inception4 = inception_block(160, 96, 64)
        self.inception5 = inception_block(160, 80, 80)
        self.inception6 = inception_block(160, 48, 96)
        self.downsample2 = downsample_block(144, 96)

        self.inception7 = inception_block(240, 176, 160)
        self.inception8 = inception_block(336, 176, 160)
        self.avgpool = nn.AvgPool2d(kernel_size=(7, 7), padding=(1,1))
        self.dropout = nn.Dropout(p = dropout_prob)
        self.fc = nn.Linear(336, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.inception1(x)
        x = self.inception2(x)
        x = self.downsample1(x)

        x = self.inception3(x)
        x = self.inception4(x)
        x = self.inception5(x)
        x = self.inception6(x)
        x = self.downsample2(x)

        x = self.inception7(x)
        x = self.inception8(x)
        x = self.avgpool(x)

        x = x.reshape(x.shape[0], -1)
        x = self.dropout(x)
        x = self.fc(x)
        return x



### Model Training
Create the Model then set the parameters

In [14]:
# creating the model
model = Inception(in_channels=3, num_classes=10, dropout_prob=0).to(device) #change val for dropout prob

loss_function = nn.CrossEntropyLoss() # because of multi classification
optimizer = optim.SGD(model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.LinearLR(optimizer) # scheduler controls the flow of information in batches

reate 2 functions that obtain the metric on the train set and the test set

### Train

In [18]:
def train(epoch):
    model.train() # start the training process
    curr_loss_train = 0 # initialize values
    correct_train = 0
    total_train = 0

    for ind, (data_train, true_labels_train) in enumerate(train_loader):
        data_train = data_train.to(device=device)
        true_labels_train = true_labels_train.to(device=device)

        out_train = model(data_train) # apply model to train data
        loss_train = loss_function(out_train, true_labels_train) # get loss

        optimizer.zero_grad()
        loss_train.backward()
        optimizer.step()

        curr_loss_train += loss_train.item()
        ix, predicted_train = out_train.max(1)
        correct_train += predicted_train.eq(true_labels_train).sum().item()
        total_train += true_labels_train.size(0)

        train_loss = curr_loss_train/len(train_loader) # get the loss
        acc_train_val = (correct_train/total_train)*100 # get the accuracy

        train_acc.append(acc_train_val)
        train_all_loss.append(train_loss)

### Test

In [19]:
def test(epoch):
    model.eval() # start the testing process / eval mode
    curr_loss_test = 0 # initialize values
    correct_test = 0
    total_test = 0

    num_class=10
    confusion_matrix = torch.zeros(num_class, num_class) # get the confusion matrix of the test set
    with torch.no_grad():
        for data_test, true_labels_test in test_loader:

            data_test = data_test.to(device=device)
            true_labels_test = true_labels_test.to(device=device)

            out_test = model(data_test) # apply model to train data
            loss_test = loss_function(out_test, true_labels_test) # get loss

            curr_loss_test += loss_test.item()
            ix, predicted_test = out_test.max(1)
            correct_test += predicted_test.eq(true_labels_test).sum().item()
            total_test += true_labels_test.size(0)

            for tr, pr in zip(true_labels_test.view(-1), predicted_test.view(-1)):
                confusion_matrix[tr.long(), pr.long()] += 1

    test_loss = curr_loss_test/len(test_loader) # get the loss
    acc_test_val = (correct_test/total_test)*100 # get the accuracy

    test_acc.append(acc_test_val)
    test_all_loss.append(test_loss)
    con_mats.append(confusion_matrix)

### Getting the metrics
Train the model, get the metrics on the training set, then apply it on the test set

In [20]:
train_acc = []
train_all_loss = []

test_acc = []
test_all_loss = {}

con_mats = []

times = []

train_start = time.time()
for epoch in range(epochs):
    ep_start = time.time()
    print(epoch)
    train(epoch)
    scheduler.step()

    epoch_time = time.time() - ep_start
    print(epoch_time)
    times.append(epoch_time)

train_time = time.time() - train_start
print(f'run time: {train_time}')

0


67.85688996315002
1
66.90397953987122
2
67.62147188186646
3
57.48777151107788
4
50.67724561691284
5
50.53305006027222
6
50.54710078239441
7
50.53807759284973
8
50.98540139198303
9
50.43864941596985
10
57.34963035583496
11
56.60908770561218
12
56.4509642124176
13
55.37633275985718
14
54.58513021469116
15
55.07923102378845
16
54.75496315956116
17
54.69509696960449
18
50.96074032783508
19
47.00078821182251
20
50.56401062011719
21
55.27487063407898
22
50.7008752822876
23
48.70212483406067
24
47.55451488494873
25
46.94573092460632
26
57.45134139060974
27
55.84408736228943
28
56.11075448989868
29
56.02310013771057
30
55.97839093208313
31
51.235411405563354
32
47.077486515045166
33
49.437774658203125
34
54.41886234283447
35
50.527411460876465
36
50.23560094833374
37
50.760645389556885
38
50.67552590370178
39
50.99362850189209
40
49.907371044158936
41
47.43209195137024
42
47.37214660644531
43
47.837684631347656
44
47.250335454940796
45
47.403449296951294
46
47.36969566345215
47
47.272326469421