# CNN
Using pytorch nn, the code will build cnn from scratch
## Dataset
data set will be black and white picture (inchannel = 1), where capture basic places with labels. The size of the data is 64 x 64.
##  nn liabraries that are going to be used in the project
### nn.Module
This is the basic class for all neaural network modules. all neaural network built by  nn should be subcalss of this class
### nn.Conv2
2d convolutional process where the params are (in_channel, out_channel, kernel_size, ...). Since our image is 2d we will do 2d convolution.
### nn.MaxPool2d
2d Maxpooling. from 2d kernel, pick max value on each kernel.
### nn.ReLU
Applies recifiedlinear unit function. If the input is negative, it is zero, otherwise the output will be input.
### nn.Linear
Applies linear transformation (y = xA**T + b) on input size and output size.

In [1]:
# import necessary liabraries
from torch import nn

## Architectural Design
1. (2d convolutional -> max pooling with 3x3 kenel -> relu) 
2. (2d convolution -> max pooling with 3x3 kernel -> relu)
3. two fully connected layers

### Loss Criterion
To calculate the difference, it used 'mean' reduction in the loss criterion by using method of cross entropy

In [2]:
class CNN1(nn.Module):
    def __init__(self):
        """
        Init function to define the layers and loss function

        Note:Read Pytorch documention to understand what it means

        """
        super().__init__()

        self.conv_layers = nn.Sequential()
        # input: 1x(64x64)
        self.conv_layers.add_module("conv_1", nn.Conv2d(1, 10, kernel_size=5))
        # output/input: 10 x (60x60)
        self.conv_layers.add_module("maxpool1", nn.MaxPool2d(kernel_size=3))
        self.conv_layers.add_module("relu_1", nn.ReLU())
        # output: 10 x (20 x 20)

        # input: 10 x (20 x 20)
        self.conv_layers.add_module("conv_2", nn.Conv2d(10, 20, kernel_size=5))
        # output/input: 20 x (16x16)
        self.conv_layers.add_module("maxpool2", nn.MaxPool2d(kernel_size=3))
        self.conv_layers.add_module("relu_2",  nn.ReLU())
        # output: 20 x (5 x 5)

        self.conv_layers.add_module("flatten",  nn.Flatten())
        self.conv_layers.add_module("relu_3",  nn.ReLU())
        # output: 20 x 5 x 5 = 500

        # this is fully connected layers
        self.fc_layers = nn.Sequential()
        self.fc_layers.add_module("linear_1", nn.Linear(in_features=500, out_features=100))
        self.fc_layers.add_module("linear_2", nn.Linear(in_features=100, out_features=15))

        self.loss_criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        """
        Args:
        -   x: the input image [Dim: (N,C,H,W)] : data type = Tensor
        Returns:
        -   y: the output (raw scores) of the net [Dim: (N,15)] : data type = Tensor
        """
        return self.fc_layers(self.conv_layers(x))

## BatchNorm2d
Applied 4d Batch normalization. The mean and standard-deviation are calculated per-dimension over the mini-batches of 2D inputs with additional channel dimension and γ\gammaγ and β\betaβ are learnable parameter vectors of size of the input.
funtion is described on the website: https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm2d.html

In [6]:
class CNN2(nn.Module):
    def __init__(self):
        super(CNN2, self).__init__()

        self.fc_layers = nn.Sequential()

        self.conv_layers = nn.Sequential()
        self.conv_layers.add_module("conv_1", nn.Conv2d(1, 10, kernel_size=5))
        self.conv_layers.add_module("BN1", nn.BatchNorm2d(num_features=10, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True))
        self.conv_layers.add_module("maxpool1", nn.MaxPool2d(kernel_size=3, padding=1, stride = 2))
        self.conv_layers.add_module("relu_1", nn.ReLU())
        
        self.conv_layers.add_module("conv_2", nn.Conv2d(10, 20, kernel_size=5))
        self.conv_layers.add_module("BN2", nn.BatchNorm2d(num_features=20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True))
        self.conv_layers.add_module("maxpool2", nn.MaxPool2d(kernel_size=3, padding=1, stride= 2))
        self.conv_layers.add_module("relu_2", nn.ReLU())
        
        self.conv_layers.add_module("dropout", nn.Dropout())
        self.conv_layers.add_module("conv_3", nn.Conv2d(20, 50, kernel_size=5))
        self.conv_layers.add_module("maxpool3", nn.MaxPool2d(kernel_size=3, padding=1, stride= 2))
        self.conv_layers.add_module("relu_3", nn.ReLU())
        self.conv_layers.add_module("flatten", nn.Flatten())


        self.fc_layers.add_module("linear_1", nn.Linear(in_features=1250, out_features=500))
        self.fc_layers.add_module("linear_2", nn.Linear(in_features=500, out_features=15))

        self.loss_criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        """
        Perform the forward pass with the net

        Args:
        -   x: the input image [Dim: (N,C,H,W)]
        Returns:
        -   y: the output (raw scores) of the net [Dim: (N,15)]
        """
        
        conv_output = self.conv_layers(x)
        model_output = self.fc_layers(conv_output)

        return model_output


