In [6]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from mpl_toolkits.mplot3d import Axes3D
import math
from matplotlib.colors import ListedColormap
from matplotlib.cm import get_cmap
from collections import defaultdict
from sklearn.metrics import completeness_score, homogeneity_score, v_measure_score, adjusted_rand_score, adjusted_mutual_info_score

In [11]:
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
from sklearn.datasets import fetch_openml

class KohonenNetwork:
    def __init__(self, M, N, input_dim, initial_sigma=1.0, sigma_decay=1.0, init_lr=0.1, decay_rate=1, neighborhood_function='gaussian'):
        self.M, self.N, self.input_dim = M, N, input_dim
        self.initial_sigma, self.sigma_decay = initial_sigma, sigma_decay
        self.init_lr, self.decay_rate = init_lr, decay_rate
        self.neighborhood_function = neighborhood_function
        self.weights = np.random.rand(M, N, input_dim)
        self.bmu_cluster_map = np.zeros((M, N), dtype=int)

    def load_mnist_data(self):
        X, y = fetch_openml('mnist_784', version=1, return_X_y=True)
        X = X / 255.0
        return X, y.astype(int)

    def train(self, input_data, labels, num_epochs):
        for epoch in range(num_epochs):
            sigma = self.initial_sigma * np.exp(-epoch / self.decay_rate) * self.sigma_decay
            learning_rate = self.init_lr * np.exp(-epoch / self.decay_rate)
            idx = np.random.randint(len(input_data))
            data_sample = input_data[idx]
            label_sample = labels[idx]
            bmu = self._find_bmu_and_update_weights(data_sample, sigma, learning_rate)
            self.bmu_cluster_map[bmu] = label_sample

    def create_labels_map(self, input_data, labels):
        self.labels_map = np.zeros((self.M, self.N, 10))
        self.bmu_cluster_map = np.zeros((self.M, self.N), dtype=int)

        for i, data in enumerate(input_data):
            bmu = self._find_bmu(data)
            self.bmu_cluster_map[bmu] = labels[i]
            self.labels_map[bmu][labels[i]] += 1

        return self.labels_map

    def plot_labels_map(self, labels_map, title):
        plt.figure(figsize=(10, 10))
        for i in range(self.M):
            for j in range(self.N):
                color = labels_map[i, j] / 10.0
                plt.plot([j+0.5], [i+0.5], 's', color=plt.cm.hsv(color), markersize=12)
        plt.xlim([0, self.M])
        plt.ylim([0, self.N])
        plt.title(title)
        plt.show()

    def accuracy(self, data, labels):
        correct = 0
        total = len(data)
        
        if total == 0:
            return 0

        for sample, label in zip(data, labels):
            bmu = self._find_bmu_and_update_weights(sample, None, None)
            if self.bmu_cluster_map[bmu] == label:
                correct += 1

        return 100 * correct / total



    def _neighborhood_function(self, distance, sigma):
        # Funkcja sąsiedztwa, która oblicza wpływ odległości na aktualizację wag.
        if self.neighborhood_function == 'gaussian':
            sigma = max(sigma, 1e-10)  # add a small constant to prevent division by zero
            distance = np.clip(distance, -709, 709)  # limit the range of the input to prevent overflow
            return np.exp(-distance**2 / (2 * sigma**2))
        elif self.neighborhood_function == 'mexican_hat':
            return (1 - (distance**2 / sigma**2)) * np.exp(-distance**2 / (2 * sigma**2))
        else:
            raise ValueError(f"Nieznana funkcja sąsiedztwa: {self.neighborhood_function}")



    def _find_bmu_and_update_weights(self, sample, sigma, lr):
        # Znalezienie BMU, aktualizacja wag i zwrócenie BMU.
        distances = np.sqrt((self.weights[..., 0] - sample[0])**2 + 3/4 * (self.weights[..., 1] - sample[1])**2)  # hexagonal distance
        bmu = np.unravel_index(np.argmin(distances), distances.shape)
        
        if sigma and lr:
            x, y = np.ogrid[0:self.M, 0:self.N]
            distance = np.sqrt((x - bmu[0])**2 + 3/4 * (y - bmu[1])**2)  # hexagonal distance
            influence = self._neighborhood_function(distance, sigma)
            self.weights += lr * influence[..., np.newaxis] * (sample - self.weights)
        
        return bmu

    
    def assign_clusters(self, input_data):
        # Mapowanie klastrów na podstawie danych wejściowych.
        cluster_stats = defaultdict(list)

        for i, data_sample in enumerate(input_data):
            # Wyszukujemy BMU, przy czym nie aktualizujemy wag
            best_matching_unit = self._find_bmu_and_update_weights(data_sample, None, None)
            self.bmu_cluster_map[best_matching_unit] = i



    def display_clustered_data(self, input_data, neuron_positions=True, view_data=True):
        # Wizualizacja sklastrowanych danych wejściowych.
        assigned_clusters = np.array([self.bmu_cluster_map[self._find_bmu_and_update_weights(sample, None, None)] for sample in input_data])
        distinct_clusters = np.unique(assigned_clusters)
        color_map = plt.cm.get_cmap('viridis', len(distinct_clusters))
        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d' if input_data.shape[1] == 3 else None)
        if view_data:
            # Wyświetlanie danych wejściowych sklastrowanych
            for i, cluster in enumerate(distinct_clusters):
                cluster_mask = assigned_clusters == cluster
                if input_data.shape[1] == 3:
                    ax.scatter(input_data[cluster_mask, 0], input_data[cluster_mask, 1], input_data[cluster_mask, 2], color=color_map(i), edgecolors='k', s=50, label=f'Klaster {i}')  # Use i for cluster number
                else:
                    ax.scatter(input_data[cluster_mask, 0], input_data[cluster_mask, 1], color=color_map(i), edgecolors='k', s=50, label=f'Klaster {i}')  # Use i for cluster number

        if neuron_positions:
            # Dodanie pozycji neuronów
            neuron_positions = np.array([self.weights[i, j, :] for i in range(self.M) for j in range(self.N)])
            if input_data.shape[1] == 3:
                ax.scatter(neuron_positions[:, 0], neuron_positions[:, 1], neuron_positions[:, 2], color='red', edgecolors='k', s=100, marker='o', label='Neurony')
            else:
                ax.scatter(neuron_positions[:, 0], neuron_positions[:, 1], color='red', edgecolors='k', s=100, marker='o', label='Neurony')

        ax.set_title('3D Clustered Data' if input_data.shape[1] == 3 else '2D Clustered Data')
        ax.set_xlabel('Dim 1')
        ax.set_ylabel('Dim 2')
        if input_data.shape[1] == 3:
            ax.set_zlabel('Dim 3')
        ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.show()



