In [300]:
# Below are the libraries that will be used to perform tensor operations for normalisation, matrix-multiplication

# The pytorch library will be used to load the datasets, e.g. MNIST and CIFAR-10

import torch
from sympy.codegen.ast import int8
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor


if torch.cuda.is_available():
    DEVICE = torch.device("cuda")

elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
    DEVICE = torch.device("mps")

else:
    DEVICE = torch.device("cpu")

print("Using device:", DEVICE)


Using device: mps


In [301]:
# class Model:
#     def __init__(self, sequential, optimiser, lr=0.01 ,device=DEVICE):
#         self.model_structure = sequential
#         self.optimiser = optimiser
#         self.device = device
#         self.lr = lr
#         self.cross_entropy = CrossEntropyLoss()
#         self.y_true = None
#
#
#     def train(self, y_true, input):
#         self.y_true = y_true
#         pred = self.model_structure.forward(input)
#         curr_loss = self.cross_entropy.forward(pred, self.y_true)
#         print(f"Current loss: {curr_loss}")
#         grad = self.cross_entropy_loss.backward()
#
#         for layer in range (len(self.model_structure.layers)-1,0,-1):
#             grad = layer.backward(grad)
#
#             if not isinstance(layer, Softmax):
#                 layer.weights = self.optimiser.step(grads=grad, input=layer.weights)
#                 layer.bias = self.optimiser.step(grads=grad, input=layer.bias)
#
#         return curr_loss


In [302]:
# Like the previous numpy version,
# for the shape of my input each row = new batch, and each column = neuron (feature)
# Sequential class manages the structure of the neural network.
# Instead of hard-coding layers, they are defined like this:
# model_features = [
#     ("linear", 784, 128),
#     ("activation","ReLU"),
#     ("linear", 128, 10),
# ]
class Sequential:
    def __init__(self, model_features: list, device=DEVICE):
        # store the model configuration list for the neural network architecture
        self.model_features = model_features
        # store the device so that all tensors can be created on the same device
        # devices are (mps, cpu, cuda)
        self.device = device
        # holds the actual layer objects
        self.layers = []
        # this dictionary is used to map a.f. to their class implementations
        self.activations = {
            "ReLU": ReLU,
            "Sigmoid": Sigmoid,
            "Tanh": Tanh,
            "Softmax": Softmax,
        }
    # Loop through the model_features to instantiate layers
    def create_instances(self):
        for item in self.model_features:
            layer_type = item[0]
            # create a linear layer
            if layer_type == "linear":
                _, in_features, out_features = item
                new_layer = Linear(out_features, in_features, device=self.device)
                self.layers.append(new_layer)
            # set the activation function, technically it's not a layer, but it will still be stored inside layers
            elif layer_type == "activation":
                activation_name = item[1]
                self.layers.append(self.activations[activation_name]())
                print(f"Created Activation function: {activation_name}")

    def to(self, device):
        self.device = torch.device(device)

        for layer in self.layers:
            if hasattr(layer, "to"):
                layer.to(self.device)
        return self

    def forward(self, x):
        for layer in self.layers:

            x = layer.forward(x)

        return x

    # def backward(self, y):
    #     for layer in self.layers:
    #         y = layer.backward(y)
    #
    #     return y


In [303]:
class Linear:
    def __init__(self, outputs, inputs, device=DEVICE):
        self.inputs = inputs
        self.outputs = outputs
        self.device = torch.device(device)
        self.weights = torch.randn(outputs, inputs, device=self.device) * 0.01
        self.bias = torch.zeros(outputs, device=device)
        self.grad_input = None
        self.grad_weights = None
        self.grad_biases = None

    def to(self, device):
        self.device = torch.device(device)
        self.weights = self.weights.to(device)
        self.bias = self.bias.to(device)

        return self

    def forward(self, input):
        self.inputs = input
        return input @ self.weights.T + self.bias

    def backward(self, grad_output):
        # Connect the grad_output with the inputs
        self.grad_input = grad_output
        self.grad_weights = self.grad_input.T @ self.inputs
        self.grad_biases = self.grad_input.sum(dim=0) # summation will be across each column
        self.grad_output = self.grad_input @ self.weights

        return self.grad_output



In [304]:
class MaxPool:
    def __init__(self):
        pass

In [305]:
class Convolution:
    def __init__(self):
        pass

In [306]:
class ReLU:
    def __init__(self):
        self.input = None
        self.output = None
        self.grad_input = None
        self.grad_output = None

    def forward(self, input):
        zeros = torch.zeros(input.shape, device=DEVICE)
        self.input = input
        self.output = torch.maximum(input=self.input, other=zeros)
        return self.output

    def backward(self, grad_output):
        self.grad_input = grad_output
        self.grad_output = self.grad_input * (self.output > 0)
        return self.grad_output


