In [None]:
%pip install import-ipynb
from google.colab import drive
import shutil
import os
from sklearn.cluster import OPTICS
import numpy as np
from collections import defaultdict, Counter
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, silhouette_score, calinski_harabasz_score
import matplotlib.pyplot as plt
import pandas as pd
from scipy.stats import entropy

drive.mount('/content/drive')
DATASET_CHOICE = "grozi" # choose grozi / webmarket
GROUND_TRUTH_CHOICE = "grozi" # choose grozi / webmarket/ real
REAL_IMAGE_MODE = False # if Dutch Markets -> True, else then False

# ============================================
# Load YOLO Detection & Features
# ============================================
%run "/content/drive/MyDrive/Object_Recognition/detect_yolo.ipynb"
%run "/content/drive/MyDrive/Object_Recognition/feature_extraction.ipynb"

## Ground truth dataset setup


In [None]:
# ----- Ground Truth Dataset Setup -----
if GROUND_TRUTH_CHOICE == "webmarket":
    gt_dataset_path = "/content/datasets/Webmarket-5"
    gt_labels_folder = os.path.join(gt_dataset_path, "test/labels")
    if not os.path.exists(gt_dataset_path):
        rf = Roboflow(api_key="ILw5ia3CDLaVSgIHRYon")
        project = rf.workspace("final-project-mwyx2").project("webmarket-7inwv-e4bky")
        version = project.version(6)
        dataset = version.download("yolov5")
    else:
        print("Webmarket GT dataset already exists.")
elif GROUND_TRUTH_CHOICE == "grozi":
    gt_dataset_path = "/content/datasets/grozi-3.2k-testing-6"
    gt_labels_folder = os.path.join(gt_dataset_path, "test/labels")
    if not os.path.exists(gt_dataset_path):
        rf = Roboflow(api_key="ILw5ia3CDLaVSgIHRYon")
        project = rf.workspace("final-project-mwyx2").project("grozi-3.2k-testing-fzu9s")
        version = project.version(7)
        dataset = version.download("yolov5")
    else:
        print("Grozi GT dataset already exists.")
elif GROUND_TRUTH_CHOICE == "real":
    gt_dataset_path = "/content/datasets/Dutch-supermarket-grocery-2"
    gt_labels_folder = os.path.join(gt_dataset_path, "test/labels")
    if not os.path.exists(gt_dataset_path):
        rf = Roboflow(api_key="ILw5ia3CDLaVSgIHRYon")
        project = rf.workspace("final-project-mwyx2").project("dutch-supermarket-grocery")
        version = project.version(2)
        dataset = version.download("yolov5")
    else:
        print("Real GT dataset already exists.")

else:
    raise ValueError("Unsupported ground truth dataset.")

print("Ground Truth Labels Folder:", gt_labels_folder)

In [4]:
# ============================================
# 5. Clustering Functions
# ============================================

def optics_clustering(feature_file, image_name_file, xi, min_samples=2, min_cluster_size=None):
    """
    Parameter:
    - min_samples (int): The number of samples in a neighborhood for a point to be considered as a core point. Also, up and down steep regions
       can’t have more than min_samples consecutive non-steep points.
    - xi (float): Determines the minimum steepness on the reachability plot that constitutes a cluster boundary.
    - min_cluster_size (int): Minimum number of samples in an OPTICS cluster, expressed as an absolute number or a fraction of the number of
      samples (rounded to be at least 2). If None, the value of min_samples is used instead. Used only when cluster_method='xi'.

    Return:
    - cluster_results (dict): dict: {Original_image_name: {cropped_image_name: cluster_label}})
    """

    # Read `.npy` file
    features = np.load(feature_file)
    image_names = np.load(image_name_file)

    image_groups = defaultdict(list)
    feature_groups = defaultdict(list)

    for i, img_name in enumerate(image_names):
        base_name = img_name.split("_jpg")[0]  # get orinigal image name, ex: "175"  #"_jpg" (use in webmarket and grocery_product) "_" (used in real)
        image_groups[base_name].append(img_name)
        feature_groups[base_name].append(features[i])

    cluster_results = {}

    for base_name, feature_list in feature_groups.items():
        features_array = np.array(feature_list)

        # If Cropped Images less than 2，then it's not possible to do Clustering，directly noted as Cluster 0
        if len(features_array) < 2:
            cluster_labels = [0] * len(features_array)
        else:
            features_scaled = features_array

            # Using OPTICS
            print(f"Image {base_name} using OPTICS clustering...")
            model = OPTICS(min_samples=min_samples, xi=xi, min_cluster_size=min_cluster_size)
            cluster_labels = model.fit_predict(features_scaled)

        # Save the results
        cluster_results[base_name] = dict(zip(image_groups[base_name], cluster_labels))

    # Store the results to ".npy"
    np.save("per_image_optics_cluster_labels.npy", cluster_results)

    print("Clustering Complete, results saved to `per_image_optics_cluster_labels.npy`！")
    return cluster_results

