<a href="https://colab.research.google.com/github/wmrohan/bisevo-ai-ml/blob/main/Assignment_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.optim as optim
import matplotlib.pyplot as plt
from timeit import default_timer as timer
from tqdm.auto import tqdm
import pandas as pd
import random


# Check if a GPU is available, and if not, use the CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create a random tensor of shape [1, 3, 64, 64]
random_tensor = torch.randn(1, 3, 64,64).to(device)
print(random_tensor)

def apply_conv2d(input_tensor, kernel_size, padding, stride):
    conv_layer = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=kernel_size, padding=padding, stride=stride)
    output_tensor = conv_layer(input_tensor).to(device)
    print(f"Output shape for kernel_size={kernel_size}, padding={padding}, stride={stride}: {output_tensor.shape}")

# Test different hyperparameter settings
apply_conv2d(random_tensor, kernel_size=3, padding=1, stride=1)
apply_conv2d(random_tensor, kernel_size=5, padding=2, stride=2)
apply_conv2d(random_tensor, kernel_size=3, padding=0, stride=2)

def accuracy_fn(y_true, y_pred):
    """Calculates accuracy between truth labels and predictions.
    Args:
        y_true (torch.Tensor): Truth labels for predictions.
        y_pred (torch.Tensor): Predictions to be compared to predictions.
    Returns:
        [torch.float]: Accuracy value between y_true and y_pred, e.g. 78.45
    """
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

from timeit import default_timer as timer

def print_train_time(start: float, end: float, device: torch.device = None):
    """Prints difference between start and end time.
    Args:
        start (float): Start time of computation (preferred in timeit format).
        end (float): End time of computation.
        device ([type], optional): Device that compute is running on. Defaults to None.
    Returns:
    float: time between start and end in seconds (higher is longer).
    """
    total_time = end - start
    print(f"Train time on {device}: {total_time:.3f} seconds")
    return total_time



def train_step(model: torch.nn.Module,
            data_loader: torch.utils.data.DataLoader,
            loss_fn: torch.nn.Module,
            optimizer: torch.optim.Optimizer,
            accuracy_fn,
    device: torch.device = device):
    train_loss, train_acc = 0, 0
    for batch, (X, y) in enumerate(data_loader):# Add a loop to loop through training batches
        # Send data to GPU
        X, y = X.to(device), y.to(device)
        # 1. Forward pass
        y_pred = model(X)
        # 2. Calculate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1)) # Go from logits -> pred labels
        #  3. Backward and Optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    # Calculate loss and accuracy per epoch and print out what's happening
    train_loss /= len(data_loader)
    train_acc /= len(data_loader)
    print(f"Model : {model} Train loss: {train_loss:.5f} | Train accuracy: {train_acc:.2f}%")

def test_step(data_loader: torch.utils.data.DataLoader,
              model: torch.nn.Module,
              loss_fn: torch.nn.Module,
              accuracy_fn,
             device: torch.device = device):
    test_loss, test_acc = 0, 0
    model.eval() # put model in eval mode
    # Turn on no_grad context manager
    with torch.no_grad():
        for X, y in data_loader:
            # Send data to GPU
            X, y = X.to(device), y.to(device)
            # 1. Forward pass
            test_pred = model(X)
            # 2. Calculate loss and accuracy
            test_loss += loss_fn(test_pred, y)
            test_acc += accuracy_fn(y_true=y,
                y_pred=test_pred.argmax(dim=1)) # Go from logits -> pred labels

        # Adjust metrics and print out
        test_loss /= len(data_loader)
        test_acc /= len(data_loader)
        print(f"Model : {model} Test loss: {test_loss:.5f} | Test accuracy: {test_acc:.2f}%\n")

def eval_model(model: torch.nn.Module,
            data_loader: torch.utils.data.DataLoader,
            loss_fn: torch.nn.Module,
            accuracy_fn,
            device: torch.device = device):
    loss, acc = 0, 0
    model.eval() # Set evaluation mode return self.train (False)
    with torch.no_grad():
        for X, y in data_loader:
            X, y = X.to(device), y.to(device) # Send data to the target device
            y_pred = model(X) # Make predictions with the model
            # Accumulate the loss and accuracy values per batch
            loss += loss_fn(y_pred, y)
            acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1)) # For accuracy, need the prediction labels (logits -> pred_prob -> pred_labels)

        # Scale loss and acc to find the average loss/acc per batch
        loss /= len(data_loader)
        acc /= len(data_loader)
    return {"model_name": model.__class__.__name__, # only works when model was created with a class
            "model_loss": loss.item(),
            "model_acc": acc}

class TinyVGG(nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
        super().__init__()
        self.block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape,
                out_channels=hidden_units,
                kernel_size=5, # how big is the filter that's going over the image
                stride=1, # default
                padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                    padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
            stride=2)
        )
        self.block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, 2, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)

        )
        self.classifier = nn.Sequential(

            nn.Flatten(),
            nn.Linear(in_features=hidden_units*7*7, # run through the net with image shape at this stage
            out_features=output_shape)
        )
    def forward(self, x: torch.Tensor):
         x = self.block_1(x)
         # print(x.shape)
         x = self.block_2(x)
         # print(x.shape)
         x = self.classifier(x)
        # print(x.shape)
         return x


