<a href="https://colab.research.google.com/github/jamesstaub/Listen-Up/blob/main/audio_slicer_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [62]:
# @title 1. Install Dependencies & Import Libraries
!pip install -q umap-learn
!pip install -q librosa soundfile ipywidgets

import os
import glob
import numpy as np
import pandas as pd
import librosa
import soundfile as sf

from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
from sklearn.preprocessing import RobustScaler, MinMaxScaler
import umap
from scipy.spatial.distance import cdist


In [6]:
from google.colab import drive
# @title 2. Mount Google Drive
drive.mount('/content/drive')
print("‚úÖ Drive Mounted.")


Mounted at /content/drive
‚úÖ Drive Mounted.


In [16]:
CONFIG = {
    # ---- paths ----
    "input_folder": "/content/drive/My Drive/audio/sp-tools-corpora/preparedpiano",
    "output_folder": "/content/drive/My Drive/audio/sp-tools-corpora/preparedpiano/clusters",
    "data_file": "/content/drive/My Drive/audio/sp-tools-corpora/preparedpiano/clusters/analysis_data.csv",

    # ---- behavior ----
    "resume": False,        # load existing analysis if present
    "clear_existing": True,  # nuke data + clusters before run

    # ---- audio ----
    "sample_rate": 22050,
    "stereo_mode": "sum",
    "file_limit": 3,

    "min_duration": 0.1,
    "max_duration": 8.0,

    # ---- clustering ----
    "n_clusters": 8,

    # ---- filtering ----
    "similarity_threshold": 0.9, # 0.95 -> keeps most slices, 0.05 -> keep only the most unique slices

    "umap": {
        "n_neighbors": 15,
        "min_dist": 0.1
    },

}


In [2]:
def stats(x):
    return {
        "mean": float(np.mean(x)),
        "min": float(np.min(x)),
        "max": float(np.max(x)),
        "std": float(np.std(x))
    }

def clear_existing_state(cfg):
    if os.path.exists(cfg["data_file"]):
        print("üßπ Removing existing analysis data")
        os.remove(cfg["data_file"])

    if os.path.exists(cfg["output_folder"]):
        print("üßπ Removing existing cluster folders")
        for root, dirs, files in os.walk(cfg["output_folder"], topdown=False):
            for f in files:
                os.remove(os.path.join(root, f))
            for d in dirs:
                os.rmdir(os.path.join(root, d))

In [69]:


