In [188]:
import w0d3Remnant as fromYesterday
import torch as t
from torch import nn as nn
import utils
import typing

# Assembling ResNet

This is a combination of my day four *and* five work.

Day Four - Spent two hours, eight minutes on this so far. Going to revisit it later, cannot focus on it
Day Five - Spent three hours and thirty three minutes. Finished it. LETS GOOOOOO

### Batch Normalization in CNNs

Batch Norm is a method to address overfitting and slow training issues in Deep Neural Networks.

Normalization itself is just a pre-processing technique used to standardize data. This is often done to make features balanced - if they have differences in ranges, different features could recieve inflated importances. Normalizing your features helps models learn better.

You can instead normalize batches of data inside the network itself, done between layers. Each neuron will first apply its weights (not biases!): $$z = g(w,x);$$

But before applying the activation function, a batch norm is applied to each neurons output across the layer.

$$z^N = (\frac{z - m_z}{s_z}) \cdot \gamma + \beta$$

where $z^N$ is the output of the Batch Norm, $m_z$ is the mean of the neuron's output, $s_z$ is the standard deviation of the output of the neurons, and $\gamma$ and $\beta$ are learnable parameters which eventually represent the standard deviation and the mean of the outputs, respectively.

The output of batch norm is then fed into the activation function

$$a = f(z^N)$$

Why does this work? Firstly, think of it intuitively as the same thing you are doing to the features. But secondly, it reduces the "internal covariate shift of the network" - it reduces the shift due to a change in data distribution. Here, it the data recieved from the previous layer, which is constantly changing.

Batch norm also has a regularization effect, speeds up computation!

When applying this to convolutional layers, each feature map ends up having a single mean and standard deviation.



In [189]:
class Sequential(nn.Module):
    def __init__(self, *modules: nn.Module):
        super().__init__()
        for i, mod in enumerate(modules):
            self.add_module(str(i), mod)

    def forward(self, x: t.Tensor) -> t.Tensor:
        '''Chain each module together, with the output from one feeding into the next one.'''
        for mod in self._modules.values():
            x = mod(x)
        return x

model = Sequential(fromYesterday.Conv2d(3,10,3,1,0),
fromYesterday.ReLU(),
nn.Linear(10,5))

print(model)

Sequential(
  (0): Conv2d(weights shape = torch.Size([10, 3, 3, 3]) stride = (1, 1) padding = (0, 0) )
  (1): ReLU()
  (2): Linear(in_features=10, out_features=5, bias=True)
)


In [190]:
from math import gamma
from operator import truediv
import einops
from tkinter.tix import MAIN


class BatchNorm2d(nn.Module):
    running_mean: t.Tensor         # shape: (num_features,)
    running_var: t.Tensor          # shape: (num_features,)
    num_batches_tracked: t.Tensor  # shape: ()

    def __init__(self, num_features: int, eps=1e-05, momentum=0.1):
        '''Like nn.BatchNorm2d with track_running_stats=True and affine=True.

        Name the learnable affine parameters `weight` and `bias` in that order.
        '''
        super().__init__()
        #print("num features is " + str(num_features))
        running_mean = t.zeros(num_features)
        running_var = t.ones(num_features)
        num_batches_tracked = t.tensor(0)
        
        self.weight = nn.Parameter(t.ones(num_features)) # tracks gamma
        self.bias = nn.Parameter(t.zeros(num_features)) # tracks beta
        self.register_buffer('running_mean', running_mean) # holds mean across channels
        self.register_buffer('running_var', running_var) # holds variance across channels
        self.register_buffer('num_batches_tracked', num_batches_tracked) # stores num batches tracked
        self.num_features = num_features
        self.momentum = momentum
        self.eps = eps


    def forward(self, x: t.Tensor) -> t.Tensor:
        '''Normalize each channel.

        Compute the variance using `torch.var(x, unbiased=False)`
        Hint: you may also find it helpful to use the argument `keepdim`.

        x: shape (batch, channels, height, width)
        Return: shape (batch, channels, height, width)
        '''
        if self.training:
            # update running mean
            mean = t.mean(x, (0,2,3), keepdim = False)
            self.running_mean = (1 - self.momentum) * mean + (self.momentum) * self.running_mean
            # update running variance
            var = t.var(x, (0,2,3), keepdim = False, unbiased=False)
            self.running_var = (1 - self.momentum) * var + (self.momentum) * self.running_var
            self.num_batches_tracked += 1
        else:
            mean = self.running_mean
            var = self.running_var

        mean = einops.rearrange(mean, 'c -> 1 c 1 1')
        var = einops.rearrange(var, 'c -> 1 c 1 1')
        gamma = einops.rearrange(self.weight, 'c -> 1 c 1 1')
        beta = einops.rearrange(self.bias, 'c -> 1 c 1 1')

        returnX = ((x - mean) / (t.sqrt(var + self.eps))) * gamma + beta
        return returnX

       
    # credit to callum for this
    def extra_repr(self) -> str:
        return ", ".join([f"{key}={getattr(self, key)}" for key in ["num_features", "eps", "momentum"]])