def agglomerative_clustering(feature_file, image_name_file, distance_threshold):
    """
    Parameters:
    - distance_threshold (float): The linkage distance threshold at or above which clusters will not be merged.
      If not None, n_clusters must be None and compute_full_tree must be True.

    Return:
    - cluster_results (dict): Results of Agglomerative Clustering: dict: {Original_image_name: {cropped_image_name: cluster_label}})
    """

    # Read `.npy` file
    features = np.load(feature_file)
    image_names = np.load(image_name_file)

    image_groups = defaultdict(list)
    feature_groups = defaultdict(list)

    for i, img_name in enumerate(image_names):
        base_name = img_name.split("_jpg")[0] #"_jpg" (use in webmarket and grocery_product) "_"(used in real)
        image_groups[base_name].append(img_name)
        feature_groups[base_name].append(features[i])

    cluster_results = {}

    for base_name, feature_list in feature_groups.items():
        features_array = np.array(feature_list)

        if len(features_array) < 2:
            cluster_labels = [0] * len(features_array)
        else:
            print(f"Image {base_name} using Agglomerative Clustering...")

            # Using Agglomerative Clustering
            model = AgglomerativeClustering(n_clusters=None, distance_threshold=distance_threshold, linkage='ward')
            cluster_labels = model.fit_predict(features_array)

        # Save the results
        cluster_results[base_name] = dict(zip(image_groups[base_name], cluster_labels))

    # Store the results to ".npy"
    np.save("per_image_agglo_cluster_labels.npy", cluster_results)

    print("Agglomerative Clustering Complete, results saved to `per_image_agglo_cluster_labels.npy`！")
    return cluster_results

# ============================================
# 4. Utility Functions
# ============================================
def load_yolo_gt(gt_label_folder):
    """
    Read YOLOv5 ground truth to {image_name: [bbox information]}
    """
    gt_data = {}

    for label_file in os.listdir(gt_label_folder):
        image_name = label_file.replace(".txt", ".jpg")
        label_path = os.path.join(gt_label_folder, label_file)

        with open(label_path, "r") as f:
            gt_bboxes = []
            for line in f:
                parts = line.strip().split()
                class_id = int(parts[0])  # Ground turth label
                bbox = list(map(float, parts[1:]))  # bbox: [x_center, y_center, width, height]
                gt_bboxes.append((class_id, bbox))

        gt_data[image_name] = gt_bboxes

    return gt_data

def get_bbox_from_cropped_image(cropped_img_name):
    """
    Based on cropped image name to get bounding box ([x_center, y_center, width, height])
    """
    if cropped_img_name in spatial_features:
        return spatial_features[cropped_img_name][:4]  # Get first four value: [x_center, y_center, width, height]
    else:
        return None

def compute_iou(box1, box2):
    """
    Compute IoU (Intersection over Union) between ground truth bounding boxes and predicted bounding boxes.
    box = [x_center, y_center, width, height]
    """

    # Convert YOLO format to (x_min, y_min, x_max, y_max)
    x1_min, y1_min = box1[0] - box1[2] / 2, box1[1] - box1[3] / 2
    x1_max, y1_max = box1[0] + box1[2] / 2, box1[1] + box1[3] / 2

    x2_min, y2_min = box2[0] - box2[2] / 2, box2[1] - box2[3] / 2
    x2_max, y2_max = box2[0] + box2[2] / 2, box2[1] + box2[3] / 2

    # Compute intersection
    inter_x_min = max(x1_min, x2_min)
    inter_y_min = max(y1_min, y2_min)
    inter_x_max = min(x1_max, x2_max)
    inter_y_max = min(y1_max, y2_max)

    inter_area = max(0, inter_x_max - inter_x_min) * max(0, inter_y_max - inter_y_min)

    # Compute union
    box1_area = (x1_max - x1_min) * (y1_max - y1_min)
    box2_area = (x2_max - x2_min) * (y2_max - y2_min)
    union_area = box1_area + box2_area - inter_area

    # Compute IoU
    iou = inter_area / union_area if union_area > 0 else 0
    return iou

def match_predictions_to_gt(spatial_features, gt_base_grouped, iou_threshold=0.4):
    """
    Match cropped images (predictions) to ground truth objects based on IoU.
    """
    matched_results = {}

    for cropped_img_name, pred_bbox in spatial_features.items():
        base_name = cropped_img_name.split("_jpg")[0]  # Extract base name

        if base_name not in gt_base_grouped:
            matched_results[cropped_img_name] = -1  # No GT available
            continue

        gt_bboxes = gt_base_grouped[base_name]  # Get ground truth bboxes for this image
        best_match = -1
        best_iou = 0.0

        for gt_class, gt_bbox in gt_bboxes:
            iou = compute_iou(pred_bbox[:4], gt_bbox)  # Compare bounding boxes

            if iou > best_iou and iou >= iou_threshold:
                best_iou = iou
                best_match = gt_class  # Assign ground truth class

        matched_results[cropped_img_name] = best_match  # Store match

    return matched_results

