In [None]:
import pandas as pd
from common import (k_means_clustering, hierarchical_clustering, visualize_2d, visualize_3d)
import torch
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt
import cv2
import numpy as np
import os
import concurrent.futures

In [None]:
os.environ["LOKY_MAX_CPU_COUNT"] = "8"  # to avoid warning from sklearn  

## Read data

In [None]:
file_path = 'features.csv'
df = pd.read_csv(file_path)

In [None]:
# add features as column
df['tensor'] = [torch.tensor(x, dtype=torch.float32) for x in df.drop(columns=['Image Name']).values]

In [None]:
df = df[['Image Name', 'tensor']]

In [None]:
df.head()

## Clustering

In [None]:
tensor_list = df['tensor'].tolist()
features = torch.stack(tensor_list)

In [None]:
# normalize features
features = (features - features.mean(dim=0)) / features.std(dim=0)

In [None]:
N_CLUSTERS = 7
CLUSTERING_METHOD = 'kmeans'
labels = None
if CLUSTERING_METHOD == 'kmeans':
    labels = k_means_clustering(features, N_CLUSTERS)
elif CLUSTERING_METHOD == 'hierarchical':
    labels = hierarchical_clustering(features, N_CLUSTERS)
else:
    raise ValueError(f'Invalid clustering method: {CLUSTERING_METHOD}')

In [None]:
# find centroids
centroids = []
for i in range(N_CLUSTERS):
    cluster = features[labels == i]
    centroid = cluster.mean(dim=0)
    centroids.append(centroid)

In [None]:
def compute_silhouette_score(n_clusters):
    k_means_labels = k_means_clustering(features, n_clusters)
    k_means_silhouette_score = silhouette_score(features, k_means_labels)

    hierarchical_labels = hierarchical_clustering(features, n_clusters)
    hierarchical_silhouette_score = silhouette_score(features, hierarchical_labels)

    return max(k_means_silhouette_score, hierarchical_silhouette_score)

In [None]:
silhouette_scores = []
with concurrent.futures.ThreadPoolExecutor() as executor:
    for score in executor.map(compute_silhouette_score, range(2, 10)):
        silhouette_scores.append(score)

plt.plot(range(2, 10), silhouette_scores)
plt.xlabel('Number of clusters')
plt.ylabel('Silhouette score')
plt.title('Silhouette score vs number of clusters')
plt.show()

## Visualization

In [None]:
visualize_2d(features, centroids, labels)

In [None]:
visualize_3d(features, centroids, labels)

## Get samples from each cluster

In [None]:
images_folder = './data/images/'

In [None]:
def plot_images(images, labels, images_folder):
    n = len(images)
    nrows = int(np.ceil(n / 5))
    fig, axs = plt.subplots(nrows, 5, figsize=(15, 3 * nrows))
    axs = axs.ravel()  # flatten the array to easily iterate over it
    for i, (img_name, label) in enumerate(zip(images, labels)):
        img = cv2.imread(os.path.join(images_folder, img_name))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        axs[i].imshow(img)
        axs[i].set_title(f'Cluster {label}')
        axs[i].axis('off')
    # remove the axes of the extra subplots
    if n % 5 != 0:
        for j in range(n, nrows * 5):
            fig.delaxes(axs[j])
    plt.tight_layout()
    plt.show()

In [None]:
# add labels to df
df['labels'] = labels

In [None]:
df.head()

In [None]:
# get samples from each cluster
sample_count = 30
samples = []
for i in range(N_CLUSTERS):
    from_cluster = df[df['labels'] == i].copy()  # create a copy to avoid warnings
    from_cluster = from_cluster[
        from_cluster['Image Name'].apply(lambda x: os.path.exists(os.path.join(images_folder, x)))]
    from_cluster.loc[:, 'distance'] = from_cluster['tensor'].apply(lambda x: torch.dist(x, centroids[i]))
    from_cluster = from_cluster.sort_values(by='distance')
    samples.append(from_cluster.head(sample_count))

samples_df = pd.concat(samples)

In [None]:
samples_df

In [None]:
# plot samples for each cluster
for i in range(N_CLUSTERS):
    cluster_samples = samples_df[samples_df['labels'] == i]
    plot_images(cluster_samples['Image Name'].values, cluster_samples['labels'].values, images_folder)