In [None]:
import time
from torchvision import datasets, transforms
from torchvision.datasets import MNIST
from tqdm import tqdm as tqdm
import numpy as np
import matplotlib.pyplot as plt

<h1>Weight Initialization Functions</h1>

In [None]:

def zeros(shape):
    return np.zeros(shape)
def xavier_normal(shape):
    fan_out, fan_in = shape
    std = np.sqrt(2.0/(fan_in + fan_out))
    return np.random.normal(0.0,std,size = shape)
def he(shape):
    fan_out, fan_in = shape
    std = np.sqrt(2/fan_in)
    return np.random.normal(0.0, std, size = shape)

<h1>Layers Classes for layers</h1>

In [None]:
class Layers() : 
    def forward(self,X,training = True):
        raise NotImplementedError("forward() method is not implemented, but still called !")
    def backward(self):
        raise NotImplementedError("forward() method is not implemented, but still called !")
        
        
class Linear(Layers):
    def __init__(self, in_dim, out_dim, weight_decay = 0.0, weight_init = he, bias_init = zeros):
        #define the @weight_init function before using it, it is not defined by default
        self.W = weight_init((out_dim,in_dim))
        self.b = bias_init((out_dim,))
        self.weight_decay = weight_decay
        
    def forward(self,X,training = True):
        if training:
            self.x = X
        return X @ self.W.T + self.b
    
    def backward(self,dout):
        self.dW = dout.T @ self.x
        if self.weight_decay > 0:
            self.dW += 0.5 * self.weight_decay * self.W
        self.db = np.sum(dout,axis = 0)
        
        dx = dout @ self.W
        return dx
    
class ReLU(Layers):
    def forward(self,X,training = True):
        if training:
            self.mask = X > 0
        return X*(X > 0)
    
    def backward(self,dout):
        return dout*self.mask

class Softmax(Layers):
    def forward(self, x):
        x_shifted = x - np.max(x, axis = 1, keepdims = True)
        exp_x = np.exp(x_shifted)
        self.probs = exp_x/np.sum(x_shifted, axis = 1, keepdims = True)
        return self.probs
    


<h1>Loss function && Accuracy </h1>

In [None]:
class Loss:
    def forward(self):
        raise NotImplementedError("Loss function not implemented..!")
    def backward(self):
        raise NotImplementedError("Loss function not implemented..!")
        
class SoftmaxCrossEntropy(Loss):
    
        def forward(self, X, y):
            
            #softmax
            x_shifted = X - np.max(X , axis = 1, keepdims = True)
            self.y = y
            exp_x = np.exp(x_shifted)
            self.probs = exp_x/np.sum(exp_x, axis = 1, keepdims = True)
            
            #CCE loss
            log_probs = -np.log(self.probs[np.arange(self.probs.shape[0]), y])#self.probs.shape[0] = batch size
            loss = np.mean(log_probs)
            
            return loss
        
        def backward(self):
            
            dx = self.probs.copy()
            dx[np.arange(self.probs.shape[0]), self.y] -= 1
            dx /= self.probs.shape[0]
            return dx
        
def compute_l2_loss(model):
    """
    Computes L2 regularization loss over all trainable weights in the model.

    Returns:
        l2_loss (float): scalar L2 penalty
    """
    l2_loss = 0.0
    for layer in model.layers:
        if hasattr(layer, 'W') and hasattr(layer, 'weight_decay'):
            if layer.weight_decay > 0:
                l2_loss += 0.5 * layer.weight_decay * np.sum(layer.W ** 2)
    return l2_loss

def accuracy_score(y_true, preds):
    y_pred = np.argmax(preds, axis = 1)
    return np.mean(y_true == y_pred)


<h1>Optimizer and Early Stopping</h1>

In [None]:
class Optimizers():
    def step(self):
        raise NotImplementedError