def compute_per_image_cluster_to_gt_mapping(cluster_results, matched_results, min_cluster_size=2):
    """
    - If Cluster size is less than one：
      - If GT Label only appears once, the Remap（Clustering correct）。
      - If GT Label has multiple samples in the original image，then annotate Clustering incorrect (`-1`)。
    -
    """
    per_image_cluster_gt_counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
    cluster_sizes = defaultdict(lambda: defaultdict(int))
    gt_label_counts = defaultdict(lambda: defaultdict(int))

    # Compute the number of GT Label in each Cluster & Cluster Size
    for base_name, cropped_images in cluster_results.items():
        for cropped_img, cluster_id in cropped_images.items():
            if cropped_img not in matched_results or matched_results[cropped_img] == -1:
                continue  # If cropped image doesn't match GT in IoU Matching, then pass

            gt_label = matched_results[cropped_img]

            per_image_cluster_gt_counts[base_name][cluster_id][gt_label] += 1
            cluster_sizes[base_name][cluster_id] += 1
            gt_label_counts[base_name][gt_label] += 1  # Count the number of GT Label in original image

    per_image_cluster_to_gt_mapping = {}

    for base_name, cluster_dict in per_image_cluster_gt_counts.items():
        per_image_cluster_to_gt_mapping[base_name] = {}

        for cluster_id, gt_count in cluster_dict.items():
            if cluster_id == -1:
                  per_image_cluster_to_gt_mapping[base_name][cluster_id] = -1 # Deak with noise cluster（cluster_id == -1）
                  continue
            cluster_size = cluster_sizes[base_name][cluster_id]
            best_gt_label = max(gt_count, key=gt_count.get)  # Most frequent GT Label in that cluster

            if cluster_size < min_cluster_size:
                # Check the number of GT Label in that orignial image
                gt_label_total_count = gt_label_counts[base_name][best_gt_label]

                if gt_label_total_count > 1:
                    # If this GT Label has multiple other samples in this original image, then marked as misclassified
                    per_image_cluster_to_gt_mapping[base_name][cluster_id] = -1
                else:
                    # If this GT Label in the original image only appears "once"，then allowed to Remap
                    per_image_cluster_to_gt_mapping[base_name][cluster_id] = best_gt_label
            else:
                # Choose the most frequent GT Label to cluster label
                per_image_cluster_to_gt_mapping[base_name][cluster_id] = best_gt_label

    return per_image_cluster_to_gt_mapping

def evaluate_clustering(cluster_results, matched_results, cluster_to_gt_mapping, features_all_path, image_names_path):

    y_true = []  # Ground Truth Labels
    y_pred = []  # Predicted Cluster Labels
    y_pred_raw = []  # Original cluster id，does not remap GT，used in silhouette（keep -1）
    image_names_flat = []

    # Covert Clustering Label and GT Label
    for base_name, cropped_images in cluster_results.items():
        if base_name not in cluster_to_gt_mapping:
            continue  # If this original images does not have the corresponding mapping，then pass

        for cropped_img, cluster_id in cropped_images.items():
            if cropped_img in matched_results and matched_results[cropped_img] != -1:
                gt_label = matched_results[cropped_img]  #  GT Label of this cropped images
                pred_label = cluster_to_gt_mapping[base_name].get(cluster_id, -1)  # Predicted Label of this cluster

                y_true.append(gt_label)
                y_pred.append(pred_label)
                y_pred_raw.append(cluster_id)
                image_names_flat.append(cropped_img)

    # Compute Adjusted Rand Index (ARI)
    ari = adjusted_rand_score(y_true, y_pred)

    # Compute Normalized Mutual Information (NMI)
    nmi = normalized_mutual_info_score(y_true, y_pred)

    # Compute Silhouette
    try:
        features_all = np.load(features_all_path)
        image_names = np.load(image_names_path)

        if isinstance(image_names[0], bytes):
            image_names = np.array([n.decode("utf-8") for n in image_names])
        image_name_list = image_names.tolist()

        used_indices = []
        for name in image_names_flat:
            if name in image_name_list:
                idx = image_name_list.index(name)
                used_indices.append(idx)

        selected_features = features_all[used_indices]

        # Silhouette (all)
        silhouette_all = -1
        if len(set(y_pred_raw)) > 1:
            silhouette_all = silhouette_score(selected_features, y_pred_raw)

        # Silhouette (clean)
        features_filtered = [f for f, cid in zip(selected_features, y_pred_raw) if cid != -1]
        preds_filtered = [cid for cid in y_pred_raw if cid != -1]
        silhouette_clean = -1
        if len(set(preds_filtered)) > 1:
            silhouette_clean = silhouette_score(features_filtered, preds_filtered)

    except Exception as e:
      print(f"Error computing clustering metrics: {e}")
      ari = -1
      nmi = -1
      silhouette = -1

    return ari, nmi, silhouette_all, silhouette_clean

spatial_features = np.load("spatial_features.npy", allow_pickle=True).item()
ground_truth = load_yolo_gt(gt_labels_folder)

