# LAION-CLAP Audio Embedding Analysis

This notebook performs preprocessing, embedding, and analysis for audio data using LAION-CLAP. Refactored for clarity, modularity, and open-sourcing.

In [None]:
# === Configuration ===
EQ_DESCRIPTORS = ['bass', 'mid', 'treble']  # Example descriptors

# === Audio File Configuration ===
AUDIO_EXT = ".wav"


In [None]:
# === Imports ===
import os
import re
import pathlib
import numpy as np
import pandas as pd
import torch
import torchaudio
from tqdm import tqdm


In [None]:

# === Utility Functions ===

def load_audio_file(filepath, target_sr=48000):
    waveform, sr = torchaudio.load(filepath)
    if sr != target_sr:
        resampler = torchaudio.transforms.Resample(sr, target_sr)
        waveform = resampler(waveform)
    return waveform

def compute_embeddings(model, waveform):
    with torch.no_grad():
        return model.get_audio_embedding_from_data(x=waveform, use_tensor=True)

def summarize_embeddings(df, group_by='label'):
    return df.groupby(group_by).mean().reset_index()

def parse_filename(filename, pattern):
    match = pattern.match(filename)
    return match.groups() if match else (None,) * len(pattern.groupindex)

def collect_audio_files(root_dir, ext=".wav"):
    return [str(p) for p in pathlib.Path(root_dir).rglob(f"*{ext}")]


In [None]:
# === Data Loading ===
# === CONFIG (EQ only, LAION-CLAP) ===
EQ_DESCRIPTORS = [
    'warm','cold','soft','loud','bright','soothing','harsh','heavy','cool',
    'smooth','calm','clear','tinny','sharp','hard','crisp','mellow','dark',
    'peaceful','gentle'
]

EQ_DIR = "/Users/lindseydeng/Desktop/timbre_semantics_experiment2/test_audio/eq_audio"  # folder with guitar_eq_<desc>_<scale>.wav
EQ_ORIGINAL_PATH = "/Users/lindseydeng/Desktop/timbre_semantics_experiment2/test_audio/guitar.wav"


import os, re, pathlib
import numpy as np
import pandas as pd
import torch
from laion_clap import CLAP_Module


laion_eq = CLAP_Module(enable_fusion=False)
laion_eq.load_ckpt()  # default pretrained
laion_eq.eval()


AUDIO_EXT = ".wav"
EQ_FNAME_RE = re.compile(
    r"^(?P<stem>.+?)_eq_(?P<desc>[A-Za-z0-9\-]+)_(?P<scale>0\.\d+|1\.0|1)\.wav$",
    re.IGNORECASE
)

def list_eq_files(eq_dir: str):
    rows = []
    for root, _, files in os.walk(eq_dir):
        for f in files:
            if pathlib.Path(f).suffix.lower() != AUDIO_EXT:
                continue
            m = EQ_FNAME_RE.match(f)
            if not m:
                continue
            rows.append({
                "path": os.path.join(root, f),
                "stem": m.group("stem"),
                "descriptor": m.group("desc").lower(),
                "scale": float(m.group("scale"))
            })
    return pd.DataFrame(rows).sort_values(["descriptor","scale","path"]).reset_index(drop=True)

eq_files_df = list_eq_files(EQ_DIR)
eq_files_df.head(10), len(eq_files_df)


# Build path list with original first (helps compute deltas)
eq_audio_paths = [EQ_ORIGINAL_PATH] + eq_files_df["path"].tolist()

# Get embeddings as numpy (use_tensor=False)
eq_audio_embs = laion_eq.get_audio_embedding_from_filelist(x=eq_audio_paths, use_tensor=False)  # [1+N, D]
eq_orig_emb = eq_audio_embs[0:1, :]     # [1, D]
eq_manip_embs = eq_audio_embs[1:, :]    # [N, D]
eq_manip_embs.shape, eq_orig_emb.shape


import os, re, pathlib
import numpy as np
import pandas as pd
import torch
from laion_clap import CLAP_Module

