<a href="https://colab.research.google.com/github/tsai-praveen/tsai-assignments/blob/main/S6/S6_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sample Code here.. Actual code way below

## Actual Code (from class)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


In [None]:
# Depthwise Convolution class

class DepthwiseSeparable(nn.Module):
    def __init__(self, in_ch, out_ch, stride=1):
        super(DepthwiseSeparable, self).__init__()
        self.in_ch = in_ch
        self.out_ch = out_ch
        self.stride = stride

        self.depthwise = nn.Sequential(
            nn.Conv2d(in_channels=self.in_ch, out_channels=self.in_ch, kernel_size=(3, 3), padding=1, stride=self.stride, groups=self.in_ch, bias=False),
            nn.Conv2d(in_channels=self.in_ch, out_channels=self.out_ch, kernel_size=(1, 1), bias=False)
        )

    def forward(self, x):
        return self.depthwise(x)

In [None]:
# The network
class Net(nn.Module):
    def __init__(self, drop=0.025):
        super(Net, self).__init__()

        # Define C1 C2 C3 C4 Output blocks
        self.conblock1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=(3, 3), padding=1, bias=False), # I: 32x32x3 | O: 32x32x32 | RF: 3x3
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(),
            nn.Dropout(p=drop),

            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), padding=1, bias=False), # I: 32x32x32 | O: 32x32x32 | RF: 5x5
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(),
            nn.Dropout(p=drop)
        )

        self.transblock1 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), padding=1, stride=2, bias=False), #I: 32x32x32 | O: 16x16x32 | RF: 7x7
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(),
            nn.Dropout(p=drop)
        )
        
        self.conblock2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), padding=1, bias=False), # I: 16x16x32 | O: 16x16x64 | RF: 11x11
            nn.BatchNorm2d(num_features=64),
            nn.ReLU(),
            nn.Dropout(p=drop)
        )

        self.transblock2 = nn.Sequential(
            DepthwiseSeparable(in_ch=64, out_ch=64, stride=2), #I: 16x16x64 | O: 8x8x64 | RF: 15x15
            nn.BatchNorm2d(num_features=64),
            nn.ReLU(),
            nn.Dropout(p=drop)
        )

        self.conblock3 = nn.Sequential(
            DepthwiseSeparable(in_ch=64, out_ch=128), #I: 8x8x64 | O: 8x8x128 | RF: 23x23
            nn.BatchNorm2d(num_features=128),
            nn.ReLU(),
            nn.Dropout(p=drop),

            nn.Conv2d(in_channels=128, out_channels=32, kernel_size=(1, 1), bias=False), # I: 8x8x128 | O: 8x8x32 | RF: 23x23
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(),
            nn.Dropout(p=drop)
        )

        self.transblock3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), padding=1, dilation=2, bias=False), # I: 8x8x32 | O: 6x6x32 | RF: 39x39
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), padding=1, dilation=2, bias=False), # I: 6x6x32 | O: 4x4x32 | RF: 55x55
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(),
            nn.Dropout(p=drop)
        )

        self.conblock4 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), padding=1, bias=False), # I: 4x4x32 | O: 4x4x32 | RF: 63x63
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=10, kernel_size=(3, 3), padding=1, bias=False), # I: 4x4x32 | O: 4x4x10 | RF: 71x71
            
        )

        self.gap = nn.AvgPool2d(kernel_size=4)

    def forward(self, x):
        x = self.transblock1(self.conblock1(x))
        x = self.transblock2(self.conblock2(x))
        x = self.transblock3(self.conblock3(x))
        x = self.conblock4(x)
        x = self.gap(x)
        x = x.view(-1, 10)

        return F.log_softmax(input=x, dim=-1)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

import albumentations as A
from albumentations.pytorch import ToTensorV2

import torch
import torch.optim as optim
from torchsummary import summary
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision import datasets, transforms
from tqdm import tqdm
 

In [None]:
exp = datasets.CIFAR10('./data', train=True, download=True)
exp_data = exp.data

# Calculate the mean and standard deviation for normalization
print('[Train]')
print(' - Numpy Shape : ', exp_data.shape)
print(' - min:', np.min(exp_data, axis=(0,1,2))/255.)
print(' - max:', np.max(exp_data, axis=(0,1,2))/255.)
print(' - mean:', np.mean(exp_data, axis=(0,1,2))/255.)
print(' - std:', np.std(exp_data, axis=(0,1,2))/255.)
print(' - var:', np.var(exp_data, axis=(0,1,2))/255.)

In [None]:
exp.classes

In [None]:
def viz_data(cols=8, rows=5):
    figure = plt.figure(figsize=(14, 10))
    for i in range(1, cols*rows+1):
        img, label = exp[i]

        figure.add_subplot(rows, cols, i)
        plt.title(exp.classes[label])
        plt.axis('off')
        plt.imshow(img, cmap='gray')

    plt.tight_layout()
    plt.show()


In [None]:
viz_data()

In [None]:
def show_images(aug_dict, ncol=6):
    nrow = len(aug_dict)

    fig, axes = plt.subplots(ncol, nrow, figsize=(3*nrow, 15), squeeze=False)

    for i, (key, aug) in enumerate(aug_dict.items()):
        for j in range(ncol):
            ax = axes[j, i]
            if j==0:
                ax.text(0.5, 0.5, key, horizontalalignment='center', verticalalignment='center', fontsize=15)
                ax.get_xaxis().set_visible(False)
                ax.get_yaxis().set_visible(False)
                ax.axis('off')
            else:
                image, label = exp[j-1]
                if aug is not None:
                    transform = A.Compose([aug])
                    image = np.array(image)
                    image = transform(image=image)['image']

                ax.imshow(image)
                ax.set_title(f'{exp.classes[label]}')
                ax.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