# Convert GT to Base Name Mapping**
gt_base_grouped = {}

for full_img_name, gt_objects in ground_truth.items():
    base_name = full_img_name.split("_jpg")[0]  # Extract base name

    if base_name not in gt_base_grouped:
        gt_base_grouped[base_name] = []

    gt_base_grouped[base_name].extend(gt_objects)  # Append GT objects

# Perform IoU-based matching
matched_results = match_predictions_to_gt(spatial_features, gt_base_grouped, iou_threshold=0.4)

# Save results
np.save("matched_results.npy", matched_results)


## Fixed threshold selection



*   Agglomerative Clustering



In [None]:
ari_results = defaultdict(list)
nmi_results = defaultdict(list)
silhouette_results = defaultdict(list)
# 定義特徵組合與檔案
feature_files = {
    "cnn": "cnn_only_features.npy",
    "color": "color_only_features.npy",
    "shape": "shape_only_features.npy",
    "texture": "texture_only_features.npy",
    "spatial": "spatial_only_features.npy",
    "text": "text_only_features.npy",
    "cnn_color": "cnn_color_features.npy",
    "cnn_shape": "cnn_shape_features.npy",
    "cnn_texture": "cnn_texture_features.npy",
    "cnn_spatial": "cnn_spatial_features.npy",
    "cnn_text": "cnn_text_features.npy",
    "color_shape": "color_shape_features.npy",
    "color_texture": "color_texture_features.npy",
    "color_spatial": "color_spatial_features.npy",
    "cnn_color_shape": "cnn_color_shape_features.npy",
    "cnn_color_texture": "cnn_color_texture_features.npy",
    "cnn_color_spatial": "cnn_color_spatial_features.npy",
    "cnn_color_text":"cnn_color_text_features.npy",
    "cnn_shape_texture": "cnn_shape_texture_features.npy",
    "cnn_shape_spatial": "cnn_shape_spatial_features.npy",
    "cnn_texture_spatial": "cnn_texture_spatial_features.npy",
    "cnn_color_shape_texture": "cnn_color_shape_texture_features.npy",
    "cnn_color_shape_spatial": "cnn_color_shape_spatial_features.npy",
    "color_shape_texture_spatial": "color_shape_texture_spatial_features.npy",
    "cnn_color_shape_texture_spatial": "all_features.npy",
    "cnn_color_shape_texture_spatial_text": "all_features_with_text.npy"
}

distance_thresholds = list(range(30, 161, 5))
matched_results = np.load("matched_results.npy", allow_pickle=True).item()
# # Run each threshold + differnt combined feature
for feature_name, feature_path in feature_files.items():
    print(f"\n Combined Feature: {feature_name}")

    if "text" in feature_name:
        image_name_file = "image_names_with_text.npy"
    else:
        image_name_file = "image_names.npy"

    for threshold in distance_thresholds:
        print(f"threshold = {threshold}")
        try:
            cluster_results = agglomerative_clustering(feature_path, image_name_file, threshold)
            cluster_to_gt = compute_per_image_cluster_to_gt_mapping(cluster_results, matched_results)
            ari, nmi, _, silhouette = evaluate_clustering(cluster_results, matched_results, cluster_to_gt, feature_path, image_name_file)
        except Exception as e:
            print(f"Error at feature {feature_name}, threshold {threshold}: {e}")
            ari = -1
            nmi = -1
            silhouette = -1
        ari_results[feature_name].append(ari)
        nmi_results[feature_name].append(nmi)
        silhouette_results[feature_name].append(silhouette)

# Save as DataFrame and CSV
ari_df = pd.DataFrame(ari_results, index=distance_thresholds)
ari_df.index.name = "Distance Threshold"
ari_df.to_csv("ari_across_thresholds.csv")
print("Save to ari_across_thresholds.csv")

nmi_df = pd.DataFrame(nmi_results, index=distance_thresholds)
nmi_df.index.name = "Distance Threshold"
nmi_df.to_csv("nmi_across_thresholds.csv")

silhouette_df = pd.DataFrame(silhouette_results, index=distance_thresholds)
silhouette_df.index.name = "Distance Threshold"
silhouette_df.to_csv("silhouette_across_thresholds.csv")

# Show best Thresholds
best_thresholds_ari = ari_df.idxmax()
best_aris = ari_df.max()

best_thresholds_nmi = nmi_df.idxmax()
best_nmis = nmi_df.max()

best_thresholds_silhouette = silhouette_df.idxmax()
best_silhouettes = silhouette_df.max()

print("Best Thresholds by Metric - Agglomerative")
for feat in ari_df.columns:
    print(f"\nFeature Combination: {feat}")
    print(f"  ARI      -> {best_aris[feat]:.4f}")
    print(f"  NMI      -> {best_nmis[feat]:.4f}")
    print(f"  Silhouette-> {best_silhouettes[feat]:.4f}")



*   OPTICS




In [None]:
ari_results_optics = defaultdict(list)
nmi_results_optics = defaultdict(list)
silhouette_results_optics = defaultdict(list)

