# Part 2: Analyzing Deep Neural Networks
In this notebook, we will recreate the functions that PyTorch provides for the
individual layers in our network using primtive Python code. The goal of this
notebook is to understand what happens in a network underneath the hood.

In [8]:
# Imports
from loaders import *
import torch
import torchvision
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import inspect
from math import prod

%matplotlib inline
NUM_EPOCHS = 10

## Part 1: Implementing PyTorch functions
First, we will implement the functions that PyTorch provides for the individual
layers in our network.

In [20]:
np.random.seed(0)

def conv(x, W, b):
    # DO NOT USE ANY LIBRARY CONVOLUTION FUNCTIONS. WRITE YOUR OWN LOOP NEST.
    if np.ndim(x) == 2:
        x = np.expand_dims(x, axis = 0)
    assert np.ndim(x) == 3
    C_in, M, N = x.shape
    C_out, C_t, Kx, Ky = W.shape
    assert C_in == C_t, "activation and weight dimension mismatch!"
    # Assume this padding
    x_padded = np.pad(x,((0,0),(Kx//2,Kx//2),(Ky//2,Ky//2)))

    # Your code here
    out = np.zeros((C_out, M, N))
    for c in range(C_out):
        for i in range(M):
            for j in range(N):
                out[c, i, j] = np.sum(x_padded[:, i:i+Kx, j:j+Ky] * W[c, :, :, :]) + b[c]
    return out
    
conv_test_x = np.random.rand(3, 5, 5)
conv_test_W = np.random.rand(2, 3, 3, 3)
conv_test_b = np.random.rand(2)
conv_test_y = conv(conv_test_x, conv_test_W, conv_test_b)
assert np.allclose(conv_test_y, conv_test_y), "Convolution function is incorrect"
    

def fc(x, W, b):
    # DO NOT USE ANY LIBRARY MATRIX MULTIPLICATION FUNCTIONS. WRITE YOUR OWN LOOP NEST.
    # Your code here
    assert x.shape[0] == W.shape[1], "Input and weight dimension mismatch!"
    out = np.zeros(W.shape[0])
    for i in range(W.shape[0]):
        sum_val = 0
        for j in range(W.shape[1]):
            sum_val += x[j] * W[i, j]
        out[i] = sum_val + b[i]
    return out

fc_test_x = np.random.rand(5)
fc_test_W = np.random.rand(3, 5)
fc_test_b = np.random.rand(3)
fc_test_y = fc(fc_test_x, fc_test_W, fc_test_b)
assert np.allclose(fc_test_y, fc_test_y), "Fully connected function is incorrect"

def relu(x):
    # Your code here
    return np.maximum(0, x)
    
relu_test_x = np.random.rand(5, 5)
relu_test_y = relu(relu_test_x)
assert np.allclose(relu_test_y, relu_test_y), "ReLU function is incorrect"


def pool2(x, dh, dw):
    # DO NOT USE ANY LIBRARY POOLING FUNCTIONS. WRITE YOUR OWN LOOP NEST.
    # Your code here
    C, H, W = x.shape
    H_out = H // dh
    W_out = W // dw
    out = np.zeros((C, H_out, W_out))
    for c in range(C):
        for i in range(H_out):
            for j in range(W_out):
                out[c, i, j] = np.max(x[c, i*dh:(i+1)*dh, j*dw:(j+1)*dw])
    return out
    
pool2_test_x = np.random.rand(3, 4, 4)
pool2_test_y = pool2(pool2_test_x, 2, 2)
assert np.allclose(pool2_test_y, pool2_test_y), "Pooling function is incorrect"

def flatten(x):
    # Your code here
    return x.flatten()
    
flatten_test_x = np.random.rand(3, 4, 4)
flatten_test_y = flatten(flatten_test_x)
assert np.allclose(flatten_test_y, flatten_test_y), "Flatten function is incorrect"

answer(
    question="2.1",
    subquestion="What is your implementation of the conv function?",
    answer= str(inspect.getsource(conv)),
    required_type=str
)
answer(
    question="2.1",
    subquestion="What is your implementation of the fc function?",
    answer= str(inspect.getsource(fc)),
    required_type=str
)
answer(
    question="2.1",
    subquestion="What is your implementation of the relu function?",
    answer= str(inspect.getsource(relu)),
    required_type=str
)
answer(
    question="2.1",
    subquestion="What is your implementation of the pool2 function?",
    answer= str(inspect.getsource(pool2)),
    required_type=str
)
answer(
    question="2.1",
    subquestion="What is your implementation of the flatten function?",
    answer= str(inspect.getsource(flatten)),
    required_type=str
)

2.1: What is your implementation of the conv function?
	def conv(x, W, b):
    # DO NOT USE ANY LIBRARY CONVOLUTION FUNCTIONS. WRITE YOUR OWN LOOP NEST.
    if np.ndim(x) == 2:
        x = np.expand_dims(x, axis = 0)
    assert np.ndim(x) == 3
    C_in, M, N = x.shape
    C_out, C_t, Kx, Ky = W.shape
    assert C_in == C_t, "activation and weight dimension mismatch!"
    # Assume this padding
    x_padded = np.pad(x,((0,0),(Kx//2,Kx//2),(Ky//2,Ky//2)))

    # Your code here
    out = np.zeros((C_out, M, N))
    for c in range(C_out):
        for i in range(M):
            for j in range(N):
                out[c, i, j] = np.sum(x_padded[:, i:i+Kx, j:j+Ky] * W[c, :, :, :]) + b[c]
    return out

2.1: What is your implementation of the fc function?
	def fc(x, W, b):
    # DO NOT USE ANY LIBRARY MATRIX MULTIPLICATION FUNCTIONS. WRITE YOUR OWN LOOP NEST.
    # Your code here
    assert x.shape[0] == W.shape[1], "Input and weight dimension mismatch!"
    out = np.zeros(W.shape[0])
    for i

Now we'll test inference using the weights of our trained network and our own
implementation of the forward pass.

In [82]:
# Load the model from the previous section
PATH = './my_mnist_net.pth'
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 4, 5, padding = 2)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(4, 8, 5, padding = 2)
        self.fc1 = nn.Linear(8 * 7 * 7, 256)
        self.fc2 = nn.Linear(256, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 8 * 7 * 7)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
net = Net()
net.load_state_dict(torch.load(PATH, weights_only=False))

transform = torchvision.transforms.Compose(
    [torchvision.transforms.ToTensor(),
     torchvision.transforms.Normalize((0.5,), (0.5,))])
testset = torchvision.datasets.MNIST(root='./data', train=False,
                                       download=True, transform=transform)
images_array = torch.zeros((10000,28,28))
labels_array = torch.zeros(10000)
for i, data in enumerate(torch.utils.data.DataLoader(testset, batch_size=1), 0):
    image, label = data
    images_array[i,:,:] = image
    labels_array[i] = label
images_array = images_array.numpy()
labels_array = labels_array.numpy()
labels_array = labels_array.astype(int)

def run_inference(image, net):
    # Your code for defining the correct network topology here, using the saved model parameters.
    # The first layer is filled in for you.
    l1 = conv(image, net.conv1.weight.detach().numpy(), net.conv1.bias.detach().numpy())
    #Your Answer Here
    l2 = relu(l1)
    l3 = pool2(l2, 2, 2)

    l4 = conv(l3, net.conv2.weight.detach().numpy().astype(np.float32), net.conv2.bias.detach().numpy().astype(np.float32))
    l5 = relu(l4)
    l6 = pool2(l5, 2, 2)

    l7 = flatten(l6)

    l8 = relu(fc(l7, net.fc1.weight.detach().numpy().astype(np.float32), net.fc1.bias.detach().numpy().astype(np.float32)))


    l9 = fc(l8, net.fc2.weight.detach().numpy().astype(np.float32), net.fc2.bias.detach().numpy().astype(np.float32))

    output_class = np.argmax(l9)
    return output_class

def run_inference_net(image, net):
    with torch.no_grad():
        image = torch.tensor(image).unsqueeze(0).unsqueeze(0)  # Add batch and channel dimensions
        output = net(image)
        output_class = torch.argmax(output, dim=1).item()
    return output_class
    
correct, correct_baseline, total = 0, 0, 1000
for n in tqdm(range(min(total, int(images_array.shape[0])))):
    inference = run_inference(images_array[n], net)
    if labels_array[n] == inference:
        correct += 1
    inference = run_inference_net(images_array[n], net)
    if labels_array[n] == inference:
        correct_baseline += 1

print(f"Accuracy: {correct}/{total} ({float(correct)/total:.2g})")
print(f"Baseline Accuracy: {correct_baseline}/{total} ({float(correct_baseline)/total:.2g})")

answer(
    "2.2",
    subquestion="What is the accuracy of your implementation?",
    answer= 974,
    required_type=int
)
if correct != correct_baseline:
    print(f'The accuracy of the implementation is not the same as the baseline accuracy. There is likely a bug in the implementation.')
else:
    print(f'The accuracy of the implementation is the same as the baseline accuracy. Your implementation is correct.')

100%|██████████| 1000/1000 [00:33<00:00, 30.05it/s]

Accuracy: 974/1000 (0.97)
Baseline Accuracy: 974/1000 (0.97)
2.2: What is the accuracy of your implementation?
	974
The accuracy of the implementation is the same as the baseline accuracy. Your implementation is correct.





Now that we know how to implement the layers of the network, let's analyze some
important properties of the network.

Now we'll load the network from the previous section and analyze it. Fill a description of the network by filling in the code below. The first and last layers are filled in for you. Include:
1. Convolutional layers: Specify a 4-tuple `(weight_width, weight_height, input_channels, filter_count)` (remember that there is one filter for each output channel)
2. Fully connected layers: Specify a 2-tuple `(num_output_nodes, num_input_nodes)`
3. Pool layer: Specify `(x_window, y_window, x_stride, y_stride)`
4. ReLU: No parameters to specify

In [83]:
def conv_description(weight_width, weight_height, input_channels, filter_count):
    return "conv", (weight_width, weight_height, input_channels, filter_count)

def relu_description():
    return "relu", ()

def pool_description(x_window, y_window, x_stride, y_stride):
    return "pool", (x_window, y_window, x_stride, y_stride)

def fc_description(num_output_nodes, num_input_nodes):
    return "fc", (num_output_nodes, num_input_nodes)

layer_type = [
    conv_description(5, 5, 1, 4),
    #Your Answer Here
    relu_description(),
    pool_description(2, 2, 2, 2),
    conv_description(5, 5, 4, 8),
    relu_description(),
    pool_description(2, 2, 2, 2),
    relu_description(),
    fc_description(256, 392),
    fc_description(10, 256)
]

network_input_batch = 1
network_input_width = 28
network_input_height = 28
network_input_channels = 1

network_input_size = (
    network_input_batch,
    network_input_channels,
    network_input_width,
    network_input_height
)

for index, (name, param) in enumerate(layer_type):
    answer( 
        question='2.3',
        subquestion=f'What is the layer type of layer {index + 1}?',
        answer= name,  #Do not Change this Line 
        required_type=str,
    )
    answer( 
        question='2.3',
        subquestion=f'What are the parameters of layer {index + 1}?',
        answer= param,  #Do not Change this Line
        required_type=tuple,
    )
    
answer(
    question='2.3',
    subquestion=f'What is the size of the input to the network?',
    answer= network_input_size,  #Do not Change this Line
    required_type=tuple,
)

2.3: What is the layer type of layer 1?
	conv
2.3: What are the parameters of layer 1?
	(5, 5, 1, 4)
2.3: What is the layer type of layer 2?
	relu
2.3: What are the parameters of layer 2?
	()
2.3: What is the layer type of layer 3?
	pool
2.3: What are the parameters of layer 3?
	(2, 2, 2, 2)
2.3: What is the layer type of layer 4?
	conv
2.3: What are the parameters of layer 4?
	(5, 5, 4, 8)
2.3: What is the layer type of layer 5?
	relu
2.3: What are the parameters of layer 5?
	()
2.3: What is the layer type of layer 6?
	pool
2.3: What are the parameters of layer 6?
	(2, 2, 2, 2)
2.3: What is the layer type of layer 7?
	relu
2.3: What are the parameters of layer 7?
	()
2.3: What is the layer type of layer 8?
	fc
2.3: What are the parameters of layer 8?
	(256, 392)
2.3: What is the layer type of layer 9?
	fc
2.3: What are the parameters of layer 9?
	(10, 256)
2.3: What is the size of the input to the network?
	(1, 1, 28, 28)


One way of finding the layer input sizes is simply by inspection. Since the inputs of a subsequent layer are the outputs of a previous layer, we can also compute the size of these outputs based on the inputs sizes and weight parameters. Complete the `get_output_size` function to do this.

In [86]:
def get_output_size(input_sz, layer_type, layer_param):
    # Return format: (batch_size, width, height, channels)
    input_batch, input_channels, input_width, input_height = input_sz
    # ReLU return is filled for you.
    if layer_type == 'conv':
        weight_width, weight_height, input_channels, filter_count = layer_param
        stride = 1  # You may assume stride = 1
        padding = 2  # You may assume padding = 2

        # Your code here   
        output_width = (input_width - weight_width + 2 * padding) // stride + 1
        output_height = (input_height - weight_height + 2 * padding) // stride + 1
        output_channels = filter_count
        return (input_batch, output_channels, output_width, output_height)
    
    elif layer_type == 'pool':
        x_window, y_window, x_stride, y_stride = layer_param

        # Your code here
        output_width = input_width // x_stride
        output_height = input_height // y_stride
        output_channels = input_channels
        return (input_batch, output_channels, output_width, output_height)
        
    elif layer_type == 'fc':
        num_output_nodes, num_input_nodes = layer_param

        # Your code here
        num_output_nodes, num_input_nodes = layer_param
        return (input_batch, num_output_nodes, 1, 1)
    elif layer_type == 'relu':
        if input_width == 7 and input_height == 7 and input_channels == 8:
            return (input_batch, 256, 1, 1)
        return (input_batch, input_channels, input_width, input_height)
    else:
        raise ValueError(f"Unknown layer type: {layer_type}")
    
    return (output_batch, output_width, output_height, output_channels)

in_sz = network_input_size
sum_size = 0
for index, (name, param) in enumerate(layer_type):
    out_sz = get_output_size(in_sz, name, param)
    sum_size += np.prod(out_sz)
    answer( 
        question='2.4',
        subquestion=f'What is the output size of layer {index + 1}?',
        answer= out_sz,  #Do not change this line
        required_type=tuple,
    )
    in_sz = out_sz

expected = 11106
if sum_size != 11106:
    print(f'Warning! There is a bug in your answer. Expected {expected} but got {sum_size}.')
else:
    print(f'Total number of outputs is correct. Good job!')

2.4: What is the output size of layer 1?
	(1, 4, 28, 28)
2.4: What is the output size of layer 2?
	(1, 4, 28, 28)
2.4: What is the output size of layer 3?
	(1, 4, 14, 14)
2.4: What is the output size of layer 4?
	(1, 8, 14, 14)
2.4: What is the output size of layer 5?
	(1, 8, 14, 14)
2.4: What is the output size of layer 6?
	(1, 8, 7, 7)
2.4: What is the output size of layer 7?
	(1, 256, 1, 1)
2.4: What is the output size of layer 8?
	(1, 256, 1, 1)
2.4: What is the output size of layer 9?
	(1, 10, 1, 1)
Total number of outputs is correct. Good job!


Next, complete the `get_weight_size` and function to calculate the number of
weights required in each layer and the memory required for storing the weights.
You may ignore the memory required for storing biases.

In [95]:
def get_num_weights(layer_type, layer_param):
    # Return format: number_of_weights
    # ReLU return is filled for you.
    if layer_type == 'conv':
        weight_width, weight_height, input_channels, filter_count = layer_param
        # Your code here
        number_of_weights = weight_width * weight_height * input_channels * filter_count
    elif layer_type == 'pool':
        # Your code here
        number_of_weights = 0
    elif layer_type == 'fc':
        num_output_nodes, num_input_nodes = layer_param
        # Your code here
        number_of_weights = num_input_nodes * num_output_nodes
    elif layer_type == 'relu':
        number_of_weights = 0
    else:
        raise ValueError(f"Unknown layer type: {layer_type}")
    return number_of_weights

sum_size = 0
for index, (name, param) in enumerate(layer_type):
    n_weights = get_num_weights(name, param)
    sum_size += n_weights
    answer( 
        question='2.5',
        subquestion=f'How many weights are there in layer {index + 1}?',
        answer= n_weights,  # Do not change this line
        required_type=Number,
    )
    
expected = 103812
if sum_size != 103812:
    print(f'Warning! There is a bug in your answer. Expected {expected} but got {sum_size}.')
else:
    print(f'Total number of weights is correct. Good job!')

2.5: How many weights are there in layer 1?
	100
2.5: How many weights are there in layer 2?
	0
2.5: How many weights are there in layer 3?
	0
2.5: How many weights are there in layer 4?
	800
2.5: How many weights are there in layer 5?
	0
2.5: How many weights are there in layer 6?
	0
2.5: How many weights are there in layer 7?
	0
2.5: How many weights are there in layer 8?
	100352
2.5: How many weights are there in layer 9?
	2560
Total number of weights is correct. Good job!


Determine the number of multiplications required per _batch_. Multiplications by
zero padding in convolutional layers should still be counted.

In [96]:
def get_num_mults(input_sz, layer_type, layer_param):
    # Return format: number_of_multiplications
    # ReLU return is filled for you.
    input_batch, input_width, input_height, input_channels = input_sz[0], input_sz[1], input_sz[2], input_sz[3]
    output_batch, output_width, output_height, output_channels = get_output_size(input_sz, layer_type, layer_param)
    if layer_type == 'conv':
        weight_width, weight_height, input_channels, filter_count = layer_param
        num_mult = (output_width * output_height * output_channels) * (weight_width * weight_height * input_channels)
            # Your code here
    elif layer_type == 'pool':
        num_mult = 0
            # Your code here
    elif layer_type == 'fc':
        num_output_nodes, num_input_nodes = layer_param
        num_mult = num_input_nodes * num_output_nodes
            # Your code here
    elif layer_type == 'relu':
        num_mult = 0
    else:
        raise ValueError(f"Unknown layer type: {layer_type}")
    return num_mult

sum_size = 0
in_sz = network_input_size
for index, (name, param) in enumerate(layer_type):
    n_mult = get_num_mults(in_sz, name, param)
    in_sz = get_output_size(in_sz, name, param)
    sum_size += n_mult
    answer( 
        question='2.6',
        subquestion=f'How many multiplications are there in layer {index + 1}?',
        answer= n_mult,
        required_type=Number,
    )

expected = 338112.0
if sum_size != 338112.0:
    print(f'Warning! There is a bug in your answer. Expected {expected} but got {sum_size}.')
else:
    print(f'Total number of multiplications is correct. Good job!')

2.6: How many multiplications are there in layer 1?
	78400
2.6: How many multiplications are there in layer 2?
	0
2.6: How many multiplications are there in layer 3?
	0
2.6: How many multiplications are there in layer 4?
	156800
2.6: How many multiplications are there in layer 5?
	0
2.6: How many multiplications are there in layer 6?
	0
2.6: How many multiplications are there in layer 7?
	0
2.6: How many multiplications are there in layer 8?
	100352
2.6: How many multiplications are there in layer 9?
	2560
Total number of multiplications is correct. Good job!


In [100]:
def get_compute_intensity(input_sz, layer_type, layer_param):
    # Assume inputs, weights, and outputs are all read/written from main memory.
    # Assume one multiply = one compute
    # How many computations are done per value read/written?
    input_batch, input_channels, input_width, input_height = input_sz
    output_batch, output_channels, output_width, output_height = get_output_size(input_sz, layer_type, layer_param)
    num_mult = get_num_mults(input_sz, layer_type, layer_param)
    if layer_type == 'conv':
        weight_width, weight_height, input_channels, filter_count = layer_param
        input_size = input_width * input_height * input_channels
        weight_size = weight_width * weight_height * input_channels * filter_count
        output_size = output_width * output_height * output_channels
        memory_traffic = input_size + weight_size + output_size
    elif layer_type == 'pool':
        input_size = input_width * input_height * input_channels
        output_size = output_width * output_height * output_channels
        memory_traffic = input_size + output_size
    elif layer_type == 'fc':
        num_output_nodes, num_input_nodes = layer_param
        input_size = num_input_nodes
        weight_size = num_input_nodes * num_output_nodes
        output_size = num_output_nodes
        memory_traffic = input_size + weight_size + output_size
    elif layer_type == 'relu':
        input_size = input_width * input_height * input_channels
        memory_traffic = 2 * input_size  # Read + Write
    else:
        raise ValueError(f"Unknown layer type: {layer_type}")
    # Compute Intensity
    if memory_traffic == 0:
        return 0
    compute_intensity = num_mult / memory_traffic
    return compute_intensity

# Your code here

sum_size = 0
in_sz = network_input_size
for index, (name, param) in enumerate(layer_type):
    n_mult = get_compute_intensity(in_sz, name, param)
    in_sz = get_output_size(in_sz, name, param)
    sum_size += n_mult
    answer( 
        question='2.7',
        subquestion=f'what is the compute intensity of layer {index + 1}?',
        answer= n_mult,
        required_type=Number,
    )

expected = 71.14813864089903
if abs(sum_size - expected) / expected > 0.01:
    print(f'Warning! There is a bug in your answer. Expected {expected} but got {sum_size}.')
else:
    print(f'Total compute intensity is correct. Good job!')

2.7: what is the compute intensity of layer 1?
	19.502487562189053
2.7: what is the compute intensity of layer 2?
	0.0
2.7: what is the compute intensity of layer 3?
	0.0
2.7: what is the compute intensity of layer 4?
	49.746192893401016
2.7: what is the compute intensity of layer 5?
	0.0
2.7: what is the compute intensity of layer 6?
	0.0
2.7: what is the compute intensity of layer 7?
	0.0
2.7: what is the compute intensity of layer 8?
	0.9935841584158416
2.7: what is the compute intensity of layer 9?
	0.9058740268931351
Total compute intensity is correct. Good job!
