In [1]:
import numpy as np

In [61]:
def unbroadcast(grad, original_shape):
    # gradient has more dims than original 
    # this is needed bcoz numpy automatically broadcasts the biases to the shape of the output
    # but the gradient is the sum over the broadcasted dimensions
    # sp we shrink the gradient back to the original shape by summing over the broadcasted dimensions
    while grad.ndim > len(original_shape):
        grad = grad.sum(axis=0) # sum over added dimensions
    # dimensions match but lengths differ 
    for axis, (dim_grad, dim_orig) in enumerate(zip(grad.shape, original_shape)):
        if dim_orig == 1 and dim_grad > 1:
            grad = grad.sum(axis=axis, keepdims=True) # sum over broadcasted dimensions
    return grad

# ------------------------------------------------
class Config:
    # global switch for enabling/disabling gradient tracking
    enable_grad = True

class no_grad:
    def __enter__(self):
        # is run when user writes  "with no_grad():"
        self.prev = Config.enable_grad
        Config.enable_grad = False
        
    def __exit__(self, *args):
        # is run at the end of the with block
        Config.enable_grad = self.prev
# ------------------------------------------------
class Context:
    # save information for the backward pass used in Function.forward to be used in Function.backward
    def __init__(self):
        self.saved_tensors = ()
        
    def save_for_backward(self, *args):
        self.saved_tensors = args

class Function:
    # class for defining mathematical operations
    @staticmethod
    def forward(context, *args):
        raise NotImplementedError
    
    @staticmethod
    def backward(context, *grad_outputs):
        raise NotImplementedError
    
    @classmethod
    def apply(self_class, *args):
        context = Context() # create the individual context to store info for backward pass
        operation_arguments = [t.data if isinstance(t, Tensor) else t for t in args] # get data not tensors
        
        output_data = self_class.forward(context, *operation_arguments) # call forward
        parents = [t for t in args if isinstance(t, Tensor)] # get the tensors with which this operation is called
        requires_grad = any(p.req_grad for p in parents) # if any I do too xdd
        out = Tensor(output_data, _parents=tuple(parents), req_grad=requires_grad) # create output tensor
        if requires_grad and Config.enable_grad:
            def _backward():
                grad_output = out.gradient # this is the grad coming from the next node in the graph - we calculate backwards xddd
                grads = self_class.backward(context, grad_output) # calculate gradients wrt inputs
                if not isinstance(grads, tuple):
                    grads = (grads, )
                for parent, grad in zip(parents, grads):
                    if parent.req_grad:
                        parent.gradient += grad # pass the gradient to the parents - backward 
            out._backward = _backward # this is not called yet, only assigned - will be called during Tensor.backward()
        return out
# ------------------------------------------------
class Add(Function):
    @staticmethod
    def forward(context, self_tensor_data, other_tensor_data):
        context.save_for_backward(self_tensor_data.shape, other_tensor_data.shape) # save original shapes for backward
        return self_tensor_data + other_tensor_data # simple add

    @staticmethod
    def backward(context, grad_output):
        shape_x, shape_y = context.saved_tensors
        grad_x = unbroadcast(grad_output, shape_x) # the derivate of addition is 1, but we need to unbroadcast
        grad_y = unbroadcast(grad_output, shape_y)
        return grad_x, grad_y


class MatMul(Function):
    @staticmethod
    def forward(context, self_tensor_data, other_tensor_data):
        context.save_for_backward(self_tensor_data, other_tensor_data)
        return self_tensor_data @ other_tensor_data # matrix multiplication

    @staticmethod
    def backward(context, grad_output):
        a, b = context.saved_tensors
        
        grad_a = grad_output @ b.T # derivative is b.T
        grad_b = a.T @ grad_output
        return grad_a, grad_b
class Substract(Function):
    @staticmethod
    def forward(context, self_tensor_data, other_tensor_data):
        context.save_for_backward(self_tensor_data.shape, other_tensor_data.shape) 
        return self_tensor_data - other_tensor_data

    @staticmethod
    def backward(context, grad_output):
        shape_x, shape_y = context.saved_tensors
        grad_x = unbroadcast(grad_output, shape_x)
        grad_y = unbroadcast(-grad_output, shape_y) # derivative of subtraction is -1 for the second input
        return grad_x, grad_y
