In [93]:
from copy import deepcopy
import numpy as np 
from sklearn.datasets import make_classification

from templates import AutoDiffFunction, Layer, Loss, Optimizer

# Defining the activation functions

In [94]:
class Sigmoid(AutoDiffFunction):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, x):
        self.saved_for_backward = 1/(1 + np.exp(-x))
        return self.saved_for_backward

    def compute_grad(self, x):
        y = self.saved_for_backward

        return {"x": y*(1-y)}

    def backward(self, dy):
        return dy * self.grad["x"]      


class RelU(AutoDiffFunction):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, x):
        self.saved_for_backward = np.where(x>0.0, 1.0, 0.0)

        return x * self.saved_for_backward

    def compute_grad(self, x):
        return {"x": self.saved_for_backward}

    def backward(self, dy):
        return dy * self.grad["x"]
     
class Softmax(AutoDiffFunction):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, x):
        v = np.exp(x)
        self.saved_for_backward = v

        return v / np.sum(v, axis=1, keepdims=True)

    def compute_grad(self, x):
        pass

    def backward(self, dy):
        return dy * self.grad["x"]


# Defining the layers

In [95]:
class FC(Layer):
    def __init__(self, in_dim, out_dim, weight_decay=None, init_method="random") -> None:
        super().__init__()
        self.weight_decay = weight_decay
        self.init_method = init_method
        self.initialize_weights(in_dim, out_dim)

    def initialize_weights(self, in_dim, out_dim):
        
        if self.init_method == "random":
            scaling_factor = 1/np.sqrt(in_dim)
            self.weights["w"] = np.random.randn(in_dim, out_dim) * scaling_factor
            self.weights["b"] = np.random.randn(1, out_dim) * scaling_factor
        elif self.init_method == "xavier":
            lim = np.sqrt(6 / (in_dim + out_dim))
            self.weights["w"] = np.random.uniform(low=-lim, high=lim, size=(in_dim, out_dim))
            self.weights["b"] = np.random.uniform(low=-lim, high=lim, size=(1, out_dim))

    def compute_grad(self, x):
        
        gradients = {}

        # y = x * w + b        
        # we compute gradients wrt w and x 
        # gradient wrt b is not required explicitly since we know that it's value is 1
        gradients["w"] = self.saved_for_backward["x"].T
        gradients["x"] = self.weights["w"].T

        return gradients


    def forward(self, x):
        output = x @ self.weights["w"] + self.weights["b"]
        self.saved_for_backward["x"] = x
        
        return output

    def backward(self, dy):
        
        # calculating gradients wrt input to pass on to previous layer for backprop
        dx = dy @ self.grad["x"]
        
        # calculating gradients wrt weights
        dw = self.grad["w"] @ dy
        db = np.sum(dy, axis=0, keepdims=True)

        # accomodating for weight_decay / regularization
        if self.weight_decay:
            dw = dw + 2 * self.weight_decay * self.weights["w"]
            db = db + 2 * self.weight_decay * self.weights["b"]

        self.absolute_gradients = {"w": dw, "b": db}

        return dx

    def update_weights(self):
        self.optimizer.step(self)

# Defining the loss function

### For this particular problem, we require CrossEntropy Loss for classification

In [96]:
class CrossEntropyLossFromLogits(Loss):
    def __init__(self) -> None:
        super().__init__()
        self.n_classes = None

    @staticmethod
    def softmax(x):
        v = np.exp(x)
        return v / np.sum(v, axis=1, keepdims=True)

    def encode(self, y): 
        encoded_y = np.zeros(shape=(len(y), self.n_classes))

        for i in range(len(y)):
            encoded_y[i,y[i]] = 1

        return encoded_y

    def forward(self, y_pred, y_true):
         
        probabilities = self.softmax(y_pred)
        y_true_encoded = self.encode(y_true)

        loss_value = np.mean(np.sum(- y_true_encoded * np.log(probabilities), axis=1))

        self.saved_for_backward["probabilities"] = probabilities
        self.saved_for_backward["y_true"] = y_true_encoded

        return loss_value

    def compute_grad(self, y_pred, y_true):

        return {"x": self.saved_for_backward["probabilities"] - self.saved_for_backward["y_true"]}        


