In [None]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from torch.utils.data import DataLoader
from utils import MultipleRegressionWithSoftmax, EmbeddingsDataset2

from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import r2_score, accuracy_score, f1_score
from sklearn.preprocessing import LabelEncoder
from scipy.stats import pearsonr

import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "3"

In [None]:
n_classes = 3
modality = 'music' # 'music', 'speech', or 'video'
which = 'openl3' # 'mfcc', 'msd' or 'openl3' for music, 'slow_fast' for video, 'hubert' for speech
voice = True 

fn_suffix = {
    'music': {
        'mfcc': '',
        'msd': '_backend', 
        'openl3': '_music', # '_music' or '_env'
    },
    'video': {
        'slow_fast': '_slow', # '_slow' or '_fast'
    },
    'speech': {
        'hubert': '_wave_encoder', # '_wave_encoder' or '_transformer'
    }
}

embedding_dimensions = {
    'video': {
        'slow_fast': 2048 if fn_suffix['video']['slow_fast']=='_slow' else 256,
    },
    'music': {
        'mfcc': 60,
        'msd': 256,
        'openl3': 512,
    },
    'speech': {
        'hubert': 1024 if fn_suffix['speech']['hubert']=='_transformer' else 512,
    }
}

## Load ground truth

In [None]:
groundtruth_df = pd.read_csv("groundtruth_merged.csv")
groundtruth_df.set_index("stimulus_id", inplace=True)
groundtruth_df.head()

In [None]:
mid_level_features = pd.read_csv("mid_level_features.csv").drop(columns=["target"])
mid_level_features.set_index("stimulus_id", inplace=True)
mid_level_features.head()

In [None]:
not_found = 0
for stimulus_id in groundtruth_df.index:
    if not os.path.exists(f"{modality}/embeddings_{which}/{stimulus_id}{fn_suffix[modality][which]}.npy"):
        print(f"Embedding for {stimulus_id} not found")
        not_found += 1

assert not_found == 0

## Load embeddings

In [None]:
embedding_dim = embedding_dimensions[modality][which]

X = np.empty((groundtruth_df.shape[0], embedding_dim))
y_reg = np.empty((mid_level_features.shape[0], mid_level_features.shape[1]))

for i,stimulus_id in enumerate(groundtruth_df.index):
    embedding = np.load(f"{modality}/embeddings_{which}{'' if voice else '_novoice'}/" +
                        f"{stimulus_id}{fn_suffix[modality][which]}.npy")
    X[i] = embedding.mean(axis=0)
    y_reg[i] = mid_level_features.loc[stimulus_id].values

X.shape, y_reg.shape

In [None]:
classes = ["Girls/women", "Boys/men"] if n_classes==2 else ["Girls/women", "Mixed", "Boys/men"]
mask = groundtruth_df.target.isin(classes) 

X = X[mask]
y_cls = groundtruth_df.target[mask].values

# convert to integers
le = LabelEncoder()
y_cls = le.fit_transform(y_cls)
y_cls.shape

In [None]:
params = {
    "input_dim": X.shape[1], 
    "n_regressions": y_reg.shape[1], 
    "output_dim": n_classes
    }

kf = KFold(n_splits=5, shuffle=True, random_state=42)

accuracies = []
f1s = []
r2s = []
pearsons = []
for train_index, test_index in kf.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_reg_train, y_reg_test = y_reg[train_index], y_reg[test_index]
    y_cls_train, y_cls_test = y_cls[train_index], y_cls[test_index]
    
    train_dataset = EmbeddingsDataset2(X_train, y_reg_train, y_cls_train)
    test_dataset = EmbeddingsDataset2(X_test, y_reg_test, y_cls_test)
    
    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=10)
    test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=10)

    model = MultipleRegressionWithSoftmax(**params)
    
    checkpoint_callback = ModelCheckpoint(monitor='val_loss')
    trainer = Trainer(max_epochs=100,
                    callbacks=[checkpoint_callback, EarlyStopping(monitor='val_loss', patience=20)],
                    enable_progress_bar = False)
    trainer.fit(model, train_loader, test_loader)

    # load best model
    model = model.load_from_checkpoint(checkpoint_callback.best_model_path, **params)
    
    model.eval()
    with torch.no_grad():
        y_reg_pred, out_cls = model(torch.from_numpy(X_test).float())
    
    y_cls_pred = torch.argmax(out_cls, dim=1).numpy()
    
    accuracies.append(accuracy_score(y_cls_test, y_cls_pred))
    f1s.append(f1_score(y_cls_test, y_cls_pred, average='weighted'))

    r2_values = r2_score(y_reg_test, y_reg_pred, multioutput='raw_values')
    r2s.append(r2_values)

    r = [pearsonr(y_reg_test[:,i], y_reg_pred[:,i])[0] for i in range(y_reg_test.shape[1])]
    pearsons.append(r)

    # print classification report
    # print(classification_report(y_test, y_pred, target_names=classes))


In [None]:
print(f"Accuracy: {np.mean(accuracies):.2f} ± {np.std(accuracies):.2f}")
print(f"F1: {np.mean(f1s):.2f} ± {np.std(f1s):.2f}")

In [None]:
# transpose to get r2s per feature
r2s = np.array(r2s).T

for i, r2 in enumerate(r2s):
    print(f"R2 for {mid_level_features.columns[i]}: {np.mean(r2):.2f} ± {np.std(r2):.2f}")

# transpose to get pearsons per feature
pearsons = np.array(pearsons).T

for i, r in enumerate(pearsons):
    print(f"Pearson's r for {mid_level_features.columns[i]}: {np.mean(r):.2f} ± {np.std(r):.2f}")