# === CONFIG ===
RVB_DESCRIPTORS = [
  'echo', 'distant', 'warm', 'spacious', 'loud', 'deep', 
    'muffled', 'church', 'big', 'distorted', 'hollow', 'sad', 'soft', 
    'bass', 'strong', 'low', 'haunting', 'clear','tinny', 'hall']

RVB_DIR = "//Users/lindseydeng/Desktop/timbre_semantics_experiment2/test_audio/reverb"  # folder with guitar_eq_<desc>_<scale>.wav
RVB_ORIGINAL_PATH = "/Users/lindseydeng/Desktop/timbre_semantics_experiment2/test_audio/guitar.wav"

laion_rvb = CLAP_Module(enable_fusion=False)
laion_rvb.load_ckpt()  # default pretrained
laion_rvb.eval()


AUDIO_EXT = ".wav"
RVB_FNAME_RE = re.compile(
    r"^(?P<stem>.+?)_reverb_(?P<desc>[A-Za-z0-9\-]+)_(?P<scale>0\.\d+|1\.0|1)\.wav$",
    re.IGNORECASE
)

def list_rvb_files(rvb_dir: str):
    rows = []
    for root, _, files in os.walk(rvb_dir):
        for f in files:
            if pathlib.Path(f).suffix.lower() != AUDIO_EXT:
                continue
            m = RVB_FNAME_RE.match(f)
            if not m:
                continue
            rows.append({
                "path": os.path.join(root, f),
                "stem": m.group("stem"),
                "descriptor": m.group("desc").lower(),
                "scale": float(m.group("scale"))
            })
    return pd.DataFrame(rows).sort_values(["descriptor","scale","path"]).reset_index(drop=True)

rvb_files_df = list_rvb_files(RVB_DIR)
rvb_files_df.head(10), len(rvb_files_df)

rvb_audio_paths = [RVB_ORIGINAL_PATH] + rvb_files_df["path"].tolist()

# Get embeddings as numpy (use_tensor=False)
rvb_audio_embs = laion_rvb.get_audio_embedding_from_filelist(x=rvb_audio_paths, use_tensor=False)  # [1+N, D]
rvb_orig_emb = rvb_audio_embs[0:1, :]     # [1, D]
rvb_manip_embs = rvb_audio_embs[1:, :]    # [N, D]
rvb_manip_embs.shape, rvb_orig_emb.shape


In [None]:
# === Embedding Audio ===
# Build path list with original first (helps compute deltas)
eq_audio_paths = [EQ_ORIGINAL_PATH] + eq_files_df["path"].tolist()

# Get embeddings as numpy (use_tensor=False)
eq_audio_embs = laion_eq.get_audio_embedding_from_filelist(x=eq_audio_paths, use_tensor=False)  # [1+N, D]
eq_orig_emb = eq_audio_embs[0:1, :]     # [1, D]
eq_manip_embs = eq_audio_embs[1:, :]    # [N, D]
eq_manip_embs.shape, eq_orig_emb.shape


rvb_audio_paths = [RVB_ORIGINAL_PATH] + rvb_files_df["path"].tolist()

# Get embeddings as numpy (use_tensor=False)
rvb_audio_embs = laion_rvb.get_audio_embedding_from_filelist(x=rvb_audio_paths, use_tensor=False)  # [1+N, D]
rvb_orig_emb = rvb_audio_embs[0:1, :]     # [1, D]
rvb_manip_embs = rvb_audio_embs[1:, :]    # [N, D]
rvb_manip_embs.shape, rvb_orig_emb.shape


In [None]:
# === Postprocessing & Analysis ===
AUDIO_EXT = ".wav"
EQ_FNAME_RE = re.compile(
    r"^(?P<stem>.+?)_eq_(?P<desc>[A-Za-z0-9\-]+)_(?P<scale>0\.\d+|1\.0|1)\.wav$",
    re.IGNORECASE
)

def list_eq_files(eq_dir: str):
    rows = []
    for root, _, files in os.walk(eq_dir):
        for f in files:
            if pathlib.Path(f).suffix.lower() != AUDIO_EXT:
                continue
            m = EQ_FNAME_RE.match(f)
            if not m:
                continue
            rows.append({
                "path": os.path.join(root, f),
                "stem": m.group("stem"),
                "descriptor": m.group("desc").lower(),
                "scale": float(m.group("scale"))
            })
    return pd.DataFrame(rows).sort_values(["descriptor","scale","path"]).reset_index(drop=True)

