# Midterm Coding - Jan25

**Question 1)** ResNet on Cifar

**Question 2)** ResNet on small dataset

**Question 3)** Pretrained ResNet on small dataset

**Question 4)** Discussion

Please complete the code wherever you see "#Your code here"

Good luck!

## Question 1: ResNet on Cifar

#### 1. Prepare datasets 

In this part you need to:
- transform them to 3x224x224
- complete other necessary steps

In [44]:
import torch
import torchvision
from torchvision import datasets, models, transforms
import torch.nn as nn
import torch.optim as optim
import time
import os
from copy import copy
from copy import deepcopy
import torch.nn.functional as F
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
from torchvision import datasets
import torchvision.utils as vutils
from torch.utils.data import TensorDataset
from sklearn.model_selection import StratifiedKFold
import matplotlib.pyplot as plt

In [45]:
# Set device to GPU or CPU

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


#### Load image

In [46]:
full_train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                                  download=True)

#### Transform image (augmentation)

In [47]:
# Your code here





#### Other necessary steps

In [10]:
# Your code here
# Train test split + transform (recommend select few e.g. train 40000 val 10000





# DataLoaders for the three datasets (recommend bs = 128)







#### 2. Prepare model 

In this part you need to:
- complete the BottleneckBlock of ResNet
- create ResNet50 and ResNet101

## ResNet

On page 5 of [the ResNet paper](https://arxiv.org/pdf/1512.03385.pdf), the simplest ResNet
described is now known as ResNet18. It is a 1.8 GFLOP CNN with 8 residual blocks (two convolutional
layers in each residual block), which along with the initial convolution (7x7 in the paper, 3x3 here)
and the final linear / softmax layer gives us 18 layers:

<img src="img/ResNet18.JPG" title="ResNet18" style="width: 200px;" />

### Residual blocks

The residual block is a reusable block.
ResNet18 and ResNet34 use very basic residual blocks, but
ResNet50, ResNet101, and ResNet152 use more complicated residual blocks
with three convolutions, the middle of which is a
bottleneck that increases the representational power of the block
without an enormous increase in the number of parameters.

We need two types of residual block, one that preserves feature map size and one
that allows changes to the feature map size:

<img src="img/residualblock.png" title="Residual block" style="width: 640px;" />

Note that only the shape-preserving residual block has a real identity mapping.
The 1x1 strided convolution is the simplest way to allow changes in the input
feature map size, but since the parameters are learned, after training, the result
may be quite different from an identity mapping.

In [48]:
class BasicBlock(nn.Module):
    '''
    BasicBlock: Simple residual block with two conv layers
    '''
    EXPANSION = 1
    def __init__(self, in_planes, out_planes, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_planes)
        self.conv2 = nn.Conv2d(out_planes, out_planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_planes)
        self.shortcut = nn.Sequential()
        # If output size is not equal to input size, reshape it with 1x1 convolution
        if stride != 1 or in_planes != out_planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

Here's the bottlneck version with three layers per residual block:

In [49]:
class BottleneckBlock(nn.Module):
    '''
    BottleneckBlock: More powerful residual block with three convs, used for Resnet50 and up
    '''
    # Your code here









    

    def forward(self, x):
        # Your code here








        
        return out

### Resnet

Here is the whole shebang for ResNet, with the layer sizes tailored a bit to our input size of 64x64:

In [50]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super().__init__()
        self.in_planes = 64
        # Initial convolution
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        # Residual blocks
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        # FC layer = 1 layer
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.linear = nn.Linear(512 * block.EXPANSION, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.EXPANSION
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

Create ResNet50 and ResNet101

In [51]:
def ResNet18(num_classes = 10):
    '''
    First conv layer: 1
    4 residual blocks with two sets of two convolutions each: 2*2 + 2*2 + 2*2 + 2*2 = 16 conv layers
    last FC layer: 1
    Total layers: 1+16+1 = 18
    '''
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes)
    
def ResNet50(num_classes = 10):
    # Your code here



    
    return ResNet(BottleneckBlock, [3, 4, 6, 3], num_classes)
    
def ResNet101(num_classes = 10):
    # Your code here






    
    return ResNet(BottleneckBlock, [3, 4, 23, 3], num_classes)

### 3. Training

In this part, you need to:
- define necessary things for training
- train the model
- plot training loss and validation accuracy 

In [18]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, weights_name='weight_save', is_inception=False):
    '''
    train_model: train a model on a dataset
    
            Parameters:
                    model: Pytorch model
                    dataloaders: dataset
                    criterion: loss function
                    optimizer: update weights function
                    num_epochs: number of epochs
                    weights_name: file name to save weights
                    is_inception: The model is inception net (Google LeNet) or not

            Returns:
                    model: Best model from evaluation result
                    val_acc_history: evaluation accuracy history
                    loss_acc_history: loss value history
    '''
    since = time.time()

    val_acc_history = []
    loss_acc_history = []

    best_model_wts = deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        epoch_start = time.time()

        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                # for process anything, device and dataset must put in the same place.
                # If the model is in GPU, input and output must set to GPU
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                # it uses for update training weights
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    # Special case for inception because in training it has an auxiliary output. In train
                    #   mode we calculate the loss by summing the final output and the auxiliary output
                    #   but in testing we only consider the final output.
                    if is_inception and phase == 'train':
                        # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                        outputs, aux_outputs = model(inputs)
                        # print('outputs', outputs)
                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            epoch_end = time.time()
            
            elapsed_epoch = epoch_end - epoch_start

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
            print("Epoch time taken: ", elapsed_epoch)

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = deepcopy(model.state_dict())
                torch.save(model.state_dict(), weights_name + ".pth")
            if phase == 'val':
                val_acc_history.append(epoch_acc)
            if phase == 'train':
                loss_acc_history.append(epoch_loss)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history, loss_acc_history

