# Self Organizing Map
Implementation of a Self Organizing Map (SOM) in python. This program and documentation was written with the help of the AI tool [Github Copilot](https://github.com/features/copilot).

The SOM is a type of artificial neural network that is trained using unsupervised learning to produce a low-dimensional representation of the input space of the training samples. The model typically consists of a two-dimensional grid of nodes, where each node is associated with a weight vector of the same dimension as the input vectors. The nodes are arranged in a hexagonal or rectangular grid, and are usually initialized randomly. The training consists of two main steps: competition and cooperation. In the competition step, the node with the weight vector that is most similar to the input vector is selected as the winner. In the cooperation step, the weights of the winning node and its neighbors are updated to move closer to the input vector. The training is repeated for a number of epochs, and the weights of the nodes converge to represent the input space.

### Define model parameters
The program is setup up to allow easy configuration. The following options can be adjusted:

In [None]:
# Hyperparameters
RATIO = 0.7 # ratio with which the data is split into training and testing data
MAP_WIDTH = 10 # width of the SOM map
MAP_HEIGHT = 10 # height of the SOM map
ITERATIONS = 10000 # number of iterations

NEIGHBORHOOD_FUNCTION = 'bubble' # neighborhood function, can be 'gaussian' or 'bubble'

### Load and pre-process the dataset
Dataset is retrieved from a file and pre-processed.  
To allow for better visualization of the results, a two-dimensional dataset is used. It is an artificial dataset with two features and three classes, all distinctly gropued into three clusters, with minor outliers in each class. The dataset is generated using a [open-source cluster painting tool](https://www.joonas.io/cluster-paint/).
1. Load the dataset from a file, separate the features and labels.

In [None]:
dataset_path = 'datasets/colors.csv' # path to the dataset file

def load_dataset(dataset_path):
    with open(dataset_path) as f:
        inputs = []
        labels = []
        
        # read the input file line by line
        for line in f:
            split_line = line.split(',')
            
            n_features = len(split_line) - 1
            
            # save the first n-1 values to an entry
            input_entry = [float(split_line[i]) for i in range(0, n_features)]
            inputs.append(input_entry)
            
            # save last value to labels
            labels.append(float(split_line[-1]))
            
    return inputs, labels, n_features, len(set(labels))

inputs, labels, n_features, n_classes = load_dataset(dataset_path)

2. Apply normalization to the features to scale them to the range [0, 1].

In [None]:
import numpy as np

from sklearn.preprocessing import MinMaxScaler

def normalize_inputs(inputs):
    # Create an instance of MinMaxScaler
    scaler = MinMaxScaler()

    # Fit the scaler to the features and transform the features
    return scaler.fit_transform(inputs)

inputs = np.array(normalize_inputs(inputs))

As the dataset only has two freature, we can review it by plotting the different classes in separate graphs.

In [None]:
import matplotlib.pyplot as plt

plt.title('Data distribution')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.scatter(inputs[:,0], inputs[:,1], c=labels, cmap='viridis')
plt.show()

fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15,5))

class_1 = [inputs[i] for i in range(len(inputs)) if labels[i] == 0]
ax1.set_title('Class 1')
ax1.set_xlim([0, 1])
ax1.set_ylim([0, 1])
ax1.scatter([x[0] for x in class_1], [x[1] for x in class_1], color='red')

class_2 = [inputs[i] for i in range(len(inputs)) if labels[i] == 1]
ax2.set_title('Class 2')
ax2.set_xlim([0, 1])
ax2.set_ylim([0, 1])
ax2.scatter([x[0] for x in class_2], [x[1] for x in class_2], color='g')

class_3 = [inputs[i] for i in range(len(inputs)) if labels[i] == 2]
ax3.set_title('Class 3')
ax3.set_xlim([0, 1])
ax3.set_ylim([0, 1])
ax3.scatter([x[0] for x in class_3], [x[1] for x in class_3], color='blue')

fig.tight_layout()
plt.show()

3. Shuffle the dataset and split it into training and testing sets.