xi_thresholds = [i / 100 for i in range(1, 11)] # [0.01, 0.02, ..., 0.10]
# Run each threshold + differnt combined features
for feature_name, feature_path in feature_files.items():
    print(f"\n Combined Feature: {feature_name}")

    if "text" in feature_name:
        image_name_file = "image_names_with_text.npy"
    else:
        image_name_file = "image_names.npy"
    for threshold in xi_thresholds:
        print(f"threshold = {threshold}")
        try:
            cluster_results_optics = optics_clustering(feature_path, image_name_file, threshold)
            cluster_to_gt_optics = compute_per_image_cluster_to_gt_mapping(cluster_results_optics, matched_results)
            ari_optics, nmi_optics, _, silhouette_optics  = evaluate_clustering(cluster_results_optics, matched_results, cluster_to_gt_optics, feature_path, image_name_file)
        except Exception as e:
            print(f"Error: {e}")
            ari_optics = -1
        ari_results_optics[feature_name].append(ari_optics)
        nmi_results_optics[feature_name].append(nmi_optics)
        silhouette_results_optics[feature_name].append(silhouette_optics)


ari_optics_df = pd.DataFrame(ari_results_optics, index=xi_thresholds)
nmi_optics_df = pd.DataFrame(nmi_results_optics, index=xi_thresholds)
silhouette_optics_df = pd.DataFrame(silhouette_results_optics, index=xi_thresholds)

ari_optics_df.index.name = "Xi Threshold"
nmi_optics_df.index.name = "Xi Threshold"
silhouette_optics_df.index.name = "Xi Threshold"

ari_optics_df.to_csv("ari_across_thresholds_optics.csv")
nmi_optics_df.to_csv("nmi_across_thresholds_optics.csv")
silhouette_optics_df.to_csv("silhouette_across_thresholds_optics.csv")

best_thresholds_ari = ari_optics_df.idxmax()
best_aris = ari_optics_df.max()

best_thresholds_nmi = nmi_optics_df.idxmax()
best_nmis = nmi_optics_df.max()

best_thresholds_silhouette = silhouette_optics_df.idxmax()
best_silhouettes = silhouette_optics_df.max()

print("Best Xi Thresholds by Metric - OPTICS")
for feat in ari_optics_df.columns:
    print(f"\nFeature Combination: {feat}")
    print(f"  ARI      -> {best_aris[feat]:.4f}")
    print(f"  NMI      -> {best_nmis[feat]:.4f}")
    print(f"  Silhouette-> {best_silhouettes[feat]:.4f}")

## Threshold optimaization selection

### Silhouette Score selection



*   Agglomerative Clustering




In [None]:
def agglomerative_clustering_auto_threshold_silhouette(feature_file, image_name_file, threshold_candidates, visualize=True):
    features = np.load(feature_file)
    image_names = np.load(image_name_file)

    image_groups = defaultdict(list)
    feature_groups = defaultdict(list)

    for i, img_name in enumerate(image_names):
        base_name = img_name.split("_jpg")[0]
        image_groups[base_name].append(img_name)
        feature_groups[base_name].append(features[i])

    cluster_results = {}

    for base_name, feature_list in feature_groups.items():
        features_array = np.array(feature_list)

        if len(features_array) < 2:
            cluster_labels = [0] * len(features_array)
        else:
            print(f"Image {base_name} - Running Agglomerative Clustering with multiple thresholds...")

            silhouette_scores = []
            valid_thresholds = []
            label_candidates = []

            for threshold in threshold_candidates:
                try:
                    model = AgglomerativeClustering(n_clusters=None, distance_threshold=threshold, linkage='ward')
                    labels = model.fit_predict(features_array)

                    n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
                    if n_clusters <= 1 or n_clusters >= len(features_array):
                        continue

                    score = silhouette_score(features_array, labels)
                    silhouette_scores.append(score)
                    valid_thresholds.append(threshold)
                    label_candidates.append(labels)
                except Exception as e:
                    continue

            if not valid_thresholds:
                print(f"Image {base_name} - No valid clustering found, fallback to single cluster.")
                cluster_labels = [0] * len(features_array)
            else:
                idx = np.argmax(silhouette_scores)
                best_threshold = valid_thresholds[idx]
                cluster_labels = label_candidates[idx]

                print(f"Image {base_name} - Best threshold selected: {best_threshold} with Silhouette Score {silhouette_scores[idx]:.4f}")

                if visualize:
                    plt.figure()
                    plt.plot(valid_thresholds, silhouette_scores, 'bo-')
                    plt.xlabel('Distance Threshold')
                    plt.ylabel('Silhouette Score')
                    plt.title(f'Silhouette Curve - {base_name}')
                    plt.axvline(x=best_threshold, color='red', linestyle='--')
                    plt.grid()
                    plt.show()

        cluster_results[base_name] = dict(zip(image_groups[base_name], cluster_labels))

    np.save("per_image_agglo_silhouette_cluster_labels.npy", cluster_results)
    print("Agglomerative Clustering complete. Results saved to `per_image_agglo_silhouette_cluster_labels.npy`！")
    return cluster_results