In [307]:
class Sigmoid:
    def __init__(self):
        self.input = None
        self.output = None
        self.grad_output = None
        self.grad_input = None
        self.zeros = torch.zeros(input.shape, device=DEVICE)

    def forward(self, input):
        self.input = input
        denominator = 1 + torch.exp(-self.input)
        self.output = 1 / denominator
        return self.output

    def backward(self, grad_output):
        self.grad_input = grad_output
        self.grad_output = self.grad_input * (self.output * (1 - self.output))
        return self.grad_output


In [308]:
class Tanh:
    def __init__(self):
        self.input = None
        self.output = None
        self.grad_input = None
        self.grad_output = None
        self.zeros = torch.zeros(input.shape, device=DEVICE)

    def forward(self, input):
        self.input = input
        numerator = (torch.exp(self.input) - torch.exp(-self.input))
        denominator = (torch.exp(self.input) + torch.exp(-self.input))
        self.output = numerator / denominator
        return self.output

    def backward(self, grad_output):
        self.grad_input = grad_output
        self.grad_output = self.grad_input * (1 - torch.pow(self.output, 2))
        return self.grad_output




In [309]:
# # Softmax Derivative
# # Learning about Jacobian matrices
#
# batch_size = 2
# num_neurons = 3
#
# # 1. Creating dummy raw scores (logits)
# # These will be the activated values from the linear layer
#
# logits = torch.randn(batch_size, num_neurons)
#
# # 2. Apply the Softmax to get the probabilities
# # dim=1 means that the softmax is computed across the neurons per batch, so going across to calculate the softmax
#
# probabilities = torch.softmax(logits, dim=1)
#
# print(f"logits: {logits}")
# print(f"probabilities: {probabilities}")
# print("Shape: ", probabilities.shape)
#
# # 3. Create the Jacobian for the whole batch
# # Step A: Diagonal part (i=j)
# diag_part = torch.diag_embed(probabilities)
#
# # Step B: Outer product part (i != j)
# # p_reshaped becomes (3,4,1)
# p_reshaped = probabilities.unsqueeze(2)
#
# print(f"p_reshaped shape: {p_reshaped.shape}")
# print(f"p_reshaped: \n{p_reshaped}")
#
# # bmm performs (3,4,1) x (3,1,4) -> (3,4,4)
#
# p_reshaped_transposed = p_reshaped.transpose(1, 2)  # swap dimensions 1 and 2
#
# print(f"p_reshaped_transposed shape: {p_reshaped_transposed.shape}")
# print(f"p_reshaped_transposed:\n {p_reshaped_transposed}")
#
# outer_part = torch.bmm(p_reshaped, p_reshaped_transposed)
# print(f"outer_part shape: {outer_part.shape}")
# print(f"outer_part: \n {outer_part}")
#
# # Step C: Combine
# softmax_derivative = diag_part - outer_part
# print(f"softmax_derivative: \n{softmax_derivative}")

In [310]:
class Softmax:
    def __init__(self):
        self.input = None
        self.pred = None
        self.grad_input = None
        self.grad_output = None

    def forward(self, input):
        self.input = input

        max_val, _ = torch.max(input, dim=1, keepdim=True)
        shifted_input = input - max_val
        numerator = torch.exp(shifted_input)
        denominator = torch.sum(numerator, dim=1, keepdim=True)
        self.pred = numerator / denominator

        return self.pred

    def backward(self, grad_output):
        self.grad_input = grad_output

        # Creating the Jacobian for the whole batch
        # Step A: Diagonal part (i = j)
        diag_part = torch.diag_embed(self.pred)

        # Step B: Outer product part (i != j)
        # p_reshaped becomes (3,4,1)
        p_reshaped = self.pred.unsqueeze(2)

        # bmm performs (3,4,1) x (3,1,4) -> (3,4,4)
        p_reshaped_transposed = p_reshaped.transpose(1, 2)  # swap dimensions 1 and 2
        outer_part = torch.bmm(p_reshaped, p_reshaped_transposed)

        softmax_derivative = diag_part - outer_part

        # self.grad_output = softmax_derivative * self.grad_input
        self.grad_output = torch.bmm(softmax_derivative, self.grad_input.unsqueeze(2)).squeeze(2)

        return self.grad_output