In [None]:
from random import shuffle
    
def shuffle_data(inputs, labels):
    zipped = list(zip(inputs, labels))
    shuffle(zipped)
    inputs, labels = zip(*zipped)
    
    return np.array(inputs), np.array(labels)
    
def split_data(inputs, labels, ratio):
    # split lists into training and validation data
    # convert lists to numpy arrays
    ratio_index = int(len(inputs)*ratio)
    train_inputs = np.array(inputs[:ratio_index])
    train_labels = np.array(labels[:ratio_index])
    val_inputs = np.array(inputs[ratio_index:])
    val_labels = np.array(labels[ratio_index:])
    
    return train_inputs, train_labels, val_inputs, val_labels
    
inputs, labels = shuffle_data(inputs, labels)

train_inputs, train_labels, val_inputs, val_labels = split_data(inputs, labels, RATIO)

In [None]:
print(f'Training data length: {len(train_inputs)}')
print(f'Validation data length: {len(val_inputs)}')
print(f'Data sample: \n\t{train_inputs[0]} => {train_labels[0]}')

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10,5))

ax1.set_title('Training data')
ax1.set_xlim([0, 1])
ax1.set_ylim([0, 1])
ax1.scatter(train_inputs[:, 0], train_inputs[:, 1], c=train_labels)

ax2.set_title('Validation data')
ax2.set_xlim([0, 1])
ax2.set_ylim([0, 1])
ax2.scatter(val_inputs[:, 0], val_inputs[:, 1], c=val_labels)

fig.tight_layout()
plt.show()

#### Helper functions
Several helper functions are defined for the more complex arithmetic operations.

In [None]:
def euclidean_distance(a, b):
    """
    Calculates the Euclidean distance between two vectors.

    Parameters:
    a (numpy.ndarray): The first vector.
    b (numpy.ndarray): The second vector.

    Returns:
    float: The Euclidean distance between the two vectors.
    """
    return np.linalg.norm(a - b)

In [None]:
def manhattan_distance(a, b):
    """
    Calculates the Manhattan distance between two points in a multi-dimensional space.

    Parameters:    Calculates the Manhattan distance between two points.

    Parameters:
    a (tuple): The coordinates of the first point in the form (x, y).
    b (tuple): The coordinates of the second point in the form (x, y).

    Returns:
    int: The Manhattan distance between the two points.
    """
    return max(abs(a[0] - b[0]), abs(a[1] - b[1]))

In [None]:
def learning_rate(iteration, max_iterations):
    """
    Calculates the learning rate for the current iteration.

    Parameters:
    iteration (int): The current iteration.
    max_iterations (int): The total number of iterations.

    Returns:
    float: The learning rate for the current iteration.
    """
    return (1 - iteration / max_iterations)

In [None]:
def gaussian_neighborhood(learning_rate, radius, distance):
    """
    Calculates the Gaussian neighborhood function.

    Parameters:
    learning_rate (float): The current learning rate.
    radius (float): The current radius of the neighborhood.
    distance (float): The distance between the neuron and the BMU.

    Returns:
    float: The Gaussian neighborhood value.
    """
    if radius == 0:
        return 1.0
    return learning_rate * np.exp(-distance / (2 * (radius ** 2)))

In [None]:
from math import ceil

def bubble_neighborhood(learning_rate, radius, max_radius):
    """
    Calculates the Bubble neighborhood function.

    Parameters:
    learning_rate (float): The current learning rate.
    max_radius (float): The maximum radius of the neighborhood.

    Returns:
    float: The Bubble neighborhood value.
    """
    return learning_rate if radius <= ceil(learning_rate * max_radius) else 0

### Implement the SOM class
The SOM class contains the main methods for training the model and predicting the class labels of the samples. The class is initialized with the model parameters and the dataset, and contains the following methods:

1. `train()`: Trains the model using the training set.

2. `predict()`: Predicts the class labels of the samples in the testing set.

In [None]:
from collections import Counter