if MAIN:
    utils.test_batchnorm2d_module(BatchNorm2d)
    utils.test_batchnorm2d_forward(BatchNorm2d)
    utils.test_batchnorm2d_running_mean(BatchNorm2d)

All tests in `test_batchnorm2d_module` passed!
All tests in `test_batchnorm2d_forward` passed!
All tests in `test_batchnorm2d_running_mean` passed!


## Watching a paper review of ResNet

A key thing to realize is that when they were contemplating why accuracy wasn't increasing with model depth, the issue was not overfitting!

Each chunk of the model only has to learn the difference in how they wish for the data to change, rather than also having to learn an identiy function, too. Regluarization pushes against identity since it pushes everything down to zero.

In [191]:
class AveragePool(nn.Module):
    def forward(self, x: t.Tensor) -> t.Tensor:
        '''
        x: shape (batch, channels, height, width)
        Return: shape (batch, channels)
        '''

        return t.mean(x,(2,3))


## Residual Block

In [192]:
class ResidualBlock(nn.Module):
    def __init__(self, in_feats: int, out_feats: int, first_stride=1):
        '''A single residual block with optional downsampling.

        For compatibility with the pretrained model, declare the left side branch first using a `Sequential`.

        If first_stride is > 1, this means the optional (conv + bn) should be present on the right branch. Declare it second using another `Sequential`.
        '''

        super().__init__()
        
        left_branch = nn.Sequential(
            fromYesterday.Conv2d(in_feats, out_feats, 3, first_stride, 1), # TODO: why is padding 1? not zero?
            BatchNorm2d(out_feats),
            fromYesterday.ReLU(),
            fromYesterday.Conv2d(out_feats, out_feats, 3, 1, 1),
            BatchNorm2d(out_feats)
        )

        if first_stride > 1:
            right_branch = nn.Sequential(
                fromYesterday.Conv2d(in_feats, out_feats, 1, first_stride, 0),
                BatchNorm2d(out_feats)
            )
        else:
            right_branch = nn.Sequential(
                nn.Identity()
            )

        self.left = left_branch
        self.right = right_branch
        self.relu = fromYesterday.ReLU()
        

    def forward(self, x: t.Tensor) -> t.Tensor:
        '''Compute the forward pass.

        x: shape (batch, in_feats, height, width)

        Return: shape (batch, out_feats, height / first_stride, width / first_stride)
        '''
        leftOutput = self.left.forward(x)
        rightOutput = self.right.forward(x)
        return self.relu.forward(leftOutput + rightOutput)

In [193]:
class BlockGroup(nn.Module):
    def __init__(self, n_blocks: int, in_feats: int, out_feats: int, first_stride=1):
        '''An n_blocks-long sequence of ResidualBlock where only the first block uses the provided stride.'''
        super().__init__()
        self.firstRes = ResidualBlock(in_feats,out_feats,first_stride)
        self.otherRes = nn.ModuleList([ResidualBlock(out_feats, out_feats,1) for i in range(n_blocks - 1)])
        # TODO: convert back to normal list if this doesn't work!
        

    def forward(self, x: t.Tensor) -> t.Tensor:
        '''Compute the forward pass.
        x: shape (batch, in_feats, height, width)

        Return: shape (batch, out_feats, height / first_stride, width / first_stride)
        '''
        result = self.firstRes.forward(x)
        for l in self.otherRes:
            result = l(result)
        
        return result

In [194]:
import pwd
class ResNet34(nn.Module):
    def __init__(
        self,
        n_blocks_per_group=[3, 4, 6, 3],
        out_features_per_group=[64, 128, 256, 512],
        first_strides_per_group=[1, 2, 2, 2],
        n_classes=1000,
    ):
        super().__init__()
        BlockGroups = []
        for i, j in enumerate(n_blocks_per_group):
            n_blocks = j
            if i > 1:
                in_feats = out_features_per_group[i-1]
            else:
                in_feats = 64
            out_feats = out_features_per_group[i]
            first_stride = first_strides_per_group[i]
            BlockGroups.append(BlockGroup(n_blocks, in_feats, out_feats, first_stride))


        self.preBlockGroups = nn.Sequential(
            fromYesterday.Conv2d(3,64,7,2,3),
            BatchNorm2d(64),
            fromYesterday.ReLU(),
            fromYesterday.MaxPool2d(3,2),
        )
        self.layers = nn.Sequential(
            *BlockGroups
        )

        self.postBlockGroups = nn.Sequential(
            AveragePool(),
            fromYesterday.Flatten(),
            fromYesterday.Linear(out_features_per_group[-1], 1000)
        )

    def forward(self, x: t.Tensor) -> t.Tensor:
        '''
        x: shape (batch, channels, height, width)

        Return: shape (batch, n_classes)
        '''
        print("HERE")
        y = self.preBlockGroups(x)
        print("went through pre block groups")
        y = self.layers(y)
        print("went through post block groups")
        y = self.postBlockGroups(y)
        return y


