In [1]:
import numpy as np
from sklearn.cluster import AgglomerativeClustering, DBSCAN

from kmeans import KMeans, optimal_clusters
from cluster_utils import visualize_clusters, compare_clusters

In [2]:
input_path = "dataset"
n_clusters = 5
n_components = 3

In [3]:
# Load image data and text labels
import os
import numpy as np
import pandas as pd
from PIL import Image


def load_data(images_dir: str, labels_file: str):
    """Loads and resizes images to the specified target size, along with labels."""
    # Load labels and map 'animal' to 1, 'human' to 0
    labels_df = pd.read_csv(labels_file, sep="|")
    labels_df["class"] = labels_df["label"].map({"animal": 1, "human": 0})

    # Extract filenames and labels
    image_names = labels_df["image_name"].tolist()
    labels = labels_df["class"].to_numpy()
    labels_text = labels_df["label"].tolist()
    comments = labels_df["comment"].tolist()

    # Load and resize images
    images = []
    for img_name in image_names:
        img_path = os.path.join(images_dir, img_name.strip())
        img = Image.open(img_path)
        images.append(img)

    print(f"Loaded {len(images)} images and {len(labels)} labels")
    return images, labels, labels_text, comments


print("Loading data...")
images, labels, labels_text, comments = load_data(
    f"{input_path}/flickr30k_images", f"{input_path}/labels.csv"
)

Loading data...
Loaded 300 images and 300 labels


In [4]:
import clip
import torch


def vectorize_images(images: np.ndarray, device: str = "cpu") -> np.ndarray:
    print(f"device={device}")
    model, preprocess = clip.load("ViT-B/32", device=device)

    vectorized = []
    for image in images:
        image_tensor = preprocess(image).unsqueeze(0).to(device)
        with torch.no_grad():
            image_features = model.encode_image(image_tensor)
        vectorized.append(image_features)
    vectorized = np.vstack(vectorized)
    return vectorized


def vectorize_texts(texts: list[str], device: str = "cpu") -> np.ndarray:
    print(f"device={device}")
    model, _ = clip.load("ViT-B/32", device=device)
    text_inputs = torch.cat([clip.tokenize(f"a photo of a {t}") for t in texts]).to(
        device
    )

    with torch.no_grad():
        vectorized = model.encode_text(text_inputs)
    return vectorized


# Vectorize images and text labels
device = (
    "mps"
    if torch.backends.mps.is_available()
    else "cuda" if torch.cuda.is_available() else "cpu"
)



In [5]:
print("Vectorizing images...")
v_images = vectorize_images(images, device)

Vectorizing images...
device=cpu


In [6]:
print("Vectorizing descriptions...")
v_text = vectorize_texts(comments, device)

Vectorizing descriptions...
device=cpu


In [7]:
from sklearn.manifold import TSNE
from pca import PCA
from visualize import visualize_all


# PCA and t-SNE on images
print("Computing t-SNE...")
tsne = TSNE(n_components=n_components, random_state=42, perplexity=10, method="exact")
tsne_results = tsne.fit_transform(v_images)

print("Computing PCA...")
pca = PCA(n_components=n_components)
pca.fit(v_images)
pca_results = pca.transform(v_images)

# Visualize 2D and 3D embeddings of images and color points based on labels
print("Visualizing embeddings...")
visualize_all(
    {
        "Original": v_images[:, :n_components],
        "PCA": pca_results,
        "t-SNE": tsne_results,
    },
    labels,
    comments,
)

Computing t-SNE...
Computing PCA...
Visualizing embeddings...


In [8]:
# Perform clustering on the embeddings and visualize the results
kmeans_clusterer = KMeans(n_clusters=n_clusters, max_iterations=1_000_000)
clustering_input = pca_results  # or v_images
kmeans_clusterer.fit(clustering_input)
kmeans_results = kmeans_clusterer.predict(clustering_input)

clusterer = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward")
hierarchical_results = clusterer.fit_predict(clustering_input)

# Visualize 2D and 3D embeddings of images and color points based on cluster label and original labels
visualize_clusters(clustering_input, labels, hierarchical_results, "Hierarchical Clustering", n_components=n_components)
compare_clusters(labels, kmeans_results, hierarchical_results)
optimal_clusters(clustering_input)

K-Means Clustering - ARI: 0.3437, NMI: 0.3546
Hierarchical Clustering - ARI: 0.3794, NMI: 0.3499


In [9]:
import plotly.graph_objects as go


# DBSCAN outlier detection
clusterer = DBSCAN(eps=7.5, min_samples=6)
dbscan_clusters = clusterer.fit_predict(v_images)
print("DBSCAN results:")
for i, count in zip(*np.unique(dbscan_clusters, return_counts=True)):
    print(f"\tCluster {i}: {count} samples")
fig = go.Figure()
if n_components == 2:
    fig.add_trace(
        go.Scatter(
            x=pca_results[:, 0],
            y=pca_results[:, 1],
            name="DBSCAN Clustering",
            mode="markers",
            marker=dict(
                size=6,
                opacity=0.8,
                color=dbscan_clusters,
                colorscale="Viridis",
                colorbar=dict(title="Cluster Label"),
            ),
        ))
    fig.update_layout(
        xaxis=dict(title="PCA Component 1"),
        yaxis=dict(title="PCA Component 2"),
    )