class SOM:
    # Initialize the SOM with random neruon weights
    def __init__(self, map_width, map_height, input_dim):
        self._map_width = map_width
        self._map_height = map_height
        self._neurons = np.random.rand(map_width, map_height, input_dim)
        
    def _find_winning_neuron(self, input):
        winning_neuron = None
        best_distance = float('inf')
        
        for i in range(self._map_width):
            for j in range(self._map_height):
                distance = euclidean_distance(input, self._neurons[i][j])
                if distance < best_distance:
                    best_distance = distance
                    winning_neuron = (i, j)
        
        return winning_neuron
    
    def _find_second_best_neuron(self, input):
        best_distance = float('inf')
        second_best_distance = float('inf')
        winning_neuron = None
        second_best_neuron = None
        
        for i in range(self._map_width):
            for j in range(self._map_height):
                distance = euclidean_distance(input, self._neurons[i][j])
                if distance < best_distance:
                    second_best_distance = best_distance
                    best_distance = distance
                    second_best_neuron = winning_neuron
                    winning_neuron = (i, j)
                elif distance < second_best_distance:
                    second_best_distance = distance
                    second_best_neuron = (i, j)
        
        return second_best_neuron
    
    def _update_som(self, input, winner, curr_iteration, max_iterations):
        for i in range(self._map_width):
            for j in range(self._map_height):                
                neighborhood = 0
                if NEIGHBORHOOD_FUNCTION == 'gaussian':
                    neighborhood = gaussian_neighborhood(
                        learning_rate(curr_iteration, max_iterations),
                        manhattan_distance(winner, (i, j)), 
                        euclidean_distance(input, self._neurons[i][j]))
                elif NEIGHBORHOOD_FUNCTION == 'bubble':
                    neighborhood = bubble_neighborhood(
                        learning_rate(curr_iteration, max_iterations),
                        manhattan_distance(winner, (i, j)), 
                        max(self._map_width, self._map_height))
                else:
                    raise ValueError('Invalid neighborhood function')
                
                delta = neighborhood * (input - self._neurons[i][j])
                self._neurons[i][j] += delta
              
    def _run_iteration(self, inputs, curr_iteration, max_iterations):
        for input in inputs:
            winner = self._find_winning_neuron(input)
            self._update_som(input, winner, curr_iteration, max_iterations) 
                         
    def train(self, inputs, labels, max_iterations):
        """
        Trains the self-organizing map (SOM) with the given inputs for the specified number of iterations.

        Parameters:
        - inputs (numpy.ndarray): The input data to train the SOM.
        - max_iterations (int): The number of iterations to train the SOM.
        """       
        quantization_errors = []
        topographic_errors = []
        
        for i in range(max_iterations):
            q_error, t_error = self._calculate_errors(inputs)
            quantization_errors.append(q_error)
            topographic_errors.append(t_error)
            
            if (i + 1) % (max_iterations / 10) == 0:
                print(f'Iteration {i + 1}/{max_iterations}')
                print(f'Quantization error: {q_error}')
                print(f'Topographic error: {t_error}')
            
            self._run_iteration(inputs, i, max_iterations)
            
        # Plot the quantization and topographic errors
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10,5))
        
        ax1.set_title('Quantization Error')
        ax1.plot(quantization_errors)
        
        ax2.set_title('Topographic Error')
        ax2.plot(topographic_errors)
        
        fig.tight_layout()
        plt.show()
        
        # Plot the data distribution and the SOM neuron weights
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10,5))
        
        ax1.set_title('Data distribution')
        ax1.set_xlim([0, 1])
        ax1.set_ylim([0, 1])
        ax1.scatter(inputs[:,0], inputs[:,1], c=labels, cmap='viridis')
        
        weights = []
        for i in range(self._map_width):
            for j in range(self._map_height):
                neuron = self._neurons[i][j]
                weights.append(list(neuron))
        
        weights = np.array(weights)
        
        ax2.set_title('SOM Neuron Weights')
        ax2.set_xlim([0, 1])
        ax2.set_ylim([0, 1])
        ax2.scatter(weights[:, 0], weights[:, 1])
        
        fig.tight_layout()
        plt.show()
        
    def _calculate_errors(self, inputs):
        q_error = 0
        t_error = 0
        
        for input in inputs:
            winner = self._find_winning_neuron(input)
            second_best = self._find_second_best_neuron(input)
            
            q_error += euclidean_distance(input, self._neurons[winner[0]][winner[1]])
            
            if manhattan_distance(winner, second_best) > 1:
                t_error += 1
            
        return q_error / len(inputs), t_error / len(inputs)
        
    def predict(self, inputs, labels):
        prediction_table = np.zeros((self._map_width, self._map_height, n_classes))
        
        # For each input, find the winning neuron and increment the counter for the corresponding label
        for i, input in enumerate(inputs):
            winner = self._find_winning_neuron(input)
            prediction_table[winner[0]][winner[1]][int(labels[i])] += 1
        
        fig, axs = plt.subplots(1, n_classes, figsize=(5 * n_classes,5))
        
        for i in range(n_classes):
            values = prediction_table[:, :, i]
            values = values / np.max(values)
            values.reshape((self._map_width, self._map_height))
            
            axs[i].set_title(f'Class {i}')
            axs[i].imshow(values, cmap='Purples', interpolation='nearest')
            
        fig.tight_layout()
        plt.show()
        
        self._plot_u_matrix(prediction_table)
    
    def _plot_u_matrix(self, prediction_table):
        u_matrix = np.zeros((self._map_width, self._map_height))
        
        # List of all possible directions to neighboring neurons
        neighbor_directions = [(0, 1), (1, 0), (0, -1), (-1, 0), (1, 1), (1, -1), (-1, 1), (-1, -1)]
        
        for i in range(self._map_width):
            for j in range(self._map_height):
                for direction in neighbor_directions:
                    if 0 <= i + direction[0] < self._map_width and 0 <= j + direction[1] < self._map_height:
                        u_matrix[i][j] += euclidean_distance(self._neurons[i][j], self._neurons[i + direction[0]][j + direction[1]])
                u_matrix[i][j] /= len(neighbor_directions)
        
        plt.title('U-Matrix')
        plt.imshow(u_matrix, cmap='Greys', interpolation='nearest')
        
        for i in range(self._map_width):
            for j in range(self._map_height):
                plt.text(j, i, prediction_table[i][j].argmax(),
                    ha="center", va="center", color="w")
        
        plt.show()

