In [1]:
import numpy as np

In [2]:
# Refer to this link for implementation of subclassing ndarray
# https://numpy.org/doc/stable/user/basics.subclassing.html#slightly-more-realistic-example-attribute-added-to-existing-array

class adarray(np.ndarray):
    def __new__(cls, input_array, requires_grad=True):
        obj = np.asarray(input_array).view(cls)
        obj.requires_grad = requires_grad
        return obj

    def __array_finalize__(self, obj=None):
        if obj is None: return
        self.requires_grad = getattr(obj, "requires_grad", None)
        self.grad = np.zeros_like(self.__array__())
        
    def zero_grad(self):
        self.grad = np.zeros_like(self.__array__())

In [3]:
class NnOps:
    """
    Implements AD for certain neural network operations matmul, add, relu, sigmoid, mse, bce
    Each forward function has signature func(cls, w, x) where w is a static parameter like
    a parameter matrix or target values y, and x is the input from a previous operation.

    Forward operations record their inputs and function on the tape. During the backward()
    call, the backward function is computed for that given operation and its inputs.

    Backward operations update the gradient of w if w.requires_grad = True
    """
    tape = []
    
    @classmethod
    def matmul(cls, A, x):
        cls.tape.append((A, x, "matmul"))
        return A @ x
    
    @classmethod
    def add(cls, b, x):
        cls.tape.append((b, x, "add"))
        return b + x
    
    @classmethod
    def relu(cls, x):
        cls.tape.append((None, x, "relu"))
        return np.maximum(x, 0)
    
    @classmethod
    def sigmoid(cls, x):
        cls.tape.append((None, x, "sigmoid"))
        return 1 / (1 + np.exp(-x))
        
    @classmethod
    def mse(cls, y, y_hat):
        cls.tape.append((y, y_hat, "mse"))
        return 0.5 * np.sum((y - y_hat)**2)
    
    # TODO: implement Binary Cross Entropy

    @staticmethod
    def _matmul_backward(A, x, delta_out):
        grad_A = np.outer(delta_out, x)
        delta_in = A.T @ delta_out
        if A.requires_grad:
            A.grad += grad_A
        return delta_in

    @staticmethod
    def _add_backward(b, x, delta_out):
        grad_b = np.ones_like(b) * delta_out
        delta_in = np.ones_like(x) * delta_out
        if b.requires_grad:
            b.grad += grad_b
        return delta_in
    
    @staticmethod
    def _relu_backward(x, delta_out):
        grad = np.ones_like(x)
        grad[x <= 0] = 0
        delta_in = grad * delta_out
        return delta_in

    @staticmethod
    def _sigmoid_backward(x, delta_out):
        sigma = 1 / (1 + np.exp(-x))
        delta_in = sigma * (1 - sigma) * delta_out
        return delta_in
    
    @staticmethod
    def _mse_backward(y, y_hat, delta_out=1):
        return delta_out * (y_hat - y)
    
    # TODO: Impement BCE backwards

    @classmethod
    def _op_backward(cls, w, x, op, delta_out):
        if op == "matmul":
            return cls._matmul_backward(w, x, delta_out)
        elif op == "add":
            return cls._add_backward(w, x, delta_out)
        elif op == "relu":
            return cls._relu_backward(x, delta_out)
        elif op == "sigmoid":
            return cls._sigmoid_backward(x, delta_out)
        elif op == "mse":
            return cls._mse_backward(w, x)
        else:
            raise ValueError("Op must be one of ['matmul', 'add', 'relu', 'mse', 'sigmoid']")

    @classmethod
    def backward(cls):
        delta = 1
        for item in reversed(cls.tape):
            delta = cls._op_backward(*item, delta)
        # Reset the tape
        cls.tape = []

In [4]:
class Dense:
    """
    Simple Dense layer y = h(A @ x + b)
    Supported activations are 'relu' and 'sigmoid'
    """
    def __init__(self, in_features, out_features, activation='relu', bias=True):
        self.A = adarray(np.random.randn(out_features, in_features))
        if bias:
            self.b = adarray(np.random.randn(out_features))
        else:
            # Empty bias in case we don't want to use it
            self.b = adarray(np.zeros(out_features), requires_grad=False)
        if activation == 'relu':
            self.activation = NnOps.relu
        elif activation == 'sigmoid':
            self.activation = NnOps.sigmoid
        else:
            self.activation = None
    
    def forward(self, x):
        x = NnOps.matmul(self.A, x)
        x = NnOps.add(self.b, x)
        if self.activation:
            x = self.activation(x)
        return x

In [5]:
class Model:
    """Model with 3 fully connected layers"""
    def __init__(self):
        self.fc1 = Dense(2, 10)
        self.fc2 = Dense(10, 10)
        self.fc3 = Dense(10, 1, activation=None)
    
    def forward(self, x):
        x = self.fc1.forward(x)
        x = self.fc2.forward(x)
        x = self.fc3.forward(x)
        return x

In [7]:
x = np.random.randn(2)
y = np.random.randn(1)

net = Model()
NnOps.tape = []

In [8]:
y_hat = net.forward(x)
loss = NnOps.mse(y, y_hat)

In [9]:
NnOps.tape

