In [None]:
import numpy as np

import torch
import torch.nn as nn
from torchvision import datasets
import torchvision.transforms as transforms
from torchvision.models import resnet34
from torch.utils.data import DataLoader

from sklearn.metrics import confusion_matrix, f1_score
from tqdm import tqdm

from numpy.ma.core import ceil
from scipy.spatial import distance #distance calculation
from sklearn.preprocessing import MinMaxScaler #normalisation
from sklearn.metrics import accuracy_score #scoring
import matplotlib.pyplot as plt
from matplotlib import colors

In [None]:
transform = transforms.Compose([
    transforms.Resize((128, 128)), # Resize to 224x224 (height x width)
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225])
])

In [None]:
# loading the train data
batch_size = 100
#drop_last=True
train_data = datasets.CIFAR10('data', train=True,
                              download=True, transform=transform)
train_dataloader = DataLoader(train_data, batch_size=batch_size,shuffle=True )

#loading the test data
test_data = datasets.CIFAR10('data', train=False,
                             download=True, transform=transform)
test_dataloader = DataLoader(test_data,batch_size=batch_size, shuffle=True)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

## Feature Extractor

In [None]:
feature_extractor = resnet34(pretrained=True)
num_features = feature_extractor.fc.in_features

for param in feature_extractor.parameters():
    param.requires_grad = False

feature_extractor.fc = nn.Identity()
feature_extractor.to(device)

## Helper Functions

In [None]:

# Manhattan distance
def manhattan_distance(x, y):
  return distance.cityblock(x,y)

# Euclidean distance
def euclidean_distance(x, y):
    return torch.sqrt(torch.sum((x - y) ** 2, dim=-1))


# Best Matching Unit search
def bmu_search(data, som, num_rows, num_cols):
  winner = [0,0]
  # som = som.to(device)
  # data = data.to(device)
  shortest_distance = 10e7 
  for row in range(num_rows):
    for col in range(num_cols):
      if som[row][col] != None:
        
        distance = euclidean_distance(som[row][col], data)
        if distance < shortest_distance: 
          shortest_distance = distance
          winner = [row,col]
  return winner

# Learning rate and neighbourhood range calculation
def optimizer(step, max_steps, max_learning_rate, max_m_distance):
  coefficient = 1.0 - (np.float64(step)/max_steps)
  learning_rate = coefficient*max_learning_rate
  # neighbourhood_range = ceil(coefficient * max_m_distance)
  neighbourhood_range = max_m_distance
  return learning_rate, neighbourhood_range

#guassian
def distance_func(x):
  sig = 2 
  return np.exp(-np.power(x , 2.) / (2 * np.power(sig, 2.)))
  

## Hyperparameters

In [None]:
num_rows = 1
num_cols = 10
max_neighborhood_range = 1
max_learning_rate = 0.3
max_steps = 20
is_2d_10_neuron = False

## Initializing SOM

In [None]:
num_features = 512 # numnber of dimensions in the input data

if is_2d_10_neuron:
  np.random.seed(40)
  som = np.random.random_sample(size=(num_rows, num_cols, num_features)) # map construction
  som[3][0] = None
  som[3][2] = None
  som = torch.from_numpy(som).to(device)

else:
  np.random.seed(40)
  som = np.random.random_sample(size=(num_rows, num_cols, num_features)) # map construction
  som = torch.from_numpy(som).to(device)


## Main Function

In [None]:
epochs = 20

final_features = torch.zeros(0,dtype=torch.long, device=device)
final_y = torch.zeros(0,dtype=torch.long, device=device)

