In [5]:
# functional.py
""" Should we keep all functional stuff here includding act/loss ...???"""
import numpy as np
from math import ceil, floor
as_strided = np.lib.stride_tricks.as_strided
import torch
from torch import nn
import torch.nn.functional as F
# from torch.autograd import grad

In [2]:
def affTrans(Z, W, B=0): return Z.dot(W.T) + B # W: (outF,inF)
def affTransP(TopGrad, Z, W):
    BGrad = TopGrad.sum(axis=0)
    WGrad = TopGrad.T.dot(Z)
    Zgrad = TopGrad.dot(W)
    return Zgrad, WGrad, BGrad

class Layer:
    """ All layers Only acccept batched input: NCHW"""
    def __call__(self, x): return self.forward(x)
    def __repr__(self): return f"{self.layers_name}(Z)"
    def forward(self, input): raise NotImplementedError
    def backward(self, TopGrad): raise NotImplementedError

class Linear(Layer):
    def __init__(self, inF, outF, bias=True):
        self.layers_name = self.__class__.__name__
        self.trainable = True
        lim = 1 / np.sqrt(inF) # Only inF used to calculate the limit, avoid saturation..
        self.w  = np.random.uniform(-lim, lim, (outF, inF)) # torch style (outF, inF)
        self.b = np.random.randn(outF) * 0.1 if bias else None
        self.params = (self.w, self.b)
        self.inShape, self.outShape = (inF,), (outF,)

    def forward(self, z):
        self.z = z
        return affTrans(self.z, self.w, self.b) # [MBS,inF][outF,inF].T -> [MBS,outF]

    def backward(self, TopGrad):
        self.zGrad, self.wGrad, self.bGrad = affTransP(TopGrad, self.z, self.w)
        return self.zGrad


In [29]:
# Test function
def test_linear_layer(inF, outF, batch_size):
    # Create random input and top gradient
    Z = np.random.randn(batch_size, inF)
    TopGrad = np.random.randn(batch_size, outF)

    # Custom Linear Layer
    custom_layer = Linear(inF, outF, bias=True)
    custom_output = custom_layer.forward(Z)
    custom_layer.backward(TopGrad)

    # PyTorch Linear Layer
    torch_layer = torch.nn.Linear(inF, outF, bias=True)
    torch_layer.weight.data = torch.tensor(custom_layer.w, dtype=torch.float32)
    torch_layer.bias.data = torch.tensor(custom_layer.b, dtype=torch.float32)

    Z_torch = torch.tensor(Z, dtype=torch.float32)
    output_torch = torch_layer(Z_torch)

    # Create a tensor for TopGrad and perform backpropagation
    TopGrad_torch = torch.tensor(TopGrad, dtype=torch.float32)
    output_torch.backward(TopGrad_torch)

    # Get gradients
    torch_w_grad = torch_layer.weight.grad.numpy()
    torch_b_grad = torch_layer.bias.grad.numpy()

    # Compare gradients
    print("Custom Layer Weights Gradient:")
    print(custom_layer.wGrad)
    print("PyTorch Weights Gradient:")
    print(torch_w_grad)

    print("\nCustom Layer Bias Gradient:")
    print(custom_layer.bGrad)
    print("PyTorch Bias Gradient:")
    print(torch_b_grad)

    # Check if gradients are close
    assert np.allclose(custom_layer.wGrad, torch_w_grad, atol=1e-5), "Weight gradients do not match!"
    assert np.allclose(custom_layer.bGrad, torch_b_grad, atol=1e-5), "Bias gradients do not match!"
    print("Gradients match!")

# Run the test
test_linear_layer(inF=5, outF=3, batch_size=10)

Custom Layer Weights Gradient:
[[ 1.83288932  1.95969429  2.24857789 -1.30090079  5.02126125]
 [ 4.35974098 -4.12743523 -4.71533437  3.9867369  -2.46517288]
 [ 3.88826194 -3.4610227  -2.09401212  1.25815586  2.61342896]]
