<a href="https://colab.research.google.com/github/sehab1611251/3D-Group-Detection-Using-Distance-Based-Clustering/blob/main/Clustering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**DBSCAN clustering performance on JRDB Ground Truth 3D Bounding Boxes**

In [None]:
import json
from collections import defaultdict, Counter

# Path to the input ground truth JSON file
json_path = "/content/clark-center-2019-02-28_0.json"

# Output file path for processed GT.txt
gt_output_path = "/content/GT.txt"  # Format: <frame>, <group_id>, <x>, <y>, <z>, <length>, <width>, <height>, <group_label>

# Initialize containers
ground_truth_detections = []
unique_frames = set()
detections_per_frame = defaultdict(int)
groups_per_frame = defaultdict(set)
group_entries_per_frame = defaultdict(list)

# Load the JSON data
with open(json_path, 'r') as f:
    data = json.load(f)

# parse valid pedestrian detections and collect all necessary info
for frame_file, objects in data["labels"].items():
    frame_number = int(frame_file.replace(".pcd", ""))
    unique_frames.add(frame_number)

    for obj in objects:
        if obj.get("attributes", {}).get("no_eval", True):
            continue  # Skip invalid detections

        box = obj.get("box", {})
        cx, cy, cz = box.get("cx"), box.get("cy"), box.get("cz")
        length, width, height = box.get("l"), box.get("w"), box.get("h")

        social_group = obj.get("social_group", {})
        group_id = social_group.get("cluster_ID", None)

        # Store parsed info
        gt_entry = {
            "frame": frame_number,
            "label_id": obj.get("label_id"),
            "cx": cx,
            "cy": cy,
            "cz": cz,
            "length": length,
            "width": width,
            "height": height,
            "group_gt": group_id
        }
        ground_truth_detections.append(gt_entry)
        detections_per_frame[frame_number] += 1

        if group_id is not None:
            groups_per_frame[frame_number].add(group_id)
            group_entries_per_frame[frame_number].append((group_id, cx, cy, cz, length, width, height))

# assign local frame-wise group labels and write output
with open(gt_output_path, 'w') as gt_file:
    for frame_number, detections in sorted(group_entries_per_frame.items()):
        # Count how many times each group ID appears in this frame
        group_id_counts = Counter([g[0] for g in detections])

        # Assign new group label
        group_map = {}
        next_group_label = 1
        for gid, count in group_id_counts.items():
            group_map[gid] = next_group_label if count >= 2 else 0
            if count >= 2:
                next_group_label += 1

        # Write out each detection line in new format
        for group_id, cx, cy, cz, length, width, height in detections:
            group_label = group_map[group_id]
            gt_file.write(f"{frame_number},{group_id},{cx},{cy},{cz},{length},{width},{height},{group_label}\n")

# Statistics and sample display
print(f"Total valid ground truth detections: {len(ground_truth_detections)}")
print(f"Total number of unique frames: {len(unique_frames)}")

print("\nDetections and group count per frame (first 10 frames):")
for frame in sorted(detections_per_frame)[:10]:
    num_dets = detections_per_frame[frame]
    num_groups = len(groups_per_frame[frame])
    print(f"Frame {frame}: {num_dets} detections, {num_groups} unique groups")

print("\nSample entries:")
for entry in ground_truth_detections[:5]:
    print(entry)


Total valid ground truth detections: 21437
Total number of unique frames: 578

Detections and group count per frame (first 10 frames):
Frame 0: 21 detections, 17 unique groups
Frame 1: 21 detections, 17 unique groups
Frame 2: 21 detections, 17 unique groups
Frame 3: 21 detections, 17 unique groups
Frame 4: 22 detections, 18 unique groups
Frame 5: 22 detections, 18 unique groups
Frame 6: 22 detections, 18 unique groups
Frame 7: 22 detections, 18 unique groups
Frame 8: 22 detections, 18 unique groups
Frame 9: 22 detections, 18 unique groups