for epoch in range(epochs):
    for x_train, y_train in tqdm(train_dataloader, desc=f"Epoch {epoch+1}", colour="blue"):
        x_train, y_train = x_train.to(device), y_train.to(device)
        features = feature_extractor(x_train)
        # final_features = features
        # final_y = y_train
        

        if epoch == epochs-1:
            final_features = torch.cat([final_features] + [torch.tensor(f).view(1, -1) for f in features])
            final_y = torch.cat([final_y,y_train.view(-1)])

        # start training iterations
        for i in range(features.shape[0]):
          learning_rate, neighbourhood_range = optimizer(epoch, epochs, max_learning_rate, max_neighborhood_range)
          bmu = bmu_search(features[i], som, num_rows, num_cols)
          for row in range(num_rows):
            for col in range(num_cols):
              if som[row][col] != None:
                dist = manhattan_distance([row, col], bmu)
                if dist <= neighbourhood_range:
                  som[row][col] += learning_rate * distance_func(dist) * (features[i].to(device) - som[row][col].to(device)) #update neighbour's weight


## Collecting Labels

In [None]:
map = np.empty(shape=(num_rows, num_cols), dtype=object)

for row in range(num_rows):
  for col in range(num_cols):
    if som[row][col] != None:
      map[row][col] = [] # empty list to store the label


label_data = final_y.cpu().numpy()

for t in range(final_features.shape[0]):
  
  bmu = bmu_search(final_features[t].to(device), som.to(device), num_rows, num_cols)
  map[bmu[0]][bmu[1]].append(label_data[t]) # label of winning neuron

## Construct Label Map

In [None]:
label_map = np.zeros(shape=(num_rows, num_cols),dtype=np.int64)
print(label_map)
label_dispersion = np.zeros(shape=(num_rows, num_cols), dtype=np.float64)
for row in range(num_rows):
  for col in range(num_cols):
    if som[row][col] != None:
      label_list = map[row][col]
      if len(label_list)==0:
        label = -3
        dispersion = 0.0
      else:
        label = max(label_list, key=label_list.count)
        count_label = label_list.count(label)
        count_all_labels = len(label_list)
        dispersion = count_label / count_all_labels

      label_map[row][col] = label
      label_dispersion[row][col] = dispersion


### Feature Map

In [None]:
title = ('Feature Map')
fig, ax = plt.subplots(figsize=(10, 6))
plt.imshow(label_map, cmap='Blues')
ax.set_xticks(np.arange(num_cols))
ax.set_yticks(np.arange(num_rows))
ax.set_xticklabels(np.arange(1, num_cols+1))
ax.set_yticklabels(np.arange(1, num_rows+1))
plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
         rotation_mode="anchor")

for i in range(num_rows):
  for j in range(num_cols):
    text = ax.text(j, i, '{:.2f}'.format(label_map[i][j]),
                   ha="center", va="center", color="black")

plt.colorbar()
plt.title(title)
plt.show()

### Dispersion Map

In [None]:
fig, ax = plt.subplots(figsize=(10, 6))
im = ax.imshow(label_dispersion, cmap='Blues')
ax.set_xticks(np.arange(num_cols))
ax.set_yticks(np.arange(num_rows))
ax.set_xticklabels(np.arange(1, num_cols+1))
ax.set_yticklabels(np.arange(1, num_rows+1))
plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
         rotation_mode="anchor")

for i in range(num_rows):
  for j in range(num_cols):
    text = ax.text(j, i, '{:.2f}'.format(label_dispersion[i][j]),
                   ha="center", va="center", color="black")

plt.title('Label Dispersion Map')
plt.colorbar(im)
plt.show()

## Test Data

In [None]:
sum_acc = 0
n = 0
for x_test, y_test in test_dataloader:
  x_test, y_test = x_test.to(device), y_test.to(device)
  features = feature_extractor(x_test)

  winner_labels = []
  print(features.shape)
  for t in range(features.shape[0]):
    bmu = bmu_search(features[t], som, num_rows, num_cols)
    row = bmu[0]
    col = bmu[1]
    predicted = label_map[row][col]
    winner_labels.append(predicted)
  acc = accuracy_score(y_test.cpu().numpy(), winner_labels)
  sum_acc += acc
  n += 1
  print("Accuracy: ",acc)

print("Total Accuracy: ", sum_acc /n)