class Multiply(Function):
    @staticmethod
    def forward(context, self_tensor_data, other_tensor_data):
        context.save_for_backward(self_tensor_data, other_tensor_data) 
        return self_tensor_data * other_tensor_data

    @staticmethod
    def backward(context, grad_output):
        a, b = context.saved_tensors
        grad_a = grad_output * b # derivative is b
        grad_b = grad_output * a
        if a.shape != b.shape:
            grad_a = unbroadcast(grad_a, a.shape) # again check if np broadcasted - we need to keep original shapes for gradients
            grad_b = unbroadcast(grad_b, b.shape)
        return grad_a, grad_b
class Sum(Function):
    @staticmethod
    def forward(context, input_data, axis=None, keepdims=False):
        context.save_for_backward(input_data.shape, axis, keepdims) 
        return np.sum(input_data, axis=axis, keepdims=keepdims)

    @staticmethod
    def backward(context, grad_output):
        original_shape, axis, keepdims = context.saved_tensors
        if axis is None:
            grad_input = np.ones(original_shape) * grad_output
        else:
            if not keepdims:
                grad_output = np.expand_dims(grad_output, axis)
            grad_input = np.ones(original_shape) * grad_output
        return grad_input
class Pow(Function):
    @staticmethod
    def forward(context, self_tensor_data, other_tensor_data):
        context.save_for_backward(self_tensor_data, other_tensor_data)
        return self_tensor_data ** other_tensor_data

    @staticmethod
    def backward(context, grad_output):
        a, b = context.saved_tensors
        return grad_output * b * (a ** (b - 1)), None
class ReLU(Function):
    @staticmethod
    def forward(context, input_data):
        context.save_for_backward(input_data)
        return np.maximum(0, input_data)

    @staticmethod
    def backward(context, grad_output):
        (input_data,) = context.saved_tensors
        grad_input = grad_output * (input_data > 0).astype(input_data.dtype) # derivative of ReLU is 1 for positive inputs, else 0
        return grad_input
class T(Function):
    @staticmethod
    def forward(context, input_data):
        context.save_for_backward(input_data)
        return input_data.T

    @staticmethod
    def backward(context, grad_output):
        (input_data,) = context.saved_tensors
        grad_input = grad_output.T
        return grad_input
# ------------------------------------------------
class Tensor:
    def __init__(self, data, _parents=(),req_grad=False):
        self.data = np.array(data) if not isinstance(data, np.ndarray) else data
        self.gradient = np.zeros(self.data.shape)
        self.req_grad = req_grad
        self._parents = _parents
        self._backward = lambda: None
        
    def __repr__(self):
        return f"d = {self.data}, g = {self.gradient}"
    
    def backward(self):
        topo = []
        visited = set()
        
        # Stack holds tuples: (node, children_pushed_to_stack_flag)
        stack = [(self, False)] 
        
        while stack:
            node, children_pushed = stack.pop()
            
            if node in visited:
                continue
                
            if children_pushed:
                # If we are seeing this node for the second time, 
                # it means we have finished processing its children.
                visited.add(node)
                topo.append(node)
            else:
                # First time seeing this node:
                # 1. Put it back on the stack marked as "children pushed"
                stack.append((node, True))
                
                # 2. Push all its children onto the stack
                for child in node._parents:
                    if child not in visited:
                        stack.append((child, False))
        
        # Proceed with the chain rule as before
        self.gradient = np.ones_like(self.data, dtype=float)
        for node in reversed(topo):
            node._backward()
            
    def __add__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return Add.apply(self, other) # call the static method apply of Add which does both forward and backward
    def __radd__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return Add.apply(other, self)
    def __matmul__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return MatMul.apply(self, other)
    def __rmul__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return MatMul.apply(other, self)
    def __sub__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return Substract.apply(self, other)
    def __rsub__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return Substract.apply(other, self)
    def __mul__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return Multiply.apply(self, other)
    def __rmul__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return Multiply.apply(other, self)
    def __pow__(self, other):
        other = other if isinstance(other, Tensor) else Tensor(other)
        return Pow.apply(self, other)
    def relu(self):
        return ReLU.apply(self)
    def sum(self, axis=None, keepdims=False):
        return Sum.apply(self, axis, keepdims)
    @property
    def T(self):
        return T.apply(self)
    
        