feature_files = {
    "cnn": "cnn_only_features.npy",
    "color": "color_only_features.npy",
    "shape": "shape_only_features.npy",
    "texture": "texture_only_features.npy",
    "spatial": "spatial_only_features.npy",
    "text": "text_only_features.npy",
    "cnn_color": "cnn_color_features.npy",
    "cnn_shape": "cnn_shape_features.npy",
    "cnn_texture": "cnn_texture_features.npy",
    "cnn_spatial": "cnn_spatial_features.npy",
    "cnn_text": "cnn_text_features.npy",
    "color_shape": "color_shape_features.npy",
    "color_texture": "color_texture_features.npy",
    "color_spatial": "color_spatial_features.npy",
    "cnn_color_shape": "cnn_color_shape_features.npy",
    "cnn_color_texture": "cnn_color_texture_features.npy",
    "cnn_color_spatial": "cnn_color_spatial_features.npy",
    "cnn_color_text":"cnn_color_text_features.npy",
    "cnn_shape_texture": "cnn_shape_texture_features.npy",
    "cnn_shape_spatial": "cnn_shape_spatial_features.npy",
    "cnn_texture_spatial": "cnn_texture_spatial_features.npy",
    "cnn_color_shape_texture": "cnn_color_shape_texture_features.npy",
    "cnn_color_shape_spatial": "cnn_color_shape_spatial_features.npy",
    "color_shape_texture_spatial": "color_shape_texture_spatial_features.npy",
    "cnn_color_shape_texture_spatial": "all_features.npy",
    "cnn_color_shape_texture_spatial_text": "all_features_with_text.npy"
}

results_summary = []

for feature_key, feature_path in feature_files.items():
    print(f"\n=== Processing Feature Set: {feature_key} ===")

    if "text" in feature_key:
        image_name_file = "image_names_with_text.npy"
    else:
        image_name_file = "image_names.npy"

    cluster_results = agglomerative_clustering_auto_threshold_silhouette(
        feature_file=feature_path,
        image_name_file=image_name_file,
        threshold_candidates=np.linspace(20, 200, num=30), # distance threshold range
        visualize=False
    )
    cluster_to_gt_mapping = compute_per_image_cluster_to_gt_mapping(cluster_results, matched_results)

    ari, nmi, sil_all, sil_clean = evaluate_clustering(
        cluster_results,
        matched_results,
        cluster_to_gt_mapping,
        features_all_path=feature_path,
        image_names_path=image_name_file
    )

    results_summary.append({
        "feature": feature_key,
        "ARI": ari,
        "NMI": nmi,
        "Silhouette_All": sil_all,
        "Silhouette_Clean": sil_clean
    })

df_results = pd.DataFrame(results_summary)
# Save to CSV
df_results.to_csv("clustering_evaluation_summary.csv", index=False)

# Print preview
print(df_results.to_string(index=False))

*   OPTICS

In [None]:
def optics_clustering_auto_xi_silhouette(feature_file, image_name_file, xi_candidates, visualize=True):
    features = np.load(feature_file)
    image_names = np.load(image_name_file)

    image_groups = defaultdict(list)
    feature_groups = defaultdict(list)

    for i, img_name in enumerate(image_names):
        base_name = img_name.split("_jpg")[0]
        image_groups[base_name].append(img_name)
        feature_groups[base_name].append(features[i])

    cluster_results = {}

    for base_name, feature_list in feature_groups.items():
        features_array = np.array(feature_list)

        if len(features_array) < 2:
            cluster_labels = [0] * len(features_array)
        else:
            print(f"Image {base_name} - Running OPTICS Clustering with multiple xi...")

            silhouette_scores = []
            valid_xi = []
            label_candidates = []

            for xi in xi_candidates:
                try:
                    model = OPTICS(min_samples=2, xi=xi)
                    labels = model.fit_predict(features_array)

                    unique_labels = set(labels)
                    if len(unique_labels) <= 1 or len(unique_labels) >= len(features_array):
                        continue

                    score = silhouette_score(features_array, labels)
                    silhouette_scores.append(score)
                    valid_xi.append(xi)
                    label_candidates.append(labels)
                except Exception as e:
                    continue

            if not valid_xi:
                print(f"Image {base_name} - No valid xi produced meaningful clusters. Fallback to single cluster.")
                cluster_labels = [0] * len(features_array)
            else:
                idx = np.argmax(silhouette_scores)
                best_xi = valid_xi[idx]
                cluster_labels = label_candidates[idx]

                print(f"Image {base_name} - Best xi: {best_xi} with Silhouette Score {silhouette_scores[idx]:.4f}")

                if visualize:
                    plt.figure()
                    plt.plot(valid_xi, silhouette_scores, 'go-')
                    plt.xlabel('xi')
                    plt.ylabel('Silhouette Score')
                    plt.title(f'Silhouette Curve (OPTICS) - {base_name}')
                    plt.axvline(x=best_xi, color='red', linestyle='--')
                    plt.grid()
                    plt.show()

        cluster_results[base_name] = dict(zip(image_groups[base_name], cluster_labels))

    np.save("per_image_optics_silhouette_cluster_labels.npy", cluster_results)
    print("OPTICS Clustering complete. Results saved to `per_image_optics_silhouette_cluster_labels.npy`！")
    return cluster_results

