In [49]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from torchvision import datasets, transforms
from sklearn.metrics import f1_score, confusion_matrix
import pickle
import os



In [50]:
class DenseLayer:
    def __init__(self, input_size, output_size):
        # Initialize weights and biases
        self.weights = np.random.randn(input_size, output_size) * 0.01 # Taking random values and scaling them down to a smaller value
        self.biases = np.zeros((1, output_size))

    def forward(self, inputs):
        # Save the inputs for use in backpropagation
        self.inputs = inputs
        # Compute the linear output Z = XW + b
        self.linear_output = np.dot(inputs, self.weights) + self.biases
        return self.linear_output
    
    def backward(self, d_output, learning_rate=0.01):
        # Calculate gradients for weights, biases, and inputs
        self.d_weights = np.dot(self.inputs.T, d_output)
        self.d_biases = np.sum(d_output, axis=0, keepdims=True)
        
        # Gradient for the inputs to pass to the previous layer
        d_inputs = np.dot(d_output, self.weights.T)
        
        # Update weights and biases using the specified learning rate
        self.weights -= learning_rate * self.d_weights
        self.biases -= learning_rate * self.d_biases
        
        return d_inputs

In [51]:
class BatchNormalization:
    def __init__(self, output_size, momentum=0.9, epsilon=1e-8):
        # Parameters for scaling and shifting
        self.gamma = np.ones((1, output_size))  # Initialize scaling to 1
        self.beta = np.zeros((1, output_size))  # Initialize shifting to 0
        self.momentum = momentum
        self.epsilon = epsilon

        # Running mean and variance for inference (test time)
        self.running_mean = np.zeros((1, output_size))
        self.running_var = np.ones((1, output_size))

    def forward(self, inputs, training=True):
        if training:
            # Compute mean and variance for the current batch
            self.batch_mean = np.mean(inputs, axis=0, keepdims=True)
            self.batch_var = np.var(inputs, axis=0, keepdims=True)

            # Normalize the inputs
            self.x_normalized = (inputs - self.batch_mean) / np.sqrt(self.batch_var + self.epsilon)

            # Scale and shift
            self.out = self.gamma * self.x_normalized + self.beta

            # Update running statistics for inference
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * self.batch_mean
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * self.batch_var
        else:
            # Use running mean and variance at test time
            self.x_normalized = (inputs - self.running_mean) / np.sqrt(self.running_var + self.epsilon)
            self.out = self.gamma * self.x_normalized + self.beta
        return self.out

    def backward(self, d_out, learning_rate=0.01):
        # Backpropagation through batch normalization
        batch_size = d_out.shape[0]

        # Gradients for gamma and beta
        self.d_gamma = np.sum(d_out * self.x_normalized, axis=0, keepdims=True)
        self.d_beta = np.sum(d_out, axis=0, keepdims=True)

        # Gradients for the normalized input
        dx_normalized = d_out * self.gamma

        # Gradients for variance
        d_variance = np.sum(dx_normalized * (self.x_normalized * -0.5) * (self.batch_var + self.epsilon)**-1.5, axis=0, keepdims=True)

        # Gradients for mean
        d_mean = np.sum(dx_normalized * -1 / np.sqrt(self.batch_var + self.epsilon), axis=0, keepdims=True) + d_variance * np.mean(-2 * (self.out - self.batch_mean), axis=0)

        # Gradients for inputs
        d_inputs = dx_normalized / np.sqrt(self.batch_var + self.epsilon) + (d_variance * 2 * (self.out - self.batch_mean) / batch_size) + (d_mean / batch_size)

        # Update gamma and beta
        self.gamma -= learning_rate * self.d_gamma
        self.beta -= learning_rate * self.d_beta

        return d_inputs

In [52]:
class ReLU:
    def forward(self, x):
        # Save input for backward pass
        self.x = x
        # Apply ReLU
        return np.maximum(0, x)

    def backward(self, d_output):
        # Gradient of ReLU is 1 for positive x, 0 for negative x
        d_input = d_output * (self.x > 0)
        return d_input