In [311]:
class CrossEntropyLoss:
    def __init__(self):
        self.y_true = None
        self.pred = None
        self.avg_loss = None
        self.epsilon = 1e-9
        self.batch_size = None
        self.grad_output = None

    # since our model will be trained in batches, the loss will the total loss over each sample averaged over the number of samples there are in a batch
    def forward(self, predictions, y_true):
        self.pred = predictions
        self.y_true = y_true
        self.batch_size = self.pred.shape[0] # (batch_size, neurons)
        batch_loss = -torch.sum(self.y_true * torch.log(predictions + self.epsilon))
        self.avg_loss = batch_loss / self.batch_size

        return self.avg_loss

    def backward(self):
        cross_entropy_derivative = -(self.y_true / (self.pred + self.epsilon))

        # We will divide by batch_size to average out across the batch, this way we can keep th gradient scale consistent.
        self.grad_output = (cross_entropy_derivative / self.batch_size)

        return self.grad_output




In [312]:
class Flatten:
    def __init__(self):
        pass

In [313]:
class SGD:
    def __init__(self, lr=0.01):
        self.input = None
        self.output = None
        self.grads = None
        self.lr = lr

    def step(self, grads, input):
        self.input = input
        self.output = self.input - (self.lr * grads)
        return self.output



In [314]:
class Adam:
    def __init__(self):
        pass

In [315]:
class Model:
    def __init__(self, sequential, optimiser, lr=0.01 ,device=DEVICE):
        self.model_structure = sequential
        self.optimiser = optimiser
        self.device = device
        self.lr = lr
        self.cross_entropy = CrossEntropyLoss()
        self.y_true = None


    def train(self, y_true, input):
        self.y_true = y_true
        pred = self.model_structure.forward(input)
        # print(f"pred: {pred}")
        curr_loss = self.cross_entropy.forward(pred, self.y_true)
        # print(f"Current loss: {curr_loss}")
        grad = self.cross_entropy.backward()

        for layer in reversed(self.model_structure.layers):
            grad = layer.backward(grad)

            if hasattr(layer, "weights"):
                layer.weights = self.optimiser.step(grads=layer.grad_weights, input=layer.weights)
                layer.bias = self.optimiser.step(grads=layer.grad_biases, input=layer.bias)

        return curr_loss

In [316]:
# Modules from torch that are needed
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# We need to flatten the input data before we can pass it into the dense layers
# For now keep it flattened but, when CNN implemented, change later
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: torch.flatten(x))
])

# Load MNIST dataset

train_dataset = datasets.MNIST(
    root= "./data",
    train=True,
    download=True,
    transform=transform
)

test_dataset = datasets.MNIST(
    root=  "./data",
    train=False,
    download=True,
    transform=transform
)

# Create batches of data

batch_size = 64

train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    drop_last=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False
)



In [317]:
print("Train dataset size:", len(train_dataset))
print("Batch size:", train_loader.batch_size)
print("Number of train batches (len(train_loader)):", len(train_loader))

# Check first 3 batches
for i, (inputs, labels) in enumerate(train_loader):
    print(f"\nTrain batch {i}:")
    print("inputs shape:", inputs.shape)
    print("labels shape:", labels.shape)
    print("inputs dtype:", inputs.dtype)
    print("labels dtype:", labels.dtype)
    print("min/max pixel:", inputs.min().item(), inputs.max().item())
    print("unique labels sample:\n", labels[:32].tolist(), "\n", labels[32:].tolist())

    if i == 0:
        break

print("Test dataset size:", len(test_dataset))
print("Batch size:", test_loader.batch_size)
print("Number of test batches (len(test_loader)):", len(test_loader))

# Check last test batch size
last_inputs, last_labels = None, None
for inputs, labels in test_loader:
    last_inputs, last_labels = inputs, labels

print("\nLast test batch:")
print("inputs shape:", last_inputs.shape)
print("labels shape:", last_labels.shape)

Train dataset size: 60000
Batch size: 64
Number of train batches (len(train_loader)): 937

Train batch 0:
inputs shape: torch.Size([64, 784])
labels shape: torch.Size([64])
inputs dtype: torch.float32
labels dtype: torch.int64
min/max pixel: 0.0 1.0
unique labels sample:
 [7, 2, 0, 2, 0, 3, 3, 6, 7, 9, 8, 8, 8, 6, 5, 0, 4, 8, 0, 9, 1, 7, 2, 9, 5, 2, 1, 1, 6, 4, 9, 4] 
 [2, 9, 6, 6, 5, 4, 4, 8, 3, 2, 9, 7, 6, 0, 5, 7, 0, 5, 6, 1, 9, 6, 8, 7, 5, 8, 7, 0, 8, 2, 1, 6]
