In [26]:

import numpy as np
import pandas as pd
import os
from sklearn.manifold import TSNE
from sklearn.cluster import AgglomerativeClustering, DBSCAN
import torch
import clip
from PIL import Image
import plotly.express as px

In [27]:
def load_data(image_folder: str, label_file: str, H: int = 224, W: int = 224):
    """Loads images and labels from the specified folder and file."""

    labels_df = pd.read_csv(label_file, delimiter="|")
    labels_df["label"] = labels_df["label"].map({"animal": 1, "human": 0})
    labels = labels_df["label"].tolist()

    images = []
    image_names = labels_df["image_name"].tolist()
    for image_name in image_names:
        image_file = (
            Image.open(os.path.join(image_folder, image_name))
            .convert("RGB")
            .resize((H, W))
        )
        image_array = np.array(image_file).transpose((2, 0, 1))
        images.append(image_array)

    images = np.array(images)

    comments = labels_df["comment"].tolist()
    return images, labels, image_names, comments

In [28]:
class PCA:
    def __init__(self, n_components: int):
        self.n_components = n_components
        self.components = None
        self.mean = None
        self.std = None

    def fit(self, X: np.ndarray) -> None:
        # standardize data
        self.calculate_mean_std(X)
        X_norm = self.standardize(X)
        # calculate covariance matrix
        cov_matrix = self.calculate_covariance_matrix(X_norm)
        # get eigenvalues and eigenvectors
        eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)
        # sort components
        idxs = np.argsort(np.abs(eigenvalues))[::-1]
        # reduce data using number of components (n_components)
        self.components = eigenvectors[idxs[: self.n_components]]

    def transform(self, X: np.ndarray) -> np.ndarray:
        # standardize data
        X_norm = self.standardize(X)
        # reduce data using number of components
        return X_norm @ self.components.T

    def standardize(self, X: np.ndarray) -> np.ndarray:
        """Normalize data by substracting mean and divide by std"""
        X_norm = (X - self.mean) / self.std
        return X_norm

    def calculate_mean_std(self, X: np.ndarray) -> None:
        """Calculate mean and std of data"""
        self.mean = np.mean(X, axis=0)
        self.std = np.std(X, axis=0)

    def calculate_covariance_matrix(self, X: np.ndarray) -> np.ndarray:
        """Calculate covariance matrix of the data"""
        return np.cov(X.T)

In [29]:
def vectorize(images: np.ndarray, comments: np.ndarray) -> np.ndarray:
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model, preprocess = clip.load("ViT-B/32", device=device)
    images = torch.tensor(images, dtype=torch.float32).to(device) / 255.0

    tokenized_comments = clip.tokenize(comments).to(device)
    with torch.no_grad():
        image_features = model.encode_image(images)
        text_features = model.encode_text(tokenized_comments)

    image_features /= image_features.norm(dim=-1, keepdim=True)
    

    return image_features.cpu().numpy(), text_features.cpu().numpy()


def load_data(image_folder: str, label_file: str, H: int = 224, W: int = 224):
    """Loads images and labels from the specified folder and file."""

    labels_df = pd.read_csv(label_file, delimiter="|")
    labels_df["label"] = labels_df["label"].map({"animal": 1, "human": 0})
    labels = labels_df["label"].tolist()

    images = []
    image_names = labels_df["image_name"].tolist()
    for image_name in image_names:
        image_file = (
            Image.open(os.path.join(image_folder, image_name))
            .convert("RGB")
            .resize((H, W))
        )
        image_array = np.array(image_file).transpose((2, 0, 1))
        images.append(image_array)

    images = np.array(images)

    comments = labels_df["comment"].tolist()
    return images, labels, image_names, comments


def neighbour_search(
    text_req: np.ndarray, vImages: np.ndarray, top_k: int = 5
) -> np.ndarray:
    """search for top_k nearest neightbours in the space"""
    pass

In [30]:
input_path = ("../dataset/images", "../dataset/labels.csv")
n_components = 2
n_clusters = 2
# load image data and text labels
images, labels, image_names, comments = load_data(*input_path)

In [31]:
# vectorize images and text labels
vImages, vText = vectorize(images, comments)