elif n_components == 3:
    fig.add_trace(go.Scatter3d(
        x=pca_results[:, 0],
        y=pca_results[:, 1],
        z=pca_results[:, 2],
        name="DBSCAN Clustering",
        mode="markers",
        marker=dict(
            size=6,
            opacity=0.8,
            color=dbscan_clusters,
            colorscale="Viridis",
            colorbar=dict(title="Cluster Label"),
        ),
    ))
    fig.update_layout(
        scene=dict(
            xaxis=dict(title="PCA Component 1"),
            yaxis=dict(title="PCA Component 2"),
            zaxis=dict(title="PCA Component 3"),
        )
    )
else:
    raise ValueError("Supports only 2 and 3 components")
fig.show()

DBSCAN results:
	Cluster -1: 209 samples
	Cluster 0: 91 samples


In [10]:
import textwrap
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import plotly.express as px


def neighbour_search(
    text_req: np.ndarray, v_images: np.ndarray, top_k: int = 5
) -> np.ndarray:
    """search for top_k nearest neightbours in the space"""

    # Ensure both are numpy arrays
    if not isinstance(text_req, np.ndarray):
        text_req = text_req.detach().cpu().numpy()
    if not isinstance(v_images, np.ndarray):
        v_images = v_images.detach().cpu().numpy()

    distances = np.linalg.norm(v_images - text_req, axis=1)
    top_k_idx = np.argsort(distances)[:top_k]
    return distances[top_k_idx], top_k_idx


def eval_performance(data_description, X, y):
    """Train and evaluate a Logistic Regression model."""
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, stratify=y, random_state=42
    )
    model = LogisticRegression()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    print(f"{data_description} data:")
    print(f"\tAccuracy: {accuracy:.2f}")
    print(f"\tF1: {f1:.2f}")


def get_dimensionality_reduction_data(cls, v_images, v_text, cls_kwargs=None):
    """Apply dimensionality reduction if a class is provided."""
    if cls:
        dimred = cls(n_components=20, **(cls_kwargs or {}))
        drv_data = dimred.fit_transform(np.vstack((v_images, v_text)))
        return drv_data[: v_images.shape[0]], drv_data[v_images.shape[0] :]
    return v_images, v_text


def create_visualization(
    nearest_idx, distances, comments, labels_text, images, name, title_text
):
    """Create and display a visualization for nearest images."""
    original_comments = [
        "<br>".join(textwrap.wrap(comments[i], 60)) for i in nearest_idx
    ]
    original_labels = [labels_text[i] for i in nearest_idx]
    original_images = [images[i].resize((300, 300)) for i in nearest_idx]
    annotations = list(zip(original_comments, original_labels, distances))

    fig = px.imshow(
        np.array([np.array(img) for img in original_images]),
        facet_col=0,
        facet_col_wrap=3,
        facet_row_spacing=0.2,
        facet_col_spacing=0.2,
        binary_string=True,
        labels={"facet_col": "text"},
        title=f"{name} Nearest Images for<br>'{title_text}'",
    )

    for annotation in fig.layout.annotations:
        annotation_idx = int(annotation.text.split("=")[-1])
        comment, label, distance = annotations[annotation_idx]
        if annotation_idx == 0:
            annotation.text = (
                f"<b>GROUND TRUTH</b><br>{comment}<br>({label} : {distance:.2f})"
            )
        else:
            annotation.text = f"{comment}<br>({label} : {distance:.2f})"

    fig.update_layout(
        title={
            "x": 0.5,
            "y": 0.95,
            "xanchor": "center",
            "yanchor": "top",
            "font": {"size": 16},
        },
        margin={"t": 150},
    )
    fig.show()


def find_nearest_images(
    v_vext,
    v_images,
    text_req_idx,
    comments,
    labels_text,
    images,
    n_closest,
    name,
    cls=None,
    cls_kwargs=None,
):
    """Compute nearest images and create a visualization."""
    if cls:
        drvImages, drvText = get_dimensionality_reduction_data(
            cls, v_images, v_vext, cls_kwargs
        )
        query_vector = drvText[text_req_idx]
        data_images = drvImages
    else:
        query_vector = v_vext[text_req_idx]
        data_images = v_images

    distances, nearest_idx = neighbour_search(query_vector, data_images, n_closest)
    ground_truth_distance, _ = neighbour_search(
        query_vector, data_images[text_req_idx].reshape(1, -1), 1
    )
    distances = np.concatenate((ground_truth_distance, distances))
    nearest_idx = np.concatenate(([text_req_idx], nearest_idx))

    title_text = "<br>".join(textwrap.wrap(comments[text_req_idx], 100))
    create_visualization(
        nearest_idx, distances, comments, labels_text, images, name, title_text
    )

In [11]:
# Model evaluation
eval_performance("Original", v_images, labels)
eval_performance(
    "Filtered", v_images[kmeans_results != -1], labels[kmeans_results != -1]
)

Original data:
	Accuracy: 0.87
	F1: 0.84
Filtered data:
	Accuracy: 0.87
	F1: 0.84


In [12]:
# Nearest image visualization
tsne_kwargs = {"random_state": 42, "perplexity": 10, "method": "exact"}
text_req_idx = np.random.choice(len(comments))

for name, cls, cls_kwargs in [
    ("Original", None, None),
    ("PCA", PCA, None),
    ("t-SNE", TSNE, tsne_kwargs),
]:
    find_nearest_images(
        v_text,
        v_images,
        text_req_idx,
        comments,
        labels_text,
        images,
        8,
        name,
        cls,
        cls_kwargs,
    )