In [None]:
import os
import random
import audeer
import audmetric
import numpy as np
import pandas as pd
import yaml
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.svm import SVC

In [None]:
seeds = [104, 105, 106]

# feature sets to evaluate
feature_sets = [
    "eGeMAPSv02",
    "ComParE_2016",
    "audeering-wav2vec2-large-robust-12-ft-emotion-msp-dim",
    "facebook-wav2vec2-large-xlsr-53-german",
]

# sessions: all, before ("vor"), after ("nach")
datasets = ["all", "vor", "nach"]

# evaluation metrics
metrics = {
    "accuracy": accuracy_score,
    "UAR": audmetric.unweighted_average_recall,
    "roc_auc": roc_auc_score,
}

base_path = "../data/results_trauma_classification/"
features_path = f"../data/features"

In [None]:
for seed in seeds:
    np.random.seed(seed)
    random.seed(seed)

    for feature_set in feature_sets:
        for session in datasets:
            # prepare results directory
            path = f"{base_path}/{seed}"
            suffix = (
                f"{feature_set}_results_only_{session}_text"
                if session != "all"
                else f"{feature_set}_results"
            )
            results_path = os.path.join(path, suffix)
            os.makedirs(results_path, exist_ok=True)

            # load features
            features = f"{features_path}/features_text/{feature_set}.csv" if "text" in feature_set else f"{features_path}/features_interview/{feature_set}.csv"
            features = pd.read_csv(features)

            # derive subject & trauma labels
            if "wav2vec2" in feature_set:
                features["subject"] = features["file"].apply(lambda x: x.split("_")[0])
            else:
                features["subject"] = features["file"].apply(
                    lambda x: x.split("/")[-2].split("_")[0]
                )
            features["trauma"] = features["subject"].str.contains("Px").astype(int)

            # optional session filter
            if session != "all":
                features = features[features["file"].str.contains(session)]

            # drop unused columns
            drop_cols = ["file"] if "wav2vec2" in feature_set else ["start", "end", "file"]
            features = features.drop(columns=drop_cols)

            all_subjs = features["subject"].unique()
            all_results = []

            for subj in audeer.progress_bar(all_subjs, total=len(all_subjs), desc="LOSO"):
                # test split
                df_test = features[features["subject"] == subj].set_index("subject")
                X_test = df_test.drop(columns="trauma")
                y_test = df_test["trauma"]

                # train split (shuffled)
                df_train = (
                    features[features["subject"] != subj]
                    .sample(frac=1, random_state=seed)
                    .reset_index(drop=True)
                    .set_index("subject")
                )
                X_train = df_train.drop(columns="trauma")
                y_train = df_train["trauma"]

                # create per-subject folder
                exp_folder = audeer.mkdir(os.path.join(results_path, subj))

                # scale features
                scaler = MinMaxScaler()
                X_train = scaler.fit_transform(X_train)
                X_test = scaler.transform(X_test)

                # SVM + grid search
                svc = SVC(class_weight="balanced", random_state=seed)
                param_grid = {
                    "kernel": ["rbf", "linear"],
                    "C": [1e-4, 1e-3, 1e-1, 1, 5, 10],
                    "gamma": ["auto", "scale"],
                }
                inner_cv = KFold(n_splits=3, shuffle=True, random_state=1)
                search = GridSearchCV(svc, param_grid, cv=inner_cv)
                search.fit(X_train, y_train)
                best_model = search.best_estimator_

                # predict & save frame-level results
                df_test["prediction"] = best_model.predict(X_test)
                df_test[["trauma", "prediction"]].reset_index().to_csv(
                    os.path.join(exp_folder, "results.csv"), index=False
                )
                all_results.append(df_test[["trauma", "prediction"]])

            # aggregate unit-level metrics
            results_df = pd.concat(all_results)
            unit_results = {
                name: func(results_df["trauma"], results_df["prediction"])
                for name, func in metrics.items()
            }
            with open(os.path.join(results_path, "results.yaml"), "w") as fp:
                yaml.dump(unit_results, fp)

            # session-level aggregation
            results_df = results_df.reset_index()  # bring subject back
            session_rows = []
            for subj in all_subjs:
                subj_df = results_df[results_df["subject"] == subj]
                true_label = subj_df["trauma"].iloc[0]
                pred_label = subj_df["prediction"].mode()[0]
                session_rows.append(
                    {"subject": subj, "trauma": true_label, "prediction": pred_label}
                )

            session_df = pd.DataFrame(session_rows)
            session_df.to_csv(
                os.path.join(results_path, "results_session.csv"), index=False
            )
            session_results = {
                name: func(session_df["trauma"], session_df["prediction"])
                for name, func in metrics.items()
            }
            with open(os.path.join(results_path, "results_session.yaml"), "w") as fp:
                yaml.dump(session_results, fp)