In [32]:
# PCA with 2 components
dimred_2d = PCA(n_components=2)
dimred_2d.fit(vImages)
drvImages_2d = dimred_2d.transform(vImages)

# PCA with 3 components
dimred_3d = PCA(n_components=3)
dimred_3d.fit(vImages)
drvImages_3d = dimred_3d.transform(vImages)

# Transforming complex -> real (Plotly can't scatter complex value)
drvImages_2d = np.real(drvImages_2d)
drvImages_3d = np.real(drvImages_3d)

# 2D Visualization
fig_2d = px.scatter(
    x=drvImages_2d[:, 0],
    y=drvImages_2d[:, 1],
    color=labels,
    labels={'x': 'PCA Component 1', 'y': 'PCA Component 2'},
    title="PCA Visualization (2 Components)"
)

fig_2d.show()

# 3D Visualization
fig_3d = px.scatter_3d(
    x=drvImages_3d[:, 0],
    y=drvImages_3d[:, 1],
    z=drvImages_3d[:, 2],
    color=labels,
    labels={'x': 'PCA Component 1', 'y': 'PCA Component 2', 'z': 'PCA Component 3'},
    title="PCA Visualization (3 Components)"
)
fig_3d.show()




In [33]:
tsne_2d = TSNE(n_components=2, random_state=42, perplexity=30, learning_rate=200)
drvImages_2d = tsne_2d.fit_transform(vImages)

# t-SNE with 3 components
tsne_3d = TSNE(n_components=3, random_state=42, perplexity=30, learning_rate=200)
drvImages_3d = tsne_3d.fit_transform(vImages)

# 2D Visualization
fig_2d = px.scatter(
    x=drvImages_2d[:, 0],
    y=drvImages_2d[:, 1],
    color=labels,
    labels={'x': 't-SNE Dimension 1', 'y': 't-SNE Dimension 2'},
    title="t-SNE Visualization (2 Components)"
)
fig_2d.show()

# 3D Visualization
fig_3d = px.scatter_3d(
    x=drvImages_3d[:, 0],
    y=drvImages_3d[:, 1],
    z=drvImages_3d[:, 2],
    color=labels,
    labels={'x': 't-SNE Dimension 1', 'y': 't-SNE Dimension 2', 'z': 't-SNE Dimension 3'},
    title="t-SNE Visualization (3 Components)"
)
fig_3d.show()

In [34]:
class KMeans:
    def __init__(self, n_clusters: int = 3, max_iterations: int = 100_000):
        self.n_clusters = n_clusters
        self.max_iter = max_iterations

        # randomly initialize cluster centroids
        self.centroids = None
        self.labels = None

    def fit(self, X: np.ndarray) -> None:
        random_indices = np.random.choice(X.shape[0], self.n_clusters, replace=False)
        self.centroids = X[random_indices]

        for _ in range(self.max_iter):
            # create clusters by assigning the samples to the nearest centroids
            clusters = self.assign_clusters(self.centroids, X)
            # update centroids
            new_centroids = self.compute_means(clusters, X)
            if self.has_converged(self.centroids, new_centroids):
                break
            self.centroids = new_centroids

    def predict(self, X: np.ndarray) -> np.ndarray:
        # for each sample search for nearest centroids
        return self.assign_clusters(self.centroids, X)
    
    def fit_predict(self, X: np.ndarray) -> np.ndarray:
        self.fit(X)
        return self.predict(X)

    def assign_clusters(self, centroids: np.ndarray, X: np.ndarray) -> np.ndarray:
        """given input data X and cluster centroids assign clusters to samples"""
        distances = np.zeros((X.shape[0], self.n_clusters))
        for i, centroid in enumerate(centroids):
            distances[:, i] = np.linalg.norm(X - centroid, axis=1)
        return np.argmin(distances, axis=1)

    def compute_means(self, clusters: np.ndarray, X: np.ndarray) -> np.ndarray:
        """recompute cluster centroids"""
        new_centroids = np.zeros((self.n_clusters, X.shape[1]))
        for k in range(self.n_clusters):
            if np.any(clusters == k):  # Avoid division by zero for empty clusters
                new_centroids[k] = np.mean(X[clusters == k], axis=0)
        return new_centroids

    def euclidean_distance(self, a, b) -> float:
        """Calculates the euclidean distance between two vectors a and b"""
        return np.sqrt(np.sum(np.power(a - b, 2)))
    
    def has_converged(self, old_centroids: np.ndarray, new_centroids: np.ndarray) -> bool:
        """Check if centroids have stopped changing significantly."""
        return np.all(np.linalg.norm(old_centroids - new_centroids, axis=1) < 1e-4)
    
    def sum_squared_error(self, X: np.ndarray, clusters: np.ndarray) -> float:
        ''' Calculate sum of squared distances between samples and their cluster centroids '''
        sse = 0
        for k in range(self.n_clusters):
            errors = X[clusters == k] - self.centroids[k]
            sse += np.sum(np.sqrt(np.sum(np.power(errors, 2), axis=1)))
        return sse

