Dependencies
----------------

In [7]:
!pip install onnxruntime onnxruntime-gpu
!pip install gammatone
!pip install librosa
!pip install plotly
!pip install umap-learn

[0m

Clustering + Report
----------------------

In [8]:
# onnx_cnn_latent_pipeline.py
#!wget -O net_eff_with_latent.onnx "https://www.dropbox.com/scl/fi/ur5n2co355tus6kg3qp1v/net_eff_with_latent.onnx?rlkey=ecfzipwdtbxpc2b79782hathl&st=aap83t7o&dl=1"
#!pip install onnxruntime onnxruntime-gpu

import os
import numpy as np
import librosa
import onnxruntime as ort
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans
from sklearn.metrics import (
    adjusted_rand_score,
    adjusted_mutual_info_score,
    silhouette_score,
    davies_bouldin_score,
    calinski_harabasz_score,
)
from scipy.optimize import linear_sum_assignment
from typing import Iterable, Tuple, Dict, Any, Optional

# CHANGED: safe UMAP import & plotting up front
import umap.umap_ as umap
import plotly.express as px

# ----------------------------
# üéØ Metrics
# ----------------------------
def hungarian_accuracy(y_true, y_pred) -> float:
    D = int(max(y_pred.max(), y_true.max())) + 1
    w = np.zeros((D, D), dtype=np.int64)
    for i in range(len(y_pred)):
        w[y_pred[i], y_true[i]] += 1
    row_ind, col_ind = linear_sum_assignment(w.max() - w)
    return float(sum(w[i, j] for i, j in zip(row_ind, col_ind)) / len(y_pred))

def _internal_indices(X: np.ndarray, labels: np.ndarray) -> Dict[str, float]:
    """Compute Silhouette, Davies‚ÄìBouldin, Calinski‚ÄìHarabasz on X with given labels.
       Returns NaN for invalid cases (e.g., 1 cluster)."""
    out = {
        "silhouette": float("nan"),
        "davies_bouldin": float("nan"),
        "calinski_harabasz": float("nan"),
    }
    uniq = np.unique(labels)
    if len(uniq) <= 1 or len(uniq) >= len(labels):
        return out
    try:
        out["silhouette"] = float(silhouette_score(X, labels))
    except Exception:
        pass
    try:
        out["davies_bouldin"] = float(davies_bouldin_score(X, labels))
    except Exception:
        pass
    try:
        out["calinski_harabasz"] = float(calinski_harabasz_score(X, labels))
    except Exception:
        pass
    return out

# ----------------------------
# üéöÔ∏è Features (Log-Mel)
# ----------------------------
def extract_logmel(
    file_path: str,
    sr: int = 16000,
    n_mels: int = 64,
    target_shape: Tuple[int, int] = (224, 224),
):
    try:
        audio = np.load(file_path).astype(np.float32)
        maxabs = np.max(np.abs(audio))
        if maxabs > 0:
            audio = audio / maxabs
        mel = librosa.feature.melspectrogram(
            y=audio, sr=sr, n_fft=1024, hop_length=512, n_mels=n_mels
        )
        logmel = librosa.power_to_db(mel + 1e-8)

        padded = np.zeros(target_shape, dtype=np.float32)
        h, w = min(logmel.shape[0], target_shape[0]), min(logmel.shape[1], target_shape[1])
        padded[:h, :w] = logmel[:h, :w]
        return padded
    except Exception as e:
        print(f"Failed processing {file_path}: {e}", flush=True)
        return None

# ----------------------------
# üß† ONNX Inference ‚Üí Latents
# ----------------------------
def extract_cnn_latents(
    X: np.ndarray,
    model_path: str = "net_eff_with_latent.onnx",
    output_name: str = "new_fc",
    providers: Iterable[str] = ("CUDAExecutionProvider", "CPUExecutionProvider"),
) -> np.ndarray:
    session = ort.InferenceSession(model_path, providers=list(providers))
    input_name = session.get_inputs()[0].name

    latents = []
    for idx, x in enumerate(X):
        # Expect [H, W] -> [1, 3, H, W]
        x = np.expand_dims(x, axis=0)
        x = np.repeat(x[np.newaxis], 3, axis=1)
        try:
            out = session.run([output_name], {input_name: x.astype(np.float32)})
            latents.append(out[0].squeeze())
        except Exception as e:
            print(f"ONNX failed on index {idx}: {e}", flush=True)

    print(f"‚úÖ Extracted {len(latents)} latent vectors", flush=True)
    return np.asarray(latents)

# ----------------------------
# üîó Clustering (+ optional UMAP)
# ----------------------------
def run_cnn_clustering(
    X_latent: np.ndarray,
    y: np.ndarray,
    *,
    kmeans_k: Optional[int] = None,        # NEW: choose k; if None, infer from y
    do_umap: bool = True,
    umap_components: int = 3,
    umap_html_path: str = "cnn_latent_umap.html",
    random_state: int = 42,
) -> Dict[str, Any]:
    n_clusters = int(kmeans_k) if kmeans_k is not None else len(np.unique(y))
    kmeans = KMeans(n_clusters=n_clusters, n_init=10, random_state=random_state)
    preds = kmeans.fit_predict(X_latent)

    # External metrics
    ari = adjusted_rand_score(y, preds)
    ami = adjusted_mutual_info_score(y, preds)
    hacc = hungarian_accuracy(y, preds)

    # Internal indices (on latent space)
    extra = _internal_indices(X_latent, preds)

    print(f"\nüìä CNN Latent Clustering (k={n_clusters})", flush=True)
    print(f"ARI: {ari:.4f} | AMI: {ami:.4f} | H-Acc: {hacc:.4f}", flush=True)
    print(f"Silhouette: {extra['silhouette']:.4f} | "
          f"Davies‚ÄìBouldin: {extra['davies_bouldin']:.4f} | "
          f"Calinski‚ÄìHarabasz: {extra['calinski_harabasz']:.2f}", flush=True)

    results: Dict[str, Any] = {
        "ari": float(ari),
        "ami": float(ami),
        "hungarian_accuracy": float(hacc),
        "silhouette": float(extra["silhouette"]),
        "davies_bouldin": float(extra["davies_bouldin"]),
        "calinski_harabasz": float(extra["calinski_harabasz"]),
        "kmeans_k": int(n_clusters),
    }

    # Optional UMAP viz on the same latent space
    if do_umap:
        if umap_components not in (2, 3):
            raise ValueError("umap_components must be 2 or 3")
        reducer = umap.UMAP(n_components=umap_components, random_state=random_state)
        X_umap = reducer.fit_transform(X_latent)

        if umap_components == 3:
            fig = px.scatter_3d(
                x=X_umap[:, 0], y=X_umap[:, 1], z=X_umap[:, 2],
                color=y.astype(str),
                title=(f"CNN Latent Clustering (k={n_clusters}) | "
                       f"ARI {ari:.4f}, AMI {ami:.4f}, H-Acc {hacc:.4f}")
            )
        else:
            fig = px.scatter(
                x=X_umap[:, 0], y=X_umap[:, 1],
                color=y.astype(str),
                title=(f"CNN Latent Clustering (k={n_clusters}) | "
                       f"ARI {ari:.4f}, AMI {ami:.4f}")
            )

        fig.write_html(umap_html_path)
        print(f"Saved UMAP visualization to '{umap_html_path}'", flush=True)
        results["umap_html_path"] = umap_html_path

    return results

# ----------------------------
# üöÄ Runner (TEST-ONLY, 20%)
# ----------------------------
def run_onnx_pipeline(
    dataset_path: str,
    do_clustering: int = 1,
    *,
    model_path: str = "net_eff_with_latent.onnx",
    output_name: str = "new_fc",
    providers: Iterable[str] = ("CUDAExecutionProvider", "CPUExecutionProvider"),
    # NOTE: enforced to 20% test set, as requested
    random_state: int = 42,
    sr: int = 16000,
    n_mels: int = 64,
    target_shape: Tuple[int, int] = (224, 224),
    do_umap: bool = True,
    umap_components: int = 3,
    umap_html_path: str = "cnn_latent_umap.html",
    kmeans_k: Optional[int] = None,  # NEW: pass k through to clustering
) -> Dict[str, Any]:
    """
    End-to-end (TEST-ONLY, 20%):
      - Loads labels + .npy waveforms from `dataset_path`
      - Splits out a fixed 20% TEST set (stratified)
      - Extracts log-mel inputs for TEST
      - Runs ONNX to get CNN latent vectors for TEST
      - Optionally runs KMeans (+UMAP) on TEST latents and reports ARI/AMI/H-Acc + Silhouette/DB/CH
    """
    label_path = os.path.join(dataset_path, "labels.npy")
    if not os.path.isdir(dataset_path):
        raise FileNotFoundError(f"Dataset directory not found: {dataset_path}")
    if not os.path.isfile(label_path):
        raise FileNotFoundError(f"Label file not found: {label_path}")

    all_files = sorted(
        [f for f in os.listdir(dataset_path) if f.endswith(".npy") and f != "labels.npy"],
        key=lambda x: int(os.path.splitext(x)[0])
    )
    labels = np.load(label_path)
    assert len(labels) == len(all_files), "Mismatch between files and labels!"

    full_paths = [os.path.join(dataset_path, f) for f in all_files]
    indices = np.arange(len(full_paths))

    # FIXED 20% TEST SPLIT (stratified)
    _, test_idx = train_test_split(
        indices, test_size=0.2, stratify=labels, random_state=random_state
    )

    # Build TEST inputs
    X_test, y_test = [], []
    for idx in test_idx:
        logmel = extract_logmel(full_paths[idx], sr=sr, n_mels=n_mels, target_shape=target_shape)
        if logmel is not None:
            X_test.append(logmel)
            y_test.append(labels[idx])

    X_test = np.asarray(X_test)
    y_test = np.asarray(y_test)
    print(f"‚úÖ Loaded TEST set (20%): X={X_test.shape}, y={y_test.shape}", flush=True)

    # ONNX ‚Üí latents (TEST)
    X_latent_test = extract_cnn_latents(
        X_test, model_path=model_path, output_name=output_name, providers=providers
    )

    results: Dict[str, Any] = {
        "test_samples": int(len(X_test)),
        "latents_shape": tuple(X_latent_test.shape),
        "split": "test (20%)",
        "model_path": model_path,
        "output_name": output_name,
    }

    # Optional clustering + UMAP on TEST latents
    if do_clustering:
        metrics = run_cnn_clustering(
            X_latent=X_latent_test,
            y=y_test,
            kmeans_k=kmeans_k,             # NEW
            do_umap=do_umap,
            umap_components=umap_components,
            umap_html_path=umap_html_path,
            random_state=random_state,
        )
        results["metrics"] = metrics

    return results

In [9]:
res = run_onnx_pipeline(
    dataset_path="/notebooks/dataset_preprocessed",
    do_clustering=1,
    model_path="net_eff_with_latent.onnx",
    output_name="new_fc",
    kmeans_k=6,                    # set k explicitly (or None to infer)
    do_umap=True,
    umap_html_path="cnn_latent_umap.html",
)
print(res.get("metrics"))

‚úÖ Loaded TEST set (20%): X=(82655, 224, 224), y=(82655,)



Specified provider 'CUDAExecutionProvider' is not in available provider names.Available providers: 'AzureExecutionProvider, CPUExecutionProvider'



‚úÖ Extracted 82655 latent vectors

üìä CNN Latent Clustering (k=6)
ARI: 0.0821 | AMI: 0.1021 | H-Acc: 0.3051
Silhouette: 0.2338 | Davies‚ÄìBouldin: 1.2509 | Calinski‚ÄìHarabasz: 28019.18



n_jobs value 1 overridden to 1 by setting random_state. Use no seed for parallelism.



Saved UMAP visualization to 'cnn_latent_umap.html'
{'ari': 0.08212392486495881, 'ami': 0.10211889006053312, 'hungarian_accuracy': 0.30507531304821245, 'silhouette': 0.23382499814033508, 'davies_bouldin': 1.250871491520521, 'calinski_harabasz': 28019.17804328841, 'kmeans_k': 6, 'umap_html_path': 'cnn_latent_umap.html'}


Clustering + Report
----------------------

In [10]:
# =========================
# ===== Report Add-On =====
# =========================
import math, base64, time, contextlib
from io import BytesIO
from collections import Counter, OrderedDict
from typing import Sequence
import matplotlib.pyplot as plt
from sklearn.metrics import silhouette_samples
from sklearn.metrics.pairwise import cosine_similarity
from scipy.signal import spectrogram, get_window
import plotly.io as pio

# ---------- File utilities & spectrogram helpers ----------
def _list_audio_npy_files(folder):
    files = []
    for f in os.listdir(folder):
        if f.endswith(".npy") and f != "labels.npy":
            stem = os.path.splitext(f)[0]
            try:
                key = int(stem)
            except ValueError:
                key = stem
            files.append((key, os.path.join(folder, f)))
    files.sort(key=lambda t: t[0])
    return [p for _, p in files]

def _load_class_labels_if_any(folder, count):
    lbl_path = os.path.join(folder, "labels.npy")
    if os.path.isfile(lbl_path):
        try:
            arr = np.load(lbl_path, allow_pickle=True)
            if len(arr) < count:
                pad = np.array(["Unknown"] * (count - len(arr)), dtype=object)
                arr = np.concatenate([arr, pad], axis=0)
            elif len(arr) > count:
                arr = arr[:count]
            return arr
        except Exception:
            pass
    return np.array(["Unknown"] * count, dtype=object)

def _ali_spec(x, fs):
    Lframe2 = 1000
    po = 80
    lov = int(np.ceil((po / 100) * Lframe2))
    taper = get_window('hann', Lframe2)
    Nfft = 2 ** (int(np.floor(np.log2(Lframe2))) + 2)
    f, t, s = spectrogram(x, fs=fs, window=taper, noverlap=lov, nfft=Nfft, mode='complex')
    as_ = np.abs(s)
    as_max = np.max(as_) if np.isfinite(np.max(as_)) and np.max(as_) > 0 else 1.0
    sdb = 10 * np.log10(100 * as_ / as_max + 1e-10)
    min_inx = np.argmin(np.abs(f - 0))
    max_inx = np.argmin(np.abs(f - 800))
    return sdb[min_inx:max_inx+1, :], f[min_inx:max_inx+1], t

def _spec_img_base64_from_wave(audio, fs=10000, title="Spectrogram"):
    spec, f_axis, t_axis = _ali_spec(audio, fs)
    fig, ax = plt.subplots(figsize=(8, 3))
    ax.imshow(spec, aspect='auto', origin='lower',
              extent=[t_axis[0], t_axis[-1], f_axis[0], f_axis[-1]], cmap='hsv')
    ax.set_title(title); ax.set_xlabel("Time (s)"); ax.set_ylabel("Frequency (Hz)")
    fig.tight_layout()
    buf = BytesIO(); fig.savefig(buf, format='png'); plt.close(fig); buf.seek(0)
    return base64.b64encode(buf.read()).decode("utf-8")

def _spec_img_base64_from_matrix(S, title="Mel-Spectrogram"):
    # S assumed ~ (F, T) or (1, F, T)
    if S.ndim == 3:
        S = S.squeeze(0)
    fig, ax = plt.subplots(figsize=(8, 3))
    ax.imshow(S, aspect='auto', origin='lower', cmap='viridis')
    ax.set_title(title); ax.set_xlabel("Frames"); ax.set_ylabel("Mel bins")
    fig.tight_layout()
    buf = BytesIO(); fig.savefig(buf, format='png'); plt.close(fig); buf.seek(0)
    return base64.b64encode(buf.read()).decode("utf-8")

def _carousel_html(cluster_id, scope, base64_imgs):
    cid = f"carousel_{scope}_{cluster_id}"
    indicators = "".join(
        f'<button type="button" data-bs-target="#{cid}" data-bs-slide-to="{i}" {"class=active" if i==0 else ""} aria-current="true" aria-label="Slide {i+1}"></button>'
        for i in range(len(base64_imgs))
    )
    items = "".join(
        f'<div class="carousel-item {"active" if i==0 else ""}"><div class="d-flex justify-content-center"><img class="d-block w-100" src="data:image/png;base64,{img}"></div></div>'
        for i, img in enumerate(base64_imgs)
    )
    return f"""
    <div id="{cid}" class="carousel slide" data-bs-interval="false" data-bs-touch="false">
      <div class="carousel-indicators">{indicators}</div>
      <div class="carousel-inner">{items}</div>
      <button class="carousel-control-prev" type="button" data-bs-target="#{cid}" data-bs-slide="prev">
        <span class="carousel-control-prev-icon" aria-hidden="true"></span>
        <span class="visually-hidden">Previous</span>
      </button>
      <button class="carousel-control-next" type="button" data-bs-target="#{cid}" data-bs-slide="next">
        <span class="carousel-control-next-icon" aria-hidden="true"></span>
        <span class="visually-hidden">Next</span>
      </button>
    </div>
    """

# ---------- Clustering choices & per-cluster metrics ----------
def _choose_clusterer(algorithm: str, embeddings: np.ndarray, n_clusters: int):
    if algorithm == 'kmeans':
        model = KMeans(n_clusters=n_clusters, n_init=10, random_state=42).fit(embeddings)
        return model.labels_, None
    elif algorithm == 'agglomerative':
        from sklearn.cluster import AgglomerativeClustering
        model = AgglomerativeClustering(n_clusters=n_clusters).fit(embeddings)
        return model.labels_, None
    elif algorithm == 'gmm':
        from sklearn.mixture import GaussianMixture
        gm = GaussianMixture(n_components=n_clusters, covariance_type='full', random_state=42).fit(embeddings)
        return gm.predict(embeddings), gm.predict_proba(embeddings)
    else:
        raise ValueError(f"Unsupported clustering algorithm: {algorithm}")

def _per_cluster_stats(embeddings, cluster_labels, top_n=10, samples_per_cluster=10, cosine_eps=1e-8):
    """Returns list of dicts for top-N clusters by size with metrics & exemplar indices."""
    embeddings = np.asarray(embeddings, dtype=np.float32)
    cluster_labels = np.asarray(cluster_labels)
    uniq, counts = np.unique(cluster_labels, return_counts=True)

    # Silhouette per-sample (if valid), then mean per cluster
    per_sample_sil = None
    if np.all(counts >= 2) and len(uniq) > 1 and embeddings.shape[0] > len(uniq):
        try:
            per_sample_sil = silhouette_samples(embeddings, cluster_labels)
        except Exception:
            per_sample_sil = None

    clusters = []
    for c in uniq:
        idxs = np.where(cluster_labels == c)[0]
        Xi = embeddings[idxs]
        size = len(idxs)
        ctr = Xi.mean(axis=0, keepdims=True)
        # Intra-cluster variance (mean L2^2 to centroid)
        d2 = np.sum((Xi - ctr) ** 2, axis=1)
        intra_var = float(np.mean(d2)) if size > 0 else 0.0
        # Mean cosine similarity to centroid
        Xi_n = Xi / (np.linalg.norm(Xi, axis=1, keepdims=True) + cosine_eps)
        ctr_n = ctr / (np.linalg.norm(ctr, axis=1, keepdims=True) + cosine_eps)
        mean_cos = float(np.mean((Xi_n @ ctr_n.T).squeeze())) if size > 0 else 1.0
        # Per-cluster silhouette
        sil = float(np.mean(per_sample_sil[idxs])) if per_sample_sil is not None else float("nan")
        # pick K nearest to centroid
        order = np.argsort(d2)
        exemplars = idxs[order[: min(samples_per_cluster, size)]]
        clusters.append({
            "cluster_id": int(c),
            "size": int(size),
            "silhouette": sil,
            "intra_variance": intra_var,
            "mean_cosine_to_centroid": mean_cos,
            "exemplar_indices": exemplars.tolist(),
        })

    clusters_sorted = sorted(clusters, key=lambda z: z["size"], reverse=True)[: top_n]
    return clusters_sorted

# ---------- Embedding extraction via your existing ONNX path ----------
def _extract_onnx_embeddings_subset(
    dataset_path: str,
    *,
    model_path: str,
    output_name: str,
    providers: Iterable[str],
    subset_fraction: float = 0.02,
    subset_seed: int = 42,
    subset_strategy: str = "random",  # "random" | "head" | "tail"
    sr: int = 16000,
    n_mels: int = 64,
    target_shape: Tuple[int, int] = (224, 224),
):
    """Select a subset of files from dataset_path, build log-mels, run ONNX -> latents."""
    all_files = _list_audio_npy_files(dataset_path)
    if len(all_files) == 0:
        raise RuntimeError(f"No .npy files found in {dataset_path}")
    n_sub = max(1, int(math.ceil(len(all_files) * subset_fraction)))
    if subset_strategy == "random":
        rng = np.random.RandomState(subset_seed)
        chosen_idx = np.sort(rng.choice(len(all_files), size=n_sub, replace=False))
    elif subset_strategy == "head":
        chosen_idx = np.arange(0, n_sub)
    elif subset_strategy == "tail":
        chosen_idx = np.arange(len(all_files) - n_sub, len(all_files))
    else:
        raise ValueError(f"Unknown subset_strategy: {subset_strategy}")

    chosen_files = [all_files[i] for i in chosen_idx]

    # Build log-mels
    X = []
    valid_files = []
    for fp in chosen_files:
        logmel = extract_logmel(fp, sr=sr, n_mels=n_mels, target_shape=target_shape)
        if logmel is not None:
            X.append(logmel); valid_files.append(fp)
    if not X:
        raise RuntimeError("No valid samples after log-mel extraction.")
    X = np.asarray(X, dtype=np.float32)

    # ONNX -> latents
    latents = extract_cnn_latents(
        X, model_path=model_path, output_name=output_name, providers=providers
    )
    return latents, valid_files

from collections import Counter
from sklearn.metrics.pairwise import cosine_similarity
from scipy.stats import entropy

def evaluate_cluster_metrics(embeddings, idxs, location_labels, location_entropy_base=None):
    X = embeddings[idxs]
    if X.shape[0] == 0:
        return {'variance': 0.0, 'mean_sim': 1.0, 'entropy': 0.0, 'quality': 0.0, 'novelty': 0.0}
    center = np.mean(X, axis=0, keepdims=True)
    variance = float(np.mean(np.sum((X - center) ** 2, axis=1)))

    if len(X) > 1:
        cos_sim = cosine_similarity(X)
        iu = np.triu_indices_from(cos_sim, k=1)
        mean_sim = float(np.mean(cos_sim[iu])) if iu[0].size > 0 else 1.0
    else:
        mean_sim = 1.0

    loc_counts = Counter(location_labels[idxs])
    loc_probs = np.array(list(loc_counts.values()), dtype=np.float32)
    loc_probs /= max(loc_probs.sum(), 1e-8)
    base = int(location_entropy_base or len(set(location_labels)))
    loc_entropy = float(entropy(loc_probs, base=base)) if base > 1 else 0.0
    max_ent = np.log2(base) if base > 1 else 1.0
    entropy_score = 1.0 - (loc_entropy / max_ent) if base > 1 else 1.0
    quality = float((mean_sim / (variance + 1e-8)) * entropy_score)
    novelty = float((loc_entropy / max_ent) * variance) if base > 1 else 0.0

    return {'variance': variance, 'mean_sim': mean_sim, 'entropy': loc_entropy,
            'quality': quality, 'novelty': novelty}

# ---------- Main analysis to HTML (mirrors your SimCLR script) ----------
def analyze_onnx_unsupervised_to_html(
    model_path: str,
    dataset_paths: Sequence[str],
    labels_list: Sequence[str],
    *,
    output_name: str = "new_fc",
    providers: Iterable[str] = ("CUDAExecutionProvider", "CPUExecutionProvider"),
    cluster_method: str = "kmeans",  # 'kmeans' | 'gmm' | 'agglomerative'
    n_clusters: int = 60,
    subset_fraction: float = 0.02,
    subset_seed: int = 42,
    subset_strategy: str = "random",
    sr: int = 16000,
    n_mels: int = 64,
    target_shape: Tuple[int, int] = (224, 224),
    top_n_clusters: int = 10,
    samples_per_cluster: int = 10,
):
    # 1) Collect embeddings + metadata across locations
    embeddings_all, loc_labels_all, class_labels_all, file_paths_all = [], [], [], []

    for ds_path, loc_label in zip(dataset_paths, labels_list):
        print(f"[INFO] Extracting ONNX latents from {ds_path}")
        H, chosen_files = _extract_onnx_embeddings_subset(
            ds_path,
            model_path=model_path,
            output_name=output_name,
            providers=providers,
            subset_fraction=subset_fraction,
            subset_seed=subset_seed,
            subset_strategy=subset_strategy,
            sr=sr, n_mels=n_mels, target_shape=target_shape,
        )
        embeddings_all.append(H)
        file_paths_all.extend(chosen_files)
        loc_labels_all.extend([loc_label] * H.shape[0])

        # class labels from labels.npy if available (aligned by filename stem)
        full_files_sorted = _list_audio_npy_files(ds_path)
        cls_full = _load_class_labels_if_any(ds_path, count=len(full_files_sorted))
        name_to_label = {os.path.basename(p): cls_full[i] for i, p in enumerate(full_files_sorted)}
        class_labels_all.extend([name_to_label.get(os.path.basename(p), "Unknown") for p in chosen_files])

    embeddings = np.vstack(embeddings_all).astype(np.float32)
    location_labels = np.array(loc_labels_all, dtype=object)
    class_labels = np.array(class_labels_all, dtype=object)
    original_paths = np.array(file_paths_all, dtype=object)
    print(f"[INFO] Total subset size: {embeddings.shape}")

    # 2) UMAP projection (3D)
    print(f"[INFO] Running UMAP on {embeddings.shape[0]} embeddings (dim={embeddings.shape[1]})")
    reducer = umap.UMAP(n_components=3, n_neighbors=15, min_dist=0.1, metric="cosine", random_state=42)
    t0 = time.perf_counter(); proj_3d = reducer.fit_transform(embeddings); t1 = time.perf_counter()
    print(f"[INFO] UMAP done in {t1 - t0:.2f}s")

    # 3) Clustering
    print(f"[INFO] Clustering with {cluster_method}, k={n_clusters}")
    t2 = time.perf_counter(); cluster_labels, _ = _choose_clusterer(cluster_method, embeddings, n_clusters); t3 = time.perf_counter()
    print(f"[INFO] Clustering done in {t3 - t2:.2f}s")

    # 4) Global metrics (safe)
    uniq = np.unique(cluster_labels)
    valid = (len(uniq) > 1) and (embeddings.shape[0] > len(uniq))
    def _safe(fn, X, y):
        try:
            return float(fn(X, y)) if valid else float("nan")
        except Exception:
            return float("nan")
    sil = _safe(silhouette_score, embeddings, cluster_labels)
    dbi = _safe(davies_bouldin_score, embeddings, cluster_labels)
    ch  = _safe(calinski_harabasz_score, embeddings, cluster_labels)
    print(f"[INFO] Silhouette={sil if sil==sil else 'nan'} | DBI={dbi if dbi==dbi else 'nan'} | CH={ch if ch==ch else 'nan'}")

    # 5) UMAP figure
    title_txt = f"{cluster_method.capitalize()} (Sil={sil:.3f} | DBI={dbi:.3f} | CH={ch:.1f})"
    umap_fig = px.scatter_3d(
        x=proj_3d[:, 0], y=proj_3d[:, 1], z=proj_3d[:, 2],
        color=[str(c) for c in cluster_labels],
        symbol=location_labels,
        hover_data={"Cluster": cluster_labels, "Class": class_labels},
        title=title_txt, opacity=0.85, height=800
    )
    umap_html = pio.to_html(umap_fig, include_plotlyjs="cdn", full_html=False)

    # 6) Per-cluster details (top-N)
    cluster_blocks = []
    top_clusters = _per_cluster_stats(
        embeddings, cluster_labels,
        top_n=top_n_clusters, samples_per_cluster=samples_per_cluster
    )

    for c in top_clusters:
        cid = c["cluster_id"]
        idxs = np.where(cluster_labels == cid)[0]
        size = len(idxs)

        # distributions
        loc_counts = Counter(location_labels[idxs])
        cls_counts = Counter(class_labels[idxs])

        base_for_entropy = len(set(location_labels))
        # inside the loop for each cluster `cid`:
        metrics = evaluate_cluster_metrics(embeddings, idxs, location_labels,
                                           location_entropy_base=base_for_entropy)

        meta_html = "<p><strong>Location Distribution:</strong></p><ul>" + "".join(
            f"<li><b>{loc}</b>: {count} ({count/size:.1%})</li>" for loc, count in Counter(location_labels[idxs]).items()
        ) + "</ul>"

        meta_html += "<p><strong>Class Distribution:</strong></p><ul>" + "".join(
            f"<li>{cls}: {count}</li>" for cls, count in Counter(class_labels[idxs]).items()
        ) + "</ul>"

        # identical to SimCLR report (you can append your per-cluster silhouette line if you like)
        meta_html += f"""
        <p><strong>Cluster Metrics:</strong></p>
        <ul>
          <li>Size: {size}</li>
          <li>Intra-Cluster Variance: {metrics['variance']:.4f}</li>
          <li>Mean Cosine Similarity: {metrics['mean_sim']:.4f}</li>
          <li>Location Entropy: {metrics['entropy']:.3f}</li>
          <li>Composite Quality Score: {metrics['quality']:.4f}</li>
          <li><strong>Novelty Score:</strong> {metrics['novelty']:.4f}</li>
        </ul>
        """

        # exemplars: nearest to centroid (already chosen)
        imgs64 = []
        for i, ex_idx in enumerate(c["exemplar_indices"]):
            try:
                x = np.load(original_paths[ex_idx], mmap_mode="r")
                if x.ndim == 1:
                    title = f"#{i+1} | {location_labels[ex_idx]} | Class {class_labels[ex_idx]}"
                    imgs64.append(_spec_img_base64_from_wave(x.astype(np.float32), title=title))
                else:
                    title = f"#{i+1} | {location_labels[ex_idx]} | Class {class_labels[ex_idx]}"
                    imgs64.append(_spec_img_base64_from_matrix(x, title=title))
            except Exception as e:
                imgs64.append(_spec_img_base64_from_matrix(np.zeros((64,64)), title=f"Error: {e}"))

        carousel_html = _carousel_html(cid, "onnx", imgs64)
        block = f"<div class='col-md-6 mb-4'><h4>Cluster {cid} (n={size})</h4>{meta_html}{carousel_html}</div>"
        cluster_blocks.append(block)

    cluster_html = ""
    for i in range(0, len(cluster_blocks), 2):
        cluster_html += "<div class='row'>" + "".join(cluster_blocks[i:i+2]) + "</div>"

    return f"""
    <div class='section'>
      <h2>ONNX Latent Clustering Analysis (subset {int(subset_fraction*100)}%)</h2>
      <div class="container">
        <div class="row justify-content-center mb-4">
          <div class="col-md-12 d-flex justify-content-center">{umap_html}</div>
        </div>
      </div>
      {cluster_html}
    </div>
    """

# ---------- Top-level HTML report ----------
def generate_full_onnx_report(
    model_path: str,
    dataset_paths: Sequence[str],
    labels_list: Sequence[str],
    *,
    output_name: str = "new_fc",
    providers: Iterable[str] = ("CUDAExecutionProvider", "CPUExecutionProvider"),
    cluster_method: str = "kmeans",
    n_clusters: int = 60,
    subset_fraction: float = 0.02,
    subset_seed: int = 42,
    subset_strategy: str = "random",
    sr: int = 16000,
    n_mels: int = 64,
    target_shape: Tuple[int, int] = (224, 224),
    top_n_clusters: int = 10,
    samples_per_cluster: int = 10,
    out_prefix: str = "onnx_unsup_report",
) -> str:
    html = f"""
    <!DOCTYPE html>
    <html lang="en"><head>
      <meta charset="UTF-8">
      <title>Unsupervised Clustering Report (ONNX Latents)</title>
      <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
      <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
      <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
      <style>
        body {{ font-family: Arial, sans-serif; padding: 20px; background-color: #f5f5f5; }}
        h1 {{ color: #2c3e50; }}
        h2, h4 {{ color: #34495e; }}
        hr {{ border-top: 2px solid #bbb; margin-top: 40px; margin-bottom: 40px; }}
        .section {{ margin-bottom: 60px; padding: 20px; background: white; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
      </style>
    </head><body>
      <h1 class='mb-4'>Unsupervised Clustering Report (ONNX CNN Latents)</h1>
      <p><strong>Locations:</strong> {', '.join(labels_list)}</p>
      <p><strong>Subset:</strong> {int(subset_fraction*100)}% ‚Ä¢ Strategy: {subset_strategy} ‚Ä¢ Seed: {subset_seed}</p>
      <hr>
    """
    html += analyze_onnx_unsupervised_to_html(
        model_path=model_path,
        dataset_paths=list(dataset_paths),
        labels_list=list(labels_list),
        output_name=output_name,
        providers=providers,
        cluster_method=cluster_method,
        n_clusters=int(n_clusters),
        subset_fraction=subset_fraction,
        subset_seed=subset_seed,
        subset_strategy=subset_strategy,
        sr=sr, n_mels=n_mels, target_shape=target_shape,
        top_n_clusters=top_n_clusters,
        samples_per_cluster=samples_per_cluster,
    )
    html += "</body></html>"
    out_path = f"{out_prefix}_{cluster_method}_subset{int(subset_fraction*100)}.html"
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(html)
    return out_path

In [11]:
report_path = generate_full_onnx_report(
    model_path="net_eff_with_latent.onnx",
    dataset_paths=["/notebooks/dataset_preprocessed"],   # can pass multiple
    labels_list=["PR_U1137"],                             # one label per dataset
    cluster_method="kmeans",                              # 'kmeans' | 'gmm' | 'agglomerative'
    n_clusters=60,
    subset_fraction=0.2,                                 # 2% ‚Äúsafe mode‚Äù subset
    subset_seed=42,
    subset_strategy="random",
    top_n_clusters=30,
    samples_per_cluster=10,
    out_prefix="FADAR_unsup_report_k60"
)
print("‚úÖ Report saved to:", report_path)


[INFO] Extracting ONNX latents from /notebooks/dataset_preprocessed



Specified provider 'CUDAExecutionProvider' is not in available provider names.Available providers: 'AzureExecutionProvider, CPUExecutionProvider'



‚úÖ Extracted 82655 latent vectors
[INFO] Total subset size: (82655, 6)
[INFO] Running UMAP on 82655 embeddings (dim=6)



n_jobs value 1 overridden to 1 by setting random_state. Use no seed for parallelism.



[INFO] UMAP done in 63.73s
[INFO] Clustering with kmeans, k=60
[INFO] Clustering done in 13.40s
[INFO] Silhouette=0.14605672657489777 | DBI=1.3729855360610055 | CH=9004.832101821683
‚úÖ Report saved to: FADAR_unsup_report_k60_kmeans_subset20.html