eq_files_df = list_eq_files(EQ_DIR)
eq_files_df.head(10), len(eq_files_df)


def l2norm(x, axis=-1, eps=1e-12):
    n = np.linalg.norm(x, axis=axis, keepdims=True)
    return x / (n + eps)

# Normalize
A_orig = l2norm(eq_orig_emb)          # [1, D]
A_manip = l2norm(eq_manip_embs)       # [N, D]
T = l2norm(EQ_TEXT_EMBS)              # [M, D]

# Cosine similarities
# original vs all descriptors -> [M]
eq_orig_sims = (A_orig @ T.T)[0]

# manipulated vs all descriptors -> [N, M]
eq_manip_sims = A_manip @ T.T

# Map descriptor -> column index
eq_desc2idx = {d:i for i, d in enumerate(EQ_TEXT_LABELS)}

# Build tidy table: sim_target, sim_orig_target, delta_target
rows = []
for i, r in eq_files_df.iterrows():
    desc = r["descriptor"]
    idx = eq_desc2idx[desc]
    s_target = float(eq_manip_sims[i, idx])
    s_orig_target = float(eq_orig_sims[idx])
    rows.append({
        "path": r["path"],
        "stem": r["stem"],
        "descriptor": desc,
        "scale": float(r["scale"]),
        "sim_target": s_target,           # cosine(sim) in [-1, 1]
        "sim_orig_target": s_orig_target,
        "delta_target": s_target - s_orig_target
    })

laion_eq_target_df = pd.DataFrame(rows).sort_values(["descriptor","scale"]).reset_index(drop=True)
laion_eq_target_df.head(12)


laion_eq_overall = (
    laion_eq_target_df.groupby("scale")["delta_target"]
    .mean()
    .reset_index()
    .sort_values("scale")
)
laion_eq_by_desc = (
    laion_eq_target_df.groupby(["descriptor","scale"])["delta_target"]
    .mean()
    .reset_index()
    .sort_values(["descriptor","scale"])
)

laion_eq_overall, laion_eq_by_desc.head(60)


import numpy as np
import pandas as pd

EXPECTED_SCALES = [0.3, 0.6, 1.0]
EPS = 1e-6  

def classify_trend(df_desc):
    g = df_desc.set_index("scale").reindex(EXPECTED_SCALES)
    if g["delta_target"].isna().any():
        return "Insufficient data"
    d0, d1, d2 = g["delta_target"].tolist()

    # monotonic checks with tolerance
    if d0 <= d1 + EPS and d1 <= d2 + EPS:
        return "Monotonic up"
    if d0 >= d1 - EPS and d1 >= d2 - EPS:
        return "Monotonic down"

    # peak location
    deltas = [d0, d1, d2]
    peak_idx = int(np.argmax(deltas))
    return {0: "Peak low (0.3)", 1: "Peak mid (0.6)", 2: "Peak high (1.0)"}[peak_idx]

# build trend table
trend_rows = []
for desc, g in laion_eq_target_df.groupby("descriptor", as_index=False):
    trend = classify_trend(g[["scale","delta_target"]].copy())
    trend_rows.append({"descriptor": desc, "trend_type": trend})

laion_trend_df = pd.DataFrame(trend_rows).sort_values("trend_type").reset_index(drop=True)
display(laion_trend_df)

# quick counts
laion_trend_counts = laion_trend_df["trend_type"].value_counts()
display(laion_trend_counts)


if "laion_eq_overall" not in globals():
    laion_eq_overall = (
        laion_eq_target_df.groupby("scale")["delta_target"]
        .mean()
        .reset_index()
        .sort_values("scale")
    )

if "laion_eq_by_desc" not in globals():
    laion_eq_by_desc = (
        laion_eq_target_df.groupby(["descriptor","scale"])["delta_target"]
        .mean()
        .reset_index()
        .sort_values(["descriptor","scale"])
    )

if "laion_trend_counts" not in globals():
    laion_trend_counts = (
        laion_eq_trends.groupby("trend_type")
        .size()
        .rename("count")
        .reset_index()
        .sort_values("count", ascending=False)
    )