results_summary_optics = []
xi_candidates = np.linspace(0.01, 0.2, 20) # xi_threshold range

for feature_key, feature_path in feature_files.items():
    print(f"\n=== Processing Feature Set: {feature_key} ===")

    if "text" in feature_key:
        image_name_file = "image_names_with_text.npy"
    else:
        image_name_file = "image_names.npy"

    cluster_results = optics_clustering_auto_xi_silhouette(
            feature_file=feature_path,
            image_name_file=image_name_file,
            xi_candidates=xi_candidates,
            visualize=False
        )
    cluster_to_gt_mapping = compute_per_image_cluster_to_gt_mapping(cluster_results, matched_results)

    ari, nmi, sil_all, sil_clean = evaluate_clustering(
        cluster_results,
        matched_results,
        cluster_to_gt_mapping,
        features_all_path=feature_path,
        image_names_path=image_name_file
    )

    results_summary_optics.append({
        "feature": feature_key,
        "ARI": ari,
        "NMI": nmi,
        "Silhouette_All": sil_all,
        "Silhouette_Clean": sil_clean
    })

df_results = pd.DataFrame(results_summary_optics)
# Save to CSV
df_results.to_csv("optics_clustering_evaluation_summary.csv", index=False)

# Print preview
print(df_results.to_string(index=False))

### Entropy selection



*   Agglomerative Clustering



In [None]:
def compute_internal_entropy(labels):
    from collections import Counter
    label_counts = Counter(labels)
    total = sum(label_counts.values())
    probs = np.array(list(label_counts.values())) / total
    return entropy(probs, base=2)

def agglomerative_clustering_auto_threshold_entropy(feature_file, image_name_file, matched_results, threshold_candidates, visualize=False):
    features = np.load(feature_file)
    image_names = np.load(image_name_file)

    image_groups = defaultdict(list)
    feature_groups = defaultdict(list)
    gt_label_groups = defaultdict(list)

    for i, img_name in enumerate(image_names):
        base_name = img_name.split("_jpg")[0]
        image_groups[base_name].append(img_name)
        feature_groups[base_name].append(features[i])

        gt = matched_results.get(img_name, -1)
        gt_label_groups[base_name].append(gt)

    cluster_results = {}

    for base_name in feature_groups:
        features_array = np.array(feature_groups[base_name])
        gt_labels = gt_label_groups[base_name]

        if len(features_array) < 2 or all(l == -1 for l in gt_labels):
            cluster_labels = [0] * len(features_array)
        else:
            entropies = []
            thresholds = []
            label_candidates = []

            for threshold in threshold_candidates:
                try:
                    model = AgglomerativeClustering(n_clusters=None, distance_threshold=threshold, linkage='ward')
                    labels = model.fit_predict(features_array)

                    if len(set(labels)) <= 1 or len(set(labels)) >= len(features_array):
                        continue

                    e_score = compute_internal_entropy(labels)

                    entropies.append(e_score)
                    thresholds.append(threshold)
                    label_candidates.append(labels)
                except:
                    continue

            if not entropies:
                cluster_labels = [0] * len(features_array)
            else:
                best_idx = np.argmin(entropies)
                cluster_labels = label_candidates[best_idx]

                if visualize:
                    plt.figure()
                    plt.plot(thresholds, entropies, 'go-')
                    plt.xlabel("Threshold")
                    plt.ylabel("Entropy")
                    plt.title(f"Entropy Curve - {base_name}")
                    plt.axvline(x=thresholds[best_idx], color='red', linestyle='--')
                    plt.grid()
                    plt.show()

        cluster_results[base_name] = dict(zip(image_groups[base_name], cluster_labels))

    return cluster_results

# ----- ENTROPY Clustering evaluation -----
matched_results = np.load("matched_results.npy", allow_pickle=True).item()
feature_files = {
    "cnn": "cnn_only_features.npy",
    "color": "color_only_features.npy",
    "shape": "shape_only_features.npy",
    "texture": "texture_only_features.npy",
    "spatial": "spatial_only_features.npy",
    "text": "text_only_features.npy",
    "cnn_color": "cnn_color_features.npy",
    "cnn_shape": "cnn_shape_features.npy",
    "cnn_texture": "cnn_texture_features.npy",
    "cnn_spatial": "cnn_spatial_features.npy",
    "cnn_text": "cnn_text_features.npy",
    "color_shape": "color_shape_features.npy",
    "color_texture": "color_texture_features.npy",
    "color_spatial": "color_spatial_features.npy",
    "cnn_color_shape": "cnn_color_shape_features.npy",
    "cnn_color_texture": "cnn_color_texture_features.npy",
    "cnn_color_spatial": "cnn_color_spatial_features.npy",
    "cnn_color_text":"cnn_color_text_features.npy",
    "cnn_shape_texture": "cnn_shape_texture_features.npy",
    "cnn_shape_spatial": "cnn_shape_spatial_features.npy",
    "cnn_texture_spatial": "cnn_texture_spatial_features.npy",
    "cnn_color_shape_texture": "cnn_color_shape_texture_features.npy",
    "cnn_color_shape_spatial": "cnn_color_shape_spatial_features.npy",
    "color_shape_texture_spatial": "color_shape_texture_spatial_features.npy",
    "cnn_color_shape_texture_spatial": "all_features.npy",
    "cnn_color_shape_texture_spatial_text": "all_features_with_text.npy"
}