class SampleBrain:
    def __init__(self, cfg):
        self.cfg = cfg
        self.sr = cfg["sample_rate"]
        self.df = pd.DataFrame()
        self.features = None

    def find_zero_crossing(self, y, idx):
        if idx <= 0 or idx >= len(y) - 1:
            return idx
        win = int(0.02 * self.sr)
        s = max(0, idx - win)
        e = min(len(y), idx + win)
        zc = np.where(np.diff(np.signbit(y[s:e])))[0]
        if len(zc):
            return s + zc[np.argmin(np.abs(zc - (idx - s)))]
        return idx

    def apply_envelope(self, y, fade_ms=5):
        n = int((fade_ms / 1000) * self.sr)
        if len(y) < 2 * n:
            return y
        env = np.ones(len(y))
        env[:n] = np.linspace(0, 1, n)
        env[-n:] = np.linspace(1, 0, n)
        return y * env

    def compute_umap(self, descriptor, n_neighbors=15, min_dist=0.1):
        cols = self.feature_columns(descriptor)
        if not cols:
            raise RuntimeError(f"No features found for {descriptor}")

        X = self.df[cols].fillna(0).values
        X = StandardScaler().fit_transform(X)

        reducer = umap.UMAP(
            n_neighbors=n_neighbors,
            min_dist=min_dist,
            metric="cosine",
            random_state=42
        )
        emb = reducer.fit_transform(X)

        self.df[f"umap_{descriptor}_x"] = emb[:, 0]
        self.df[f"umap_{descriptor}_y"] = emb[:, 1]

        print(f"üó∫Ô∏è Built {descriptor} UMAP")


    def feature_columns(self, descriptor):
        if descriptor == "timbral":
            return [
                c for c in self.df.columns
                if c.startswith("mfcc_") or c.startswith("mfcc_delta_")
            ]
        elif descriptor == "tonal":
            return [
                c for c in self.df.columns
                if c.startswith("chroma_") or c.startswith("chroma_delta_")
            ]
        else:
            raise ValueError(f"Unknown descriptor: {descriptor}")


    def analyze(self):
        if self._try_resume():
            return

        files = self._gather_audio_files()
        rows = []

        for i, path in enumerate(files):
            print(f"üîç [{i+1}/{len(files)}] {os.path.basename(path)}")
            rows.extend(self._process_file(path))

        self.df = pd.DataFrame(rows)
        self.save_dataframe()

        print(f"‚ú® Extracted {len(self.df)} slices")

        print("üó∫Ô∏è Computing UMAPs...")
        self.compute_umap("timbral")
        self.compute_umap("tonal")

    def _try_resume(self):
        if self.cfg.get("resume") and os.path.exists(self.cfg["data_file"]):
            self.load_dataframe()
            print(f"‚úÖ Loaded {len(self.df)} existing slices")

            if "umap_timbral_x" not in self.df.columns:
                self.compute_umap("timbral")
            if "umap_tonal_x" not in self.df.columns:
                self.compute_umap("tonal")

            return True
        return False

    def _gather_audio_files(self):
        cfg = self.cfg
        files = []

        for ext in ("wav", "mp3", "aiff"):
            files += glob.glob(
                os.path.join(cfg["input_folder"], "**", f"*.{ext}"),
                recursive=True
            )

        if cfg.get("file_limit"):
            files = files[: cfg["file_limit"]]

        print(f"üìÇ Found {len(files)} audio files")
        return files

    def _process_file(self, path):
        rows = []

        try:
            y, _ = librosa.load(path, sr=self.sr, mono=False)
            for ch, sig in self._iter_channels(y):
                rows.extend(self._process_channel(path, ch, sig))
        except Exception as e:
            print(f"‚ö†Ô∏è Error processing {os.path.basename(path)}: {e}")

        return rows

    def _iter_channels(self, y):
        cfg = self.cfg

        if y.ndim == 1:
            return [("mono", y)]

        if cfg["stereo_mode"] == "sum":
            return [("sum", librosa.to_mono(y))]
        if cfg["stereo_mode"] == "split":
            return [("L", y[0]), ("R", y[1])]
        if cfg["stereo_mode"] == "L":
            return [("L", y[0])]
        if cfg["stereo_mode"] == "R":
            return [("R", y[1])]

        return [("sum", librosa.to_mono(y))]

    def _process_channel(self, path, ch, sig):
        rows = []
        cfg = self.cfg

        onsets = librosa.onset.onset_detect(
            y=sig, sr=self.sr, units="samples"
        )
        if len(onsets) == 0:
            onsets = [0]

        for j in range(len(onsets)):
            s = onsets[j]
            e = onsets[j + 1] if j + 1 < len(onsets) else len(sig)

            s = self.find_zero_crossing(sig, s)
            e = self.find_zero_crossing(sig, e)

            dur = (e - s) / self.sr
            if not (cfg["min_duration"] <= dur <= cfg["max_duration"]):
                continue

            row = self._extract_slice_features(
                path, ch, sig, s, e, dur
            )
            if row:
                rows.append(row)

        return rows

    def _extract_slice_features(self, path, ch, sig, s, e, dur):
        slice_y = self.apply_envelope(sig[s:e])
        y_fix = librosa.util.fix_length(slice_y, size=10240)

        row = {
            "file_path": path,
            "channel": ch,
            "duration": dur,
            "start_sample": int(s),
            "end_sample": int(e),
            "rolloff": 0.0,
            "attack_slope": 0.0,
            "loudness": float(
                np.mean(librosa.feature.rms(y=y_fix))
            ),
        }

        self._add_mfcc_features(row, y_fix)
        self._add_chroma_features(row, y_fix)
        self._add_rolloff(row, y_fix)
        self._add_attack_slope(row, slice_y)

        return row

    def _add_mfcc_features(self, row, y):
        mfcc = librosa.feature.mfcc(y=y, sr=self.sr, n_mfcc=13)
        mfcc_d = librosa.feature.delta(mfcc)

        for k in range(13):
            row[f"mfcc_mean_{k}"] = float(np.mean(mfcc[k]))
            row[f"mfcc_delta_mean_{k}"] = float(np.mean(mfcc_d[k]))

    def _add_chroma_features(self, row, y):
        chroma = librosa.feature.chroma_stft(y=y, sr=self.sr)
        chroma_d = librosa.feature.delta(chroma)

        for k in range(12):
            row[f"chroma_mean_{k}"] = float(np.mean(chroma[k]))
            row[f"chroma_delta_mean_{k}"] = float(np.mean(chroma_d[k]))

    def _add_rolloff(self, row, y):
        try:
            ro = librosa.feature.spectral_rolloff(y=y, sr=self.sr)
            if ro.size > 0:
                row["rolloff"] = float(np.mean(ro))
        except Exception:
            pass

    def _add_attack_slope(self, row, slice_y):
        attack_len = int(0.05 * self.sr)
        attack = slice_y[:attack_len]

        if len(attack) > 8:
            rms_env = librosa.feature.rms(y=attack)[0]
            row["attack_slope"] = float(
                np.polyfit(
                    np.arange(len(rms_env)),
                    rms_env,
                    1
                )[0]
            )



    def cluster(self, descriptor="timbral", n_clusters=8):
        cols = self.feature_columns(descriptor)
        X = self.df[cols].fillna(0).values
        X = StandardScaler().fit_transform(X)

        print(f"üß† Clustering ({descriptor}) {len(X)} slices...")

        km = KMeans(
            n_clusters=n_clusters,
            random_state=42,
            n_init="auto"
        )
        self.df["cluster"] = km.fit_predict(X)

        print("üìä Cluster distribution:")
        print(self.df["cluster"].value_counts().sort_index())


    # --- Robust scaling helper ---
    def robust_scale_features(X):
        """Scale features using median/IQR, robust to outliers."""
        scaler = RobustScaler()
        return scaler.fit_transform(X)


    def thin(self, descriptor="timbral", percentile=90):
        """
        Thins slices per cluster based on distance from cluster centroid.

        Args:
            descriptor: "timbral" or "tonal"
            percentile: keep slices above this percentile distance within each cluster
                        (0‚Äì100, higher = keep more distant slices)
        """
        coords_cols = [f"umap_{descriptor}_x", f"umap_{descriptor}_y"]
        X = self.df[coords_cols].values
        X = MinMaxScaler().fit_transform(X)  # global scaling

        keep_mask = np.zeros(len(self.df), dtype=bool)
        print(f"üîç Thinning similar slices using {descriptor} UMAP (percentile={percentile})...")

        for cid, idx in self.df.groupby("cluster").groups.items():
            cluster_X = X[list(idx)]
            centroid = cluster_X.mean(axis=0, keepdims=True)
            distances = np.linalg.norm(cluster_X - centroid, axis=1)

            # Compute the threshold distance for this cluster
            thresh = np.percentile(distances, percentile)
            keep_mask[list(idx)] = distances >= thresh

        before = len(self.df)
        self.df = self.df[keep_mask].reset_index(drop=True)
        print(f"‚úÇÔ∏è Reduced {before} -> {len(self.df)} slices after thinning")



    def sort(self, descriptor="timbral", mode="similarity"):
        if mode == "duration":
            self.df = self.df.sort_values("duration", ascending=False).reset_index(drop=True)
            return

        if mode == "pitch":
            self.df = self.df.sort_values("f0_mean", ascending=False).reset_index(drop=True)
            return

        if mode != "similarity":
            raise ValueError(f"Unknown sort mode: {mode}")

        coords_cols = [f"umap_{descriptor}_x", f"umap_{descriptor}_y"]
        X = self.df[coords_cols].values
        X = MinMaxScaler().fit_transform(X)  # global scaling

        distances = np.zeros(len(self.df))
        for cid, idx in self.df.groupby("cluster").groups.items():
            cluster_X = X[list(idx)]
            centroid = cluster_X.mean(axis=0, keepdims=True)
            distances[list(idx)] = np.linalg.norm(cluster_X - centroid, axis=1)

        self.df["distance"] = distances
        self.df = self.df.sort_values(["cluster", "distance"], ascending=[True, True]).reset_index(drop=True)
        print(f"üìê Sorted by {descriptor} similarity (distance to cluster centroid)")
        print(f"distance min/max: {self.df['distance'].min():.5f} / {self.df['distance'].max():.5f}")



    def save_dataframe(self):
        path = self.cfg["data_file"]
        print(f"üíæ Saving analysis data to {path}")

        df = self.df.copy()
        # df["raw_audio"] = df["raw_audio"].apply(lambda x: x.tolist())
        df.to_csv(path, index=False)

    def load_dataframe(self):
        path = self.cfg["data_file"]
        print(f"üìÇ Loading analysis data from {path}")

        df = pd.read_csv(path)
        # df["raw_audio"] = df["raw_audio"].apply(lambda x: np.array(eval(x)))
        self.df = df


    def export(self, sort_mode="timbral"):
        """
        Write slices to cluster directories, filenames include:
            {global_index}-{sort_mode}-{distance:.3f}.wav
        """
        out = self.cfg["output_folder"]
        os.makedirs(out, exist_ok=True)

        if "cluster" not in self.df.columns:
            raise RuntimeError("No clusters found. Run cluster() first.")

        if "distance" not in self.df.columns:
            print("‚ö†Ô∏è Distance column not found. Running sort first...")
            self.sort(descriptor=sort_mode, mode="similarity")

        print(f"üíæ Writing slices to {out}")

        audio_cache = {}

        for i, row in enumerate(self.df.itertuples()):
            path = row.file_path
            start_sample = row.start_sample
            end_sample = row.end_sample
            cluster = row.cluster
            channel = row.channel
            distance = getattr(row, "distance", 0.0)

            if path not in audio_cache:
                y, _ = librosa.load(path, sr=self.sr, mono=True)
                audio_cache[path] = y
            else:
                y = audio_cache[path]

            slice_y = y[start_sample:end_sample]

            # Create cluster folder
            c_dir = os.path.join(out, f"Cluster_{cluster}")
            os.makedirs(c_dir, exist_ok=True)

            # Filename: global index - sort mode - distance
            fn = f"{i}-{sort_mode}-{distance:.3f}.wav"
            sf.write(os.path.join(c_dir, fn), slice_y, self.sr)

        print("‚úÖ Export complete")





