# Multi-Layer Perceptron for a toy two-class problem in MLX

Markus Enzweiler, markus.enzweiler@hs-esslingen.de

This is a demo used in a Computer Vision & Machine Learning lecture. Feel free to use and contribute.

We build and train a multi-layer perceptron (MLP) for a two-class classification problem with a *single* neuron in its output layer. The MLP will output values from 0-1 and we can use a threshold of 0.5 to determine the class label.

We will also train a single perceptron for comparison. 




**Note: This requires a machine with an Apple SoC, e.g. M1/M2/M3 etc.**

See: https://github.com/ml-explore/mlx

## Setup

Adapt `packagePath` to point to the directory containing this notebeook.

In [None]:
# Imports
import sys
import os

In [None]:
# Additional imports

# Repository Root
repo_root = os.path.abspath(os.path.join("..", ".."))
# Add the repository root to the system path
sys.path.append(repo_root)

# Package Imports
from nbutils import requirements as nb_reqs
from nbutils import colab as nb_clab
from nbutils import git as nb_git
from nbutils import exec as nb_exec

In [None]:
# Package Path
package_path = "./" # local
print(f"Package path: {package_path}")

In [None]:
# Additional requirements for this notebook
req_file = os.path.join(package_path, "requirements.txt")
nb_reqs.pip_install_reqs(req_file)    

In [None]:
# Now we should be able to import the additional packages
import numpy as np
import matplotlib.pyplot as plt
import mlx
import mlx.core as mx
import mlx.nn as nn
import mlx.optimizers as optim

# Set the random seed for reproducibility
np.random.seed(42)
mx.random.seed(42)


## Create the training and validation data

In [None]:
# Number of samples per class
n_samples = 1000