PyTorch Weights Gradient:
[[ 1.8328893  1.9596944  2.248578  -1.3009008  5.021261 ]
 [ 4.359741  -4.1274357 -4.715334   3.986737  -2.4651728]
 [ 3.8882618 -3.4610229 -2.094012   1.258156   2.6134288]]

Custom Layer Bias Gradient:
[ 0.32418518 -6.70843461  3.43833086]
PyTorch Bias Gradient:
[ 0.32418537 -6.708435    3.4383307 ]
Gradients match!


In [32]:
def batch_norm_forward(x, gamma, beta, eps=1e-5):
    """ Forward pass for batch normalization. """
    # Compute mean and variance for the batch
    batch_mean = x.mean(axis=0)
    batch_var = x.var(axis=0)

    # Normalize the batch
    x_normalized = (x - batch_mean) / np.sqrt(batch_var + eps)

    # Scale and shift
    out = gamma * x_normalized + beta

    # Store for backward pass
    cache = (x, x_normalized, batch_mean, batch_var, gamma, beta, eps)
    return out, cache

def batch_norm_backward(d_out, cache):
    """ Backward pass for batch normalization. """
    x, x_normalized, batch_mean, batch_var, gamma, beta, eps = cache
    N, D = x.shape

    # Gradient of beta and gamma
    dbeta = d_out.sum(axis=0)
    dgamma = (d_out * x_normalized).sum(axis=0)

    # Gradient of normalized input
    d_x_normalized = d_out * gamma

    # Gradient of variance and mean
    d_var = (d_x_normalized * (x - batch_mean) * -0.5 * (batch_var + eps)**(-1.5)).sum(axis=0)
    d_mean = d_x_normalized.sum(axis=0) * -1 / np.sqrt(batch_var + eps) + d_var * -2 * (x - batch_mean).mean(axis=0)

    # Gradient of input
    dx = d_x_normalized / np.sqrt(batch_var + eps) + d_var * 2 * (x - batch_mean) / N + d_mean / N

    return dx, dgamma, dbeta

class BatchNorm(Layer):
    def __init__(self, num_features, momentum=0.9, eps=1e-5):
        self.layers_name = self.__class__.__name__
        self.trainable = True

        # Initialize parameters
        self.gamma = np.ones(num_features)  # Scale
        self.beta = np.zeros(num_features)   # Shift

        # Running averages for inference
        self.running_mean = np.zeros(num_features)
        self.running_var = np.ones(num_features)
        self.momentum = momentum
        self.eps = eps
        self.training = True  # Flag to switch between training and inference mode

    def forward(self, x):
        if self.training:
            out, self.cache = batch_norm_forward(x, self.gamma, self.beta, self.eps)
            # Update running mean and variance
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * x.mean(axis=0)
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * x.var(axis=0)
        else:
            # During inference, use running mean and variance
            x_normalized = (x - self.running_mean) / np.sqrt(self.running_var + self.eps)
            out = self.gamma * x_normalized + self.beta
        return out

    def backward(self, TopGrad):
        return batch_norm_backward(TopGrad, self.cache)

In [None]:
### Written for the test

# The BatchNorm class implementation as provided
def batch_norm_forward(x, gamma, beta, eps=1e-5):
    """ Forward pass for batch normalization. """
    # Compute mean and variance for the batch
    batch_mean = x.mean(axis=0)
    batch_var = x.var(axis=0)

    # Normalize the batch
    x_normalized = (x - batch_mean) / np.sqrt(batch_var + eps)

    # Scale and shift
    out = gamma * x_normalized + beta

    # Store for backward pass
    cache = (x, x_normalized, batch_mean, batch_var, gamma, beta, eps)
    return out, cache

def batch_norm_backward(d_out, cache):
    """ Backward pass for batch normalization. """
    x, x_normalized, batch_mean, batch_var, gamma, beta, eps = cache
    N, D = x.shape

    # Gradient of beta and gamma
    dbeta = d_out.sum(axis=0)
    dgamma = (d_out * x_normalized).sum(axis=0)

    # Gradient of normalized input
    d_x_normalized = d_out * gamma

    # Gradient of variance and mean
    d_var = (d_x_normalized * (x - batch_mean) * -0.5 * (batch_var + eps)**(-1.5)).sum(axis=0)
    d_mean = d_x_normalized.sum(axis=0) * -1 / np.sqrt(batch_var + eps) + d_var * -2 * (x - batch_mean).mean(axis=0)

    # Gradient of input
    dx = d_x_normalized / np.sqrt(batch_var + eps) + d_var * 2 * (x - batch_mean) / N + d_mean / N

    return dx, dgamma, dbeta