[(adarray([[ 0.89684423,  1.61408643],
           [ 0.68717215,  0.19970963],
           [ 0.46380819,  0.17467749],
           [-0.04398915,  1.54741357],
           [ 0.58611085,  0.56754047],
           [-0.35836985,  1.14215218],
           [-1.47439481,  0.5710566 ],
           [-1.20413585,  1.47673708],
           [-0.75689184,  1.47745627],
           [-1.29848196, -0.22987494]]),
  array([-0.69257751, -0.86886552]),
  'matmul'),
 (adarray([-0.16031875, -1.21150219, -1.78456731,  0.77551992,  1.33689699,
            1.20708215,  1.28505599, -0.81260678, -0.20667334,  1.59891552]),
  adarray([-2.02355819, -0.64944079, -0.47299437, -1.3140284 , -0.89904353,
           -0.74417775,  0.5249613 , -0.44912853, -0.75950455,  1.09902981]),
  'add'),
 (None,
  adarray([-2.18387694, -1.86094298, -2.25756168, -0.53850849,  0.43785345,
            0.4629044 ,  1.81001729, -1.26173531, -0.96617789,  2.69794534]),
  'relu'),
 (adarray([[ 0.5517421 , -0.88792723,  0.07990686, -0.59441351,
   

In [10]:
NnOps.backward()

In [11]:
net.fc1.A.grad

array([[ 0.        ,  0.        ],
       [ 0.        ,  0.        ],
       [ 0.        ,  0.        ],
       [ 0.        ,  0.        ],
       [-1.84581915, -2.31565218],
       [ 3.28546842,  4.12174839],
       [-0.45822958, -0.5748669 ],
       [ 0.        ,  0.        ],
       [ 0.        ,  0.        ],
       [ 0.49429953,  0.62011806]])

# Compare with Torch

In [19]:
import torch
import numpy as np
import torch.nn.functional as F

In [20]:
x = np.random.randn(2)

A = np.random.randn(10, 2)
b = np.random.randn(10)

C = np.random.randn(10, 10)
d = np.random.randn(10)

E = np.random.randn(1, 10)
f = np.random.randn(1)

y = np.random.randn(1)

def mse_torch(y_hat, y):
    return 0.5 * torch.sum((y_hat - y) ** 2)

def mse_numpy(y_hat, y):
    return 0.5 * np.sum((y_hat - y) ** 2)

In [21]:
At = torch.from_numpy(A)
At.requires_grad = True
bt = torch.from_numpy(b)
bt.requires_grad = True
Ct = torch.from_numpy(C)
Ct.requires_grad = True
dt = torch.from_numpy(d)
dt.requires_grad = True
Et = torch.from_numpy(E)
Et.requires_grad = True
ft=torch.from_numpy(f)
ft.requires_grad = True

xt = torch.from_numpy(x)
yt = torch.from_numpy(y)

In [22]:
def model_torch(xt):
    yt = At @ xt + bt
    yt = F.relu(yt)
    yt = Ct @ yt + dt
    yt = F.relu(yt)
    yt = Et @ yt + ft
    return yt

In [23]:
loss = mse_torch(model_torch(xt), yt)

In [24]:
loss.backward()
loss

tensor(9.1852, dtype=torch.float64, grad_fn=<MulBackward0>)

In [25]:
At.grad

tensor([[-1.9969e-02,  4.1153e+00],
        [-0.0000e+00,  0.0000e+00],
        [-0.0000e+00,  0.0000e+00],
        [-9.7304e-03,  2.0053e+00],
        [-1.7851e-02,  3.6789e+00],
        [ 8.3460e-03, -1.7200e+00],
        [-0.0000e+00,  0.0000e+00],
        [ 1.2122e-02, -2.4982e+00],
        [ 2.8846e-03, -5.9448e-01],
        [-0.0000e+00,  0.0000e+00]], dtype=torch.float64)

# Our implementation

In [26]:
class Model:
    def __init__(self, A, b, C, d, E, f):
        self.fc1 = Dense(2, 10)
        self.fc1.A = adarray(A)
        self.fc1.b = adarray(b)
        
        self.fc2 = Dense(10, 10)
        self.fc2.A = adarray(C)
        self.fc2.b = adarray(d)

        self.fc3 = Dense(10, 1, activation=None)
        self.fc3.A = adarray(E)
        self.fc3.b = adarray(f)
    
    def forward(self, x):
        x = self.fc1.forward(x)
        x = self.fc2.forward(x)
        x = self.fc3.forward(x)
        return x

In [30]:
net = Model(A, b, C, d, E, f)
NnOps.tape = []

In [31]:
y_hat = net.forward(x)
loss = NnOps.mse(y, y_hat)
NnOps.backward()

In [32]:
net.fc1.A.grad, At.grad

(array([[-1.99685085e-02,  4.11526842e+00],
        [ 0.00000000e+00,  0.00000000e+00],
        [ 0.00000000e+00,  0.00000000e+00],
        [-9.73041804e-03,  2.00532164e+00],
        [-1.78512891e-02,  3.67893507e+00],
        [ 8.34603404e-03, -1.72001681e+00],
        [ 0.00000000e+00,  0.00000000e+00],
        [ 1.21220679e-02, -2.49821178e+00],
        [ 2.88458191e-03, -5.94477492e-01],
        [ 0.00000000e+00,  0.00000000e+00]]),
 tensor([[-1.9969e-02,  4.1153e+00],
         [-0.0000e+00,  0.0000e+00],
         [-0.0000e+00,  0.0000e+00],
         [-9.7304e-03,  2.0053e+00],
         [-1.7851e-02,  3.6789e+00],
         [ 8.3460e-03, -1.7200e+00],
         [-0.0000e+00,  0.0000e+00],
         [ 1.2122e-02, -2.4982e+00],
         [ 2.8846e-03, -5.9448e-01],
         [-0.0000e+00,  0.0000e+00]], dtype=torch.float64))