class MSELossFromLogits(Loss):
    def __init__(self) -> None:
        super().__init__()
        self.n_classes = None

    @staticmethod
    def softmax(x):
        v = np.exp(x)

        return v / np.sum(v, axis=1, keepdims=True)

    def encode(self, y): 
        encoded_y = np.zeros(shape=(len(y), self.n_classes))

        for i in range(len(y)):
            encoded_y[i,y[i]] = 1

        return encoded_y
    
    @staticmethod
    def indicator(i, j):
        ind = {True: 1, False: 0}
        return ind[i==j]

    def forward(self, y_pred, y_true):
         
        probabilities = self.softmax(y_pred)
        y_true_encoded = self.encode(y_true)

        loss_value = np.mean(np.sum((probabilities - y_true_encoded)**2, axis=1))

        self.saved_for_backward["probabilities"] = probabilities
        self.saved_for_backward["y_true"] = y_true_encoded

        return loss_value

    def compute_grad(self, y_pred, y_true):

        probs = self.saved_for_backward["probabilities"]
        labels = self.saved_for_backward["y_true"]
        grad = np.zeros(shape=(len(y_true), self.n_classes))
        
        for point_counter in range(len(y_true)):
            res = 0
            for i in range(self.n_classes):
                for j in range(self.n_classes):
                    
                    res = probs[point_counter, j] * (probs[point_counter, j] - labels[point_counter, j]) * (self.indicator(i,j) - probs[point_counter, i])
                
                grad[point_counter, i] = res
        
        return {"x": grad}

# Creating an optimizer for the loss

In [97]:
class SGD(Optimizer):
    def __init__(self, lr=1e-3):
        super().__init__()
        self.lr = lr

    def step(self, layer):

        for weight_name, _ in layer.weights.items():
            layer.weights[weight_name] = layer.weights[weight_name] - self.lr * layer.absolute_gradients[weight_name]

class Nadam(Optimizer):
    def __init__(self, lr=0.002, beta_1=0.9, beta_2=0.999, epsilon=1e-7):
        super().__init__()
        self.lr = lr
        self.beta_1 = beta_1
        self.beta_2 = beta_2
        self.epsilon = epsilon
        self.t = 1

    def step(self, layer):
        
        # we have 2 parameters to remember m(t) and v(t) for all weights in the layer
        if self.remember == {}:
            for weight_name, weight in layer.weights.items():
                self.remember[weight_name] = {}
                self.remember[weight_name]["v"] = np.zeros_like(weight)
                self.remember[weight_name]["m"] = np.zeros_like(weight)

        for weight_name, weight in layer.weights.items():
            
            self.remember[weight_name]["m"] = self.beta_1 * self.remember[weight_name]["m"] + \
                                                (1 -self.beta_1) * layer.absolute_gradients[weight_name]

            self.remember[weight_name]["v"] = self.beta_2 * self.remember[weight_name]["v"] + \
                                                (1 - self.beta_2) * layer.absolute_gradients[weight_name]**2

            # bias correction step 
            m_hat = self.remember[weight_name]["m"]/(1 - self.beta_1 ** self.t)
            v_hat = self.remember[weight_name]["v"]/(1 - self.beta_2 ** self.t)

            d = self.lr / (np.sqrt(v_hat) + self.epsilon) * (self.beta_1*m_hat + (1-self.beta_1)/
                                                (1-self.beta_1 ** self.t) * layer.absolute_gradients[weight_name]) 

            layer.weights[weight_name] = layer.weights[weight_name] - d

        self.t += 1

Creating the structure for an actual neural network