# ------------------------------------------------
class Parameter(Tensor):
    # just a tensor that wants gradients
    # the weights and biases are this class
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.req_grad = True # if it is a parameter, we always need gradients

class Module:
    # this is a bit scatchy u might understand it better than me xd
    def __init__(self):
        self._parameters = {}
        self._modules = {}

    def __setattr__(self, name, value): # this is run when we do "self.name = value" in the Tensor init func
        # if we are setting a Parameter (tensor with gradients), save it to _parameters
        if isinstance(value, Parameter):
            self._parameters[name] = value
        # if we are setting a sub-Module, save it to _modules
        elif isinstance(value, Module):
            self._modules[name] = value
        
        # do the default behavior (actually set the attribute)
        object.__setattr__(self, name, value)

    def parameters(self):
        # recursively find all parameters
        # this is called in the training loop the get all the tensor so that we can push their data wrt their gradients!
        params = list(self._parameters.values())
        for module in self._modules.values():
            params.extend(module.parameters())
        return params
# ------------------------------------------------
class Optimizer():
    # default optimizer class
    def __init__(self, parameters, lr=0.01, weight_decay=0.0, clip_norm=1.0):
        self.parameters = list(parameters)
        self.lr = lr
        self.weight_decay = weight_decay
        self.clip_norm = clip_norm
    def step(self):
        raise NotImplementedError
    def zero_grad(self):
        for p in self.parameters:
            p.gradient = np.zeros_like(p.data)
            
class SGD(Optimizer):
    def __init__(self, parameters, lr=0.01, weight_decay=0.0, clip_norm=1.0):
        super().__init__(parameters, lr=lr, weight_decay=weight_decay, clip_norm=clip_norm)
    def step(self):
        if self.clip_norm is not None:
            total_norm = 0
            for p in self.parameters:
                if p.req_grad:
                    total_norm += np.sum(p.gradient ** 2)
            total_norm = np.sqrt(total_norm)
            
            clip_coef = self.clip_norm / (total_norm + 1e-6)
            if clip_coef < 1:
                for p in self.parameters:
                    if p.req_grad:
                        p.gradient *= clip_coef

        for p in self.parameters:
            if p.req_grad:
                grad = p.gradient
                
                if self.weight_decay > 0:
                    grad = p.gradient + self.weight_decay * p.data
                
                p.data -= self.lr * grad
# ------------------------------------------------
class Criterion():
    # default loss class
    def __init__(self):
        pass
    def __call__(self, predictions, targets):
        raise NotImplementedError
    
class MSELoss(Criterion):
    def __init__(self):
        super().__init__()
    def __call__(self, predictions, targets):
        diff = predictions - targets
        return (diff ** 2).sum() * (1.0 / predictions.data.shape[0])
# ------------------------------------------------
class Neuron(Module):
    # NOT used in MLP xd
    def __init__(self, input_size, nonlinearity=True):
        super().__init__()
        rng = np.random.default_rng()
        self.w = Parameter(rng.standard_normal((input_size, 1)) * (1.0 / np.sqrt(input_size)))
        self.b = Parameter(np.zeros((1, 1)))
        self.nonlinearity = nonlinearity
        
    def forward(self, x):
        act = self.w.T @ x + self.b
        return act.relu() if self.nonlinearity else act
    
    def __call__(self, x): # so that we can call neuron like a function
        return self.forward(x)
