In [1]:
import os
import re
import glob 

import numpy as np
import pandas as pd
from scipy.spatial.distance import pdist, squareform
from sklearn.model_selection import LeaveOneGroupOut
from rsatoolbox.data import Dataset
from rsatoolbox.rdm.calc import calc_rdm
from rsatoolbox.rdm import RDMs

import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
run = 8 # example run

In [3]:
deriv_dir = "/home/exp-psy/Desktop/study_face_tracks/derivatives"
df_path = os.path.join(deriv_dir, "reference_face-emotions", f"emotions_av_1s_events_run-{run}_events.tsv")

## First: Use the Annotation to create Character Events

In [4]:
in_df = pd.read_csv(df_path, sep="\t", index_col=0)
in_df.head(10)

Unnamed: 0_level_0,duration,character,arousal,valence_positive,valence_negative,c_audio,c_context,c_face,c_gesture,c_narrator,...,e_hope,e_love,e_pity/compassion,e_pride,e_relief,e_remorse,e_resent,e_sadness,e_satisfaction,e_shame
onset,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0.56,2.0,JENNY,-0.444444,0.666667,0.111111,0.333333,0.111111,0.611111,0.222222,0.0,...,0.0,0.111111,0.0,0.0,0.111111,0.0,0.0,0.111111,0.0,0.0
9.56,43.0,JENNY,-0.777778,0.0,0.777778,0.111111,0.222222,0.777778,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.666667,0.0,0.0
20.56,11.0,FORREST,-0.555556,0.222222,0.444444,0.111111,0.111111,0.333333,0.0,0.0,...,0.0,0.111111,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
31.56,1.0,FORREST,-0.555556,0.222222,0.666667,0.0,0.444444,0.666667,0.0,0.0,...,0.0,0.333333,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
32.56,1.0,FORREST,-0.555556,0.222222,1.0,0.111111,0.666667,0.888889,0.0,0.0,...,0.0,0.333333,0.0,0.0,0.0,0.0,0.0,0.222222,0.0,0.0
33.56,16.0,FORREST,-0.333333,0.222222,0.888889,0.111111,0.666667,1.0,0.0,0.0,...,0.0,0.444444,0.0,0.0,0.0,0.0,0.0,0.222222,0.0,0.0
49.56,2.0,FORREST,-0.333333,0.333333,0.888889,0.111111,0.666667,1.0,0.0,0.0,...,0.0,0.555556,0.0,0.0,0.0,0.0,0.0,0.222222,0.0,0.0
51.56,1.0,FORREST,-0.444444,0.222222,0.777778,0.111111,0.555556,0.888889,0.0,0.0,...,0.0,0.444444,0.0,0.0,0.0,0.0,0.0,0.222222,0.0,0.0
52.56,3.0,JENNY,-0.777778,1.0,0.111111,0.111111,0.222222,1.0,0.0,0.0,...,0.0,0.555556,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
52.56,1.0,FORREST,-0.222222,0.222222,0.555556,0.111111,0.333333,0.666667,0.0,0.0,...,0.0,0.111111,0.0,0.0,0.0,0.0,0.0,0.222222,0.0,0.0


In [5]:
# correct faulty onset column
if in_df.index.name == "onset" or "onset" not in in_df.columns:
    in_df = in_df.reset_index()
in_df.columns = in_df.columns.str.replace(r'^e_', '', regex=True)
in_df = pd.DataFrame(in_df)

In [6]:
in_df.columns

Index(['onset', 'duration', 'character', 'arousal', 'valence_positive',
       'valence_negative', 'c_audio', 'c_context', 'c_face', 'c_gesture',
       'c_narrator', 'c_verbal', 'd_other', 'd_self', 'admiration',
       'anger/rage', 'contempt', 'disappointment', 'fear', 'fears_confirmed',
       'gloating', 'gratification', 'gratitude', 'happiness', 'happy-for',
       'hate', 'hope', 'love', 'pity/compassion', 'pride', 'relief', 'remorse',
       'resent', 'sadness', 'satisfaction', 'shame'],
      dtype='object')

In [7]:
in_df.drop(in_df[in_df["duration"] <= 2].index, inplace=True)
in_df.head(10)

