# Image Classification CNN

In [115]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data

import torchvision.transforms as transforms
import torchvision.datasets as datasets

from sklearn import decomposition
from sklearn import manifold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
from tqdm.notebook import tqdm, trange
import matplotlib.pyplot as plt
import numpy as np

import copy
import random
import time

In [116]:
SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [117]:
ROOT = '../../data/'

train_data = datasets.CIFAR10(root=ROOT,
                            train=True,
                            download=True)

train_data.data = torch.tensor(train_data.data)

channels = train_data.data.split(1, dim=-1)
channel_tensors = [channel.squeeze(-1) for channel in channels]

means = [z.float().mean() / 255 for z in channel_tensors]
stds = [z.float().std() / 255 for z in channel_tensors]


print(f'Calculated mean: {means}')
print(f'Calculated std: {stds}')

Files already downloaded and verified
Calculated mean: [tensor(0.4914), tensor(0.4822), tensor(0.4465)]
Calculated std: [tensor(0.2470), tensor(0.2435), tensor(0.2616)]


In [118]:
train_transforms = transforms.Compose([
                            # transforms.RandomRotation(5, fill=(0,)),
                            # transforms.RandomCrop(28, padding=2),
                            transforms.ToTensor(),
                            transforms.Normalize(mean=means, std=stds)
                                      ])

test_transforms = transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize(mean=means, std=stds)
                                     ])

In [119]:
train_data = datasets.CIFAR10(root=ROOT,
                            train=True,
                            download=True,
                            transform=train_transforms)

test_data = datasets.CIFAR10(root=ROOT,
                           train=False,
                           download=True,
                           transform=test_transforms)

Files already downloaded and verified
Files already downloaded and verified


In [120]:
VALID_RATIO = 0.9

n_train_examples = int(len(train_data) * VALID_RATIO)
n_valid_examples = len(train_data) - n_train_examples

train_data, valid_data = data.random_split(train_data,
                                           [n_train_examples, n_valid_examples])


In [121]:
valid_data = copy.deepcopy(valid_data)
valid_data.dataset.transform = test_transforms

print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

BATCH_SIZE = 64

train_iterator = data.DataLoader(train_data, batch_size=BATCH_SIZE,shuffle = True)
valid_iterator = data.DataLoader(valid_data, batch_size=BATCH_SIZE, shuffle = True)
test_iterator = data.DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=True)



Number of training examples: 45000
Number of validation examples: 5000
Number of testing examples: 10000


In [122]:
class InceptionBlock(nn.Module):
    def __init__(self, c1 : float, c2, c3, c4 : float, **kwargs):
        super(InceptionBlock, self).__init__(**kwargs)
        self.b1 = nn.LazyConv2d(c1, kernel_size=1)
        
        self.b2a = nn.LazyConv2d(c2[0], kernel_size=1)
        self.b2b = nn.LazyConv2d(c2[1], kernel_size=3, padding=1)
        
        self.b3a = nn.LazyConv2d(c3[0], kernel_size=1)
        self.b3b = nn.LazyConv2d(c3[1], kernel_size=5, padding=2)
        
        self.b4a = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.b4b = nn.LazyConv2d(c4, kernel_size=1)
        
        
    def init(self):
        nn.init.xavier_uniform_(self.b1.weight)
        nn.init.xavier_uniform_(self.b2a.weight)
        nn.init.xavier_uniform_(self.b2b.weight)
        nn.init.xavier_uniform_(self.b3a.weight)
        nn.init.xavier_uniform_(self.b3b.weight)
        nn.init.xavier_uniform_(self.b4b.weight)

        
    def forward(self, X):
        b1 = F.relu(self.b1(X))
        b2 = F.relu(self.b2b(F.relu(self.b2a(X))))
        b3 = F.relu(self.b3b(F.relu(self.b3a(X))))
        b4 = F.relu(self.b4b(F.relu(self.b4a(X))))
        return torch.cat((b1,b2,b3,b4),dim=1)