# ------------------------------------------------
class Linear(Module):
    def __init__(self, input_size, output_size, init_method="glorot"):
        super().__init__()
        rng = np.random.default_rng()
        if init_method == "glorot":
            scale = np.sqrt(2.0 / (input_size + output_size))
            self.w = Parameter(rng.standard_normal((output_size, input_size)) * scale)
        elif init_method == "he":
            scale = np.sqrt(2.0 / input_size)
            self.w = Parameter(rng.standard_normal((output_size, input_size)) * scale)
        elif init_method == "zero":
            self.w = Parameter(np.zeros((output_size, input_size)))
        else:
            self.w = Parameter(rng.standard_normal((output_size, input_size)) * (1.0 / np.sqrt(input_size)))
        # yes it is initialized transposed xd its bxoz we do x @ w.T in forward
        self.b = Parameter(np.zeros((output_size, 1))) 
    
    def forward(self, x):
        return x @ self.w.T + self.b.T # yay dims  
    
    def __call__(self, x):
        return self.forward(x) # pytorch style calling B)
    
class MLP(Module):
    def __init__(self, n_in, n_outs, init_method="glorot"):
        # nin is input size (3), n_outs is a list of output sizes of each layer [4,4,1]
        super().__init__()
        sz = [n_in] + n_outs # this is now [3,4,4,1] xd
        self.layers_list = []
        for i in range(len(n_outs)):
            layer = Linear(sz[i], sz[i+1], init_method=init_method)
            name = f"layer_{i}"
            setattr(self, name, layer) # this will also add it to _modules via __setattr__ so that parameters() works.. recursion baby
            self.layers_list.append(layer) # keep a list of layers for forward pass

    def forward(self, x, activation="relu"):
        for i, layer in enumerate(self.layers_list):
            x = layer(x) # this calls the forward of Linear
            if i != len(self.layers_list) - 1:
                if activation == "relu":
                    x = x.relu()
                # elif activation == "sigmoid":
                #     x = x.sigmoid()
                # elif activation == "tanh":
                #     x = x.tanh() 
        return x
        
    def __call__(self, x, activation="relu"):
        return self.forward(x, activation=activation) 
# ------------------------------------------------

# class with input same as the instructions say - FFNN class with the following configurable hyperparameters:Num epochs, num_hidden_layers, n_hidden_units, learning rate, optimizer, batch_size, l2_coeff, weights_init, activation, loss etc.

class FFNN:
    def __init__(self,num_epochs=15, num_hidden_layers=3, num_hidden_units=[64, 64, 32], learning_rate=0.01, optimizer="sgd", batch_size=16, l2_coeff=0.01, weights_init="he", activation="relu", criterion="mse", clip_norm=1.0):
        self.num_epochs = num_epochs
        self.num_hidden_layers = num_hidden_layers
        self.num_hidden_units = num_hidden_units if isinstance(num_hidden_units, list) else [num_hidden_units]*num_hidden_layers
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.l2_coeff = l2_coeff
        self.weights_init = weights_init
        self.activation = activation
        self.clip_norm = clip_norm
        criterion_classes = {"mse": MSELoss,
                             # "cross_entropy": CrossEntropyLoss, # if you implement more losses, add them here
                            }
        optimizer_classes = {"sgd": SGD,
                             # "adam": Adam, # if you implement more optimizers, add them here
                             # "adamW": AdamW
                            }
        self.optimizer_class = optimizer_classes[optimizer]
        self.criterion = criterion_classes[criterion]()
        self.model = None
        
    def create_model(self, input_size, output_size):
        n_outs = self.num_hidden_units + [output_size]
        self.model = MLP(input_size, n_outs, init_method=self.weights_init)
        
    def train(self, x_train, y_train):
        num_samples = x_train.shape[0]
        
        optimizer = self.optimizer_class(self.model.parameters(), lr=self.learning_rate, weight_decay=self.l2_coeff, clip_norm=self.clip_norm)
        
        for epoch in range(self.num_epochs):
            total_loss = 0
            for start in range(0, num_samples, self.batch_size):
                end = start + self.batch_size
                x_batch = Tensor(x_train[start:end])
                y_batch = Tensor(y_train[start:end])
                
                preds = self.model(x_batch, activation=self.activation)
                loss = self.criterion(preds, y_batch)
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
                
                total_loss += loss.data.sum()
            print(f"Epoch {epoch+1}, Loss: {total_loss / (num_samples // self.batch_size)}")
            
    def evaluate(self, x_test, y_test):
        # lets test if no_grad works properly
        print("Evaluating on test data...")
        
        with no_grad():
            inputs = Tensor(x_test)
            targets = Tensor(y_test)
            preds = self.model(inputs)
            loss = self.criterion(preds, targets)
            print(f"Test Loss: {loss.data}")
            accuracy = np.mean(np.argmax(preds.data, axis=1) == np.argmax(targets.data, axis=1))
            print(f"Test Accuracy: {accuracy * 100:.2f}%")
            # lets print some gradients to see if they are zero
            # for i, param in enumerate(self.model.parameters()):
            #     print(f"Param {i} gradient norm: {np.linalg.norm(param.gradient)}")
            return loss.data