show_images({
    'Original Image' : None, 
    'Horizontal Flip': A.HorizontalFlip(always_apply=True),
    'Vertical Flip': A.VerticalFlip(always_apply=True),
    'Cut Out': A.CoarseDropout(max_holes=1, max_height=16, max_width=16, min_holes=1, min_height=16,
                               min_width=16, fill_value=0.473363, mask_fill_value=None, always_apply=True),
    'Gray Scale': A.ToGray(always_apply=True)    
})

In [None]:
class AlbumentationImageDataset(Dataset):
    def __init__(self, image_list, train=True):
        self.image_list = image_list
        self.aug = A.Compose({
            A.Normalize(mean=(0.49139968, 0.48215841, 0.44653091), std=(0.24703223, 0.24348513, 0.26158784)),
            A.HorizontalFlip(),
            A.ShiftScaleRotate(),
            A.CoarseDropout(max_holes=1, max_height=16, max_width=16, min_holes=1, min_height=16,
                               min_width=16, fill_value=0.473363, mask_fill_value=None),
            A.ToGray()
        })

        self.norm = A.Compose({
            A.Normalize(mean=(0.49139968, 0.48215841, 0.44653091), std=(0.24703223, 0.24348513, 0.26158784))
        })

        self.train = train

    def __len__(self):
        return(len(self.image_list))
    
    def __getitem__(self, i):
        image, label = self.image_list[i]

        if self.train:
            image = self.aug(image=np.array(image))['image']
        else:
            image = self.norm(image = np.array(image))['image']

        image = np.transpose(image, (2, 0, 1)).astype(np.float32)
        return torch.tensor(image, dtype=torch.float), label

In [None]:
SEED = 1

# CUDA available?
cuda = torch.cuda.is_available()
print(f"CUDA Available : {cuda}")

# For reproducability
torch.manual_seed(SEED)

if cuda:
    torch.cuda.manual_seed(SEED)
BATCH_SIZE = 64




In [None]:
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True)
len(trainset), len(testset)

In [None]:
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True)

train_loader = DataLoader(AlbumentationImageDataset(image_list=trainset, train=True), batch_size=BATCH_SIZE,
                          shuffle=True, num_workers=2)
test_loader = DataLoader(AlbumentationImageDataset(image_list=testset, train=False), batch_size=BATCH_SIZE,
                         shuffle=False, num_workers=1)

In [None]:
use_cuda = torch.cuda.is_available()
device = torch.device('cuda' if use_cuda else 'cpu')
print(device)

net = Net().to(device)
summary(model=net, input_size=(3, 32, 32))

In [None]:
enumerate(train_loader)
# pbar = tqdm(train_loader)
# for batch_idx, (data, target) in enumerate(pbar):
#     print(batch_idx)
#     break

In [None]:
def train(model, device, train_loader, optimizer, l1, scheduler):
    model.train()

    pbar = tqdm(train_loader)
    correct = 0
    processed = 0
    num_loops = 0
    train_loss = 0

    print("Got to training....")
    for batch_idx, (data, target) in enumerate(pbar):
        print("Batch : ", batch_idx)
        # get samples
        data, target = data.to(device), target.to(device)

        # Init
        optimizer.zero_grad()

        # In pytorch, we need to set the gradient to zero before starting to do backpropagation
        # Because pytorch will accumulate the gradients on subsequent passes. 
        # Hence, when you start training loop, zero out your gradients so that parameter update is done correctly

        # Predict 
        print("Before model run")
        y_pred = model(data)
        print("After model run")

        # Calculate loss
        loss = F.nll_loss(y_pred, target)
        l1 = 0
        lambda_l1 = 0.01

        if l1:
            for p in model.parameter():
                l1 = l1 + p.abs().sum()

        loss = loss + lambda_l1*l1

        # Backpropagation
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        # Update LR
        scheduler.step()

        # Update pbar-tqdm
        pred = y_pred.argmax(dim=1, keepdim=True) # get the index of the max log-probability
        correct += pred.eq(target.view_as(pred)).sum().item()
        processed += len(data)

        num_loops += 1
        pbar.set_description(desc=f"Batch_id={batch_idx} Loss={train_loss/num_loops:.5f} Accuracy={100 * correct / processed:0.2f}")

    return 100*correct/processed, train_loss/num_loops


In [None]:
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
            pred = output.argmax(dim=1, keepdims=True) # get the index of the max log probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print("\n Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n".format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    
    return 100. * correct / len(test_loader.dataset), test_loss
                            


In [None]:
def fit_model(net, NUM_EPOCHS=20, l1=False, l2=False):
    training_acc, training_loss, testing_acc, testing_loss = list(), list(), list(), list()

    if l2:
        optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)
    else:
        optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

    scheduler = optim.lr_scheduler.OneCycleLR(optimizer=optimizer, max_lr=0.017, epochs=NUM_EPOCHS, steps_per_epoch=len(train_loader))

    for epoch in range(1, NUM_EPOCHS+1):
        print(f"EPOCH : {epoch}")
        train_acc, train_loss = train(net, device, train_loader, optimizer, l1, scheduler)
        test_acc, test_loss = test(net, device, test_loader)

        training_acc.append(train_acc)
        training_loss.append(train_loss)
        testing_acc.append(test_acc)
        testing_loss.append(test_loss)

    return net, (training_acc, training_loss, testing_acc, testing_loss)

In [None]:
net, history = fit_model(net.to(device), NUM_EPOCHS=100)