display(laion_eq_overall)
display(laion_eq_by_desc.head(20))
display(laion_trend_counts)

# Save CSVs
laion_eq_overall.to_csv(f"{OUT}/laionclap_eq_mean_delta_by_scale.csv", index=False)
laion_eq_by_desc.to_csv(f"{OUT}/laionclap_eq_delta_by_descriptor_and_scale.csv", index=False)
laion_trend_counts.to_csv(f"{OUT}/laionclap_eq_trend_counts.csv", index=False)


def spearman_corr(x, y):
    rx = pd.Series(x).rank(method="average").to_numpy()
    ry = pd.Series(y).rank(method="average").to_numpy()
    if np.std(rx) < 1e-12 or np.std(ry) < 1e-12:
        return np.nan
    return float(np.corrcoef(rx, ry)[0,1])

rows = []
for desc, g in laion_eq_targets.groupby("descriptor"):
    g = g.sort_values("scale")
    if g["scale"].nunique() >= 3:   # expect 0.3, 0.6, 1.0
        rho = spearman_corr(g["scale"].to_numpy(), g["delta_target"].to_numpy())
    else:
        rho = np.nan
    rows.append({"descriptor": desc, "spearman_delta_vs_scale": rho})

laion_eq_spearman = pd.DataFrame(rows).sort_values("spearman_delta_vs_scale", ascending=False)
display(laion_eq_spearman.head(10))
display(laion_eq_spearman.tail(10))

laion_eq_spearman.to_csv(f"{OUT}/laionclap_eq_spearman_delta_vs_scale.csv", index=False)


AUDIO_EXT = ".wav"
RVB_FNAME_RE = re.compile(
    r"^(?P<stem>.+?)_reverb_(?P<desc>[A-Za-z0-9\-]+)_(?P<scale>0\.\d+|1\.0|1)\.wav$",
    re.IGNORECASE
)

def list_rvb_files(rvb_dir: str):
    rows = []
    for root, _, files in os.walk(rvb_dir):
        for f in files:
            if pathlib.Path(f).suffix.lower() != AUDIO_EXT:
                continue
            m = RVB_FNAME_RE.match(f)
            if not m:
                continue
            rows.append({
                "path": os.path.join(root, f),
                "stem": m.group("stem"),
                "descriptor": m.group("desc").lower(),
                "scale": float(m.group("scale"))
            })
    return pd.DataFrame(rows).sort_values(["descriptor","scale","path"]).reset_index(drop=True)

rvb_files_df = list_rvb_files(RVB_DIR)
rvb_files_df.head(10), len(rvb_files_df)

def l2norm(x, axis=-1, eps=1e-12):
    n = np.linalg.norm(x, axis=axis, keepdims=True)
    return x / (n + eps)

# Normalize
A_orig = l2norm(rvb_orig_emb)          # [1, D]
A_manip = l2norm(rvb_manip_embs)       # [N, D]
T = l2norm(RVB_TEXT_EMBS)              # [M, D]

# Cosine similarities
# original vs all descriptors -> [M]
rvb_orig_sims = (A_orig @ T.T)[0]

# manipulated vs all descriptors -> [N, M]
rvb_manip_sims = A_manip @ T.T

# Map descriptor -> column index
rvb_desc2idx = {d:i for i, d in enumerate(RVB_TEXT_LABELS)}
# Build tidy table: sim_target, sim_orig_target, delta_target
rows = []
for i, r in rvb_files_df.iterrows():
    desc = r["descriptor"]
    idx = rvb_desc2idx[desc]
    s_target = float(rvb_manip_sims[i, idx])
    s_orig_target = float(rvb_orig_sims[idx])
    rows.append({
        "path": r["path"],
        "stem": r["stem"],
        "descriptor": desc,
        "scale": float(r["scale"]),
        "sim_target": s_target,           # cosine(sim) in [-1, 1]
        "sim_orig_target": s_orig_target,
        "delta_target": s_target - s_orig_target
    })

laion_rvb_target_df = pd.DataFrame(rows).sort_values(["descriptor","scale"]).reset_index(drop=True)
laion_rvb_target_df.head(12)