Unnamed: 0,onset,duration,character,arousal,valence_positive,valence_negative,c_audio,c_context,c_face,c_gesture,...,hope,love,pity/compassion,pride,relief,remorse,resent,sadness,satisfaction,shame
1,9.56,43.0,JENNY,-0.777778,0.0,0.777778,0.111111,0.222222,0.777778,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.666667,0.0,0.0
2,20.56,11.0,FORREST,-0.555556,0.222222,0.444444,0.111111,0.111111,0.333333,0.0,...,0.0,0.111111,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,33.56,16.0,FORREST,-0.333333,0.222222,0.888889,0.111111,0.666667,1.0,0.0,...,0.0,0.444444,0.0,0.0,0.0,0.0,0.0,0.222222,0.0,0.0
8,52.56,3.0,JENNY,-0.777778,1.0,0.111111,0.111111,0.222222,1.0,0.0,...,0.0,0.555556,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
10,53.56,3.0,FORREST,-0.333333,0.222222,0.444444,0.111111,0.222222,0.555556,0.0,...,0.0,0.111111,0.0,0.0,0.0,0.0,0.0,0.222222,0.0,0.0
12,57.56,5.0,FORREST,-0.222222,0.666667,0.111111,0.111111,0.333333,0.444444,0.0,...,0.0,0.333333,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
15,79.56,6.0,JENNY,-0.222222,0.666667,0.0,0.0,0.0,0.555556,0.222222,...,0.0,0.333333,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
17,87.56,4.0,JENNY,-0.555556,1.0,0.0,0.0,0.0,1.0,0.444444,...,0.0,0.444444,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
19,92.56,8.0,JENNY,-0.555556,1.0,0.0,0.111111,0.0,0.777778,0.555556,...,0.0,0.777778,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
20,92.56,7.0,FORREST,-0.333333,0.888889,0.0,0.0,0.222222,0.777778,0.333333,...,0.0,0.888889,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


## Construct Emotion LS-A

In [8]:
emotion_cols = ['contempt', 'disappointment', 'fear', 'fears_confirmed', 'gloating', 
 'gratification', 'gratitude', 'happiness', 'happy-for', 'hate', 
 'hope', 'love', 'pity/compassion', 'pride', 'relief', 'remorse', 
 'resent', 'sadness', 'satisfaction', 'shame']

In [9]:
# Create a list to collect new rows
rows = []

# Iterate through each row and emotion column
for _, row in in_df.iterrows():
    for col in emotion_cols:
        if row[col] >= 0.2:
            rows.append({
                "onset": row["onset"],
                "duration": row["duration"],
                "trial_type": col
            })

In [10]:
emotion_events_df = pd.DataFrame(rows)
emotion_events_df

Unnamed: 0,onset,duration,trial_type
0,9.56,43.0,sadness
1,20.56,11.0,fear
2,33.56,16.0,fear
3,33.56,16.0,love
4,33.56,16.0,sadness
...,...,...,...
69,599.56,4.0,love
70,599.56,4.0,pride
71,603.56,10.0,pride
72,614.56,3.0,sadness


In [11]:
emotion_events_df.to_csv(
    os.path.join(deriv_dir, "reference_face-emotions", f"run-0{run}_lsa-emotions.tsv"), 
    sep="\t", 
    index=False
)

In [12]:
event_dfs = []

In [13]:
df_events = in_df[["onset", "duration", "character"]].copy()

## Now Create Valence LS-A

In [14]:
pos_vals = in_df.loc[in_df['valence_positive'] > 0, 'valence_positive']
neg_vals = in_df.loc[in_df['valence_negative'] > 0, 'valence_negative']

pos_median = pos_vals.median()
neg_median = neg_vals.median()

pos_median, neg_median

(np.float64(0.666666666667), np.float64(0.3333333333329999))

In [15]:
def determine_valence_median_split(row):
    pos_val = row.get("valence_positive", 0)
    neg_val = row.get("valence_negative", 0)
    
    pos = pos_val > 0
    neg = neg_val > 0
    
    # Ambiguous if both pos and neg present
    if pos and neg:
        return "ambiguous"
    
    # Positive valence only
    if pos and not neg:
        if pos_val > pos_median:
            return "highpositive"
        else:
            return "lowpositive"
    
    # Negative valence only
    if neg and not pos:
        if neg_val > neg_median:
            return "highnegative"
        else:
            return "lownegative"
    
    return "neutral"

In [16]:
in_df["trial_type"] = in_df.apply(determine_valence_median_split, axis=1)
valence_df = in_df[["onset", "duration", "trial_type"]]
valence_df

Unnamed: 0,onset,duration,trial_type
1,9.56,43.0,highnegative
2,20.56,11.0,ambiguous
5,33.56,16.0,ambiguous
8,52.56,3.0,ambiguous
10,53.56,3.0,ambiguous
...,...,...,...
107,596.56,3.0,ambiguous
108,599.56,4.0,ambiguous
109,603.56,10.0,ambiguous
111,614.56,3.0,ambiguous


In [17]:
valence_df.to_csv(
    os.path.join(deriv_dir, "reference_face-emotions", f"run-0{run}_lsa-valence.tsv"), 
    sep="\t", 
    index=False
)

In [18]:
break

SyntaxError: 'break' outside loop (668683560.py, line 1)

## Create Valence RDM over all Runs


In [None]:
all_dfs = []
groups = []

