# ENV SETTING

In [3]:
import argparse
from omegaconf import OmegaConf

import os
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm, trange
from classifiers.linear_classifier import LinearClassifier

from msclap import CLAP

KeyboardInterrupt: 

In [None]:
model_cfg = "./configs/CLAP_linear_classifier.yaml"
model_name = "CLAP23"
classifier = "linear"
data_dir = "./data"
base_dir = "./"

cfg = OmegaConf.load(model_cfg)

clap_model = CLAP(version = '2023', use_cuda=False)

## funtion

In [None]:
def load_data(file_path):
    df = pd.read_csv(file_path)
    audio_paths = df['Audio file'].tolist()
    text_prompts = df['Hypothesis'].tolist()
    labels = df['Label'].tolist()
    return audio_paths, text_prompts, labels

def process_embeddings(audio_paths, text_prompts):
    embeddings = []
    missing_files =[]
    audio_path = None
    for i in tqdm(range(len(audio_paths)), desc="Processing Embeddings"):
        if audio_paths[i] != audio_path:
            if os.path.isfile(audio_paths[i]):
                audio_embeddings = clap_model.get_audio_embeddings([audio_paths[i]])
                audio_path = audio_paths[i]
            else:
                print(f"File not found: {audio_paths[i]}")
                missing_files.append(i)
                continue
        text_embeddings = clap_model.get_text_embeddings([text_prompts[i]])

        concat_embeddings = torch.cat((audio_embeddings, text_embeddings), dim=1)
        embeddings.append(concat_embeddings)
    return embeddings, missing_files

# DATA PROCESSING

In [4]:

# Get CLAP23 Embeddings from obtained ckpt
data_dir = os.path.join(cfg.data_dir, "Clotho/entailment")
audio_paths, text_prompts, train_labels = load_data(os.path.join(data_dir,'flattened_clotho_development_gpt4.csv'))
train_embeddings, missing_train = process_embeddings(audio_paths, text_prompts)
val_audio_paths, val_text_prompts, val_labels = load_data(os.path.join(data_dir, 'flattened_clotho_validation_gpt4.csv'))
val_embeddings, missing_val = process_embeddings(val_audio_paths, val_text_prompts)
eval_audio_paths, eval_text_prompts, eval_labels = load_data(os.path.join(data_dir, 'flattened_clotho_evaluation_gpt4.csv'))
eval_embeddings, missing_eval  = process_embeddings(eval_audio_paths, eval_text_prompts)

train_labels = [label for i, label in enumerate(train_labels) if i not in missing_train]
val_labels = [label for i, label in enumerate(val_labels) if i not in missing_val]
eval_labels = [label for i, label in enumerate(eval_labels) if i not in missing_eval]

train_embeddings_tensor = torch.stack(train_embeddings).squeeze(1)
val_embeddings_tensor = torch.stack(val_embeddings).squeeze(1)
eval_embeddings_tensor = torch.stack(eval_embeddings).squeeze(1)

train_labels_tensor = torch.tensor(train_labels, dtype=torch.long)
val_labels_tensor = torch.tensor(val_labels, dtype=torch.long)
test_labels_tensor = torch.tensor(eval_labels, dtype=torch.long)

# Create DataLoaders
train_dataset = TensorDataset(train_embeddings_tensor, train_labels_tensor)
train_loader = DataLoader(train_dataset,
                            batch_size=cfg.batch_size,
                            shuffle=True,
                            num_workers=cfg.num_workers)
val_dataset = TensorDataset(val_embeddings_tensor, val_labels_tensor)
val_loader = DataLoader(val_dataset,
                            batch_size=cfg.batch_size,
                            num_workers=cfg.num_workers)
test_dataset = TensorDataset(eval_embeddings_tensor, test_labels_tensor)
test_loader = DataLoader(test_dataset,
                            batch_size=cfg.batch_size,
                            num_workers=cfg.num_workers)

assert len(train_embeddings[0]) == len(train_embeddings[1]), "Input Sizes are not Consistent!!"

NameError: name 'cfg' is not defined

In [None]:
def save_checkpoint(name='latest.pt', ckpt_path=None, cfg=None, model=None, optimizer=None, lr_scheduler=None, cur_epoch=None):
        ckpt_path = os.path.join(ckpt_path, f"{name}")
        ckpt = {
            'cfg' : cfg,
            'model' : model.state_dict(),
            'optimizer' : optimizer,
            'lr_scheduler' : lr_scheduler,
            'cur_epoch' : cur_epoch
        }
        torch.save(ckpt, ckpt_path)
        print(f"{ckpt_path} has been saved.")
