In [203]:
import sys
from pathlib import Path

# === 2. Import modules inside concept/ ===
# concept/ MUST contain __init__.py
from Preprocess import audio_to_mel_spectrogram
from PreprocessParams import TARGET_FRAMES, FREQUENCY_BIN_COUNT
from concepts_creation import generate_random_pattern_spectrogram

# === 3. Other required imports ===
from typing import List, Optional, Dict
import numpy as np
import pandas as pd
import torch
from captum.concept import TCAV, Concept
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import librosa


# For ECAPA model
from speechbrain.pretrained import EncoderClassifier


In [204]:
CONCEPT_UNIQUE_NAMES = [
    "long-constant-thick",
    "long-dropping-flat-thick",
    "long-dropping-steep-thick",
    "long-dropping-steep-thin",
    "long-rising-flat-thick",
    "long-rising-steep-thick",
    "long-rising-steep-thin",
    "short-constant-thick",
    "short-dropping-steep-thick",
    "short-dropping-steep-thin",
    "short-rising-steep-thick",
    "short-rising-steep-thin"
]

label_2_index = {
    "eden": 0,
    "idan": 1,
    "yoav": 2,
}


In [205]:
class PreGeneratedRandomSpectrogramDataset(Dataset):
    def __init__(self, n_samples: int, freq_count=FREQUENCY_BIN_COUNT, frames=TARGET_FRAMES, rng_seed=None):
        self.n_samples = n_samples
        self.freq_count = freq_count
        self.frames = frames
        self.rng = np.random.default_rng(rng_seed)

        self.data = np.array([
            generate_random_pattern_spectrogram(freq_count, frames, rng=self.rng)
            for _ in range(n_samples)
        ])
        self.data = torch.tensor(self.data, dtype=torch.float32)

    def __getitem__(self, idx):
        return self.data[idx].unsqueeze(0)  # [1, H, W]

    def __len__(self):
        return len(self.data)


In [206]:
class PreGeneratedConceptDataset(Dataset):
    def __init__(self, n_samples: int, concept_name: str, root_concept_dir: Path,
                 freq_count=FREQUENCY_BIN_COUNT, frames_count=TARGET_FRAMES, rng_seed=None):
        
        self.concept_name = concept_name
        self.root_concept_dir = Path(root_concept_dir)
        concept_dir = self.root_concept_dir / concept_name
        concept_dir.mkdir(exist_ok=True)

        self.data = [
            np.load(f) for f in concept_dir.glob("*.npy")
        ]
        self.data = torch.tensor(np.array(self.data), dtype=torch.float32)

    def __getitem__(self, idx):
        return self.data[idx].unsqueeze(0)

    def __len__(self):
        return len(self.data)


In [207]:
def init_tcav_with_pamalia_dict(model_path: Path, concept_samples_count=100):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    ecapa = EncoderClassifier.from_hparams(
        source=str(model_path),
        savedir=str(model_path),
        run_opts={"device": str(device)}
    )
    ecapa.eval()

    # Make embedding_model accessible at top level
    ecapa.embedding_model = ecapa.mods.embedding_model
    encoder = ecapa.embedding_model

    # ---------- DYNAMIC LAYER DETECTION ----------
    # Find last block index
    last_block_index = len(encoder.blocks) - 1
    target_layer_obj = encoder.blocks[last_block_index]

    # Expose block under valid attribute name
    block_attr_name = f"block{last_block_index}"
    setattr(encoder, block_attr_name, target_layer_obj)

    # TCAV layer name (string path)
    layer_name = f"embedding_model.{block_attr_name}"
    # --------------------------------------------

    tcav = TCAV(
        model=ecapa,
        layers=[layer_name],
        layer_dict={layer_name: target_layer_obj},
        test_split_ratio=0.33
    )

    # Load concepts
    concepts_root = Path.cwd() / "positive concepts dataset"
    if not concepts_root.exists():
        raise FileNotFoundError(f"Concepts directory not found: {concepts_root}")

    positive_concepts = [
        Concept(
            id=i,
            name=name,
            data_iter=DataLoader(
                PreGeneratedConceptDataset(
                    n_samples=concept_samples_count,
                    concept_name=name,
                    root_concept_dir=concepts_root
                ),
                shuffle=False
            )
        )
        for i, name in enumerate(CONCEPT_UNIQUE_NAMES)
    ]

    random_dataset = PreGeneratedRandomSpectrogramDataset(n_samples=concept_samples_count)
    random_concept = Concept(
        id=len(positive_concepts),
        name="random",
        data_iter=DataLoader(random_dataset, shuffle=False)
    )

    return {
        "tcav": tcav,
        "positive-concepts": positive_concepts,
        "random-concept": random_concept,
        "layer": target_layer_obj,
        "layer_name": layer_name,
        "ecapa": ecapa
    }

