In [187]:
import numpy as np
from utils import MSE

np.random.seed(42)

In [188]:
class Dense:
    """
    A fully connected layer.

    Parameters:
    - units (int): Number of neurons in the layer.
    """
    def __init__(self, units: int, activation="linear"):
        self.units: int = units
        self.w = None
        self.b = None
        self.initialized = False
        self.activation = activation
        
    def initialize_params(self, input_dim):
        """
        Initializes the weights and biases of the layer.

        Parameters:
        - input_dim (int): Number of features in the input data.
        """
        self.w = np.random.rand(self.units, input_dim)
        self.b = np.random.rand(self.units) 
        self.initialized = True

    def forward(self, X):
        """
        Performs the forward pass through the layer.

        Parameters:
        - X (array-like): Input data.

        Returns:
        - y (array-like): Output after applying weights and biases.
        """
        if not self.initialized:
            raise ValueError("Initiate params first")
        X = np.asarray(X)
        y = np.dot(X, self.w.T) + self.b
        
        return y

    

class Input:
    """
    Represents the input layer of a model.

    Parameters:
    - units (int): Number of features in the input data.
    """
    def __init__(self, units):
        self.units = units

In [189]:
# Test the dense layer
n_samples = 100
input_dim = 2

X = np.random.random((n_samples, input_dim))

dense_1 = Dense(4)  # Output dim is 4
dense_1.initialize_params(input_dim=input_dim)
y = dense_1.forward(X)

print(f"Input dim: {input_dim} | Output dim: {dense_1.units}")
print(f"W shape: {dense_1.w.shape}")
print(f"b shape: {dense_1.b.shape}")

print(f"Input shape: {X.shape}")
print(f"Output shape: {y.shape}")

del input_dim, dense_1, X

Input dim: 2 | Output dim: 4
W shape: (4, 2)
b shape: (4,)
Input shape: (100, 2)
Output shape: (100, 4)


In [190]:
class Adam:
    def __init__(self, param, learning_rate=0.01, beta_1=0.9, beta_2=0.99):
        
        self.param = param
        
        self.lr = learning_rate
        self.beta_1 = beta_1
        self.beta_2 = beta_2

        self.m = 0
        self.v = 0

        self.m_prev = 0
        self.v_prev = 0

    def step(self, gradients: list, epoch):
        # Store previous momentum and velocity
        self.prev_m = self.m
        self.prev_v = self.v

        # Calculate 2 moving averages
        self.m = self.beta_1 * self.prev_m + (1 - self.beta_1) * gradients
        self.v = self.beta_2 * self.prev_v + (1 - self.beta_2) * (gradients**2)

        # Bias correction
        m_hat = self.m / (1 - self.beta_1 ** (epoch + 1))
        v_hat = self.v / (1 - self.beta_2 ** (epoch + 1))

        # Update parameter with corrected values
        learning_rate = self.lr / (np.sqrt(v_hat) + 1e-8)
        self.param -= learning_rate * m_hat

    def get_param(self):
        return self.param

In [191]:
class OptimizerWrapper:
    def __init__(self, layers, optimizer):
        self.optimizer = optimizer
        self.layer_map = self._initialize_layer_map(layers)

    def _initialize_layer_map(self, layers):
        layer_map = {}
        for idx, layer in enumerate(layers):
            # Skip Input layer or layers without weights and biases
            if not hasattr(layer, "w") and not hasattr(layer, "b"):
                layer_map[idx] = {
                    "units": layer.units,
                    "w": None,
                    "b": None,
                    "w_opt": None,
                    "b_opt": None,
                }
                continue

            # Initialize optimizers for weights and biases
            vectorized_opt = np.vectorize(self.optimizer)
            w_opt = vectorized_opt(layer.w)
            b_opt = vectorized_opt(layer.b)

            layer_map[idx] = {
                "units": layer.units,
                "w": layer.w,
                "b": layer.b,
                "w_opt": w_opt,
                "b_opt": b_opt,
            }
        return layer_map

    def step(self, layer_num, w_grads, b_grads, epoch):
        # self.layer_map[layer_num]["w_opt"].step(w_grads)
        # self.layer_map[layer_num]["b_opt"].step(b_grads)

        # Apply gradients element-wise
        for i in range(w_grads.shape[0]):
            for j in range(w_grads.shape[1]):
                self.layer_map[layer_num]["w_opt"][i, j].step(
                    w_grads[i, j], epoch
                )  # Apply Adam step for each element

        for i in range(
            b_grads.shape[0]
        ):  # Biases are typically 1D, but keeping it general
            self.layer_map[layer_num]["b_opt"][i].step(b_grads[i], epoch)

        for i in range(w_grads.shape[0]):
            for j in range(w_grads.shape[1]):
                self.layer_map[layer_num]['w'][i,j] = self.layer_map[layer_num]['w_opt'][i, j].get_param()
        
        for i in range(b_grads.shape[0]):
            self.layer_map[layer_num]['b'][i]  = self.layer_map[layer_num]['b_opt'][i].get_param() 

    def get_w(self, layer_num: int):
        w = self.layer_map[layer_num]["w"]
        return w

    def get_b(self, layer_num: int):
        b = self.layer_map[layer_num]["b"]
        return b