laion_rvb_overall = (
    laion_rvb_target_df.groupby("scale")["delta_target"]
    .mean()
    .reset_index()
    .sort_values("scale")
)
laion_rvb_by_desc = (
    laion_rvb_target_df.groupby(["descriptor","scale"])["delta_target"]
    .mean()
    .reset_index()
    .sort_values(["descriptor","scale"])
)

laion_rvb_overall, laion_rvb_by_desc.head(60)


EXPECTED_SCALES = [0.3, 0.6, 1.0]
EPS = 1e-6  

def classify_trend(df_desc):
    g = df_desc.set_index("scale").reindex(EXPECTED_SCALES)
    if g["delta_target"].isna().any():
        return "Insufficient data"
    d0, d1, d2 = g["delta_target"].tolist()

    # monotonic checks with tolerance
    if d0 <= d1 + EPS and d1 <= d2 + EPS:
        return "Monotonic up"
    if d0 >= d1 - EPS and d1 >= d2 - EPS:
        return "Monotonic down"

    # peak location
    deltas = [d0, d1, d2]
    peak_idx = int(np.argmax(deltas))
    return {0: "Peak low (0.3)", 1: "Peak mid (0.6)", 2: "Peak high (1.0)"}[peak_idx]

# build trend table
trend_rows = []
for desc, g in laion_rvb_target_df.groupby("descriptor", as_index=False):
    trend = classify_trend(g[["scale","delta_target"]].copy())
    trend_rows.append({"descriptor": desc, "trend_type": trend})

laion_trend_df = pd.DataFrame(trend_rows).sort_values("trend_type").reset_index(drop=True)
display(laion_trend_df)

# quick counts
laion_trend_counts = laion_trend_df["trend_type"].value_counts()
display(laion_trend_counts)


if "laion_rvb_overall" not in globals():
    laion_rvb_overall = (
        laion_rvb_target_df.groupby("scale")["delta_target"]
        .mean()
        .reset_index()
        .sort_values("scale")
    )

if "laion_rvb_by_desc" not in globals():
    laion_rvb_by_desc = (
        laion_rvb_target_df.groupby(["descriptor","scale"])["delta_target"]
        .mean()
        .reset_index()
        .sort_values(["descriptor","scale"])
    )

if "laion_trend_counts" not in globals():
    laion_trend_counts = (
        laion_rvb_trends.groupby("trend_type")
        .size()
        .rename("count")
        .reset_index()
        .sort_values("count", ascending=False)
    )

display(laion_rvb_overall)
display(laion_rvb_by_desc.head(20))
display(laion_trend_counts)

# Save CSVs
laion_rvb_overall.to_csv(f"{OUT}/laionclap_rvb_mean_delta_by_scale.csv", index=False)
laion_rvb_by_desc.to_csv(f"{OUT}/laionclap_rvb_delta_by_descriptor_and_scale.csv", index=False)
laion_trend_counts.to_csv(f"{OUT}/laionclap_rvb_trend_counts.csv", index=False)


def spearman_corr(x, y):
    rx = pd.Series(x).rank(method="average").to_numpy()
    ry = pd.Series(y).rank(method="average").to_numpy()
    if np.std(rx) < 1e-12 or np.std(ry) < 1e-12:
        return np.nan
    return float(np.corrcoef(rx, ry)[0,1])


rows = []
for desc, g in laion_rvb_targets.groupby("descriptor"):
    g = g.sort_values("scale")
    if g["scale"].nunique() >= 3:   # expect 0.3, 0.6, 1.0
        rho = spearman_corr(g["scale"].to_numpy(), g["delta_target"].to_numpy())
    else:
        rho = np.nan
    rows.append({"descriptor": desc, "spearman_delta_vs_scale": rho})

laion_rvb_spearman = pd.DataFrame(rows).sort_values("spearman_delta_vs_scale", ascending=False)
display(laion_rvb_spearman.head(10))
display(laion_rvb_spearman.tail(10))

laion_rvb_spearman.to_csv(f"{OUT}/laionclap_rvb_spearman_delta_vs_scale.csv", index=False)