# Generate random data for class 1
class_0 = mx.concatenate([
    mx.random.normal((n_samples//4, 2)) + mx.array([2, 0]),
    mx.random.normal((n_samples//2, 2)) + mx.array([0, 2]),
    mx.random.normal((n_samples//4, 2)) + mx.array([2, 5])
    ])


# Generate random data for class 2
class_1 = mx.random.normal((n_samples, 2)) + mx.array([4, 3])

# Labels for the classes
labels_0 = mx.zeros((n_samples, 1))
labels_1 = mx.ones ((n_samples, 1))

# Combine the data and labels
data   = mx.concatenate([class_0, class_1],   axis=0)
labels = mx.concatenate([labels_0, labels_1], axis=0)

# Plotting the data
plt.scatter(class_0[:, 0], class_0[:, 1], color='red', label='Class 0')
plt.scatter(class_1[:, 0], class_1[:, 1], color='blue',  label='Class 1')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('Two-dimensional Dataset with Two Classes')
plt.legend()
plt.show()

In [None]:
# Split the data into training and validation data sets
# We use NumPy and convert later into MLX arrays. Must be a better way in MLX, maybe with
# the upcoming mlx-data package.

# Combine data and labels into a dataset
dataset = np.column_stack((data, labels))

# Shuffle the dataset
np.random.shuffle(dataset)

# Split the data and labels back from the combined dataset
data, labels = dataset[:, :-1], dataset[:, -1]

# Define the size of the training and validation sets
train_size = int(0.8 * len(dataset))    # 80% for training
val_size   = len(dataset) - train_size  # 20% for validation

# Split the dataset into training and validation sets
train_data   = mx.array(data[:train_size])
train_labels = mx.array(labels[:train_size])

val_data   = mx.array(data[train_size:])
val_labels = mx.array(labels[train_size:])


# Define the Multi-Layer Perceptron (MLP) and a single Perceptron

## Single perceptron class

In [None]:
class Perceptron(nn.Module):
    # override constructor from nn.Module
    def __init__(self, num_inputs):
        super().__init__() ## call constructor of nn.Module

        # we define the components of our perceptron
        # we want to have a linear layer with num_inputs inputs and one output
        self.linear = nn.Linear(input_dims=num_inputs, output_dims=1)
        self.sigmoid = mx.sigmoid

    def __call__(self, x):
        return self.forward(x)

    def forward(self, x):
        # and the computation of the forward pass
        return self.sigmoid(self.linear(x))

## MLP class

In [None]:
class MultiLayerPerceptron(nn.Module):
    # override constructor from nn.Module
    def __init__(self, num_inputs, num_hidden_layer_neurons=2):
        super().__init__() ## call constructor of nn.Module

        # layer 1 defines the transformation from input to hidden layer
        self.layer1 = nn.Linear(input_dims=num_inputs, output_dims=num_hidden_layer_neurons)
        # layer 2 defines the transformation from hidden layer to output
        self.layer2 = nn.Linear(input_dims=num_hidden_layer_neurons, output_dims=1)
        self.sigmoid = mx.sigmoid

    def __call__(self, x):
        return self.forward(x)

    def forward(self, x):
        # x (input) -> hidden layer -> sigmoid -> output layer -> sigmoid
        x = self.sigmoid(self.layer1(x))
        x = self.sigmoid(self.layer2(x))
        return x        

    

# Training with gradient descent

## Training and testing functions

In [None]:
# Training function 
def train(model, X, Y, optimizer, loss_and_grad_fn, num_epochs):

    #  Loop over epochs
    for epoch in range(num_epochs):

        # Reset accumulated loss per epoch
        acc_loss = 0

        # Loop over all training data
        for i in range(len(X)):          
        
            # forward and backward pass
            loss, gradients = loss_and_grad_fn(model, X[i], Y[i].reshape(1,))     
            acc_loss += loss

            # Update the model with the gradients. So far no computation has happened.
            optimizer.update(model, gradients)

            # Compute the new parameters and also the new optimizer state.
            mx.eval(model.parameters(), optimizer.state)
        

        # Print accumulated average loss per epoch once in a while
        if (epoch % (num_epochs//10)) == 0 or epoch == num_epochs - 1:     
            print(f"Epoch {epoch:5d}: loss = {mx.mean(acc_loss).item():.5f}")

In [None]:
# Testing function

# The model will output values from 0-1.
# We can use a threshold of 0.5 to determine the class label.

def test(model, X, Y):
    # test the model on all data points
    print("Testing ...")

    num_correct = 0

    for i in range(len(X)): 
        x = X[i]
        y = Y[i].reshape(1,)

        prediction = model(x)

        class_label = mx.where(prediction < 0.5, mx.array(0), mx.array(1))
        print(f"{x} -> {class_label} ({prediction.item():.3f}) (label: {y})")

        if class_label == y:
            num_correct += 1
    
    # Print accuracy
    print(f"Accuracy: {num_correct}/{val_size} = {100 * num_correct/val_size:.2f}%")
    

In [None]:
# Visualization

import matplotlib.cm as cm
import matplotlib.gridspec as gridspec


def show_decision_boundary(model, data, labels, subplot_spec=None):

    data   = np.array(data)
    labels = np.array(labels)

    wratio = (15, 1)
    if subplot_spec is None:
        gs = gridspec.GridSpec(1, 2, width_ratios=wratio)
    else:
        gs = gridspec.GridSpecFromSubplotSpec(1, 2, subplot_spec=subplot_spec, width_ratios=wratio)
        
    ax = plt.subplot(gs[0])
    ax.set_title('Dataset and Decision Function')
    
    x_min, x_max = data[:, 0].min() - 1, data[:, 0].max() + 1
    y_min, y_max = data[:, 1].min() - 1, data[:, 1].max() + 1
    h = 0.01  # Reduced step size for higher resolution
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))

    Z = model(mx.array(np.c_[xx.ravel(), yy.ravel()], dtype=mx.float32))
    Z = Z.reshape(xx.shape)
    

    # Increase the number of levels for smoother color transitions
    levels = np.linspace(0, 1, 100)
    ctr = ax.contourf(xx, yy, np.array(Z), levels, cmap=cm.gray, vmin=0, vmax=1)
    
    unique_labels = np.unique(labels)

    # Define colors for each class
    colors = ['red', 'blue']
    for i, yi in enumerate(unique_labels):
        color = colors[i]
        ax.scatter(data[np.where(labels.flatten() == yi), 0], data[np.where(labels.flatten() == yi), 1], 
                   color=color, linewidth=0, label='Class %d (y=%d)' % (yi, yi))
    ax.legend()
    ax.set_xlim((x_min, x_max))
    ax.set_ylim((y_min, y_max))

    # Create colorbar
    cbar = plt.colorbar(ctr, cax=plt.subplot(gs[1]))
    cbar.set_ticks(np.arange(0, 1.1, 0.1))  # Set ticks from 0 to 1 with 0.1 increments
    cbar.set_label('Decision value')

## Train and test the single perceptron

In [None]:
# Train the perceptron model

# The model to train
model = Perceptron(num_inputs=2)
# Evaluate because mlx uses lazy evaluation
mx.eval(model.parameters())

# Hyperparameters
num_epochs = 50
eta = 0.01

# Loss function
def loss_fn(model, X, y):  
    return nn.losses.mse_loss((model(X)), y)

# Create the gradient function
loss_and_grad_fn = nn.value_and_grad(model, loss_fn)

# Stochastic gradient descent (SGD) optimizer
optimizer = optim.SGD(learning_rate=eta)

# Train the model
train(model, train_data, train_labels, optimizer, loss_and_grad_fn, num_epochs)

In [None]:
# Test the model
test(model, val_data, val_labels)

In [None]:
# Visualize the decision boundary
show_decision_boundary(model, data, labels)

## Train and test the MLP

In [None]:
# Train the MLP model

# The model to train
model = MultiLayerPerceptron(num_inputs=2)
# Evaluate because mlx uses lazy evaluation
mx.eval(model.parameters())

# Hyperparameters
num_epochs = 50
eta = 0.01

# Loss function
def loss_fn(model, X, y):  
    return nn.losses.mse_loss((model(X)), y)

# Create the gradient function
loss_and_grad_fn = nn.value_and_grad(model, loss_fn)

# Stochastic gradient descent (SGD) optimizer
optimizer = optim.SGD(learning_rate=eta)

# Train the model
train(model, train_data, train_labels, optimizer, loss_and_grad_fn, num_epochs)

In [None]:
# Test the model
test(model, val_data, val_labels)

In [None]:
# Visualize the decision boundary
show_decision_boundary(model, data, labels)