class Adam(Optimizers):
    def __init__(self, layers, lr = 0.001, beta1 =0.95, beta2 = 0.95, eps = 1e-08):
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.eps = eps
        self.velocity = {}
        self.cache = {}
        self.t = 0
        
        self.params = []
        for layer in layers:
            for name in ['W', 'b', 'gamma', 'beta']:
                if hasattr(layer, name):
                    self.params.append((layer, name))
                    params = getattr(layer, name)
                    self.cache[(layer, name)] = np.zeros_like(params)
                    self.velocity[(layer, name)] = np.zeros_like(params)
                    
    def step(self):
        self.t += 1
        
        for layer, name in self.params:
            grad = getattr(layer, 'd'+name)
            velocity = self.velocity[(layer, name)]
            cache = self.cache[(layer, name )]
            
            #velocity update
            velocity[:] = self.beta1 * velocity + (1 - self.beta1) * grad
            cache[:] = self.beta2 * cache + (1 - self.beta2) * (grad ** 2)
            
            #bias correction
            v_hat = velocity / (1 - self.beta1 ** self.t)
            c_hat = cache / (1 - self.beta2 ** self.t)
            setattr(layer, name, getattr(layer, name) - self.lr * v_hat / (np.sqrt(c_hat) + self.eps))

class EarlyStopping:
    def __init__(self, patience=5, min_delta=0.0, restore_best_params=True):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = float('inf')
        self.counter = 0
        self.stop = False
        self.best_params = None
        self.restore_best_params = restore_best_params
        self.best_epoch = None
        self.epoch_counter = 0

    def step(self, val_loss, model):
        self.epoch_counter += 1
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.best_epoch = self.epoch_counter
            self.counter = 0
            if self.restore_best_params:
                self.best_params = self._copy_params(model)
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.stop = True

    def restore_best(self, model):
        if self.best_params is not None:
            self._set_params(model, self.best_params)

    def _copy_params(self, model):
        params = []
        for layer in model.layers:
            layer_params = {}
            for name in ['W', 'b', 'gamma', 'beta']:
                if hasattr(layer, name):
                    layer_params[name] = getattr(layer, name).copy()
            params.append(layer_params)
        return params

    def _set_params(self, model, params):
        for layer, layer_params in zip(model.layers, params):
            for name, value in layer_params.items():
                setattr(layer, name, value)


<h1> Model Architecture (Sequential Container)</h1>

In [None]:
class Sequential: 
    def __init__(self,layers): 
        self.layers = layers
        
    def forward(self, x, training = True): 
        for layer in self.layers : 
            x = layer.forward(x, training) 
        return x 
    
    def backward(self, dout): 
        for layer in reversed(self.layers) : 
            dout = layer.backward(dout) 
        return dout 
    
    def predict(self, x): 
        logits = self.forward(x, training = False) 
        return np.argmax(logits, axis = 1)

<h1>Data Loading,shuffling and Train-Test-Split</h1>

In [None]:
transform = transforms.Compose([transforms.ToTensor(),transforms.Lambda(lambda x: x.view(-1))])

In [None]:
train_dataset = MNIST(root="./data", train = True, download = True, transform = transform)

In [None]:
train_dataset

In [None]:
img, label = train_dataset[2]
img,label

In [None]:
type(img)

In [None]:
img = img.numpy()

In [None]:
print(type(img))
print(img.shape)
print(img.dtype)


In [None]:
print(type(label))

In [None]:
X_list = []
y_list = []

for img, label in train_dataset:
    X_list.append(img.numpy())
    y_list.append(label)

In [None]:
X = np.array(X_list) #input data
y = np.array(y_list) #label/targets
print(X.shape)
print(y.shape)
print(len(X))

In [None]:
def train_test_split(X, y, train_size = None, test_size = 0.2, random_state = None):
    if random_state is not None:
        np.random.seed(random_state)

    N = len(X)

    # decide sizes
    if train_size is None and test_size is None:
        raise ValueError("At least one of train_size or test_size must be specified")

    if test_size is None:
        test_size = 1.0 - train_size

    if train_size is None:
        train_size = 1.0 - test_size

    if train_size + test_size > 1.0:
        raise ValueError("train_size + test_size must be <= 1")
    
    indices = np.random.permutation(X.shape[0])
    X = X[indices]
    y = y[indices]
    N = len(X)   
    X_train = X[:int(train_size*N)]
    y_train = y[:int(train_size*N)]

    X_test = X[int(train_size*N):]
    y_test = y[int(train_size*N):]

    return X_train,y_train,X_test,y_test

In [None]:
X_train,y_train,X_test,y_test = train_test_split(X, y, train_size = 0.8, test_size = 0.2, random_state = 2)
print("len(X_train) = ",len(X_train))
print("len(y_train) = ",len(y_train))
print("len(X_test) = ", len(X_test))
print("len(y_test) = ", len(y_test))

<h1>Epoch Implementation (Training & Validation)</h1>