In [53]:
class Dropout:
    def __init__(self, dropout_rate=0.5):
        self.dropout_rate = dropout_rate

    def forward(self, inputs, training=True):
        if training:
            # Create a mask with the same shape as inputs, with `1` for neurons kept, `0` for dropped
            self.mask = (np.random.rand(*inputs.shape) > self.dropout_rate).astype(float)
            # Scale the mask to keep expected value consistent
            return inputs * self.mask / (1 - self.dropout_rate)
        else:
            # During inference, scale down the output by (1 - dropout_rate)
            return inputs

    def backward(self, d_output):
        # Apply the dropout mask to the gradient, ensuring only active neurons propagate gradients
        return d_output * self.mask / (1 - self.dropout_rate)

In [54]:
class AdamOptimizer:
    def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.learning_rate = learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.m = None  # First moment vector
        self.v = None  # Second moment vector
        self.t = 0     # Time step for bias correction

    def update(self, params, grads):
        # Initialize m and v as zeros with the same shape as parameters
        if self.m is None:
            self.m = [np.zeros_like(param) for param in params]
            self.v = [np.zeros_like(param) for param in params]

        # Increment time step
        self.t += 1

        # Update parameters
        for i in range(len(params)):
            # Update biased first moment estimate
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grads[i]
            # Update biased second moment estimate
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (grads[i] ** 2)

            # Correct bias in first moment
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            # Correct bias in second moment
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)

            # Update parameters
            params[i] -= self.learning_rate * m_hat / (np.sqrt(v_hat) + self.epsilon)

In [55]:
def softmax(logits):
    exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))  # stability trick
    return exp_logits / np.sum(exp_logits, axis=1, keepdims=True)

def cross_entropy_loss(predictions, labels):
    one_hot_labels = np.zeros_like(predictions)
    one_hot_labels[np.arange(len(labels)), labels] = 1
    probs = softmax(predictions)
    loss = -np.mean(np.sum(one_hot_labels * np.log(probs + 1e-8), axis=1))  # Add epsilon to avoid log(0)
    return loss

def cross_entropy_gradient(predictions, labels):
    one_hot_labels = np.zeros_like(predictions)
    one_hot_labels[np.arange(len(labels)), labels] = 1
    probs = softmax(predictions)
    return (probs - one_hot_labels) / len(labels)

In [56]:
class NeuralNetwork:
    def __init__(self):
        self.layers = [
            DenseLayer(28*28, 128),
            BatchNormalization(128),
            ReLU(),
            Dropout(),
            DenseLayer(128, 32),
            BatchNormalization(32),
            ReLU(),
            Dropout(),
            DenseLayer(32, 10)
        ]

    def forward(self, X):
        for layer in self.layers:
            X = layer.forward(X)
        return X
    
    def predict(self, X):
        logits = self.forward(X)
        probabilities = softmax(logits)
        return np.argmax(probabilities, axis=1)


In [57]:
def train(network, train_loader, epochs=10, learning_rate=0.001):
    for epoch in range(epochs):
        losses = []
        for images, labels in tqdm(train_loader):
            # Flatten images and prepare labels
            images = images.view(-1, 28*28).numpy()
            labels = labels.numpy()

            # Forward pass
            predictions = network.forward(images)
            # loss = cross_entropy_loss(predictions, labels)
            # losses.append(loss)

            # Backward pass
            grad_output = cross_entropy_gradient(predictions, labels)
            for layer in reversed(network.layers):
                if isinstance(layer, DenseLayer):
                    grad_output = layer.backward(grad_output, learning_rate)
                else:
                    grad_output = layer.backward(grad_output)

        print(f"Epoch {epoch+1}, Loss: {np.mean(losses)}")

In [None]:
from sklearn.metrics import f1_score

# Instantiate softmax, cross-entropy loss, and metrics tracking
softmax = Softmax()
cross_entropy_loss = CrossEntropyLoss()

# Placeholder lists to store metrics over epochs
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []
val_macro_f1_scores = []