In [35]:
# Perform clustering on the embeddings and visualize the results
clusterer = KMeans(n_clusters=3, max_iterations=100_000)
clusterer.fit(vImages)
cl_original = clusterer.predict(vImages)

kmeans_pca = KMeans(n_clusters=3, max_iterations=100_000)
kmeans_pca.fit(drvImages_3d)
cl_pca = kmeans_pca.predict(drvImages_3d)

# Visualize 2D and 3D embeddings of images and color points based on cluster label and original labels

fig_original = px.scatter_3d(
    x=vImages[:, 0],
    y=vImages[:, 1],
    z=vImages[:, 2],
    color=cl_original.astype(str),
    labels={'x': 'Original Dimension 1', 'y': 'Original Dimension 2', 'z': 'Original Dimension 3'},
    title="K-Means Clustering on Original Vectors"
)
fig_original.show()

fig_pca = px.scatter_3d(
    x=drvImages_3d[:, 0],
    y=drvImages_3d[:, 1],
    z=drvImages_3d[:, 2],
    color=cl_pca.astype(str),
    labels={'x': 'PCA Component 1', 'y': 'PCA Component 2', 'z': 'PCA Component 3'},
    title="K-Means Clustering on PCA-Reduced Vectors (3 Components)"
)
fig_pca.show()

Optimal Number of Clusters


In [36]:
from sklearn.metrics import silhouette_score
silhouette_scores = []
cluster_range = range(2, 10)
for k in cluster_range:
    kmeans = KMeans(n_clusters=k, max_iterations=100_000)
    kmeans.fit(drvImages_3d)
    labels_k = kmeans.predict(drvImages_3d)
    score = silhouette_score(drvImages_3d, labels_k)
    silhouette_scores.append(score)
    print(f"Silhouette score for k={k}: {score}")
best_clusters_num = cluster_range[np.argmax(silhouette_scores)]
print("Best", best_clusters_num, "clusters")

Silhouette score for k=2: 0.2572438418865204
Silhouette score for k=3: 0.24198909103870392
Silhouette score for k=4: 0.24319817125797272
Silhouette score for k=5: 0.2307439148426056
Silhouette score for k=6: 0.21804067492485046
Silhouette score for k=7: 0.2270670086145401
Silhouette score for k=8: 0.2223968505859375
Silhouette score for k=9: 0.22701111435890198
Best 2 clusters


Perform hierarchical clustering on the samples after PCA and visualize the results.

In [39]:
for cluster_num in range(2, 10, 2):
    for clusterer_name, clusterer_cls in [("Hierarchical", AgglomerativeClustering), ("KMeans", KMeans)]:
        clusterer = clusterer_cls(n_clusters=cluster_num,)
        predicted_clusters = clusterer.fit_predict(drvImages_3d)
        
        
        fig = px.scatter_3d(
            x=drvImages_3d[:, 0],
            y=drvImages_3d[:, 1],
            z=drvImages_3d[:, 2],
            color=predicted_clusters,
            title=f"{clusterer_name} Clustering with {cluster_num} clusters"
        )
        fig.show()

In [38]:
# DBSCAN outlier detection
clusterer = DBSCAN(eps=None)  # select good eps value !!!

# Create a copy of your trained data with cleaned outliers
# TODO


# Select few text descriptions and select nearest neighbors based on embeddings.
vText = vectorize(descriptions)
drvText = dimred.transform(vText)
# TODO

# Plot the results: text description, few nearest images
# TODO

NameError: name 'descriptions' is not defined