In [98]:
class NeuralNet():
    def __init__(self, layers) -> None:
        self.layers = layers
        self.history = []

    def __call__(self, *args, **kwds):
        return self.forward(*args, **kwds)

    def compile(self, loss, optimizer):
        self.loss = loss

        for layer in self.layers:
            if isinstance(layer, Layer):
                layer.optimizer = deepcopy(optimizer)

    def calculate_loss(self, y_pred, y_true):
        return self.loss(y_pred, y_true)

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)

        return x

    def backward(self):

        gradient = self.loss.backward()
        for layer in reversed(self.layers):
            gradient = layer.backward(gradient)

        return gradient

    def update_weights(self):

        for layer in reversed(self.layers):
            if isinstance(layer, Layer):
                layer.update_weights()

    @staticmethod
    def accuracy_score(y_pred, y_true):

        pred_labels = np.argmax(y_pred, axis=1)
        return np.sum(pred_labels == y_true) / len(y_true)

    @staticmethod
    def create_batches(X, y, batch_size=32):
        batches = []

        for i in range(len(y) // batch_size):
            start_idx = batch_size * i
            end_idx = batch_size * (i + 1)

            batches.append([X[start_idx: end_idx], y[start_idx: end_idx]])

        # take care of the last batch which might have batch_size less than the specified one
        if len(y) % batch_size != 0:
            batches.append([X[end_idx:], y[end_idx:]])

        return batches


    def fit(self, X, y, batch_size=32, epochs=10):

        # calculate number of classes to pass to the loss function
        self.loss.n_classes = len(np.unique(y))

        batches = self.create_batches(X, y, batch_size=batch_size)
        num_batches = len(batches)

        for epoch in range(1, epochs+1):

            total_loss = 0
            total_accuracy = 0

            for X, y in batches:

                preds = self(X)
                total_loss += self.loss(preds, y)
                total_accuracy += self.accuracy_score(preds, y)

                _ = self.backward()
                self.update_weights()

            loss_per_epoch = total_loss / num_batches
            accuracy = total_accuracy / num_batches

            print(f"Epoch: {epoch} Train Loss: {loss_per_epoch} Train Accuracy: {accuracy}")

            self.history.append({"Epoch" : epoch, 
                                    "Train Loss": loss_per_epoch,
                                    "Train Accuracy": accuracy})

        print("\nModel trained successfully!")

Create a custom classification dataset to test out the function <br> 

In [101]:
## creating a dummy dataset to test out stuff ##

X, y = make_classification(n_samples=32*6, n_features=20, n_informative=15, n_classes=3)

from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X = scaler.fit_transform(X)

# Initializing the model and setting up loss and optimizer
model = NeuralNet([FC(20, 32), RelU(), FC(32, 3)])
optimizer = Nadam()
loss = MSELossFromLogits()

model.compile(loss, optimizer)
model.fit(X, y, batch_size=32, epochs=50)


Epoch: 1 Train Loss: 0.844315784639989 Train Accuracy: 0.2552083333333333
Epoch: 2 Train Loss: 0.822198569469644 Train Accuracy: 0.3020833333333333
Epoch: 3 Train Loss: 0.8077986334843196 Train Accuracy: 0.3489583333333333
Epoch: 4 Train Loss: 0.7947670769701066 Train Accuracy: 0.375
Epoch: 5 Train Loss: 0.782619337862834 Train Accuracy: 0.390625
Epoch: 6 Train Loss: 0.7715931250139082 Train Accuracy: 0.40625
Epoch: 7 Train Loss: 0.761790289533458 Train Accuracy: 0.421875
Epoch: 8 Train Loss: 0.7530796535002624 Train Accuracy: 0.4375
Epoch: 9 Train Loss: 0.745543100535019 Train Accuracy: 0.453125
Epoch: 10 Train Loss: 0.7391375678212309 Train Accuracy: 0.4635416666666667
Epoch: 11 Train Loss: 0.7335748308563086 Train Accuracy: 0.46875
Epoch: 12 Train Loss: 0.728398187260407 Train Accuracy: 0.484375
Epoch: 13 Train Loss: 0.7236356215186907 Train Accuracy: 0.4895833333333333
Epoch: 14 Train Loss: 0.7193538486350352 Train Accuracy: 0.4947916666666667
Epoch: 15 Train Loss: 0.71550193884462

In [8]:
## Utility functions ##
def probs_to_labels(y): 
    return np.argmax(y, axis=1)


def encoded_to_labels(y):
    return np.where(y==1)[1]

def accuracy_score(y_pred, y_true):

    pred_labels = probs_to_labels(y_pred)

    return np.sum(pred_labels == y_true) / len(y_true)

def create_batches(X, y, batch_size=32):
    batches = []

    for i in range(len(y) // batch_size):
        start_idx = batch_size * i
        end_idx = batch_size * (i + 1)

        batches.append([X[start_idx: end_idx], y[start_idx: end_idx]])

    # take care of the last batch which might have batch_size less than the specified one
    if len(y) % batch_size != 0:
        batches.append([X[end_idx:], y[end_idx:]])

    return batches

batches = create_batches(X, y, batch_size=32)
len(batches)

6

In [9]:
def fit_model(model, batches, loss, optimizer, epochs=10):

    training_stats = []
    num_batches = len(batches)
    
    model.compile(loss=loss, optimizer=optimizer)

    for epoch in range(1, epochs+1):

        total_loss = 0
        total_accuracy = 0

        for X, y in batches:

            preds = model(X)
            total_loss += model.loss(preds, y)
            total_accuracy += accuracy_score(preds, y)

            _ = model.backward()
            model.update_weights()

        loss_per_epoch = total_loss / num_batches
        accuracy = total_accuracy / num_batches

        print(f"Epoch: {epoch} Train Loss: {loss_per_epoch} Train Accuracy: {accuracy}")

        training_stats.append({"Epoch" : epoch, 
                                "Train Loss": loss_per_epoch,
                                "Train Accuracy": accuracy})

    
    return training_stats

In [10]:
# Initializing the model and setting up loss and optimizer
model = NeuralNet([FC(20, 32, 1e-3), RelU(), FC(32, 3, 1e-3)])
optimizer = SGD(lr = 0.001)
loss = CrossEntropyLossFromLogits()

training_stats = fit_model(model, batches, loss, optimizer, epochs=50)

TypeError: 'NoneType' object cannot be interpreted as an integer

# Load MNIST Dataset