We can now create and train the SOM model. The epoch count is configurable. Quanitization error is calculated after each epoch to evaluate the model performance.

In [None]:
som = SOM(MAP_WIDTH, MAP_HEIGHT, len(train_inputs[0]))
som.train(train_inputs, train_labels, ITERATIONS)

Next, we pass in new, validation, data to the model and plot the results. For each input entry, we find the best matching unit (BMU), then increase the counter for the BMU's class. Finally, we plot the results in three separate graphs, one for each class. Additionally, we plot the U-matrix, which is a visualization of the distance between the nodes in the SOM grid. It helps to visualize the topology of the input space and the relationships between the nodes. The U-matrix is labeled with the class labels of the samples.

In [None]:
som.predict(val_inputs, val_labels)

### Test with different dataset
To test the functionality of the SOM, the same process is repeated with a different dataset.
In this example, the dataset is the [Iris dataset](https://archive.ics.uci.edu/dataset/53/iris), which is a popular dataset for testing machine learning algorithms. It contains 150 samples of iris flowers, each with 4 features: sepal length, sepal width, petal length, and petal width. The goal is to cluster the samples into 3 classes based on the features.

In [None]:
inputs, labels, n_features, n_classes = load_dataset('datasets/iris.csv')
inputs, labels = shuffle_data(np.array(normalize_inputs(inputs)), labels)
train_inputs, train_labels, val_inputs, val_labels = split_data(inputs, labels, RATIO)

som = SOM(MAP_WIDTH, MAP_HEIGHT, len(train_inputs[0]))
som.train(train_inputs, train_labels, ITERATIONS)
som.predict(val_inputs, val_labels)