in_files = sorted(
    glob.glob(
        os.path.join(deriv_dir, "reference_face-emotions", f"emotions_av_1s_events_run-*_events.tsv")
    )
)
in_files

In [None]:
for run_idx, file in enumerate(in_files, 1):
    df = pd.read_csv(file, sep='\t')
    df["run"] = run_idx
    all_dfs.append(df)
    groups.extend([run_idx] * len(df))

df_all = pd.concat(all_dfs, ignore_index=True)
df_all.head(10)

In [None]:
def assign_valence(row):
    if row['valence_positive'] > row['valence_negative']:
        return 'positive'
    elif row['valence_negative'] > row['valence_positive']:
        return 'negative'
    else:
        return 'ambiguous'

In [None]:
df_all['valence_label'] = df_all.apply(assign_valence, axis=1)
df_all.head(10)

In [None]:
valence_groups = df_all.groupby("valence_label")[["valence_positive", "valence_negative"]].mean()
print("\nMean vectors per valence category:\n", valence_groups)

In [None]:
dist_matrix = squareform(pdist(valence_groups.values, metric="euclidean"))
rdm_df = pd.DataFrame(dist_matrix, 
                      index=valence_groups.index, 
                      columns=valence_groups.index)

In [None]:
sns.heatmap(rdm_df, annot=True, cmap="viridis", square=True, cbar_kws={'label': 'Euclidean Distance'})
plt.title("Model RDM (Valence-based)")
plt.tight_layout()
plt.show()

## Create Single-Face DF

In [None]:
for character, group in df_events.groupby("character"):
    group = group.reset_index(drop=True)
    group["trial_type"] = [f"{character}{i+1}" for i in range(len(group))]
    event_dfs.append(group[["onset", "duration", "trial_type"]])

In [None]:
individual_faces_df = pd.concat(event_dfs).sort_values("onset").reset_index(drop=True)
individual_faces_df.to_csv(
    os.path.join(deriv_dir, "reference_face-emotions", f"run-0{run}_adjusted-single-characters.tsv"), 
    sep="\t", 
    index=False
)

In [None]:
individual_faces_df.head(10)

## Create no-Faces DF

In [None]:
gaps = []

# Check for a gap at the beginning
if individual_faces_df.loc[0, "onset"] > 0:
    gaps.append({
        "onset": 0,
        "duration": individual_faces_df.loc[0, "onset"],
        "trial_type": "GAP"
    })

# Loop through events to find in-between gaps
for i in range(len(individual_faces_df) - 1):
    current_end = individual_faces_df.loc[i, "onset"] + individual_faces_df.loc[i, "duration"]
    next_start = individual_faces_df.loc[i + 1, "onset"]
    if next_start > current_end:
        gaps.append({
            "onset": current_end,
            "duration": next_start - current_end,
            "trial_type": "GAP"
        })

# Optional: Check if there is a gap at the end (define total duration)
total_duration = individual_faces_df["onset"].max() + individual_faces_df.loc[individual_faces_df["onset"].idxmax(), "duration"]
max_possible_time = 900  # for example, end of your scanning run
if total_duration < max_possible_time:
    gaps.append({
        "onset": total_duration,
        "duration": max_possible_time - total_duration,
        "trial_type": "GAP"
    })

# Create DataFrame from gaps
df_gaps = pd.DataFrame(gaps)
df_gaps

# Create DataFrame from gaps
df_gaps = pd.DataFrame(gaps)
df_gaps.drop(df_gaps[df_gaps["duration"] <= 2].index, inplace=True)
df_gaps["trial_type"] = [f"GAP{i+1}" for i in range(len(df_gaps))]

df_gaps.to_csv(
    os.path.join(deriv_dir, "reference_face-emotions", f"run-0{run}_adjusted-no-characters.tsv"), 
    sep="\t", 
    index=False
)

In [None]:
# loop over all to get character events
for run in range(1, 9):
    df_path = os.path.join(deriv_dir, "reference_face-emotions", f"emotions_av_1s_events_run-{run}_events.tsv")
    in_df = pd.read_csv(df_path, sep="\t", index_col=0)
    
    # correct faulty onset column
    if in_df.index.name == "onset" or "onset" not in in_df.columns:
        in_df = in_df.reset_index()
        
    event_dfs = []
    df_events = in_df[["onset", "duration", "character"]].copy()
    
    for character, group in df_events.groupby("character"):
        group = group.reset_index(drop=True)
        group["trial_type"] = [f"{character}{i+1}" for i in range(len(group))]
        event_dfs.append(group[["onset", "duration", "trial_type"]])

    individual_faces_df = pd.concat(event_dfs).sort_values("onset").reset_index(drop=True)
    
    # drop rows smaller than or equal to 2
    individual_faces_df.drop(individual_faces_df[individual_faces_df["duration"] <= 2].index, inplace=True)

    individual_faces_df = individual_faces_df[
    ~individual_faces_df["trial_type"].str.contains("VO|CROWD", case=False, na=False)
    ].reset_index(drop=True)
    
    individual_faces_df.to_csv(
        os.path.join(deriv_dir, "reference_face-emotions", f"run-0{run}_adjusted-single-characters.tsv"), 
        sep="\t", 
        index=False
    )