class Model(nn.Module):
    def __init__(self, output_classes=10):
        super().__init__()
        self.conv1 = nn.LazyConv2d(out_channels=128,kernel_size=11,padding=5)
        self.conv2 = nn.LazyConv2d(out_channels = 256, kernel_size=7, padding=3)
        self.conv3 = nn.LazyConv2d(out_channels = 512, kernel_size=3, padding = 1)
        
        self.inc1 = InceptionBlock(64, (64, 128), (128, 256), 128)
        self.inc2 = InceptionBlock(96, (64, 216), (128, 354), 216)
        self.inc3 = InceptionBlock(128, (64, 256), (128, 512), 256)
        
        self.MLP = nn.LazyLinear(out_features=1024)
        self.end = nn.LazyLinear(out_features=output_classes)
        
        self.dropout1 = nn.Dropout(p=0.1)
        self.dropout2 = nn.Dropout(p=0.5)
        


    def init_weights(self):
        nn.init.xavier_uniform_(self.conv1.weight)
        nn.init.xavier_uniform_(self.conv2.weight)
        nn.init.xavier_uniform_(self.conv3.weight)
        nn.init.xavier_uniform_(self.MLP.weight)
        nn.init.xavier_uniform_(self.end.weight)
        self.inc1.init()
        self.inc2.init()
        self.inc3.init()

    def forward(self, X):
        X = self.dropout1(nn.ReLU()(self.conv1(X)))
        X = nn.MaxPool2d(kernel_size=3,padding=1)(X)
        X = self.dropout1(nn.ReLU()(self.conv2(X)))
        X = nn.MaxPool2d(kernel_size=3, padding=1)(X)
        X = self.dropout1(nn.ReLU()(self.conv3(X)))
        X = nn.MaxPool2d(kernel_size=3, padding=1)(X)
        
        X = self.inc1(X)
        X = self.inc2(X)
        X = self.inc3(X)
    
        X = nn.Flatten()(X)
        X = nn.ReLU()(self.dropout2(self.MLP(X)))
        X = self.dropout2(self.end(X))
        return X


model = Model()
model(torch.Tensor(1,3,32,32))
model.init_weights()
def init_cnn(module: nn.Module):
    if type(module) == nn.Linear or type(module) == nn.Conv2d:
        nn.init.xavier_uniform_(module.weight)
        

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


print(f'The model has {count_parameters(model):,} trainable parameters')


The model has 12,496,636 trainable parameters


In [123]:
optimizer = optim.Adam(model.parameters(),lr=0.01)
loss = nn.CrossEntropyLoss()
device = torch.device('cpu' if torch.backends.mps.is_available() else 'cpu')
print(device)
model = model.to(device)
loss = loss.to(device)

cpu


In [124]:
def acc(y_hat: torch.Tensor, y: torch.Tensor):
    preds = y_hat.argmax(1, keepdim=True)
    hits = preds.eq(y.view_as(preds)).sum()
    return hits.float() / y.shape[0]

In [125]:
def train(model, it, optim, loss, device):
    epoch_loss = 0
    epoch_acc = 0
    model.train() # set training mode

    for (data, labels) in tqdm(it, desc="Training", leave=False):
        data = data.to(device)
        labels = labels.to(device)
        
        optim.zero_grad()
        y_hat = model(data)
        batch_loss = loss(y_hat, labels)
        batch_acc = acc(y_hat, labels)

        batch_loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.01)
        optim.step()

        # print(epoch_loss, batch_loss,y_hat)
        
        epoch_loss += batch_loss.item()
        epoch_acc += batch_acc.item()

    return epoch_loss / len(it), epoch_acc / len(it) # avg loss/acc

def eval(model, it, loss, device):
    epoch_loss = 0
    epoch_acc = 0
    model.eval() # set training mode

    with torch.no_grad():
        for (data, labels) in tqdm(it, desc="Evaluating", leave=False):
            data = data.to(device)
            labels = labels.to(device)

            y_hat = model(data)
            batch_loss = loss(y_hat, labels)
            batch_acc = acc(y_hat, labels)

            epoch_loss += batch_loss.item()
            epoch_acc += batch_acc.item()

    return epoch_loss / len(it), epoch_acc / len(it) # avg loss/acc

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [126]:
EPOCHS = 69

for epoch in trange(EPOCHS, desc="Epochs"):
     start_time = time.monotonic()

     train_loss, train_acc = train(model, train_iterator, optimizer, loss, device)
     valid_loss, valid_acc = eval(model, valid_iterator, loss, device)
     end_time = time.monotonic()

     epoch_mins, epoch_secs = epoch_time(start_time, end_time)

     print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
     print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
     print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

Epochs:   0%|          | 0/69 [00:00<?, ?it/s]

Training:   0%|          | 0/704 [00:00<?, ?it/s]