In [192]:
class Sequential:
    def __init__(self, layers: list, verbose=False):
        self.layers = layers

        self.verbose = verbose
        self.cost_function = None
        self.optimizer = None
        self.lr = 0.01

        self._validate_input()

        self.is_compiled = False

    def __str__(self):
        output = "Sequential Model:\n\n"
        for idx, layer in enumerate(self.layers):
            output += f"[{idx}] {layer.__class__.__name__} ({layer.units})\n"

        return output

    def _validate_input(self):
        if not isinstance(self.layers[0], Input):
            raise TypeError(
                f"The first layer must be an object of class `Input`, got object of class `{self.layers[0].__class__.__name__}`"
            )

    def initialize_layers(self):
        i = 1  # start from first layer (excluding Input)
        while i < len(self.layers):

            layer = self.layers[i]

            input_dim = self.layers[i - 1].units
            layer.initialize_params(input_dim)
            print(
                f"{layer.__class__.__name__} ({layer.units}) | w: {layer.w.shape} | b: {layer.b.shape}"
            )
            i += 1

    def compile(self, cost_function, optimizer):

        self.cost_function = cost_function
        if self.verbose:
            print(self)
            print(f"Loss: {self.cost_function.__class__.__name__}")
            print(f"Optimizer: {self.optimizer.__class__.__name__}\n")

        self.initialize_layers()
        self.optimizer = OptimizerWrapper(self.layers, optimizer)

        self.is_compiled = True

    def fit(self, X, y, epochs=1):
        if not self.is_compiled:
            raise ValueError("Call `compile()` method first.")

        for epoch in range(epochs):
            print(f"\nEpoch: {epoch}")

            # Forward pass (full_batch)
            y_pred = X
            layer_outputs = [X]  # to pad for Input layer
            for layer in self.layers[1:]:  # eclude Input layer
                y_pred = layer.forward(y_pred)
                layer_outputs.append(y_pred)
            # print(f"Y pred: {y_pred.shape}")  # [n_samples, 2]
            loss = np.mean((y_pred - y) ** 2)

            # Calculate grads of the last layer:
            n_samples = len(X)
            errors = y_pred - y  # [n_samples, 2]
            a_prev = layer_outputs[-2]  # [n_samples, 5]
            last_w_grad = (2 / n_samples) * np.dot(errors.T, a_prev)  # [units, inputs]
            last_b_grad = (2 / n_samples) * np.sum(errors, axis=0)  # [units,]
            # print("Last layer:", last_w_grad.shape, last_b_grad.shape)

            second_w_grad = (
                (2 / n_samples)
                * np.dot(errors, self.layers[-1].w).T
                @ layer_outputs[-3]
            )
            second_b_grad = (2 / n_samples) * np.sum(errors @ self.layers[-1].w, axis=0)
            # print("Second last layer: ", second_w_grad.shape, second_b_grad.shape)
            val = (2 / n_samples) * np.dot(errors, self.layers[-1].w)
            # print("val_shape:", val.shape, "\n0---------0")

            # what is val?
            # g3 = dL/da3 * da3/dw3
            # here dl/da3 is used in next iteration of backward pass as dl/da2 = dl/da3 * da3/da2, and then we use it dl/dw2 = dl/da2 * da2/dw2
            # however for the last layer we can find gradients by hand
            # and fro second last layer we need to calculate dl/da2 manually, then everything repeats naturally from third last layer.

            gradient_map = {
                len(self.layers) - 1: {"w": last_w_grad, "b": last_b_grad},
                len(self.layers) - 2: {"w": second_w_grad, "b": second_b_grad},
            }

            i = len(self.layers) - 3  # [..., Layer[i], Prev layer, Last Layer ]
            while i > 0:

                layer = self.layers[i]
                val = val @ self.layers[i + 1].w
                w_grad = val.T @ layer_outputs[i - 1]
                b_grad = np.sum(val, axis=0)
                gradient_map.update({i: {"w": w_grad, "b": b_grad}})
                # print(i, w_grad.shape)
                i -= 1

            for layer_ID in range(1, len(self.layers)):
                WG = gradient_map[layer_ID]["w"]
                BG = gradient_map[layer_ID]["b"]
                
                self.optimizer.step(
                    layer_num=layer_ID, w_grads=WG, b_grads=BG, epoch=epoch
                )
                self.layers[layer_ID].w = self.optimizer.get_w(layer_num=layer_ID)
                self.layers[layer_ID].b = self.optimizer.get_b(layer_num=layer_ID)
                
            print(loss)

            # val = val @ self.layers[-2].w
            # third_w_grad = val.T @ layer_outputs[-4]
            # print("3 last layer:", third_w_grad.shape)
            # print("val_shape:", val.shape, "\n0---------0")

            # val = val @ self.layers[-3].w
            # fourth_w_grad = val.T  @ layer_outputs[-5]
            # print("4 last layer:", fourth_w_grad.shape)
            # print("val_shape:", val.shape, "\n0---------0")

            # val = val @ self.layers[-4].w
            # fifth_w_grad = val.T  @ layer_outputs[-6]
            # print("5 last layer:", fifth_w_grad.shape)
            # print("val_shape:", val.shape, "\n0---------0")

            # # last layer
            # val = val @ self.layers[-5].w
            # sixth_w_grad = val.T  @ layer_outputs[-7]
            # print("6 last layer:", sixth_w_grad.shape)
            # print("val_shape:", val.shape, "\n0---------0")
            # print(layer_outputs[-7] is X)


            # self.layers[1].w -= g1 * self.lr
            # self.layers[1].b -= bg1 * self.lr

            # self.layers[2].w -= g2 * self.lr
            # self.layers[2].b -= bg2 * self.lr

            # self.layers[3].w -= g3 * self.lr
            # self.layers[3].b -= bg3 * self.lr


    def predict(self, X):
        y_pred = X
        for layer in self.layers[1:]:  # eclude Input layer
            y_pred = layer.forward(y_pred)
        return y_pred