In [None]:
# import kagglehub
# # Download latest version
# path = kagglehub.dataset_download("zalando-research/fashionmnist")

Downloading from https://www.kaggle.com/api/v1/datasets/download/zalando-research/fashionmnist?dataset_version_number=4...


100%|██████████| 68.8M/68.8M [00:06<00:00, 11.6MB/s]

Extracting files...





Path to dataset files: C:\Users\onder\.cache\kagglehub\datasets\zalando-research\fashionmnist\versions\4


In [18]:
# fashion dataset
train = np.loadtxt("datasets/fashion-mnist_train.csv", delimiter=",", skiprows=1)

In [20]:
test = np.loadtxt("datasets/fashion-mnist_test.csv", delimiter=",", skiprows=1)

In [21]:
y_train = train[:, 0]  
x_train = train[:, 1:] 
x_test = test[:, 1:]  
y_test = test[:, 0]  
num_samples = x_train.shape[0]
print(x_train.shape, y_train.shape)

(60000, 784) (60000,)


In [62]:
# the same with FFNN class
ffnn = FFNN(num_epochs=30, num_hidden_layers=2, num_hidden_units=[256, 128], learning_rate=0.1, optimizer="sgd", batch_size=64, l2_coeff=0.00001, weights_init="he", activation="relu", criterion="mse", clip_norm=1.0)
ffnn.create_model(input_size=784, output_size=10)
# normalize data
x_train_normalized = x_train / 255.0
y_train_onehot = np.eye(10)[y_train.astype(int)]
x_test_normalized = x_test / 255.0
y_test_onehot = np.eye(10)[y_test.astype(int)]
ffnn.train(x_train_normalized, y_train_onehot)
ffnn.evaluate(x_test_normalized, y_test_onehot)

Epoch 1, Loss: 0.3778916413918811
Epoch 2, Loss: 0.2535041091381651
Epoch 3, Loss: 0.22527715843316912
Epoch 4, Loss: 0.20835588933130214
Epoch 5, Loss: 0.1961701463113599
Epoch 6, Loss: 0.18653033508613923
Epoch 7, Loss: 0.1786826589480956
Epoch 8, Loss: 0.17199178076467486
Epoch 9, Loss: 0.16627177285889277
Epoch 10, Loss: 0.16121488694576402
Epoch 11, Loss: 0.1563315042780901
Epoch 12, Loss: 0.15193136836438445
Epoch 13, Loss: 0.14799294422805515
Epoch 14, Loss: 0.14432830646405742
Epoch 15, Loss: 0.141041985588635
Epoch 16, Loss: 0.13778136671241326
Epoch 17, Loss: 0.13492728429335335
Epoch 18, Loss: 0.13193884986794016
Epoch 19, Loss: 0.12928411289616276
Epoch 20, Loss: 0.12673803685025006
Epoch 21, Loss: 0.12438418855342268
Epoch 22, Loss: 0.12195137758224715
Epoch 23, Loss: 0.11950938772288562
Epoch 24, Loss: 0.11724404659950333
Epoch 25, Loss: 0.11480150027095191
Epoch 26, Loss: 0.1128677761791242
Epoch 27, Loss: 0.1109359122346305
Epoch 28, Loss: 0.10930572326186613
Epoch 29, 

array(0.18159099)