In [195]:
import torchvision
from torchvision.models import resnet34



In [196]:
def copy_weights(myresnet: ResNet34, pretrained_resnet: torchvision.models.resnet.ResNet) -> ResNet34:
    '''Copy over the weights of `pretrained_resnet` to your resnet.'''

    mydict = myresnet.state_dict()
    pretraineddict = pretrained_resnet.state_dict()

    # Check the number of params/buffers is correct
    assert len(mydict) == len(pretraineddict), "Number of layers is wrong. Have you done the prev step correctly?"

    # Initialise an empty dictionary to store the correct key-value pairs
    state_dict_to_load = {}

    for (mykey, myvalue), (pretrainedkey, pretrainedvalue) in zip(mydict.items(), pretraineddict.items()):
        state_dict_to_load[mykey] = pretrainedvalue

    myresnet.load_state_dict(state_dict_to_load)

    return myresnet

#utils.compare_my_resnet_to_pytorch(ResNet34())
officialresnet = resnet34()
myresnet = copy_weights(ResNet34(), officialresnet)

In [197]:
from torchvision import transforms
from pathlib import Path
from PIL import Image 
import torch as t

IMAGE_FILENAMES = [
    "chimpanzee.jpg",
    "golden_retriever.jpg",
    "platypus.jpg",
    "frogs.jpg",
    "fireworks.jpg",
    "astronaut.jpg",
    "iguana.jpg",
    "volcano.jpg",
    "goofy.jpg",
    "dragonfly.jpg",
]

IMAGE_FOLDER = Path("./resnet_inputs")

images = [Image.open(IMAGE_FOLDER / filename) for filename in IMAGE_FILENAMES]
# ImageNet transforms copied from solution:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def prepare_data(images: list[Image.Image]) -> t.Tensor:
    '''
    Return: shape (batch=len(images), num_channels=3, height=224, width=224)
    '''
    return t.stack([transform(i) for i in images], dim = 0)

prepared_images = prepare_data(images)

def predict(model, images):
    logits = model(images)
    return logits.argmax(dim=1)

In [198]:
prepared_images.shape

torch.Size([10, 3, 224, 224])

In [206]:
predict(myresnet, prepared_images)

HERE
<built-in method size of Tensor object at 0x35fefec70>
(10, 3, 112, 112, 7, 7)
went through pre block groups
<built-in method size of Tensor object at 0x2d5eda6d0>
(10, 64, 56, 56, 3, 3)
<built-in method size of Tensor object at 0x35feb2cc0>
(10, 64, 56, 56, 3, 3)
<built-in method size of Tensor object at 0x35feb2cc0>
(10, 64, 56, 56, 3, 3)
<built-in method size of Tensor object at 0x2d5eda180>
(10, 64, 56, 56, 3, 3)
<built-in method size of Tensor object at 0x2d612d180>
(10, 64, 56, 56, 3, 3)
<built-in method size of Tensor object at 0x35fefe8b0>
(10, 64, 56, 56, 3, 3)
<built-in method size of Tensor object at 0x35fefe1d0>
(10, 64, 28, 28, 3, 3)
<built-in method size of Tensor object at 0x35fefe680>
(10, 128, 28, 28, 3, 3)
<built-in method size of Tensor object at 0x35fefe1d0>
(10, 64, 28, 28, 1, 1)
<built-in method size of Tensor object at 0x35fefeea0>
(10, 128, 28, 28, 3, 3)
<built-in method size of Tensor object at 0x2d612dd10>
(10, 128, 28, 28, 3, 3)
<built-in method size of 

tensor([714, 714, 714, 714, 714, 714, 352, 714, 352, 714])

In [200]:
import json
with open("imagenet_labels.json") as f:
    imagenet_labels = list(json.load(f).values())

In [205]:
predict(officialresnet, prepared_images)

tensor([714, 714, 714, 714, 714, 714, 352, 714, 352, 714])

In [202]:
len(prepared_images)

10