Here we define the model, optimizer, loss function train the model

#1252 GB CUDA, Training took around 6-7 mins for me

In [1]:
# Your code here
# Model


# Loss function
# Your code here



# Optimizer
# Your code here



# Train model
# Your code here


Please plot training loss and validation accuracy 

In [20]:
import matplotlib.pyplot as plt

def plot_data(val_acc_history, loss_acc_history):
     # Your code here









### 4. Evaluation

In this part, you need to:
- define necessary things for evaluating the trained model. This includes function that collects test loss and accuracy
- evaluate the model
- plot confusion matrix
- discuss the result

In [22]:
# Your code here
def evaluate(model, iterator, criterion):
    










    
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator), predicteds, trues

Now, let's take a closer look at the results. Please plot confusion matrix

#### Confusion Matrix

In [2]:
def plot_confusion_matrix(test_true_label_, test_pred_label_):
    cm = confusion_matrix(test_true_label_, test_pred_label_)
    
    
    # Plot heatmap
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=range(10), yticklabels=range(10))
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title("Confusion Matrix")
    plt.show()

# Your code here






Please discuss the result

In [None]:
# You discussion here







## Question 2: ResNet on small dataset

In this question, we will train ResNet18 on the Komondor vs. Mop dataset. 

#### Load data

In [52]:
dataset = datasets.ImageFolder(root='data-mop-dog',
                           transform=transforms.Compose([
                               transforms.Resize(256),
                               transforms.CenterCrop(224),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                           ]))
print(dataset.class_to_idx)

{'class_1': 0, 'class_2': 1}


### Training