In [70]:
if __name__ == "__main__":
    if CONFIG["clear_existing"]:
        clear_existing_state(CONFIG)

    brain = SampleBrain(CONFIG)

    brain.analyze()


üßπ Removing existing analysis data
üßπ Removing existing cluster folders
üìÇ Found 3 audio files
üîç [1/3] prepared piano study #9.mp3


  return pitch_tuning(


üîç [2/3] The Lily in a Crystal (after Herrick) for electromagnetically prepare piano.mp3
üîç [3/3] Improvisation for Prepared Piano - Richard Melkonian.mp3
üíæ Saving analysis data to /content/drive/My Drive/audio/sp-tools-corpora/preparedpiano/clusters/analysis_data.csv
‚ú® Extracted 446 slices
üó∫Ô∏è Computing UMAPs...


  warn(


üó∫Ô∏è Built timbral UMAP


  warn(


üó∫Ô∏è Built tonal UMAP


In [73]:
brain.cluster(descriptor="timbral", n_clusters=5)
brain.thin(descriptor="timbral", percentile=50)
brain.sort(descriptor="tonal", mode="similarity")



üß† Clustering (timbral) 421 slices...
üìä Cluster distribution:
cluster
0    155
1     62
2    109
3     31
4     64
Name: count, dtype: int64
üîç Thinning similar slices using timbral UMAP (percentile=50)...
‚úÇÔ∏è Reduced 421 -> 212 slices after thinning
üìê Sorted by tonal similarity (distance to cluster centroid)
distance min/max: 0.00999 / 0.82399


In [74]:
brain.export()

üíæ Writing slices to /content/drive/My Drive/audio/sp-tools-corpora/preparedpiano/clusters
‚úÖ Export complete