In [12]:
import numpy as np
import struct
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.metrics import v_measure_score
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
# Assume KohonenNetwork is already implemented somewhere and imported correctly

def load_mnist_images(filename):
    with open(filename, 'rb') as file:
        _, num, rows, cols = struct.unpack(">IIII", file.read(16))
        images = np.fromfile(file, dtype=np.uint8).reshape(num, rows * cols)
    return images

def load_mnist_labels(filename):
    with open(filename, 'rb') as file:
        _, num = struct.unpack(">II", file.read(8))
        labels = np.fromfile(file, dtype=np.uint8)
    return labels

In [13]:
# Load MNIST data
train_images = load_mnist_images('../data/2/mnist/train-images-idx3-ubyte/train-images-idx3-ubyte')
train_labels = load_mnist_labels('../data/2/mnist/train-labels-idx1-ubyte/train-labels-idx1-ubyte')

test_images = load_mnist_images('../data/2/mnist/t10k-images-idx3-ubyte/t10k-images-idx3-ubyte')
test_labels = load_mnist_labels('../data/2/mnist/t10k-labels-idx1-ubyte/t10k-labels-idx1-ubyte')

In [14]:
# Initialize the Kohonen Network
kohonen_network = KohonenNetwork(20, 20, 784, neighborhood_function='gaussian')

# Normalize the images
train_images = train_images / 255.0
test_images = test_images / 255.0

# Train the network
kohonen_network.train(train_images, train_labels, 1000)

# Create a labels map
labels_map = kohonen_network.create_labels_map(train_images, train_labels)

# Plot the labels map
kohonen_network.plot_labels_map(labels_map, 'Self-Organizing Map with Gaussian labels')

AttributeError: 'KohonenNetwork' object has no attribute '_find_bmu'

In [15]:
class KohonenNetwork:
    def __init__(self, M, N, input_dim, sigma=1.0, learning_rate=0.3, neighborhood_function='gaussian'):
        self.M, self.N, self.input_dim = M, N, input_dim
        self.sigma, self.learning_rate = sigma, learning_rate
        self.neighborhood_function = neighborhood_function
        self.weights = np.random.rand(M, N, input_dim)  # Randomly initialize weights

    def train(self, input_data, num_iterations):
        for iteration in range(num_iterations):
            for idx in np.random.permutation(len(input_data)):
                self._train_single_sample(input_data[idx])

    def _train_single_sample(self, sample):
        bmu_idx = self._find_bmu(sample)
        self._update_weights(sample, bmu_idx)

    def _find_bmu(self, sample):
        # Find the best matching unit
        distances = np.linalg.norm(self.weights - sample, axis=2)
        return np.unravel_index(np.argmin(distances, axis=None), distances.shape)

    def _update_weights(self, sample, bmu_idx):
        # Compute the neighborhood function
        x, y = np.ogrid[0:self.M, 0:self.N]
        distance = np.sqrt((x - bmu_idx[0])**2 + (y - bmu_idx[1])**2)
        influence = self._neighborhood_function(distance, self.sigma)
        # Update weights
        self.weights += self.learning_rate * influence[..., np.newaxis] * (sample - self.weights)

    def _neighborhood_function(self, distance, sigma):
        if self.neighborhood_function == 'gaussian':
            return np.exp(-distance**2 / (2 * sigma**2))
        elif self.neighborhood_function == 'mexican_hat':
            return (1 - (distance**2 / sigma**2)) * np.exp(-distance**2 / (2 * sigma**2))
        else:
            raise ValueError("Unknown neighborhood function")

    def visualize_map(self, input_data):
        label_map = np.zeros((self.M, self.N))
        for i in range(self.M):
            for j in range(self.N):
                distances = np.linalg.norm(input_data - self.weights[i, j], axis=1)
                label_map[i, j] = np.argmin(distances)
        plt.figure(figsize=(10, 10))
        plt.imshow(label_map, cmap='viridis', interpolation='none')
        plt.colorbar()
        plt.show()


In [19]:
# Normalize the image data to [0, 1]
train_images = train_images / 255.0
test_images = test_images / 255.0

# Reshape data if necessary (here, flattening is already done by the load function)
# train_images = train_images.reshape((train_images.shape[0], -1))
# test_images = test_images.reshape((test_images.shape[0], -1))

# Initialize the Kohonen Network
kohonen = KohonenNetwork(20, 20, 784)  # Assuming 28x28 images, hence 784 input dimensions

# Train the network
kohonen.train(train_images,  100)

# Evaluate the network or use it for further analysis

KeyboardInterrupt: 

In [None]:
kohonen.visualize_map(train_images)