In [53]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, weights_name='weight_save', is_inception=False):
    print('====================== New Run =======================',file=open(f"{weights_name}.txt", "a"))

    since = time.time()

    val_acc_history = []
    loss_acc_history = []

    best_model_wts = deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        epoch_start = time.time()
        print('Epoch {}/{}'.format(epoch, num_epochs - 1), file=open(f"{weights_name}.txt", "a"))
        print('-' * 10, file=open(f"{weights_name}.txt", "a"))

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train() 
            else:
                model.eval() 

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:

                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):

                    if is_inception and phase == 'train':
                        outputs, aux_outputs = model(inputs)

                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            epoch_end = time.time()
            
            elapsed_epoch = epoch_end - epoch_start

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc), file=open(f"{weights_name}.txt", "a"))
            print(f"Epoch time taken: {elapsed_epoch}", file=open(f"{weights_name}.txt", "a"))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = deepcopy(model.state_dict())
                torch.save(model.state_dict(), weights_name + ".pth")
            if phase == 'val':
                val_acc_history.append(epoch_acc)
            if phase == 'train':
                loss_acc_history.append(epoch_loss)

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60), file=open(f"{weights_name}.txt", "a"))
    print('Best val Acc: {:4f}'.format(best_acc), file=open(f"{weights_name}.txt", "a"))

    return  val_acc_history, loss_acc_history

#### 1. Perform training

In this part, you need to:
- perform K-fold cross validation to get a reasonable validation accuracy estimate since you only have few examples in each category. I did for 8 folds
- plot validation loss and accuracy

In [5]:
import matplotlib.pyplot as plt

# Your code here
# Your Kfold











In [4]:
# Your code here
# Your Plot











#### 2. Evaluation
- Evaluate the model on the test samples (in data-mop-dog-test folder).
- This includes confusion metric and displaying image along with the predicted label and its ground truth
- Interpret the result

In [77]:
# Your code here
# transforms_function




dataset_test = datasets.ImageFolder(root='data-mop-dog-test',
                           transform=transforms_function
                           ]))

#Your code here
# data loader






In [78]:
def evaluate(model, iterator, criterion, class_names):
    # Your code here










    
    return epoch_loss / len(iterator), epoch_acc / len(iterator), predicteds, trues


In [79]:
def show_images(images, predicted_labels, true_labels, class_names):
    """
    Displays images with predicted and true labels.
    """
    images = torchvision.utils.make_grid(images, nrow=8)  # Arrange images in a grid
    images = images.numpy().transpose((1, 2, 0))  # Convert to (H, W, C) format

    # Normalize back to [0,1] if images were normalized
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    images = std * images + mean  # Unnormalize
    images = np.clip(images, 0, 1)  # Clip values

    plt.figure(figsize=(12, 6))
    plt.imshow(images)
    plt.axis("off")
    plt.title("Predictions: " + " | ".join([f"{class_names[p]} (True: {class_names[t]})"
                                            for p, t in zip(predicted_labels, true_labels)]))
    plt.show()

def plot_confusion_matrix(y_true, y_pred, class_names):
    """
    Plots the confusion matrix.
    """
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title("Confusion Matrix")
    plt.show()

In [7]:
# Your code here
# Evaluate & show image & confusion matrix







## Question 3) Pretrained ResNet18 on small dataset

In this question, we will train pretrained ResNet18 on the Komondor vs. Mop dataset. 

In [84]:
import torch
import torchvision.models as models

In [8]:
# Your code here
# Your Kfold code









#### 2. Evaluation
- Evaluate the model on the test samples (in data-mop-dog-test folder).
- This includes confusion metric and displaying image along with the predicted label and its ground truth
- Interpret the result

In [87]:
dataset_test = datasets.ImageFolder(root='data-mop-dog-test',
                           transform=transforms.Compose([
                               transforms.Resize(256),
                               transforms.CenterCrop(224),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                           ]))
test_dataloader = torch.utils.data.DataLoader(dataset_test,batch_size = 1,shuffle = True)

In [91]:
def evaluate(model, iterator, criterion, class_names):
    # Your code here











    
    return epoch_loss / len(iterator), epoch_acc / len(iterator), predicteds, trues


In [9]:
# Your code here
# Evaluate & show image & confusion matrix









## Question 4: Discuss the results
You may include:
- describe the results
- why is it like this what are the possible reasons
- how would y