# Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras
import matplotlib.pyplot as plt
import gc

# Dataset
Generating 500 samples of each digit

In [None]:
# getting mnist dataset from keras
dataset_size = 5000

(x_train, y_train), _ = keras.datasets.mnist.load_data()
print(f"train shape: {x_train.shape}, {y_train.shape}")

# get 5000 of data by random
# each number should have equal distribution

# find each image group (0 to 9)
x_train_gps = [[] for x in range(10)]
for i in range(len(x_train)):
    x_train_gps[int(y_train[i])].append(i)

# get equal amount of each number
count = dataset_size // 10
x = []
for i in range(10):
    x.append(np.random.choice(x_train_gps[i], count))

# turn to numpy and shuffle
x = np.array(x)
x = x.flatten()
np.random.shuffle(x)
# normalize
kohonen_input = np.array([
  x_train[i].flatten() for i in x
]) / 255
print("dataset shape: ", kohonen_input.shape)

# divide to minibatches
batch_size = 25
batch_count = kohonen_input.shape[0] // batch_size
kohonen_minibatches = np.reshape(kohonen_input, (batch_count, -1, kohonen_input.shape[1]))

print("dataset minibatch shape: ", kohonen_minibatches.shape)

# remove extra vars
del x
del x_train
del y_train
del x_train_gps
gc.collect()

# Kohonen Implementation

In [None]:
class Kohonen:
    def __init__(self, x, y, feature_size, radius, lr, time_constant_r=1, time_constant_lr=1):
        self.x = x
        self.y = y
        self.feature_size = feature_size
        self.w = None
        self.radius = radius
        self.lr = lr
        self.nodes_n = x * y
        self.iterations_n = 0
        self.time_constant_r = time_constant_r
        self.time_constant_lr = time_constant_lr
        
        
        # step one: initialize
        self.initialize_weights()
        print("initialization complete")
        
        self.neighborhoods = self.find_all_neighborhoods()
        
    def find_all_neighborhoods(self):
        nodes_x, nodes_y = np.unravel_index(list(range(self.nodes_n)), (self.x, self.y))
        dists = []
        for i in range(self.nodes_n):
            x = nodes_x[i]
            y = nodes_y[i]
            d = np.sqrt((nodes_x - x)**2 + (nodes_y - y)**2)
            d = np.expand_dims(d, axis=1)
            gaus_func = np.exp(- np.square(d) / (2 * np.square(self.radius)))
            h = gaus_func
            dists.append(h)
        return np.array(dists).reshape((self.nodes_n, self.nodes_n))
    
    
    def visualize_map(self, x, return_fast=True):
        wmap = {}
        im = 0
        winners_idx = []
        neurons_assigned = np.zeros((self.nodes_n,2), dtype="uint32")
        for i in range(x.shape[0]):
            winn = (self.compete(x[i], return_idx_only=True))
            winners_idx.append(winn)
            for j in range(len(winn)):
                neurons_assigned[winn[j]] = [i+1, j+1]
        matched_n = (np.sum(neurons_assigned, axis=1) > 0).sum()
        print("number of matched neurons: ", matched_n)
        if (return_fast):
            return matched_n
        # drawing a subplot for each image
        length = len(x)
        fig, axes = plt.subplots(nrows=20, ncols=20, figsize=(10,10))
        for i in range(20):
            for j in range(20):
                axes[i,j].axis("off")
                img_idx = neurons_assigned[i * 20 + j]
                if (img_idx[0] == 0 and img_idx[1] == 0):
                    continue
                img_data = np.reshape(x[img_idx[0]-1][img_idx[1]-1], (28,28))
                axes[i, j].imshow(img_data, cmap="gray", aspect="auto")
                
        plt.axis([0, 20, 0,  20])
        plt.subplots_adjust(wspace=.05, hspace=.05)
        plt.show()

    def initialize_weights(self):
        """Randomize weights - 1s step"""
        self.w = np.random.random((self.nodes_n, self.feature_size))

    def euc_distance(self, inputs):
        """Calculate euclidean distance between the inputs and rows of weight"""
        input_n = len(inputs)
        repeated_inp = np.repeat(inputs, self.nodes_n, axis=0)
        inp_br = np.reshape(repeated_inp, (input_n, self.nodes_n, self.feature_size)) # input_n, nodes_n, feature_size
        # res[i]: distance of input i from each w rows
        res = np.sqrt(np.sum((self.w - inp_br) ** 2, axis=2))
        return res

    def compete(self, inputs, return_idx_only=False):
        """Find the closest to the inputs for each input - 2nd step"""
        winners_idx = np.argmin(self.euc_distance(inputs), axis=1)
        winners = np.take(self.w, winners_idx, axis=0)
        if (return_idx_only):
            return winners_idx
        return winners, winners_idx

    def cooperate(self, winners_idx):
        """Find neighbors of winners with gaussian function - 3rd step"""
        return np.expand_dims(np.take(self.neighborhoods, winners_idx, axis=0), axis=-1)

    def adapt(self, h, inputs):
        """Find dw to modify the weight - 4th step"""
        # batch
        input_len = len(inputs)
        inp_tiled = np.tile(inputs, [1,self.nodes_n]).reshape((input_len, self.nodes_n, -1))
        w_tiled = np.tile(self.w, [input_len, 1]).reshape((input_len, self.nodes_n, -1))
        dw = (self.lr * h * (inp_tiled - w_tiled))
        dw = np.sum(dw, axis=0)
        return dw

    def iteration(self, inputs):
        # find closest neurons - 2nd step
        winners, winners_idx = self.compete(inputs)
        input_n = len(inputs)
        dw = 0
        # find neighbors - 3rd step
        hs = self.cooperate(winners_idx)
        # update weights - 4rd step
        dw = self.adapt(hs, inputs)
        self.w += dw / input_n
        self.w = np.clip(self.w, 0, 1)

    def decay_r(self):
        """Decay neighborhood radius based on time_constant"""
        self.radius = self.radius * np.exp(-self.iterations_n / self.time_constant_r)

    def decay_lr(self):
        self.lr = self.lr * np.exp(-self.iterations_n / self.time_constant_lr)


    def train(self, minibatches, epochs, display, decay_radius=False, decay_lr=False):
        early_stop_patience = 0
        prev_res = 0
        for e in range(epochs):
            self.iterations_n += 1
            if (display):
                print("epoch ", e)
            # stop the training if there's no change for 10 epochs
            if (early_stop_patience >= 10):
                print("no changes for 10 epochs. stopped training.")
            gc.collect()
            # iterate through minibatched
            for batch in minibatches:
                self.iteration(batch)
            if ((decay_lr or decay_radius)):
                print("epoch ", self.iterations_n, "  radius: ", self.radius, ", lr: ", self.lr)
            # decay radius
            if (decay_radius):
                self.decay_r()
            # decay learning rate
            if (decay_lr):
                self.decay_lr()

            
            if (display):
                res = self.visualize_map(minibatches)
                if (res == prev_res):
                    early_stop_patience += 1