In [None]:
epochs = 50
batch_size = 64
model = Sequential([Linear(784, 256, weight_decay = 3e-04), 
                    ReLU(), 
                    Linear(256, 128, weight_decay = 3e-04), 
                    ReLU(), 
                    Linear(128, 10, weight_decay = 3e-04, weight_init = xavier_normal)])

criterion = SoftmaxCrossEntropy()
optimizer = Adam(model.layers, lr = 0.001, beta1 = 0.9, beta2 = 0.999)
early_stopper = EarlyStopping(patience = 5)

epoch_loss_train = []
epoch_loss_test = []
epoch_acc_train = []
epoch_acc_test = []

start_time = time.time()
for epoch in range(epochs):
    
    epoch_loss_train_sum = 0.0
    epoch_correct_train = 0
    epoch_samples_train = 0
    perm = np.random.permutation(len(X_train))
    X_train = X_train[perm]
    y_train = y_train[perm]
    
    for i in tqdm(range(0, len(X_train), batch_size), desc = f"Epoch: {epoch+1}/{epochs}",leave = True):
        X_train_batch = X_train[i : i + batch_size]
        y_train_batch = y_train[i : i + batch_size]

        
        #training
        
        preds = model.forward(X_train_batch,training = True)
        train_loss = criterion.forward(preds, y_train_batch)
        
        #L2 Regularization
        l2_loss = compute_l2_loss(model)
        train_loss += l2_loss
        train_accuracy = accuracy_score(y_train_batch, preds)
        
        dout = criterion.backward()
        model.backward(dout)
        optimizer.step()
        
        batch_size = len(X_train_batch)
        epoch_loss_train_sum += train_loss * batch_size
        epoch_correct_train += np.sum(np.argmax(preds, axis = 1) == y_train_batch)
        epoch_samples_train += batch_size

    #Evaluation

    preds_test = model.forward(X_test, training = False)
    val_loss = criterion.forward(preds_test, y_test)
    #L2 Regularization
    l2_loss = compute_l2_loss(model)
    val_loss += l2_loss
    val_accuracy = accuracy_score(y_test, preds_test)
    
    epoch_loss_train.append(epoch_loss_train_sum/epoch_samples_train)
    epoch_loss_test.append(val_loss)
    epoch_acc_train.append(epoch_correct_train/epoch_samples_train)
    epoch_acc_test.append(val_accuracy)
    print(f"train_loss: {epoch_loss_train[epoch]: .5f}, "
          f"val_loss: {epoch_loss_test[epoch]: .5f}, "
          f"train_accuracy: {epoch_acc_train[epoch]: .5f}, "
          f"val_accuracy: {epoch_acc_test[epoch]: .5f}")
    
    early_stopper.step(val_loss, model)
    if early_stopper.stop :
        print(f"Early stopping executed at epoch: {epoch + 1}, "
              f"best epoch: {early_stopper.best_epoch}, "
              f"best_loss: {early_stopper.best_loss: .5f}")
        early_stopper.restore_best(model)
        break

total_time = time.time() - start_time    
print(f"Total time required: {total_time :.2f} seconds")

<h1>Training and Validation Metrics</h1>

In [None]:
epochs = range(1, (len(epoch_loss_train) + 1))

plt.figure()
plt.plot(epochs, epoch_loss_train, label= "Train loss")
plt.plot(epochs, epoch_loss_test, label="val_loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss Validation Loss")
plt.grid(True)
plt.legend()
plt.show()

In [None]:
plt.figure()
plt.plot(epochs, epoch_acc_train, label= "Train Accuracy")
plt.plot(epochs, epoch_acc_test, label="Val Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training vs Validation accuracy")
plt.grid(True)
plt.legend()
plt.show()

In [None]:
i = 34587
model.predict(X[i:i+1]), y[i:i+1]

<h1>Testing</h1>

In [None]:
test_dataset = MNIST(root="./data", train = False, download = True, transform = transform)

In [None]:
test_dataset

In [None]:
X_list = []
y_list = []

for img, label in test_dataset :
    X_list.append(img.numpy())
    y_list.append(label)

In [None]:
X_test = np.array(X_list)
y_test = np.array(y_list)
print(X_test.shape, y_test.shape)

In [None]:
preds_test = model.forward(X_test, training = False)
test_loss = criterion.forward(preds_test, y_test)
test_accuracy = accuracy_score(y_test, preds_test)

In [None]:
print(f"Test Loss: {test_loss: .5f}, Test accuracy: {test_accuracy}")