def load_checkpoint(name='latest.pt', ckpt_path=None, model=None, optimizer=None, lr_scheduler=None, cur_epoch=None):
    ckpt_path = os.path.join(ckpt_path, f"{name}")
    ckpt = torch.load(ckpt_path, map_location='cpu')
    print(model.load_state_dict(ckpt['model']))
    optimizer.load_state_dict(ckpt['optimizer'])
    lr_scheduler.load_state_dict(ckpt['lr_scheduler'])
    cur_epoch = ckpt['cur_epoch']
    return model, optimizer, lr_scheduler, cur_epoch
    
ckpt_path = os.path.join(data_dir, "checkpoints")
os.makedirs(ckpt_path, exist_ok=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Import Classifier Model, currently only Linear
model = LinearClassifier(input_dim=max(train_embeddings[0].shape)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
    model.parameters(),
    lr=cfg.lr
)

lr_scheduler = None #not implemented
cur_epoch = 0

# Resume Training
ckpt_path = os.path.join(ckpt_path, f"{cfg.resume_ckpt}")
if cfg.resume and os.path.exist(ckpt_path):
    print(f"Resume from checkpoint {ckpt_path}")
    model, optimizer, lr_scheduler, cur_epoch = load_checkpoint(cfg.resume_ckpt, ckpt_path, model, optimizer, lr_scheduler, cur_epoch)

# TRAIN

In [74]:
@torch.no_grad()
def evaluate(dataloader):
    model.eval()
    val_predictions = []
    val_labels_list = []
    with torch.no_grad():
        for embeddings, labels in dataloader:
            embeddings = embeddings.to(device)
            outputs = model(embeddings)
            _, predicted = torch.max(outputs, 1)
            val_predictions.extend(predicted.cpu().numpy())
            val_labels_list.extend(labels.numpy())

    accuracy = accuracy_score(val_labels_list, val_predictions)
    precision = precision_score(val_labels_list, val_predictions, average='macro')
    recall = recall_score(val_labels_list, val_predictions, average='macro')
    f1 = f1_score(val_labels_list, val_predictions, average='macro')
    return f1, accuracy, precision, recall


In [None]:
tq = trange(
    cur_epoch,
    cfg.epochs,
    desc='Training'
)
model.train()
for i in tq :
    for (embeddings,labels) in train_loader :
        embeddings, labels = embeddings.to(device), labels.to(device)
        # Forward pass
        outputs = model(embeddings)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if lr_scheduler is not None :
            lr_scheduler.step()

    val_f1, val_accuracy, val_precision, val_recall = evaluate(val_loader)
    print(f"Epoch [{cur_epoch + 1}/{cfg.epochs}], F1 : {val_f1:.4f} | Accuracy: {val_accuracy:.4f} | Precision : {val_precision:.4f} | Recall : {val_recall:.4f}")

save_checkpoint(name=f"CLAP23_Linear_{cur_epoch}_{cfg.lr}.pt", ckpt_path=base_dir, cfg=cfg, model=model, optimizer=optimizer, lr_scheduler=lr_scheduler, cur_epoch=cur_epoch)
test_f1, test_accuracy, test_precision, test_recall = evaluate(test_loader)
print(f"Test Scores _ F1 : {test_f1} | accuracy : {test_accuracy} | precision : {test_precision} | recall : {test_recall}")

## Evaluation on Test set

In [76]:
test_predictions = []
test_labels_list = []
with torch.no_grad():
    for embeddings, labels in test_loader:
        embeddings = embeddings.to(device)
        outputs = model(embeddings)
        _, predicted = torch.max(outputs, 1)
        test_predictions.extend(predicted.cpu().numpy())
        test_labels_list.extend(labels.numpy())

# Calculate evaluation metrics
test_accuracy = accuracy_score(test_labels_list, test_predictions)
test_precision = precision_score(test_labels_list, test_predictions, average='macro')
test_recall = recall_score(test_labels_list, test_predictions, average='macro')
test_f1 = f1_score(test_labels_list, test_predictions, average='macro')

print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Precision: {test_precision:.4f}")
print(f"Test Recall: {test_recall:.4f}")
print(f"Test F1 Score: {test_f1:.4f}")


Test Accuracy: 0.8463
Test Precision: 0.8464
Test Recall: 0.8463
Test F1 Score: 0.8463


In [79]:
import numpy as np
class_accuracies = {}
for class_label in [0, 1, 2]:  # Assuming 0: Entailment, 1: Neutral, 2: Contradiction
    correct_class_preds = sum((np.array(test_predictions) == class_label) & (np.array(test_labels_list) == class_label))
    total_class_samples = sum(np.array(test_labels_list) == class_label)
    class_accuracies[class_label] = correct_class_preds / total_class_samples if total_class_samples > 0 else 0.0


In [None]:
print(f"  Entailment (0): {class_accuracies[0]:.4f}")
print(f"  Neutral (1): {class_accuracies[1]:.4f}")
print(f"  Contradiction (2): {class_accuracies[2]:.4f}")

  Entailment (0): 0.8392
  Neutral (1): 0.8230
  Contradiction (2): 0.8766