# Models

## 1:
* Size: 20X20
* Radius: 5
* Learning rate: 0.01
* No LR decay, No radius decay
* 50 epochs

In [None]:
k1 = Kohonen(20, 20, kohonen_input.shape[1], 5, 0.01)
k1.train(kohonen_minibatches, 10, display=False)
print("map after 10 epochs")
k1.visualize_map(kohonen_minibatches, return_fast=False)

k1.train(kohonen_minibatches, 10, display=False)
print("map after 20 epochs")
k1.visualize_map(kohonen_minibatches, return_fast=False)

k1.train(kohonen_minibatches, 10, display=False)
print("map after 30 epochs")
k1.visualize_map(kohonen_minibatches, return_fast=False)

k1.train(kohonen_minibatches, 10, display=False)
print("map after 40 epochs")
k1.visualize_map(kohonen_minibatches, return_fast=False)

k1.train(kohonen_minibatches, 10, display=False)
print("map after 50 epochs")
k1.visualize_map(kohonen_minibatches, return_fast=False)

## 2:
* Size: 20X20
* Radius: 5
* Learning rate: 0.1
* No radius decay
* LR decay with time constant of 80
* 50 epochs

In [None]:
k2 = Kohonen(20, 20, kohonen_input.shape[1], 3, 0.5, time_constant_lr=80)
k2.train(kohonen_minibatches, 10, display=False, decay_lr=True)
print("map after 10 epochs")
k2.visualize_map(kohonen_minibatches, return_fast=False)
k2.train(kohonen_minibatches, 10, display=False, decay_lr=True)
print("map after 20 epochs")
k2.visualize_map(kohonen_minibatches, return_fast=False)
k2.train(kohonen_minibatches, 10, display=False, decay_lr=True)
print("map after 30 epochs")
k2.visualize_map(kohonen_minibatches, return_fast=False)
k2.train(kohonen_minibatches, 10, display=False, decay_lr=True)
print("map after 40 epochs")
k2.visualize_map(kohonen_minibatches, return_fast=False)
k2.train(kohonen_minibatches, 10, display=False, decay_lr=True)
print("map after 50 epochs")
k2.visualize_map(kohonen_minibatches, return_fast=False)

## 3:
* Size: 20X20
* Radius: 5
* Learning rate: 0.1
* Radius decay with time constant of 120
* LR decay with time constant of 80
* 50 epochs

In [None]:
k3 = Kohonen(20, 20, kohonen_input.shape[1], 3, 10, time_constant_lr=80, time_constant_r=120)
k3.train(kohonen_minibatches, 10, display=False, decay_lr=True, decay_radius=True)
print("map after 10 epochs")
k3.visualize_map(kohonen_minibatches, return_fast=False)
k3.train(kohonen_minibatches, 10, display=False, decay_lr=True, decay_radius=True)
print("map after 20 epochs")
k3.visualize_map(kohonen_minibatches, return_fast=False)
k3.train(kohonen_minibatches, 10, display=False, decay_lr=True, decay_radius=True)
print("map after 30 epochs")
k3.visualize_map(kohonen_minibatches, return_fast=False)
k3.train(kohonen_minibatches, 10, display=False, decay_lr=True, decay_radius=True)
print("map after 40 epochs")
k3.visualize_map(kohonen_minibatches, return_fast=False)
k3.train(kohonen_minibatches, 10, display=False, decay_lr=True, decay_radius=True)
print("map after 50 epochs")
k3.visualize_map(kohonen_minibatches, return_fast=False)