In [208]:
def _compute_cav_accuracy_df(tcav, positive_concepts, random_concept, float_precision=3):
    rows = []

    experimental_sets = [[c, random_concept] for c in positive_concepts]
    cavs = tcav.compute_cavs(experimental_sets)

    for key, layer_dict in cavs.items():
        pos_id = int(str(key).split("-")[0])
        concept_name = positive_concepts[pos_id].name

        for layer_name, cav_obj in layer_dict.items():
            if cav_obj is None or cav_obj.stats is None:
                continue

            acc = cav_obj.stats["accs"]
            if isinstance(acc, torch.Tensor):
                acc = acc.item()

            rows.append({
                "concept name": concept_name,
                "layer name": layer_name,
                "cav acc": round(acc, float_precision)
            })

    return pd.DataFrame(rows)


In [209]:
def _get_tcav_dict_per_sample(tcav_raw, df, model_path, label_2_index):
    tcav = tcav_raw["tcav"]
    pos = tcav_raw["positive-concepts"]
    rnd = tcav_raw["random-concept"]

    output = {}

    for _, row in tqdm(df.iterrows(), total=len(df)):
        wav_path = row["path"]
        label_name = row["predicted_label"]

        if label_name not in label_2_index:
            continue

        mel = audio_to_mel_spectrogram(Path(wav_path))   # shape [80, T]
        x = torch.tensor(mel, dtype=torch.float32).unsqueeze(0)   # shape [1, 80, T]


        scores = tcav.interpret(
            inputs=x,
            experimental_sets=[[c, rnd] for c in pos],
            target=label_2_index[label_name]
        )

        output[wav_path] = scores

    return output


In [210]:
def _tcav_dict_per_sample_to_df(tcav_raw, scores_by_sample, concept_names):
    rows = []

    for wav_path, exp_sets in scores_by_sample.items():
        for key, layer_dict in exp_sets.items():
            pos_id = int(key.split("-")[0])
            concept_name = concept_names[pos_id]

            for layer_name, metrics in layer_dict.items():
                sc = metrics["sign_count"]
                mg = metrics["magnitude"]

                sc = sc[0].item() if isinstance(sc, torch.Tensor) else sc[0]
                mg = mg[0].item() if isinstance(mg, torch.Tensor) else mg[0]

                rows.append({
                    "path": wav_path,
                    "concept name": concept_name,
                    "layer name": layer_name,
                    "positive percentage": sc,
                    "magnitude": mg
                })

    return pd.DataFrame(rows)


In [211]:
def get_tcav_per_sample(attribute_csv, model_path, label_2_index, target_layer_path=None):
    df = pd.read_csv(attribute_csv)

    df.drop(columns=df.filter(regex="^prob ").columns, inplace=True)

    tcav_raw = init_tcav_with_pamalia_dict(model_path=model_path)

    scores = _get_tcav_dict_per_sample(tcav_raw, df, model_path, label_2_index)

    df_tcav = _tcav_dict_per_sample_to_df(tcav_raw, scores, CONCEPT_UNIQUE_NAMES)

    df_merged = df_tcav.merge(df, on="path", how="left")

    return df_merged


In [212]:
ATTRIBUTE_CSV = Path("./user_final_predictions.csv")
MODEL_PATH = Path("../ecapa_pretrained")  # SpeechBrain format directory
OUTPUT = Path("./outputs/tcav_per_sample_output.csv")

# Create output directory if it doesn't exist
OUTPUT.parent.mkdir(parents=True, exist_ok=True)

df_result = get_tcav_per_sample(
    attribute_csv=ATTRIBUTE_CSV,
    model_path=MODEL_PATH,
    label_2_index=label_2_index
)

df_result.to_csv(OUTPUT, index=False)
df_result


  torch.load(path, map_location=device), strict=False
  stats = torch.load(path, map_location=device)
  0%|          | 0/90 [00:00<?, ?it/s]



RuntimeError: stft(torch.FloatTensor[1, 1, 80, 680], n_fft=400, hop_length=160, win_length=400, window=torch.FloatTensor{[400]}, normalized=0, onesided=1, return_complex=0) : expected a 1D or 2D tensor