class Layer:
    def __call__(self, x):
        return self.forward(x)

    def __repr__(self):
        return f"{self.layers_name}(Z)"

    def forward(self, input):
        raise NotImplementedError

    def backward(self, TopGrad):
        raise NotImplementedError

class BatchNorm(Layer):
    def __init__(self, num_features, momentum=0.9, eps=1e-5):
        self.layers_name = self.__class__.__name__
        self.trainable = True

        # Initialize parameters
        self.gamma = np.ones(num_features)  # Scale
        self.beta = np.zeros(num_features)   # Shift

        # Running averages for inference
        self.running_mean = np.zeros(num_features)
        self.running_var = np.ones(num_features)
        self.momentum = momentum
        self.eps = eps
        self.training = True  # Flag to switch between training and inference mode

    def forward(self, x):
        if self.training:
            out, self.cache = batch_norm_forward(x, self.gamma, self.beta, self.eps)
            # Update running mean and variance
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * x.mean(axis=0)
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * x.var(axis=0)
        else:
            # During inference, use running mean and variance
            x_normalized = (x - self.running_mean) / np.sqrt(self.running_var + self.eps)
            out = self.gamma * x_normalized + self.beta
        return out

    def backward(self, TopGrad):
        return batch_norm_backward(TopGrad, self.cache)


In [55]:
BN = torch.nn.BatchNorm1d(num_features)
BN.gamma.data.shape, BN.beta.data.shape

BatchNorm1d(5, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)

In [54]:
# Run the test
num_features, batch_size = 5, 10

# Create random input and top gradient
x = np.random.randn(batch_size, num_features)
TopGrad = np.random.randn(batch_size, num_features)

# Custom BatchNorm Layer
custom_bn = BatchNorm(num_features)
custom_bn.training = True  # Set to training mode
custom_output = custom_bn.forward(x)
custom_grad = custom_bn.backward(TopGrad)

# PyTorch BatchNorm Layer
torch_bn = torch.nn.BatchNorm1d(num_features)
torch_bn.weight.data = torch.tensor(custom_bn.gamma, dtype=torch.float32)
torch_bn.bias.data = torch.tensor(custom_bn.beta, dtype=torch.float32)
torch_bn.running_mean = torch.tensor(custom_bn.running_mean, dtype=torch.float32)
torch_bn.running_var = torch.tensor(custom_bn.running_var, dtype=torch.float32)

x_torch = torch.tensor(x, dtype=torch.float32)
TopGrad_torch = torch.tensor(TopGrad, dtype=torch.float32)

# Forward and backward pass in PyTorch
torch_output = torch_bn(x_torch)
torch_output.backward(TopGrad_torch)

# Get gradients
torch_gamma_grad = torch_bn.weight.grad.numpy()
torch_beta_grad = torch_bn.bias.grad.numpy()

In [None]:
# Compare outputs and gradients
print("Custom BatchNorm Output:")
print(custom_output)  # output from custom implementation

# print("\nCustom BatchNorm Gradients:")
# print(custom_grad)
# print("PyTorch BatchNorm Gradients (input):")
# print(torch_bn.weight.grad.numpy())  # gradients from PyTorch for weights

# # Check if outputs are close
# assert np.allclose(custom_output[0], torch_output.detach().numpy(), atol=1e-5), "Outputs do not match!"
# # Check if gradients are close
# assert np.allclose(custom_grad, torch_bn.weight.grad.numpy(), atol=1e-5), "Input gradients do not match!"
# print("Outputs and gradients match!")


### Sepearator


In [48]:
num_features, batch_size=5, 10
# Create random input and top gradient
x = np.random.randn(batch_size, num_features)
TopGrad = np.random.randn(batch_size, num_features)