n_samples = 5
X_data = np.random.normal(0, 1, size=(n_samples, 2))
y_data = np.array([[x1 * 0.3 + 4, x2 * 1.9 - 8] for x1, x2 in X_data])

model = Sequential(
    [Input(2),  Dense(5), Dense(2)]
)

model.compile(cost_function=MSE(), optimizer=Adam)
model.fit(X_data, y_data, epochs=1000)

Dense (5) | w: (5, 2) | b: (5,)
Dense (2) | w: (2, 5) | b: (2,)

Epoch: 0
56.120751375059584

Epoch: 1
55.03458048804022

Epoch: 2
53.98579342744979

Epoch: 3
52.97151035603919

Epoch: 4
51.9898399332568

Epoch: 5
51.03937641693372

Epoch: 6
50.11912808718441

Epoch: 7
49.228298561881886

Epoch: 8
48.36608705617601

Epoch: 9
47.531621417714504

Epoch: 10
46.72395811816834

Epoch: 11
45.94210933573227

Epoch: 12
45.18507584157011

Epoch: 13
44.45187269716223

Epoch: 14
43.74154355880205

Epoch: 15
43.053165225151

Epoch: 16
42.38584549990268

Epoch: 17
41.73871707016838

Epoch: 18
41.11092969486735

Epoch: 19
40.50164261830944

Epoch: 20
39.91001856845256

Epoch: 21
39.335220058501235

Epoch: 22
38.776408162176274

Epoch: 23
38.23274355692947

Epoch: 24
37.70338941190816

Epoch: 25
37.187515595453526

Epoch: 26
36.68430365628033

Epoch: 27
36.19295207339941

Epoch: 28
35.71268135739147

Epoch: 29
35.24273870174427

Epoch: 30
34.78240200555684

Epoch: 31
34.330983196842496

Epoch: 32
33.

In [193]:
# X got 2 features [n, 2]
# Y got 2 features [n, 2]

n_samples = 500
X_data = np.random.normal(0, 1, size=(n_samples, 2))
y_data = np.array([[x1 * 0.3 + 4, x2 * 1.9 - 8] for x1, x2 in X_data])

print(f"X shape: {X_data.shape}\nY shape: {y_data.shape}")

X shape: (500, 2)
Y shape: (500, 2)


In [194]:
model.predict(X_data[:2])

array([[ 3.86426133, -6.52360539],
       [ 4.31367135, -8.65006925]])

In [195]:
y_data[:2]

array([[ 3.86426133, -6.52360539],
       [ 4.31367135, -8.65006925]])