## Calculate pairwise distances for Emotion Annotations

In [None]:
df_dropped = in_df.drop(columns=["character"])
columns = ["arousal", "valence_positive", "valence_negative", "e_sadness", "e_happiness"]

In [None]:
output_dir = "/home/exp-psy/Desktop/study_face_tracks/derivatives/model_rdms/emotion_rdms"
os.makedirs(output_dir, exist_ok=True)

In [None]:
# loop over all runs and all target columns
for run in range(1, 9):
    df_path = os.path.join(deriv_dir, "reference_face-emotions", f"emotions_av_1s_events_run-{run}_events.tsv")
    in_df = pd.read_csv(df_path, sep="\t", index_col=0)
    # print(in_df.head(10))
    # correct faulty onset column
    if in_df.index.name == "onset" or "onset" not in in_df.columns:
        in_df = in_df.reset_index()
        
    # drop rows smaller than or equal to 2
    in_df.drop(in_df[in_df["duration"] <= 2].index, inplace=True)

    in_df = in_df[
    ~in_df["character"].str.contains("VO|CROWD", case=False, na=False)
    ].reset_index(drop=True)
    
    for col in columns:
        data = in_df[[col]]
        dist_array = pdist(data, metric="euclidean")
        dist_matrix = squareform(dist_array)

        print(f"shape of the matrix:\t {dist_matrix.shape}")
    
        npy_path = os.path.join(output_dir, f"run-{run}_{col}_distance-matrix.npy")
        np.save(npy_path, dist_matrix)
        print(f"saved at {npy_path}")
    
        # plot each heatmapmerged_df = pd.concat([pd.read_csv(fp, sep='\t') for fp in file_paths], ignore_index=True)
        plt.figure(figsize=(8, 6))
        sns.heatmap(dist_matrix, cmap="RdBu_r", square=True, cbar_kws={"label": "Correlation distance"})
        plt.title(f"run-{run}: {col}")
        plt.xlabel("")
        plt.ylabel("")
    
        plot_path = os.path.join(output_dir, f"run-{run}_{col}_distance-matrix.png")
        plt.savefig(plot_path)
        plt.close()

## Create single-character Hypothesis RDMs

In [None]:
def create_character_rdm(df, save_path="character_rdm.png"):
    """
    Create and plot a symmetric RDM where each character is maximally dissimilar
    from others and self-similar across their trials.

    Parameters
    ----------
    df : DataFrame
        Must contain a 'trial_type' column with string labels like 'FORREST25'.
    save_path : str
        Path to save the RDM image.

    Returns
    -------
    rdm : ndarray
        Symmetric RDM (n_trials x n_trials) with 0 for same character, 1 for different.
    """
    df.drop(df[df["duration"] <= 2].index, inplace=True)
    trial_types = df["trial_type"].tolist()
    characters = [re.match(r"[A-Z]+", t).group(0) for t in trial_types]

    # Create a (n x n) matrix: 0 if same character, 1 if different
    n = len(characters)
    rdm = np.zeros((n, n))

    for i in range(n):
        for j in range(n):
            rdm[i, j] = 0 if characters[i] == characters[j] else 1 
    return rdm, trial_types

In [None]:
for run in range(1, 9):
    # get face identity matrix
    npy_path = os.path.join(output_dir, f"run-{run}_face-identity_distance-matrix.npy")
    single_char_rdm, trial_types = create_character_rdm(
        pd.read_csv(
        os.path.join(deriv_dir, "reference_face-emotions", f"run-0{run}_adjusted-single-characters.tsv"), 
    sep="\t")
    )
    print(f"shape of the matrix:\t {single_char_rdm.shape}")
    np.save(npy_path, single_char_rdm)
    print(f"saved at {npy_path}")

    # save corresponding plot
    plot_path = os.path.join(output_dir, f"run-{run}_face-identity_distance-matrix.png")
    plt.figure(figsize=(12, 10))
    sns.heatmap(single_char_rdm, 
                square=True, 
                # annot=True, 
                cmap="RdBu_r",
                xticklabels=trial_types, 
                yticklabels=trial_types,
                cbar_kws={"label": "Dissimilarity"})
    plt.title("Character Identity RDM (0 = same, 1 = different)")
    plt.tight_layout()
    plt.savefig(plot_path, dpi=300)
    plt.show()