In [20]:
cnn1 = CNN1()
cnn2 = CNN2()
cnn1.train()
cnn2.train()

CNN2(
  (fc_layers): Sequential(
    (linear_1): Linear(in_features=1250, out_features=500, bias=True)
    (linear_2): Linear(in_features=500, out_features=15, bias=True)
  )
  (conv_layers): Sequential(
    (conv_1): Conv2d(1, 10, kernel_size=(5, 5), stride=(1, 1))
    (BN1): BatchNorm2d(10, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (maxpool1): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (relu_1): ReLU()
    (conv_2): Conv2d(10, 20, kernel_size=(5, 5), stride=(1, 1))
    (BN2): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (maxpool2): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (relu_2): ReLU()
    (dropout): Dropout(p=0.5, inplace=False)
    (conv_3): Conv2d(20, 50, kernel_size=(5, 5), stride=(1, 1))
    (maxpool3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (relu_3): ReLU()
    (flatten): Flatten(start_dim=1, end_di

In [11]:
""" Optional Cuda package (personally do not have cuda but you can use cuda to speed up the model)
from torch import cuda
if cuda:
    cnn1.cuda()
    cnn2.cuda()"""

' Optional Cuda package (personally do not have cuda but you can use cuda to speed up the model)\nfrom torch import cuda\nif cuda:\n    cnn1.cuda()\n    cnn2.cuda()'

In [14]:
import os
from typing import Tuple, Union

import matplotlib.pyplot as plt
import torch
import torchvision.transforms as transforms
from vision.dl_utils import compute_accuracy, compute_loss
from vision.image_loader import ImageLoader
from vision.my_resnet import MyResNet18
from vision.simple_net import SimpleNet
from torch.optim import Optimizer
from torch.utils.data import DataLoader


class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self, name: str, fmt: str = ":f") -> None:
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self) -> None:
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val: float, n: int = 1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})"
        return fmtstr.format(**self.__dict__)


class Trainer:
    """Class that stores model training metadata."""

    def __init__(
        self,
        data_dir: str,
        model: Union[SimpleNet, MyResNet18],
        optimizer: Optimizer,
        model_dir: str,
        train_data_transforms: transforms.Compose,
        val_data_transforms: transforms.Compose,
        batch_size: int = 100,
        load_from_disk: bool = True
    ) -> None:
        self.model_dir = model_dir

        self.model = model

        self.train_dataset = ImageLoader(
            data_dir, split="train", transform=train_data_transforms
        )
        self.train_loader = DataLoader(self.train_dataset, batch_size=batch_size, shuffle=True)

        self.val_dataset = ImageLoader(
            data_dir, split="test", transform=val_data_transforms
        )
        self.val_loader = DataLoader(self.val_dataset, batch_size=batch_size, shuffle=True)

        self.optimizer = optimizer

        self.train_loss_history = []
        self.validation_loss_history = []
        self.train_accuracy_history = []
        self.validation_accuracy_history = []

        # load the model from the disk if it exists
        if os.path.exists(model_dir) and load_from_disk:
            checkpoint = torch.load(os.path.join(self.model_dir, "checkpoint.pt"))
            self.model.load_state_dict(checkpoint["model_state_dict"])
            self.optimizer.load_state_dict(checkpoint["optimizer_state_dict"])

        self.model.train()

    def save_model(self) -> None:
        """
        Saves the model state and optimizer state on the dict
        """
        torch.save(
            {
                "model_state_dict": self.model.state_dict(),
                "optimizer_state_dict": self.optimizer.state_dict(),
            },
            os.path.join(self.model_dir, "checkpoint.pt"),
        )

    def run_training_loop(self, num_epochs: int) -> None:
        """Train for num_epochs, and validate after every epoch."""
        for epoch_idx in range(num_epochs):

            train_loss, train_acc = self.train_epoch()

            self.train_loss_history.append(train_loss)
            self.train_accuracy_history.append(train_acc)

            val_loss, val_acc = self.validate()
            self.validation_loss_history.append(val_loss)
            self.validation_accuracy_history.append(val_acc)

            print(
                f"Epoch:{epoch_idx + 1}"
                + f" Train Loss:{train_loss:.4f}"
                + f" Val Loss: {val_loss:.4f}"
                + f" Train Accuracy: {train_acc:.4f}"
                + f" Validation Accuracy: {val_acc:.4f}"
            )

    def train_epoch(self) -> Tuple[float, float]:
        """Implements the main training loop."""
        self.model.train()

        train_loss_meter = AverageMeter("train loss")
        train_acc_meter = AverageMeter("train accuracy")
        # loop over each minibatch
        for (x, y) in self.train_loader:

            n = x.shape[0]
            logits = self.model(x)
            batch_acc = compute_accuracy(logits, y)
            train_acc_meter.update(val=batch_acc, n=n)

            batch_loss = compute_loss(self.model, logits, y, is_normalize=True)
            train_loss_meter.update(val=float(batch_loss.cpu().item()), n=n)

            self.optimizer.zero_grad()
            batch_loss.backward() #do loss backward
            self.optimizer.step() #do optimizer step

        return train_loss_meter.avg, train_acc_meter.avg

    def validate(self) -> Tuple[float, float]:
        """Evaluate on held-out split (either val or test)"""
        self.model.eval()

        val_loss_meter = AverageMeter("val loss")
        val_acc_meter = AverageMeter("val accuracy")

        # loop over whole val set
        for (x, y) in self.val_loader:
            n = x.shape[0]
            logits = self.model(x)

            batch_acc = compute_accuracy(logits, y)
            val_acc_meter.update(val=batch_acc, n=n)

            batch_loss = compute_loss(self.model, logits, y, is_normalize=True)
            val_loss_meter.update(val=float(batch_loss.cpu().item()), n=n)

        return val_loss_meter.avg, val_acc_meter.avg

    def plot_loss_history(self) -> None:
        """Plots the loss history"""
        plt.figure()
        epoch_idxs = range(len(self.train_loss_history))

        plt.plot(epoch_idxs, self.train_loss_history, "-b", label="training")
        plt.plot(epoch_idxs, self.validation_loss_history, "-r", label="validation")
        plt.title("Loss history")
        plt.legend()
        plt.ylabel("Loss")
        plt.xlabel("Epochs")
        plt.show()

    def plot_accuracy(self) -> None:
        """Plots the accuracy history"""
        plt.figure()
        epoch_idxs = range(len(self.train_accuracy_history))
        plt.plot(epoch_idxs, self.train_accuracy_history, "-b", label="training")
        plt.plot(epoch_idxs, self.validation_accuracy_history, "-r", label="validation")
        plt.title("Accuracy history")
        plt.legend()
        plt.ylabel("Accuracy")
        plt.xlabel("Epochs")
        plt.show()


FileNotFoundError: [Errno 2] No such file or directory: './model_checkpoints/simple_net/checkpoint.pt'

## Insight on CNNs

In [None]:
def flatten_layers(layers):
    """
    Keep on flattening nn.Sequential objects
    """

    flattened_layers = list()

    recurse = False
    if isinstance(layers, nn.Linear):
        return layers
    for elem in layers:
        if type(elem) == nn.Sequential:
            recurse = True
            flattened_layers += list(elem.children())
        else:
            flattened_layers.append(elem)

    if recurse:
        return flatten_layers(flattened_layers)

    return flattened_layers


def extract_model_layers(model: Union[SimpleNet, SimpleNetFinal, MyResNet18]):
    # get the CNN sequential
    layers = flatten_layers(
        list(model.conv_layers.children())
        + (
            [model.fc_layers]
            if isinstance(model.fc_layers, nn.Linear)
            else list(model.fc_layers.children())
        )
    )

    # generate counts of different types of layers present in the model
    layers_type = [x.__class__.__name__ for x in layers]
    layers_count = Counter(layers_type)

    # get the total number of parameters which require grad and which do not require grad
    num_params_grad = 0
    num_params_nograd = 0
    for param in model.parameters():
        if param.requires_grad:
            num_params_grad += param.numel()
        else:
            num_params_nograd += param.numel()
    return (
        layers,
        layers[-1].out_features,
        layers_count,
        num_params_grad,
        num_params_nograd,
    )


if __name__ == "__main__":
    model1 = SimpleNet()
    print(extract_model_layers(model1))

    model2 = SimpleNetFinal()
    print(extract_model_layers(model2))

    model3 = MyResNet18()
    print(extract_model_layers(model3))