Test dataset size: 10000
Batch size: 64
Number of test batches (len(test_loader)): 157

Last test batch:
inputs shape: torch.Size([16, 784])
labels shape: torch.Size([16])


In [318]:
# Test
#
# model_features = [
#     ("linear", 784, 128),
#     ("activation", "ReLU"),
#     ("linear", 128, 10),
#     ("activation", "Softmax")
# ]

# Dummy y_true values
# y_true = torch.tensor([
#     [1, 0, 0, 0],  # Sample 0 is Class 0
#     [0, 0, 0, 1],  # Sample 1 is Class 3
#     [0, 0, 1, 0],  # Sample 2 is Class 2
#     [0, 0, 0, 1]   # Sample 3 is Class 1
# ], dtype=torch.float32).to(DEVICE)

# new_model = Sequential(model_features, device=DEVICE)
# new_model.create_instances()
# inputs = torch.randn(4, 4).to(DEVICE)
# print(f"inputs: {inputs}")
# # out = new_model.forward(inputs)
# # print(f"outputs: {out}")
#
# out = new_model.forward(inputs)
# cross_entropy_loss = CrossEntropyLoss()
# loss = cross_entropy_loss.forward(predictions=out, y_true=y_true)
# print(out)
# print(loss)
#
# optimiser = SGD(lr=0.01)
# initial_grad_output = cross_entropy_loss.backward()
#
# grad_descent = new_model.backward(y=grad_output)
# print(grad_descent)

# Dummy Test

# new_arcitecture = Sequential(model_features, device=DEVICE)
# new_arcitecture.create_instances()
# optimiser = SGD(lr=0.01)
# new_model = Model(sequential=new_arcitecture,optimiser=optimiser,lr=0.1, device=DEVICE)
#
# epochs = 1000
#
# for epoch in range(epochs):
#     loss = new_model.train(y_true=y_true, input=inputs)
#     print(f"epoch: {epoch}, loss: {loss}")



In [319]:

model_features = [
    ("linear", 784, 128),
    ("activation", "ReLU"),
    ("linear", 128, 10),
    ("activation", "Softmax")
]

network = Sequential(model_features, device=DEVICE)
network.create_instances()

optimiser = SGD(lr=0.1)
model = Model(sequential=network, optimiser=optimiser, device=DEVICE, lr=0.1)

def one_hot(labels, num_classes=10, device=DEVICE):
    y = torch.zeros(labels.size(0),
                    num_classes,
                    device=device,
                    dtype=torch.int64)
    y.scatter_(1,labels.unsqueeze(1),1)
    return y

# Training

epochs = 5

for epoch in range(epochs):
    total_loss = 0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)
        y_true = one_hot(labels, 10, DEVICE)

        loss = model.train(y_true, inputs)
        total_loss += loss.item()


        pred = model.model_structure.forward(inputs)
        pred_labels = pred.argmax(dim=1)
        correct += (pred_labels == labels).sum().item()
        total += labels.size(0)

    avg_loss = total_loss / len(train_loader)
    acc = (correct / total) * 100

    print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss}, Accuracy: {acc}")

# Testing

model.model_structure.to(device=DEVICE)

test_total_loss = 0
test_correct = 0
test_total = 0

for inputs, labels in test_loader:
    inputs = inputs.to(DEVICE)
    labels = labels.to(DEVICE)

    pred = model.model_structure.forward(inputs)

    pred_labels = pred.argmax(dim=1)
    test_correct += (pred_labels == labels).sum().item()
    test_total += labels.size(0)

    y_true = one_hot(labels, 10, DEVICE)
    batch_loss = model.cross_entropy.forward(pred, y_true)
    test_total_loss += batch_loss.item()

test_avg_loss = test_total_loss / len(test_loader)
test_acc = (test_correct / test_total) * 100
print(f"Test Loss: {test_avg_loss}, Test Accuracy: {test_acc}")



Created Activation function: ReLU
Created Activation function: Softmax
Epoch 1/5, Loss: 0.543689077737937, Accuracy: 87.20984525080043
Epoch 2/5, Loss: 0.24391738001730298, Accuracy: 94.99399679829243
Epoch 3/5, Loss: 0.18254830757814003, Accuracy: 96.4447705442903
Epoch 4/5, Loss: 0.14635360802115283, Accuracy: 97.32857524012807
Epoch 5/5, Loss: 0.1216753886052454, Accuracy: 97.97058431163286
Test Loss: 0.11341957578429608, Test Accuracy: 96.55