results_summary = []

# entropy-based clustering evaluation for different feature combinations
for feature_key, feature_path in feature_files.items():
    print(f"\n Processing Feature Set: {feature_key}")
    image_name_file = "image_names_with_text.npy" if "text" in feature_key else "image_names.npy"

    cluster_results = agglomerative_clustering_auto_threshold_entropy(
        feature_file=feature_path,
        image_name_file=image_name_file,
        matched_results=matched_results,
        threshold_candidates=np.linspace(20, 200, num=30),
        visualize=False
    )

    cluster_to_gt_mapping = compute_per_image_cluster_to_gt_mapping(cluster_results, matched_results)

    # Evaluation
    ari, nmi, sil_all, sil_clean = evaluate_clustering(
        cluster_results,
        matched_results,
        cluster_to_gt_mapping,
        features_all_path=feature_path,
        image_names_path=image_name_file
    )

    results_summary.append({
        "feature": feature_key,
        "ARI": ari,
        "NMI": nmi,
        "Silhouette_All": sil_all,
        "Silhouette_Clean": sil_clean
    })

# Print the output
df_entropy_results = pd.DataFrame(results_summary)
print(df_entropy_results.to_string(index=False))

*   OPTICS

In [None]:
def compute_internal_entropy(labels):
    from collections import Counter
    label_counts = Counter(labels)
    total = sum(label_counts.values())
    probs = np.array(list(label_counts.values())) / total
    return entropy(probs, base=2)

def optics_clustering_auto_xi_entropy(feature_file, image_name_file, matched_results, xi_candidates, min_samples=2):
    features = np.load(feature_file)
    image_names = np.load(image_name_file)

    if isinstance(image_names[0], bytes):
        image_names = np.array([n.decode("utf-8") for n in image_names])

    image_groups = defaultdict(list)
    feature_groups = defaultdict(list)

    for i, img_name in enumerate(image_names):
        base_name = img_name.split("_jpg")[0]
        image_groups[base_name].append(img_name)
        feature_groups[base_name].append(features[i])

    cluster_results = {}

    for base_name in feature_groups:
        feats = np.array(feature_groups[base_name])
        if len(feats) < 2:
            cluster_labels = [0] * len(feats)
            cluster_results[base_name] = dict(zip(image_groups[base_name], cluster_labels))
            continue

        best_entropy = float("inf")
        best_labels = None

        for xi in xi_candidates:
            try:
                model = OPTICS(min_samples=min_samples, xi=xi)
                labels = model.fit_predict(feats)
                valid_labels = [lbl for lbl in labels if lbl != -1]
                if len(set(valid_labels)) <= 1:
                    continue
                e_score = compute_internal_entropy(valid_labels)
                if e_score < best_entropy:
                    best_entropy = e_score
                    best_labels = labels
            except:
                continue

        if best_labels is None:
            best_labels = [0] * len(feats)

        cluster_results[base_name] = dict(zip(image_groups[base_name], best_labels))

    return cluster_results

# ----- ENTROPY Clustering Evaluation -----
matched_results = np.load("matched_results.npy", allow_pickle=True).item()

results_summary = []

# entropy-based clustering evaluation for different feature combinations
for feature_key, feature_path in feature_files.items():
    print(f"\n Processing Feature Set: {feature_key}")
    image_name_file = "image_names_with_text.npy" if "text" in feature_key else "image_names.npy"

    cluster_results = optics_clustering_auto_xi_entropy(
    feature_file=feature_path,
    image_name_file=image_name_file,
    matched_results=matched_results,
    xi_candidates=np.linspace(0.01, 0.2, 20)
    )

    cluster_to_gt_mapping = compute_per_image_cluster_to_gt_mapping(cluster_results, matched_results)

    # Evaluation
    ari, nmi, sil_all, sil_clean = evaluate_clustering(
        cluster_results,
        matched_results,
        cluster_to_gt_mapping,
        features_all_path=feature_path,
        image_names_path=image_name_file
    )

    results_summary.append({
        "feature": feature_key,
        "ARI": ari,
        "NMI": nmi,
        "Silhouette_All": sil_all,
        "Silhouette_Clean": sil_clean
    })

# Print the result
df_entropy_results = pd.DataFrame(results_summary)
print(df_entropy_results.to_string(index=False))