Sample entries:
{'frame': 0, 'label_id': 'pedestrian:96', 'cx': 6.48283, 'cy': -8.3539, 'cz': 0.03798, 'length': 1.06, 'width': 0.52, 'height': 1.93596, 'group_gt': 36}
{'frame': 0, 'label_id': 'pedestrian:9', 'cx': 4.74, 'cy': -5.97, 'cz': -0.00499, 'length': 0.89, 'width': 0.51, 'height': 1.85003, 'group_gt': 14}
{'frame': 0, 'label_id': 'pedestrian:63', 'cx': 15.04, 'cy': -7.83, 'cz': 2.79201, 'length': 0.94, 'width': 0.5, 'height': 1.82403, 'grou

In [None]:
# Apply Clustering

import os
import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN
from collections import defaultdict

# Load and Parse Ground Truth Data
def load_ground_truth(filename):
    column_names = [
        'frame', 'group_id', 'x', 'y', 'z',
        'length', 'width', 'height', 'group_label'
    ]

    gt_data = pd.read_csv(filename, header=None, names=column_names)
    print(f"Loaded {len(gt_data)} ground truth entries across {gt_data['frame'].nunique()} frames.")

    # Calculate bounding box min/max bounds
    gt_data['x_min'] = gt_data['x'] - gt_data['length'] / 2
    gt_data['x_max'] = gt_data['x'] + gt_data['length'] / 2
    gt_data['y_min'] = gt_data['y'] - gt_data['width'] / 2
    gt_data['y_max'] = gt_data['y'] + gt_data['width'] / 2
    gt_data['z_min'] = gt_data['z'] - gt_data['height'] / 2
    gt_data['z_max'] = gt_data['z'] + gt_data['height'] / 2

    return gt_data


# Compute Distances and Apply Clustering
def detect_groups(gt_data, distance_threshold=2.0):
    result = gt_data.copy()
    result['group'] = 0  # Initialize all as ungrouped (group=0)

    unique_frames = result['frame'].unique()
    print(f"Processing {len(unique_frames)} frames...")

    total_groups = 0

    for frame in unique_frames:
        frame_data = result[result['frame'] == frame]
        frame_indices = frame_data.index

        if len(frame_data) <= 1:
            continue  # Skip frames with only one entry

        # Use box centers for clustering
        coords = frame_data[['x', 'y', 'z']].values

        # Apply DBSCAN clustering
        clustering = DBSCAN(eps=distance_threshold, min_samples=2, metric='euclidean').fit(coords)
        labels = clustering.labels_

        # Convert DBSCAN labels to group IDs (noise = 0)
        group_ids = np.zeros_like(labels)
        label_to_group = {}
        next_group_id = 1

        for i, label in enumerate(labels):
            if label == -1:
                group_ids[i] = 0
            else:
                if label not in label_to_group:
                    label_to_group[label] = next_group_id
                    next_group_id += 1
                group_ids[i] = label_to_group[label]

        result.loc[frame_indices, 'group'] = group_ids

        frame_groups = len(set(group_ids) - {0})
        total_groups += frame_groups

    print(f"Detected {total_groups} groups across all frames.")
    return result


# Save Group Detection Results
def save_group_detections(detections, output_file):
    detections.to_csv(output_file, header=False, index=False)
    print(f"Saved group detection results to {output_file}")


# Main Function
def main():
    input_file = "/content/GT.txt"
    output_file = "/content/gt_group_detections_with_bbox.txt"
    distance_threshold = 2.0  # meters

    print(" Loading and parsing ground truth data...")
    gt_data = load_ground_truth(input_file)

    print("\n Computing distances and clustering into groups...")
    grouped_detections = detect_groups(gt_data, distance_threshold)

    print("\n Saving group detection results...")
    save_group_detections(grouped_detections, output_file)

    # Print Summary
    total_detections = len(grouped_detections)
    grouped = grouped_detections[grouped_detections['group'] > 0]
    ungrouped = grouped_detections[grouped_detections['group'] == 0]

    print("\nResults Summary:")
    print(f"Total entries: {total_detections}")
    print(f"Grouped entries: {len(grouped)} ({len(grouped) / total_detections * 100:.1f}%)")
    print(f"Ungrouped entries: {len(ungrouped)} ({len(ungrouped) / total_detections * 100:.1f}%)")

# Run the script
if __name__ == "__main__":
    main()


Step 1: Loading and parsing ground truth data...
Loaded 21363 ground truth entries across 578 frames.

Step 2 & 3: Computing distances and clustering into groups...
Processing 578 frames...
Detected 4088 groups across all frames.

Step 4: Saving group detection results...
Saved group detection results to /content/gt_group_detections_with_bbox.txt

Results Summary:
Total entries: 21363
Grouped entries: 16982 (79.5%)
Ungrouped entries: 4381 (20.5%)


In [None]:
# Clustering performance

import pandas as pd
import numpy as np
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score

# Load Grouped Ground Truth with DBSCAN Results
def load_group_detections(file_path):
    col_names = ["frame", "group_id", "x", "y", "z",
                 "length", "width", "height", "group_label",
                 "x_min", "x_max", "y_min", "y_max", "z_min", "z_max", "group"]
    df = pd.read_csv(file_path, header=None, names=col_names)

    # Ensure group label is integer
    df["group"] = df["group"].astype(int)

    print(f"Loaded {len(df)} entries from {file_path}.")
    return df

# Load output file with clustered groups from DBSCAN applied on GT
dbscan_results = load_group_detections("/content/gt_group_detections_with_bbox.txt")


# Compute Clustering Metrics Per Frame
def compute_clustering_metrics(detections):
    silhouette_scores = []
    db_scores = []
    ch_scores = []

    for frame in detections["frame"].unique():
        frame_data = detections[detections["frame"] == frame]
        clustered_data = frame_data[frame_data["group"] != 0]

        if len(set(clustered_data["group"])) > 1:
            # Use the 3D coordinates (bounding box centers)
            X = clustered_data[["x", "y", "z"]].to_numpy()
            labels = clustered_data["group"].to_numpy()

            try:
                sil_score = silhouette_score(X, labels)
                db_index = davies_bouldin_score(X, labels)
                ch_index = calinski_harabasz_score(X, labels)

                silhouette_scores.append(sil_score)
                db_scores.append(db_index)
                ch_scores.append(ch_index)
            except Exception as e:
                print(f"Skipping frame {frame} due to error: {e}")

    return {
        "silhouette_score": np.mean(silhouette_scores) if silhouette_scores else None,
        "davies_bouldin_index": np.mean(db_scores) if db_scores else None,
        "calinski_harabasz_index": np.mean(ch_scores) if ch_scores else None
    }

# Print Evaluation Results
dbscan_metrics = compute_clustering_metrics(dbscan_results)

print("\n===== DBSCAN-Based Clustering Evaluation on GT Bounding Boxes =====")
if dbscan_metrics["silhouette_score"] is not None:
    print(f"Silhouette Score: {dbscan_metrics['silhouette_score']:.3f} (Higher is better)")
else:
    print("Silhouette Score: Not available")

if dbscan_metrics["davies_bouldin_index"] is not None:
    print(f"Davies-Bouldin Index: {dbscan_metrics['davies_bouldin_index']:.3f} (Lower is better)")
else:
    print("Davies-Bouldin Index: Not available")

if dbscan_metrics["calinski_harabasz_index"] is not None:
    print(f"Calinski-Harabasz Index: {dbscan_metrics['calinski_harabasz_index']:.3f} (Higher is better)")
else:
    print("Calinski-Harabasz Index: Not available")


Loaded 21363 entries from /content/gt_group_detections_with_bbox.txt.

===== DBSCAN-Based Clustering Evaluation on GT Bounding Boxes =====
Silhouette Score: 0.635 (Higher is better)
Davies-Bouldin Index: 0.301 (Lower is better)
Calinski-Harabasz Index: 111.928 (Higher is better)


**Clustering performance on prediction [(RPEA or DCCLA) + (DBSCAN or KMeans or AHC)]**

**DBSCAN**

In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN
from collections import defaultdict


# Load and Parse Input Data
def load_detections(filename):

    column_names = [
        'frame', 'id', 'x_center', 'y_center', 'z_center',
        'length', 'width', 'height', 'heading', 'score'
    ]

    # Read the detection file
    detections = pd.read_csv(filename, header=None, names=column_names)
    print(f"Loaded {len(detections)} detections across {detections['frame'].nunique()} frames.")

    return detections


# Compute Distances and Apply Clustering
def detect_groups(detections, distance_threshold=1.0):

    result = detections.copy()
    result['group'] = 0  # Initialize all detections as ungrouped (group=0)

    unique_frames = result['frame'].unique()
    print(f"Processing {len(unique_frames)} frames...")

    total_groups = 0

    for frame in unique_frames:
        frame_detections = result[result['frame'] == frame] # Extract detections for this frame
        frame_indices = result[result['frame'] == frame].index # example output [0, 1, 2, 3, 4, . .]

        if len(frame_detections) <= 1:
            continue  # Skip frames with only one detection

        centers = frame_detections[['x_center', 'y_center', 'z_center']].values

        # Apply DBSCAN clustering
        clustering = DBSCAN(eps=distance_threshold, min_samples=2, metric='euclidean').fit(centers)
        labels = clustering.labels_ # example output [0, 0, 1, 1, -1] Here -1 is noise.

        # Count the number of detections in each group (excluding noise)
        label_counts = defaultdict(int)
        for label in labels:
            if label != -1:
                label_counts[label] += 1

        #If labels =[0, 0, 1, 1, -1, 2, 2, 2, -1], then group_ids =[0, 0, 0, 0, 0, 0, 0, 0, 0]. Default group is 0 (no group)
        group_ids = np.zeros_like(labels)
        label_to_group = {}
        next_group_id = 1

        for i, label in enumerate(labels):
            if label == -1:
                group_ids[i] = 0  # Noise or single detections remain ungrouped
            else:
                if label not in label_to_group:
                    label_to_group[label] = next_group_id
                    next_group_id += 1
                group_ids[i] = label_to_group[label]

        result.loc[frame_indices, 'group'] = group_ids

        frame_groups = len(set(group_ids) - {0})  # Count groups in this frame (excluding 0)
        total_groups += frame_groups

    print(f"Detected {total_groups} groups across all frames.")

    return result


# Save Group Detection Results
def save_group_detections(detections, output_file):
    detections.to_csv(output_file, header=False, index=False)
    print(f"Saved group detection results to {output_file}")


# Main Function
def main():
    input_file = "/content/det.txt"
    output_file = "/content/group_detections.txt"
    distance_threshold = 2.0 # meters

    print("Loading and parsing detection data...")
    detections = load_detections(input_file)

    print("\n Computing distances and clustering into groups...")
    grouped_detections = detect_groups(detections, distance_threshold)

    print("\n Saving group detection results...")
    save_group_detections(grouped_detections, output_file)

    # Print Summary
    total_detections = len(grouped_detections)
    grouped = grouped_detections[grouped_detections['group'] > 0]
    ungrouped = grouped_detections[grouped_detections['group'] == 0]

    print("\nResults Summary:")
    print(f"Total detections: {total_detections}")
    print(f"Detections in groups: {len(grouped)} ({len(grouped) / total_detections * 100:.1f}%)")
    print(f"Ungrouped detections: {len(ungrouped)} ({len(ungrouped) / total_detections * 100:.1f}%)")

# Run the script
if __name__ == "__main__":
    main()


Step 1: Loading and parsing detection data...
Loaded 14441 detections across 578 frames.

Step 2 & 3: Computing distances and clustering into groups...
Processing 578 frames...
Detected 3053 groups across all frames.

Step 4: Saving group detection results...
Saved group detection results to /content/group_detections.txt

Results Summary:
Total detections: 14441
Detections in groups: 7446 (51.6%)
Ungrouped detections: 6995 (48.4%)


**KMeans with Elbow method**

In [None]:
!pip install kneed

Collecting kneed
  Downloading kneed-0.8.5-py3-none-any.whl.metadata (5.5 kB)
Downloading kneed-0.8.5-py3-none-any.whl (10 kB)
Installing collected packages: kneed
Successfully installed kneed-0.8.5


In [None]:
# Elbow Method using KMeans
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from kneed import KneeLocator # Automatically detects the "elbow" point
from collections import defaultdict

# Load and Parse Input Data
def load_detections(filename):
    column_names = [
        'frame', 'id', 'x_center', 'y_center', 'z_center',
        'length', 'width', 'height', 'heading', 'score'
    ]
    detections = pd.read_csv(filename, header=None, names=column_names)
    print(f"Loaded {len(detections)} detections across {detections['frame'].nunique()} frames.")
    return detections

# Clustering Using KMeans + Elbow
def detect_groups_elbow(detections, min_k=2, max_k=6, plot_dir="elbow_plots"):
    os.makedirs(plot_dir, exist_ok=True)

    result = detections.copy()
    result['group'] = 0
    unique_frames = result['frame'].unique()
    print(f"Processing {len(unique_frames)} frames...")

    total_groups = 0

    for frame in unique_frames:
        frame_data = result[result['frame'] == frame]
        frame_indices = frame_data.index

        if len(frame_data) < min_k:
            print(f"Frame {frame}: Not enough detections to cluster.")
            continue

        coords = frame_data[['x_center', 'y_center', 'z_center']].to_numpy()
        inertias = [] # Inertia = Within-Cluster Sum of Squares (WCSS)
        k_values = list(range(min_k, min(max_k + 1, len(coords) + 1)))

        for k in k_values:
            kmeans = KMeans(n_clusters=k, n_init='auto', random_state=0)
            kmeans.fit(coords)
            inertias.append(kmeans.inertia_)

        best_k = None
        if len(inertias) >= 2:
            try:
                kl = KneeLocator(k_values, inertias, curve='convex', direction='decreasing')
                best_k = kl.elbow
            except:
                best_k = None

        # Plot elbow
        plt.figure()
        plt.plot(k_values, inertias, 'bo-')
        plt.xlabel("Number of Clusters (k)")
        plt.ylabel("Inertia")
        plt.title(f"Elbow Plot - Frame {frame}")
        plt.grid(True)
        plt.savefig(os.path.join(plot_dir, f"frame_{frame:04d}.png"))
        plt.close()

        if best_k is None:
            print(f"Frame {frame}: Elbow point not found. Marking all as ungrouped.")
            result.loc[frame_indices, 'group'] = 0
        else:
            kmeans = KMeans(n_clusters=best_k, n_init='auto', random_state=0)
            labels = kmeans.fit_predict(coords)
            result.loc[frame_indices, 'group'] = labels + 1  # groups start from 1
            print(f"Frame {frame}: Optimal k = {best_k}, Inertia = {kmeans.inertia_:.2f}")
            total_groups += len(set(labels))

    print(f"Detected {total_groups} groups across all frames.")
    return result

# Save Results
def save_group_detections(detections, output_file):
    detections.to_csv(output_file, header=False, index=False)
    print(f"Saved group detection results to {output_file}")

# MAIN Function
def main():
    input_file = "/content/det.txt"
    output_file = "/content/group_detections_elbow.txt"
    plot_dir = "/content/elbow_plots"

    print(" Loading and parsing detection data...")
    detections = load_detections(input_file)

    print("\n Clustering into groups using Elbow Method...")
    grouped_detections = detect_groups_elbow(detections, plot_dir=plot_dir)

    print("\n Saving group detection results...")
    save_group_detections(grouped_detections, output_file)

    total_detections = len(grouped_detections)
    grouped = grouped_detections[grouped_detections['group'] > 0]
    ungrouped = grouped_detections[grouped_detections['group'] == 0]
    print("\nResults Summary:")
    print(f"Total detections: {total_detections}")
    print(f"Detections in groups: {len(grouped)} ({len(grouped) / total_detections * 100:.1f}%)")
    print(f"Ungrouped detections: {len(ungrouped)} ({len(ungrouped) / total_detections * 100:.1f}%)")

# Run the script
if __name__ == "__main__":
    main()


Step 1: Loading and parsing detection data...
Loaded 14441 detections across 578 frames.

Step 2 & 3: Clustering into groups using Elbow Method...
Processing 578 frames...
Frame 0: Optimal k = 4, Inertia = 898.31
Frame 1: Elbow point not found. Marking all as ungrouped.
Frame 2: Optimal k = 3, Inertia = 887.52
Frame 3: Optimal k = 4, Inertia = 650.93
Frame 4: Elbow point not found. Marking all as ungrouped.
Frame 5: Optimal k = 3, Inertia = 1219.43
Frame 6: Optimal k = 3, Inertia = 1564.84
Frame 7: Elbow point not found. Marking all as ungrouped.
Frame 8: Elbow point not found. Marking all as ungrouped.
Frame 9: Elbow point not found. Marking all as ungrouped.
Frame 10: Elbow point not found. Marking all as ungrouped.
Frame 11: Optimal k = 4, Inertia = 667.76
Frame 12: Elbow point not found. Marking all as ungrouped.
Frame 13: Elbow point not found. Marking all as ungrouped.
Frame 14: Elbow point not found. Marking all as ungrouped.
Frame 15: Optimal k = 3, Inertia = 906.64
Frame 16: O

**Hierarchical Agglomerative Clustering (AHC) with k via Elbow**

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import pairwise_distances
from scipy.cluster.hierarchy import linkage
from kneed import KneeLocator


# Load and Parse Input Data
def load_detections(filename):
    column_names = [
        'frame', 'id', 'x_center', 'y_center', 'z_center',
        'length', 'width', 'height', 'heading', 'score'
    ]
    detections = pd.read_csv(filename, header=None, names=column_names)
    print(f"Loaded {len(detections)} detections across {detections['frame'].nunique()} frames.")
    return detections


# Clustering Using Agglomerative + Elbow
def detect_groups_hierarchical_elbow(detections, min_k=2, max_k=6, plot_dir="hierarchical_elbow_plots"):
    os.makedirs(plot_dir, exist_ok=True)

    result = detections.copy()
    result['group'] = 0
    unique_frames = result['frame'].unique()
    print(f"Processing {len(unique_frames)} frames...")

    total_groups = 0

    for frame in unique_frames:
        frame_data = result[result['frame'] == frame]
        frame_indices = frame_data.index

        if len(frame_data) < min_k:
            print(f"Frame {frame}: Not enough detections to cluster.")
            continue

        coords = frame_data[['x_center', 'y_center', 'z_center']].to_numpy()
        k_values = list(range(min_k, min(max_k + 1, len(coords) + 1)))

        inertias = []
        Z = linkage(coords, method='ward')  # hierarchical linkage

        for k in k_values:
            clustering = AgglomerativeClustering(n_clusters=k)
            labels = clustering.fit_predict(coords)
            centers = np.array([coords[labels == i].mean(axis=0) for i in range(k)])
            inertia = np.sum([
                np.sum(np.linalg.norm(coords[labels == i] - centers[i], axis=1) ** 2)
                for i in range(k)
            ])
            inertias.append(inertia)

        best_k = None
        if len(inertias) >= 2:
            try:
                kl = KneeLocator(k_values, inertias, curve='convex', direction='decreasing')
                best_k = kl.elbow
            except:
                best_k = None

        # Plot Elbow
        plt.figure()
        plt.plot(k_values, inertias, 'ro-')
        plt.xlabel("Number of Clusters (k)")
        plt.ylabel("Inertia (Within-cluster sum of squares)")
        plt.title(f"Hierarchical Elbow - Frame {frame}")
        plt.grid(True)
        plt.savefig(os.path.join(plot_dir, f"frame_{frame:04d}.png"))
        plt.close()

        if best_k is None:
            print(f"Frame {frame}: Elbow point not found. Marking all as ungrouped.")
            result.loc[frame_indices, 'group'] = 0
        else:
            clustering = AgglomerativeClustering(n_clusters=best_k)
            labels = clustering.fit_predict(coords)
            result.loc[frame_indices, 'group'] = labels + 1
            print(f"Frame {frame}: Optimal k = {best_k}, Inertia = {inertias[best_k - min_k]:.2f}")
            total_groups += len(set(labels))

    print(f"Detected {total_groups} groups across all frames.")
    return result


# Save Results
def save_group_detections(detections, output_file):
    detections.to_csv(output_file, header=False, index=False)
    print(f"Saved group detection results to {output_file}")


# MAIN Function
def main():
    input_file = "/content/det.txt"
    output_file = "/content/group_detections_hierarchical.txt"
    plot_dir = "/content/hierarchical_elbow_plots"

    print("Loading and parsing detection data...")
    detections = load_detections(input_file)

    print("\n Clustering into groups using Hierarchical Clustering + Elbow Method...")
    grouped_detections = detect_groups_hierarchical_elbow(detections, plot_dir=plot_dir)

    print("\n Saving group detection results...")
    save_group_detections(grouped_detections, output_file)

    total_detections = len(grouped_detections)
    grouped = grouped_detections[grouped_detections['group'] > 0]
    ungrouped = grouped_detections[grouped_detections['group'] == 0]
    print("\nResults Summary:")
    print(f"Total detections: {total_detections}")
    print(f"Detections in groups: {len(grouped)} ({len(grouped) / total_detections * 100:.1f}%)")
    print(f"Ungrouped detections: {len(ungrouped)} ({len(ungrouped) / total_detections * 100:.1f}%)")


# Run the script
if __name__ == "__main__":
    main()


Step 1: Loading and parsing detection data...
Loaded 14441 detections across 578 frames.

Step 2 & 3: Clustering into groups using Hierarchical Clustering + Elbow Method...
Processing 578 frames...
Frame 0: Optimal k = 4, Inertia = 898.31
Frame 1: Elbow point not found. Marking all as ungrouped.
Frame 2: Optimal k = 4, Inertia = 560.35
Frame 3: Optimal k = 3, Inertia = 1069.99
Frame 4: Elbow point not found. Marking all as ungrouped.
Frame 5: Elbow point not found. Marking all as ungrouped.
Frame 6: Optimal k = 3, Inertia = 1505.12
Frame 7: Optimal k = 4, Inertia = 1035.08
Frame 8: Optimal k = 4, Inertia = 829.55
Frame 9: Optimal k = 4, Inertia = 704.20
Frame 10: Elbow point not found. Marking all as ungrouped.
Frame 11: Optimal k = 4, Inertia = 667.76
Frame 12: Elbow point not found. Marking all as ungrouped.
Frame 13: Optimal k = 3, Inertia = 833.67
Frame 14: Elbow point not found. Marking all as ungrouped.
Frame 15: Optimal k = 4, Inertia = 530.75
Frame 16: Elbow point not found. Ma

**Clustering Evaluation Script**

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score

def load_group_detections(file_path):

    # Load group detection results from a .txt file.
    col_names = ["frame", "id", "x_center", "y_center", "z_center",
                 "length", "width", "height", "heading", "score", "group"]
    df = pd.read_csv(file_path, header=None, names=col_names)
    df["group"] = df["group"].astype(int)
    print(f"Loaded {len(df)} detections from {file_path}.")
    return df

def compute_clustering_metrics(detections):

    # Compute silhouette, Davies-Bouldin, and Calinski-Harabasz scores per frame.

    silhouette_scores = []
    db_scores = []
    ch_scores = []

    for frame in detections["frame"].unique():
        frame_data = detections[detections["frame"] == frame]
        clustered_data = frame_data[frame_data["group"] != 0]

        if len(set(clustered_data["group"])) > 1:
            X = clustered_data[["x_center", "y_center", "z_center"]].to_numpy()
            labels = clustered_data["group"].to_numpy()

            try:
                sil_score = silhouette_score(X, labels)
                db_index = davies_bouldin_score(X, labels)
                ch_index = calinski_harabasz_score(X, labels)

                silhouette_scores.append(sil_score)
                db_scores.append(db_index)
                ch_scores.append(ch_index)
            except Exception as e:
                print(f"Skipping frame {frame} due to error: {e}")

    return {
        "silhouette_score": np.mean(silhouette_scores) if silhouette_scores else None,
        "davies_bouldin_index": np.mean(db_scores) if db_scores else None,
        "calinski_harabasz_index": np.mean(ch_scores) if ch_scores else None
    }

def evaluate_clustering(file_path, method_name):

    # Evaluate and print clustering performance given a file path and method name.
    detections = load_group_detections(file_path)
    metrics = compute_clustering_metrics(detections)

    print(f"\n===== {method_name} Clustering Evaluation =====")
    if metrics["silhouette_score"] is not None:
        print(f"Silhouette Score: {metrics['silhouette_score']:.3f} (Higher is better)")
    else:
        print("Silhouette Score: Not available")

    if metrics["davies_bouldin_index"] is not None:
        print(f"Davies-Bouldin Index: {metrics['davies_bouldin_index']:.3f} (Lower is better)")
    else:
        print("Davies-Bouldin Index: Not available")

    if metrics["calinski_harabasz_index"] is not None:
        print(f"Calinski-Harabasz Index: {metrics['calinski_harabasz_index']:.3f} (Higher is better)")
    else:
        print("Calinski-Harabasz Index: Not available")

In [None]:
# Call for all methods
evaluate_clustering("/content/group_detections.txt", "DBSCAN-Based")
evaluate_clustering("/content/group_detections_elbow.txt", "KMeans-Based")
evaluate_clustering("/content/group_detections_hierarchical.txt", "Hierarchical Agglomerative")

Loaded 14441 detections from /content/group_detections.txt.

===== DBSCAN-Based Clustering Evaluation =====
Silhouette Score: 0.814 (Higher is better)
Davies-Bouldin Index: 0.193 (Lower is better)
Calinski-Harabasz Index: 521.517 (Higher is better)
Loaded 14441 detections from /content/group_detections_elbow.txt.

===== KMeans-Based Clustering Evaluation =====
Silhouette Score: 0.505 (Higher is better)
Davies-Bouldin Index: 0.624 (Lower is better)
Calinski-Harabasz Index: 35.680 (Higher is better)
Loaded 14441 detections from /content/group_detections_hierarchical.txt.

===== Hierarchical Agglomerative Clustering Evaluation =====
Silhouette Score: 0.519 (Higher is better)
Davies-Bouldin Index: 0.595 (Lower is better)
Calinski-Harabasz Index: 36.194 (Higher is better)