# Custom BatchNorm Layer
custom_layer = BatchNorm(num_features)
custom_output, custom_cache = batch_norm_forward(x, custom_layer.gamma, custom_layer.beta)
custom_dx, custom_dgamma, custom_dbeta = batch_norm_backward(TopGrad, custom_cache)

# PyTorch BatchNorm Layer
torch_layer = torch.nn.BatchNorm1d(num_features, affine=True)
torch_layer.weight.data = torch.tensor(custom_layer.gamma, dtype=torch.float32)
torch_layer.bias.data = torch.tensor(custom_layer.beta, dtype=torch.float32)

x_torch = torch.tensor(x, dtype=torch.float32)
output_torch = torch_layer(x_torch)

# Create a tensor for TopGrad and perform backpropagation
TopGrad_torch = torch.tensor(TopGrad, dtype=torch.float32)
output_torch.backward(TopGrad_torch)

# # Get gradients
# torch_dx = x_torch.grad.numpy()
# torch_dgamma = torch_layer.weight.grad.numpy()
# torch_dbeta = torch_layer.bias.grad.numpy()

# # Compare outputs
# print("Custom Layer Output:")
# print(custom_output)
# print("PyTorch Output:")
# print(output_torch.detach().numpy())

# # Compare gradients
# print("\nCustom Layer Gradients:")
# print("dx:", custom_dx)
# print("dgamma:", custom_dgamma)
# print("dbeta:", custom_dbeta)

# print("\nPyTorch Gradients:")
# print("dx:", torch_dx)
# print("dgamma:", torch_dgamma)
# print("dbeta:", torch_dbeta)

# # Check if outputs and gradients are close
# assert np.allclose(custom_output, output_torch.detach().numpy(), atol=1e-5), "Outputs do not match!"
# assert np.allclose(custom_dx, torch_dx, atol=1e-5), "dx gradients do not match!"
# assert np.allclose(custom_dgamma, torch_dgamma, atol=1e-5), "dgamma gradients do not match!"
# assert np.allclose(custom_dbeta, torch_dbeta, atol=1e-5), "dbeta gradients do not match!"
# print("Outputs and gradients match!")


True

In [None]:
# Test function
def test_linear_layer(inF, outF, batch_size):
    # Create random input and top gradient
    Z = np.random.randn(batch_size, outF)
    TopGrad = np.random.randn(batch_size, outF)

    # Custom Linear Layer
    custom_layer = BatchNorm(outF)
    custom_output = custom_layer.forward(Z)
    custom_layer.backward(TopGrad)

    # PyTorch Linear Layer
    torch_layer = torch.nn.Linear(inF, outF, bias=True)
    torch_layer.weight.data = torch.tensor(custom_layer.w, dtype=torch.float32)
    torch_layer.bias.data = torch.tensor(custom_layer.b, dtype=torch.float32)

    Z_torch = torch.tensor(Z, dtype=torch.float32)
    output_torch = torch_layer(Z_torch)

    # Create a tensor for TopGrad and perform backpropagation
    TopGrad_torch = torch.tensor(TopGrad, dtype=torch.float32)
    output_torch.backward(TopGrad_torch)

    # Get gradients
    torch_w_grad = torch_layer.weight.grad.numpy()
    torch_b_grad = torch_layer.bias.grad.numpy()

    # Compare gradients
    print("Custom Layer Weights Gradient:")
    print(custom_layer.wGrad)
    print("PyTorch Weights Gradient:")
    print(torch_w_grad)

    print("\nCustom Layer Bias Gradient:")
    print(custom_layer.bGrad)
    print("PyTorch Bias Gradient:")
    print(torch_b_grad)

    # Check if gradients are close
    assert np.allclose(custom_layer.wGrad, torch_w_grad, atol=1e-5), "Weight gradients do not match!"
    assert np.allclose(custom_layer.bGrad, torch_b_grad, atol=1e-5), "Bias gradients do not match!"
    print("Gradients match!")

# Run the test
test_linear_layer(inF=5, outF=3, batch_size=10)