# Training loop
for epoch in range(epochs):
    # Training phase
    model.train()
    train_loss, correct_train, total_train = 0, 0, 0
    
    for inputs, targets in train_loader:
        # Forward pass
        logits = model.forward(inputs)        # Pass inputs through the network
        predictions = softmax.forward(logits) # Apply softmax to get probabilities
        
        # Compute loss and accumulate it
        loss = cross_entropy_loss.forward(predictions, targets)
        train_loss += loss
        
        # Backward pass
        d_loss = cross_entropy_loss.backward(predictions, targets)
        model.backward(d_loss)
        
        # Calculate accuracy
        predicted_classes = np.argmax(predictions, axis=1)
        correct_train += (predicted_classes == targets).sum()
        total_train += targets.shape[0]
    
    # Calculate average training loss and accuracy for the epoch
    avg_train_loss = train_loss / len(train_loader)
    train_accuracy = correct_train / total_train
    train_losses.append(avg_train_loss)
    train_accuracies.append(train_accuracy)
    
    # Validation phase
    model.eval()
    val_loss, correct_val, total_val = 0, 0, 0
    all_pred, all_true = [], []
    
    for inputs, targets in val_loader:
        # Forward pass
        logits = model.forward(inputs)
        predictions = softmax.forward(logits)
        
        # Compute loss and accumulate it
        loss = cross_entropy_loss.forward(predictions, targets)
        val_loss += loss
        
        # Accuracy calculations
        predicted_classes = np.argmax(predictions, axis=1)
        correct_val += (predicted_classes == targets).sum()
        total_val += targets.shape[0]
        
        # Store predictions and targets for F1 score calculation
        all_pred.extend(predicted_classes)
        all_true.extend(targets)
    
    # Calculate average validation loss, accuracy, and macro F1 score
    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = correct_val / total_val
    macro_f1 = f1_score(all_true, all_pred, average='macro')
    
    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)
    val_macro_f1_scores.append(macro_f1)
    
    # Print metrics for the current epoch
    print(f"Epoch {epoch + 1}/{epochs}")
    print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
    print(f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val Macro-F1: {macro_f1:.4f}")


In [58]:
# Transform and load dataset
from torch.utils.data import DataLoader


transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
train_data = datasets.FashionMNIST(root="data", train=True, download=True, transform=transform)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)

# Initialize and train the network
network = NeuralNetwork()
train(network, train_loader, epochs=10, learning_rate=0.001)


100%|██████████| 938/938 [00:13<00:00, 67.41it/s]


Epoch 1, Loss: nan


100%|██████████| 938/938 [00:13<00:00, 70.79it/s]


Epoch 2, Loss: nan


100%|██████████| 938/938 [00:13<00:00, 70.53it/s]


Epoch 3, Loss: nan


100%|██████████| 938/938 [00:13<00:00, 68.62it/s]


Epoch 4, Loss: nan


100%|██████████| 938/938 [00:13<00:00, 67.24it/s]


Epoch 5, Loss: nan


100%|██████████| 938/938 [00:14<00:00, 65.14it/s]


Epoch 6, Loss: nan


100%|██████████| 938/938 [00:14<00:00, 65.44it/s]


Epoch 7, Loss: nan


100%|██████████| 938/938 [00:14<00:00, 65.30it/s]


Epoch 8, Loss: nan


100%|██████████| 938/938 [00:14<00:00, 65.05it/s]


Epoch 9, Loss: nan


100%|██████████| 938/938 [00:14<00:00, 62.98it/s]

Epoch 10, Loss: nan





In [59]:
def evaluate(network, data_loader):
    correct_predictions = 0
    total_predictions = 0

    for images, labels in data_loader:
        images = images.view(-1, 28*28).numpy()
        labels = labels.numpy()

        # Get predictions
        predicted_labels = network.predict(images)
        correct_predictions += np.sum(predicted_labels == labels)
        total_predictions += len(labels)

    accuracy = correct_predictions / total_predictions
    print(f"Accuracy: {accuracy:.4f}")


In [60]:
test_data = datasets.FashionMNIST(root="data", train=False, download=True, transform=transform)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)
evaluate(network, test_loader)

Accuracy: 0.1416