# Load MNIST dataset and create data loaders
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)

#create dataloaders passing the datasets and consider batch size as 8
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Instantiate the TinyVGG model and move it to GPU if available
model_0 = TinyVGG(input_shape=1, hidden_units=10, output_shape=len(train_dataset.classes)).to(device)

#steup Loss function and optimizer.
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params = model_0.parameters(),lr=0.01)

torch.manual_seed(99)
train_time_start_model_0 = timer()
epochs = 5
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n---------")
    train_step(data_loader=train_loader,
        model=model_0,
        loss_fn=loss_fn,
        optimizer=optimizer,
        accuracy_fn=accuracy_fn,
        device=device
)
    test_step(data_loader=test_loader,
        model=model_0,
        loss_fn=loss_fn,
        accuracy_fn=accuracy_fn,
        device=device
)
# Calculate training time using our print_train_time() function
train_time_end_model_0 = timer()
total_train_time_model_0 = print_train_time(start= train_time_start_model_0, end=train_time_end_model_0,
device=device)

model_0_results = eval_model(model=model_0,
    data_loader=test_loader,
    loss_fn=loss_fn,
    accuracy_fn=accuracy_fn,
    device=device)
model_0_results

print(model_0_results)

y_preds = [] #create an empty predictions list so we can add our predictions to that list
model_0.eval()
with torch.no_grad(): #set our model into evaluation mode as our context manager
#similar code to previous testing loop except append all predictions to a list (iterate through test_dataloader)
    for X, y in tqdm(test_loader, desc="Making predictions"): #Our tqdm description is "making predictions"
                # Send data and targets to target device
                X, y = X.to(device), y.to(device)
                # Do the forward pass
                #(raw outputs of a model with linear layer at the end are referred to as logits)
                y_logit = model_0(X)
                # Not calculate the loss but Turn predictions from logits -> prediction probabilities -> predictions labels
                #squeeze y_logit and loop across the 1st dimension (dim=0) and take the argmax of that across dim=1
                y_pred = torch.softmax(y_logit.squeeze(), dim=0).argmax(dim=1)
                # Put predictions on CPU for evaluation (matplotlib requirement)
                y_preds.append(y_pred.cpu())
    # Since we get a list of different predictions we can concatenate list of predictions into a tensor
y_pred_tensor = torch.cat(y_preds)
try:
    import torchmetrics, mlxtend
    print(f"mlxtend version: {mlxtend.__version__}")
    assert int(mlxtend.__version__.split(".")[1]) >= 19, "mlxtend verison should be 0.19.0 or higher"
except:
    !pip install -q torchmetrics -U mlxtend # <- Note: often require restarting the runtime/Kernel
    import torchmetrics, mlxtend
    print(f"mlxtend version: {mlxtend.__version__}")
#compare_results = pd.DataFrame([model_0_results, model_1_results, model_2_results])
#compare_results
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix

#Draw Confusion Matrix from the Train and test dataset
confmat = ConfusionMatrix(task='multiclass',num_classes=len(train_dataset.classes))
confmat_tensor = confmat(preds=y_pred_tensor,
target=test_dataset.targets)
# 3. Plot the confusion matrix
fig, ax = plot_confusion_matrix(
conf_mat=confmat_tensor.numpy(), # matplotlib works with NumPy
# turn the row and column labels into class names
class_names=train_dataset.classes,
figsize=(10, 7)
);


tensor([[[[-0.6832,  0.5286,  0.0657,  ...,  0.1903,  0.8164, -1.0014],
          [ 0.4422, -1.7012,  0.4279,  ..., -0.1093, -0.7446, -0.6310],
          [-0.0829,  0.6157, -0.8538,  ..., -0.8542, -0.0052,  0.2447],
          ...,
          [-0.3133, -0.8166, -0.9174,  ..., -1.1026,  0.3989,  0.2705],
          [ 0.1917, -1.2777, -1.0548,  ..., -0.3577, -0.0618,  0.7929],
          [-0.1710,  0.4883, -2.7755,  ...,  1.1779, -1.3473, -1.2584]],

         [[-0.5938,  0.7104, -0.2424,  ..., -0.6059,  1.8179, -0.5420],
          [-0.6159, -0.4900,  0.8943,  ...,  1.0469,  0.6119, -1.2052],
          [ 0.4501,  1.9181,  1.1132,  ...,  0.3698, -0.0804, -0.7101],
          ...,
          [ 1.4761,  0.3965,  0.1743,  ...,  0.4694,  2.4600,  0.3486],
          [-0.9655,  0.0539, -0.6757,  ...,  1.1373, -0.3866, -1.0749],
          [-2.1097,  0.6134, -2.4563,  ..., -1.1416, -0.9337,  1.2521]],

         [[-1.1603, -0.1439, -1.1294,  ...,  0.5810, -0.7463,  0.3763],
          [ 0.1282,  0.8620, -

RuntimeError: ignored