# Imports

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pandas as pd
import numpy as np
from pathlib import Path
import re
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.patches as mpatches
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from tqdm.auto import tqdm
from sentence_transformers import SentenceTransformer

2025-08-14 23:35:57.148033: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2025-08-14 23:35:57.148059: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


# Prepare data

## Retrieve annotations by seconds

In [3]:
file_name = "A_12_20130925_1212_01_2.txt"

In [4]:
def load_annotations(file_name: str):
    df_raw = pd.read_csv(file_name, delimiter='\t', header=None)
    df = df_raw.iloc[:, [0, 2, 4, 6, 8]].copy()
    df.columns = ['Tier_Name', 'Start_Time', 'End_Time', 'Duration', 'Annotation_Value']
    return df

def filter_by_scene(df: pd.DataFrame):
    scene_records = df[df['Tier_Name'] == 'Scene']
    if scene_records.empty:
        print("Scene tier not found. Keeping all annotations.")
        return df

    # Retrieve the scene intervals
    scene_intervals = []
    for index, row in scene_records.iterrows():
        scene_intervals.append((row['Start_Time'], row['End_Time']))

    # Filter annotations based on these intervals
    df_filtered = pd.DataFrame()
    for start_time, end_time in scene_intervals:
        df_fragment = df[(df['Start_Time'] >= start_time) & (df['End_Time'] <= end_time)].copy()
        df_filtered = pd.concat([df_filtered, df_fragment])

    df = df_filtered.copy()
    return df

def time_string_to_ms(time_str: str):
    # Convert time string to milliseconds
    if pd.isna(time_str):
        return None
    h, m, s_ms = time_str.split(':')
    s, ms = s_ms.split('.')
    return int(h) * 3600000 + int(m) * 60000 + int(s) * 1000 + int(ms)

def preprocess_times(df: pd.DataFrame):
    df = df.copy()
    df['Start_Time_ms'] = df['Start_Time'].apply(time_string_to_ms)
    df['End_Time_ms'] = df['End_Time'].apply(time_string_to_ms)
    df['Start'] = (df['Start_Time_ms'] // 100).astype('Int64')
    df['End'] = (df['End_Time_ms'] // 100).astype('Int64')

    df = df.dropna()

    for c in ['Tier_Name', 'Annotation_Value']:
        df[c] = df[c].astype(str).str.strip()

    #Remove reference numbers in annotations, e.g. "14_mom spits the ball out of the mouth"
    df['Annotation_Value'] = df['Annotation_Value'].apply(lambda x: re.sub(r'^\d+_', '', x) if isinstance(x, str) else x)

    return df

def annotations_per_s(df: pd.DataFrame):
    rows = []
    max_t = int(df['End'].max())

    for t in range(0, max_t + 1):
        active_rows = df[(df['Start'] <= t) & (df['End'] > t)]
        if not active_rows.empty:
            for _, row in active_rows.iterrows():
              if row['Tier_Name'] not in ["Round", "Scene", "Comment"]:
                rows.append({'Time': t, 'Tier': row['Tier_Name'], 'Annotation': row['Annotation_Value']})

    return pd.DataFrame(rows)


def export(df: pd.DataFrame, output_name: str):
    df = df.dropna()
    df.to_csv(output_name, index=False)
    print(f"Per-second annotations saved to {Path(output_name).resolve()}")


def process_annotations(file_name: str, output_name: str = "annotations_per_t.csv"):
    df = load_annotations(file_name)
    df = filter_by_scene(df)
    df = preprocess_times(df)
    per_t_df = annotations_per_s(df)
    export(per_t_df, output_name)


process_annotations(file_name, "annotations_per_t.csv")


Per-second annotations saved to /home/elizaveta/Documents/RedBallGame/annotations_per_t.csv


## Annotations categorization

In [5]:
df = pd.read_csv("annotations_per_t.csv")

In [6]:
# Categorize Facial@MOT annotations

facial_mot_counts = df[df['Tier']=='Facial@MOT'].value_counts('Annotation')
print(facial_mot_counts)

facial_mapping = {
    "smile showing teeth": "positive",
    "smile": "positive",
    "widely opened eyes": "positive",
    "O-shaped mouth": "neutral",
    "neutral": "neutral",
    "biting lower lip": "neutral",
    "invisible": "invisible"
}

def categorize_facial_mot(facial_annotation):
    return facial_mapping.get(facial_annotation, "neutral")

Annotation
invisible              1302
neutral                 503
smile showing teeth     436
widely opened eyes      208
smile                   197
O-shaped mouth          104
biting lower lip         18
dtype: int64


In [9]:
# Categorize Utterance@MOT annotations
utterance_mot_counts = df[df['Tier']=='Utterance@MOT'].value_counts('Annotation')
print(utterance_mot_counts)

# Utterance categories
questions = [
    "Ready?",
    "Can I please have a ball?",
    "What even set off you laughing so hard?"
]
commands = [
    "Drop it.",
    "Drop it in.",
    "Put it in here.",
    "Can you sit up?",
]
affection = [
    "I love you."
]

playful_sounds = [
    "chooga",
    "... tickle you",
    "Yeaaah!"
]

statements = [
    "She’s laughing so hard that she literally can’t even stay seated.",
    "I hope this is not the laugh before the cry.",
    "Really far.",
    "Too far.",
    "I take this one.",
    "This is harmonious to hear.",
    "Keep falling."
]

utterance_lists = {
    'question': questions,
    'statement': statements,
    'affection': affection,
    'playful_sounds': playful_sounds
}

utterance_category_mapping = {value: category for category, values in utterance_lists.items() for value in values}

Annotation
She’s laughing so hard that she literally can’t even stay seated.    66
This is harmonious to hear.                                          24
What even set off you laughing so hard?                              22
I hope this is not the laugh before the cry.                         22
Ready?                                                               18
... tickle you                                                       15
chooga                                                               15
Yeaaah!                                                              12
Keep falling.                                                        11
Can I please have a ball?                                             9
Really far.                                                           9
Drop it.                                                              9
I love you.                                                           8
Can you sit up?                                      

In [10]:
def categorize_utterance_mot(value):
    return utterance_category_mapping.get(value, "statement")

In [None]:
df['Annotation'] = df.apply(
    lambda row: categorize_facial_mot(row['Annotation']) if row['Tier'] == 'Facial@MOT' else
                categorize_utterance_mot(row['Annotation']) if row['Tier'] == 'Utterance@MOT' else
                row['Annotation'],
    axis=1
)

## Create objects

In [None]:
tiers = [
    "Action@MOT",
    "Gaze@MOT",
    "Utterance@MOT",
    "Prosody@MOT",
    "Facial@MOT",
    "Laughter@MOT",
    "Laughter@CHI",
    "Gaze@CHI",
    "GazePattern"
]

pivot_df = df.pivot_table(
    index='Time',
    columns='Tier',
    values='Annotation',
    aggfunc=lambda x: x.iloc[0]  # if multiple rows for same tier & second, take first
)

# Create GazePattern column before selecting tiers
records = pivot_df.to_dict(orient='records')
for record in records:
    mot_target = record.get("Gaze@MOT")
    chi_target = record.get("Gaze@CHI")

    if chi_target == "mom" and mot_target == "child":
        label = "MA"  # MutualAttention
    elif mot_target == "invisible":
        label = "<UNK>"  # Unknown
    elif chi_target == "mom":
        label = "SA(CHI)"  # SingleAttention(CHI)
    elif mot_target == "child":
        label = "SA(MOT)"  # SingleAttention(MOT)
    elif chi_target == mot_target and chi_target is not None:
        label = "ShA"  # SharedAttention
    elif chi_target != mot_target:
        label = "L"  # LostAttention
    else:
        label = "<UNK>"  # Unknown

    record["GazePattern"] = label

# Convert records back to DataFrame to select tiers
pivot_df = pd.DataFrame(records)

pivot_df = pivot_df[tiers]

pivot_df = pivot_df.where(pd.notnull(pivot_df), None)

records = pivot_df.to_dict(orient='records')

In [14]:
# Instead of dividing data for input and output before the training,
# this will done within the model: creating targets lists for the expected output

X = records.copy()

## Retrieve vocabs

In [None]:
values = defaultdict(set)

for r in records:
    for key in r.keys():
        value = r[key]
        if value is None or value == '' or value == 'invisible':
            value = "<UNK>"  # use <UNK> token for missing
        values[key].add(value)

vocab_action    = {token: idx for idx, token in enumerate(sorted(values['Action@MOT']))}
vocab_gaze      = {token: idx for idx, token in enumerate(sorted(values['Gaze@MOT']))}
vocab_utterance = {token: idx for idx, token in enumerate(sorted(values['Utterance@MOT']))}
vocab_prosody   = {token: idx for idx, token in enumerate(sorted(values['Prosody@MOT']))}
vocab_facial    = {token: idx for idx, token in enumerate(sorted(values['Facial@MOT']))}
vocab_laughter  = {token: idx for idx, token in enumerate(sorted(values['Laughter@MOT']))}
vocab_laughter_chi = {token: idx for idx, token in enumerate(sorted(values['Laughter@CHI']))}
vocab_gaze_chi  = {token: idx for idx, token in enumerate(sorted(values['Gaze@CHI']))}
vocab_gazerelation  = {token: idx for idx, token in enumerate(sorted(values['GazePattern']))}

## Divide dataset on train and test

In [17]:
# timeline_length = len(records)
# train_end = int(0.75 * timeline_length)
# validation_end = int(0.90 * timeline_length) # 75% for train, 15% for validation, 10% for test

# X_train = X[:train_end]
# X_validation = X[train_end:validation_end]
# X_test  = X[validation_end:]

In [43]:
def split_datset (X, n_parts, train_size, val_size):
    n_parts = 4

    n = len(X)
    part_size = n // n_parts

    train_idx, val_idx, test_idx = [], [], []

    for i in range(n_parts):
        start = i * part_size
        end = (i + 1) * part_size if i < n_parts - 1 else n

        part_indices = np.arange(start, end)

        n_train = int(len(part_indices) * train_size)
        n_val = int(len(part_indices) * val_size)
        # n_test = len(part_indices) - n_train - n_val  # остаток

        # Индексы для этой части (сохраняем порядок)
        train_idx.extend(part_indices[:n_train])
        val_idx.extend(part_indices[n_train:n_train + n_val])
        test_idx.extend(part_indices[n_train + n_val:])

    # Собираем выборки
    X_train = [X[i] for i in train_idx]
    X_validation = [X[i] for i in val_idx]
    X_test = [X[i] for i in test_idx]

    return X_train, X_validation, X_test


In [19]:
print(len(X_train))
print(len(X_validation))
print(len(X_test))

2326
496
504


# Baseline Model

## Model

In [None]:
class SocialActionPredictor(nn.Module):
    def __init__(self,
                 # passing the vocabularies for embeddings
                 vocab_gaze, vocab_utterance, vocab_prosody,
                 vocab_facial, vocab_laughter, vocab_action,
                 vocab_laughter_chi, vocab_gaze_chi,
                 hidden_dim = 16
                 ):
        super().__init__()
        # Vocabs
        self.vocab_gaze = vocab_gaze
        self.vocab_utterance = vocab_utterance
        self.vocab_prosody = vocab_prosody
        self.vocab_facial = vocab_facial
        self.vocab_laughter = vocab_laughter
        self.vocab_action = vocab_action
        self.vocab_laughter_chi = vocab_laughter_chi
        self.vocab_gaze_chi = vocab_gaze_chi

        # Embedding layers
        self.embed_gaze = nn.Embedding(len(vocab_gaze), 3) # nn.Embedding(num_embeddings, embedding_dim)
        self.embed_utterance = nn.Embedding(len(vocab_utterance), 3)
        self.embed_prosody = nn.Embedding(len(vocab_prosody), 2)
        self.embed_facial = nn.Embedding(len(vocab_facial), 3)
        self.embed_laughter = nn.Embedding(len(vocab_laughter), 2)
        self.embed_action = nn.Embedding(len(vocab_action), 3)

        self.embed_laughter_chi = nn.Embedding(len(vocab_laughter_chi), 2)
        self.embed_gaze_chi = nn.Embedding(len(vocab_gaze_chi), 3)

        # LSTM
        input_dim = 3 + 3 + 2 + 3 + 2 + 3 # considering only mother's tiers

        self.hidden_dim = hidden_dim
        self.lstm_cell = nn.LSTMCell(input_dim, hidden_dim)
        self.dropout = nn.Dropout(0.3)
        self.proj = nn.Linear(input_dim, hidden_dim)

        # Decoder to predict child gaze, laughter and mother's action
        self.decoder_gaze = nn.Linear(hidden_dim, len(vocab_gaze_chi))
        self.decoder_laughter = nn.Linear(hidden_dim, len(vocab_laughter_chi))

    # method for finding value's index in list
    def lookup(self, vocab, value):
        if value is None:
            value = "<UNK>"
        return vocab.get(value, vocab["<UNK>"])

    # converting inputs into indices
    def encode_input(self, input, device):
        gaze = torch.tensor([self.lookup(self.vocab_gaze, input.get("Gaze@MOT"))]).to(device)
        utterance = torch.tensor([self.lookup(self.vocab_utterance, input.get("Utterance@MOT"))]).to(device)
        prosody = torch.tensor([self.lookup(self.vocab_prosody, input.get("Prosody@MOT"))]).to(device)
        facial = torch.tensor([self.lookup(self.vocab_facial, input.get("Facial@MOT"))]).to(device)
        laughter = torch.tensor([self.lookup(self.vocab_laughter, input.get("Laughter@MOT"))]).to(device)
        action = torch.tensor([self.lookup(self.vocab_action, input.get("Action@MOT"))]).to(device)

        # concatenating the embeddings from all modalities
        return torch.cat([
            self.embed_gaze(gaze),
            self.embed_utterance(utterance),
            self.embed_prosody(prosody),
            self.embed_facial(facial),
            self.embed_laughter(laughter),
            self.embed_action(action)
        ], dim=-1)

    def forward(self, sequence_of_dicts, device):
        # initializating short (h) and long (c) term memory
        h_t = torch.zeros(1, self.hidden_dim).to(device)
        c_t = torch.zeros(1, self.hidden_dim).to(device)

        # predicted by LSTM cell
        logits_gaze_per_step = []
        logits_laughter_per_step = []

        # expected results
        targets_gaze_per_step = []
        targets_laughter_per_step = []

        for step_dict in sequence_of_dicts:
            x_t = self.encode_input(step_dict, device) # input
            h_t, c_t = self.lstm_cell(x_t, (h_t, c_t)) # short and long term memory

            # Predict child gaze and laughter
            logits_gaze = self.decoder_gaze(h_t)
            logits_laughter = self.decoder_laughter(h_t)

            # These are gaze and laughter that are predicted by LSTM
            logits_gaze_per_step.append(logits_gaze)
            logits_laughter_per_step.append(logits_laughter)

            # Store actual child gaze and laughter indices for loss calculation
            targets_gaze_per_step.append(self.lookup(self.vocab_gaze_chi, step_dict.get("Gaze@CHI")))
            targets_laughter_per_step.append(self.lookup(self.vocab_laughter_chi, step_dict.get("Laughter@CHI")))


        # Concatenate the list of tensors along the first dimension (sequence length)
        logits_gaze_per_step = torch.cat(logits_gaze_per_step, dim=0)
        logits_laughter_per_step = torch.cat(logits_laughter_per_step, dim=0)

        # Convert the list of target indices to single tensors
        targets_gaze_per_step = torch.tensor(targets_gaze_per_step, dtype=torch.long).to(device)
        targets_laughter_per_step = torch.tensor(targets_laughter_per_step, dtype=torch.long).to(device)

        return (logits_gaze_per_step, # predicted with LSTM
                logits_laughter_per_step, # predicted with LSTM
                targets_gaze_per_step, # expected (from input)
                targets_laughter_per_step # expected (from input)
                )

## Training

In [None]:
model = SocialActionPredictor(
    vocab_gaze, vocab_utterance, vocab_prosody,
    vocab_facial, vocab_laughter, vocab_action,
    vocab_laughter_chi, vocab_gaze_chi,
    hidden_dim = 8,
).to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
criterion = nn.CrossEntropyLoss()

num_epochs = 50

# Lists to store metrics per epoch
train_losses = []
train_gaze_accuracies = []
train_laughter_accuracies = []
val_losses = []
val_gaze_accuracies = []
val_laughter_accuracies = []


print("Starting training...")
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    # Training
    (logits_gaze, logits_laughter,
     targets_gaze, targets_laughter) = model(X_train, device)

    # Calculate loss for child gaze and laughter prediction
    loss_gaze = criterion(logits_gaze, targets_gaze)
    loss_laughter = criterion(logits_laughter, targets_laughter)

    # Combine losses
    loss = loss_gaze + loss_laughter

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    total_loss = loss.item()
    train_losses.append(total_loss) # Store training loss

    pred_gaze = torch.argmax(logits_gaze, dim=1)
    correct_gaze = (pred_gaze == targets_gaze).sum().item()
    accuracy_gaze = correct_gaze / targets_gaze.size(0)
    train_gaze_accuracies.append(accuracy_gaze) # Store training accuracy

    pred_laughter = torch.argmax(logits_laughter, dim=1)
    correct_laughter = (pred_laughter == targets_laughter).sum().item()
    accuracy_laughter = correct_laughter / targets_laughter.size(0)
    train_laughter_accuracies.append(accuracy_laughter) # Store training accuracy

    # Validation
    model.eval()
    with torch.no_grad():
        (val_logits_gaze, val_logits_laughter,
         val_targets_gaze, val_targets_laughter) = model(X_validation, device)

        val_loss_gaze = criterion(val_logits_gaze, val_targets_gaze)
        val_loss_laughter = criterion(val_logits_laughter, val_targets_laughter)
        val_total_loss = val_loss_gaze + val_loss_laughter
        val_losses.append(val_total_loss.item()) # Store validation loss

        val_pred_gaze = torch.argmax(val_logits_gaze, dim=1)
        val_correct_gaze = (val_pred_gaze == val_targets_gaze).sum().item()
        val_accuracy_gaze = val_correct_gaze / val_targets_gaze.size(0)
        val_gaze_accuracies.append(val_accuracy_gaze) # Store validation accuracy

        val_pred_laughter = torch.argmax(val_logits_laughter, dim=1)
        val_correct_laughter = (val_pred_laughter == val_targets_laughter).sum().item()
        val_accuracy_laughter = val_correct_laughter / val_targets_laughter.size(0)
        val_laughter_accuracies.append(val_accuracy_laughter) # Store validation accuracy


    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Train Loss: {total_loss:.4f}, Train Gaze Acc: {accuracy_gaze:.4f}, Train Laughter Acc: {accuracy_laughter:.4f}, "
          f"Val Loss: {val_total_loss:.4f}, Val Gaze Acc: {val_accuracy_gaze:.4f}, Val Laughter Acc: {val_accuracy_laughter:.4f}"
          )

print("Training finished.")

# Evaluate on Test Set
model.eval()
with torch.no_grad():
    (test_logits_gaze, test_logits_laughter,
     test_targets_gaze, test_targets_laughter) = model(X_test, device)

    test_loss_gaze = criterion(test_logits_gaze, test_targets_gaze)
    test_loss_laughter = criterion(test_logits_laughter, test_targets_laughter)
    test_total_loss = test_loss_gaze + test_loss_laughter

    test_pred_gaze = torch.argmax(test_logits_gaze, dim=1)
    test_correct_gaze = (test_pred_gaze == test_targets_gaze).sum().item()
    test_accuracy_gaze = test_correct_gaze / test_targets_gaze.size(0)

    test_pred_laughter = torch.argmax(test_logits_laughter, dim=1)
    test_correct_laughter = (test_pred_laughter == test_targets_laughter).sum().item()
    test_accuracy_laughter = test_correct_laughter / test_targets_laughter.size(0)

print("\nTest Set Evaluation:")
print(f"Test Loss: {test_total_loss:.4f}, Test Gaze Acc: {test_accuracy_gaze:.4f}, Test Laughter Acc: {test_accuracy_laughter:.4f}")

Starting training...
Epoch [1/50], Train Loss: 2.2623, Train Gaze Acc: 0.1836, Train Laughter Acc: 0.6580, Val Loss: 2.2567, Val Gaze Acc: 0.0240, Val Laughter Acc: 0.8176
Epoch [2/50], Train Loss: 2.2564, Train Gaze Acc: 0.1832, Train Laughter Acc: 0.6580, Val Loss: 2.2521, Val Gaze Acc: 0.0240, Val Laughter Acc: 0.8176
Epoch [3/50], Train Loss: 2.2505, Train Gaze Acc: 0.1848, Train Laughter Acc: 0.6620, Val Loss: 2.2476, Val Gaze Acc: 0.0261, Val Laughter Acc: 0.8176
Epoch [4/50], Train Loss: 2.2447, Train Gaze Acc: 0.1848, Train Laughter Acc: 0.6672, Val Loss: 2.2432, Val Gaze Acc: 0.0281, Val Laughter Acc: 0.8196
Epoch [5/50], Train Loss: 2.2389, Train Gaze Acc: 0.1885, Train Laughter Acc: 0.6692, Val Loss: 2.2388, Val Gaze Acc: 0.0281, Val Laughter Acc: 0.8357
Epoch [6/50], Train Loss: 2.2332, Train Gaze Acc: 0.1937, Train Laughter Acc: 0.6708, Val Loss: 2.2345, Val Gaze Acc: 0.0281, Val Laughter Acc: 0.8357
Epoch [7/50], Train Loss: 2.2275, Train Gaze Acc: 0.1981, Train Laughter 

## Cross Validation

### 10-k CV

In [None]:
from sklearn.model_selection import KFold

X_combined = X_train + X_validation + X_test

# Initialize KFold
kf = KFold(n_splits=10, shuffle=True, random_state=42)

test_losses = []
test_gaze_accuracies = []
test_laughter_accuracies = []

print("Starting 10-Fold Cross-Validation...")

# Iterate over each fold
for fold, (train_index, test_index) in enumerate(kf.split(X_combined)):
    print(f"\n--- Fold {fold+1}/10 ---")

    # Split data into train and test sets for the current fold
    X_train_fold = [X_combined[i] for i in train_index]
    X_test_fold = [X_combined[i] for i in test_index]

    # Initialize a new model for each fold to ensure independent training
    model = SocialActionPredictor(
        vocab_gaze, vocab_utterance, vocab_prosody,
        vocab_facial, vocab_laughter, vocab_action,
        vocab_laughter_chi, vocab_gaze_chi,
        hidden_dim = 8
    ).to(device)

    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    criterion = nn.CrossEntropyLoss()

    # Training loop for the current fold
    num_epochs_fold = 50
    for epoch in range(num_epochs_fold):
        model.train()
        (logits_gaze, logits_laughter,
         targets_gaze, targets_laughter) = model(X_train_fold, device)

        loss_gaze = criterion(logits_gaze, targets_gaze)
        loss_laughter = criterion(logits_laughter, targets_laughter)
        loss = loss_gaze + loss_laughter

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Evaluate on the test set for the current fold
    model.eval()
    with torch.no_grad():
        (test_logits_gaze, test_logits_laughter,
         test_targets_gaze, test_targets_laughter) = model(X_test_fold, device)

        test_loss_gaze = criterion(test_logits_gaze, test_targets_gaze)
        test_loss_laughter = criterion(test_logits_laughter, test_targets_laughter)
        test_total_loss = test_loss_gaze + test_loss_laughter

        test_pred_gaze = torch.argmax(test_logits_gaze, dim=1)
        test_correct_gaze = (test_pred_gaze == test_targets_gaze).sum().item()
        test_accuracy_gaze = test_correct_gaze / test_targets_gaze.size(0)

        test_pred_laughter = torch.argmax(test_logits_laughter, dim=1)
        test_correct_laughter = (test_pred_laughter == test_targets_laughter).sum().item()
        test_accuracy_laughter = test_correct_laughter / test_targets_laughter.size(0)

    test_losses.append(test_total_loss.item())
    test_gaze_accuracies.append(test_accuracy_gaze)
    test_laughter_accuracies.append(test_accuracy_laughter)

    print(f"Fold {fold+1} - Test Loss: {test_total_loss:.4f}, Test Gaze Acc: {test_accuracy_gaze:.4f}, Test Laughter Acc: {test_accuracy_laughter:.4f}")

# Report average performance across all folds
print("\n--- Cross-Validation Results ---")
print(f"Average Test Loss: {np.mean(test_losses):.4f}")
print(f"Average Test Gaze Accuracy: {np.mean(test_gaze_accuracies):.4f}")
print(f"Average Test Laughter Accuracy: {np.mean(test_laughter_accuracies):.4f}")


Starting 10-Fold Cross-Validation...

--- Fold 1/10 ---
Fold 1 - Test Loss: 2.0747, Test Gaze Acc: 0.3183, Test Laughter Acc: 0.6486

--- Fold 2/10 ---
Fold 2 - Test Loss: 1.9815, Test Gaze Acc: 0.5465, Test Laughter Acc: 0.6306

--- Fold 3/10 ---
Fold 3 - Test Loss: 1.9408, Test Gaze Acc: 0.5015, Test Laughter Acc: 0.7538

--- Fold 4/10 ---
Fold 4 - Test Loss: 2.0069, Test Gaze Acc: 0.4835, Test Laughter Acc: 0.6637

--- Fold 5/10 ---
Fold 5 - Test Loss: 2.1932, Test Gaze Acc: 0.4685, Test Laughter Acc: 0.6517

--- Fold 6/10 ---
Fold 6 - Test Loss: 2.0848, Test Gaze Acc: 0.3183, Test Laughter Acc: 0.6547

--- Fold 7/10 ---
Fold 7 - Test Loss: 2.0518, Test Gaze Acc: 0.5060, Test Laughter Acc: 0.6386

--- Fold 8/10 ---
Fold 8 - Test Loss: 2.0496, Test Gaze Acc: 0.4307, Test Laughter Acc: 0.6687

--- Fold 9/10 ---
Fold 9 - Test Loss: 2.1336, Test Gaze Acc: 0.4247, Test Laughter Acc: 0.6205

--- Fold 10/10 ---
Fold 10 - Test Loss: 2.0785, Test Gaze Acc: 0.4337, Test Laughter Acc: 0.7199



# Embeddings

## Pretrained sentence embedding

### Action and Gaze

#### Model

In [36]:
class SocialActionPredictor(nn.Module):
    def __init__(self,
                 vocabs,
                 isGazeRelation,
                 hidden_dim = 192,
                 num_layers = 2,
                 dropout = 0.3,
                 input_size = 40,
                 bidirectional = False
                 ):
        super().__init__()

        self.isGazeRelation = isGazeRelation

        # Vocabs
        self.vocab_gaze = vocabs["gaze"]
        self.vocab_utterance = vocabs["utterance"]
        self.vocab_prosody = vocabs["prosody"]
        self.vocab_facial = vocabs["facial"]
        self.vocab_laughter = vocabs["laughter"]
        self.vocab_gaze_chi = vocabs["gaze_chi"]
        self.vocab_laughter_chi = vocabs["laughter_chi"]

        if self.isGazeRelation:
            self.vocab_gazerelation = vocabs.vocab_gazerelation

        # Embedding layers
        self.embed_utterance = nn.Embedding(len(self.vocab_utterance), 8)
        self.embed_prosody = nn.Embedding(len(self.vocab_prosody), 4)
        self.embed_facial = nn.Embedding(len(self.vocab_facial), 8)
        self.embed_laughter = nn.Embedding(len(self.vocab_laughter), 4)
        self.sentence_encoder = SentenceTransformer('all-MiniLM-L6-v2')
        self.embed_laughter_chi = nn.Embedding(len(self.vocab_laughter_chi), 4)

        if self.isGazeRelation:
            self.embed_gazepattern = nn.Embedding(len(self.vocab_gazerelation), 4)

        # LSTM
        self.hidden_dim = hidden_dim
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,    # so input shape is (batch, seq_len, input_dim)
            dropout=dropout,     # optional dropout between layers
            bidirectional=bidirectional
        )

        # Determine the input size for the decoder based on bidirectionality
        decoder_input_dim = hidden_dim * (2 if bidirectional else 1)

        # Decoder to predict child gaze, laughter and mother's action
        self.decoder_laughter = nn.Linear(decoder_input_dim, len(self.vocab_laughter_chi))
        if self.isGazeRelation:
            self.decoder_gazerelation = nn.Linear(decoder_input_dim, len(self.vocab_gazerelation))
        self.project_gaze = nn.Linear(decoder_input_dim, self.sentence_encoder.get_sentence_embedding_dimension()) # Corrected input dimension

    def lookup(self, vocab, value):
        if value is None:
            value = "<UNK>"
        return vocab.get(value, vocab["<UNK>"])

    def encode_input(self, input, device):
        gaze = torch.tensor([self.lookup(self.vocab_gaze, input.get("Gaze@MOT"))]).to(device)
        utterance = torch.tensor([self.lookup(self.vocab_utterance, input.get("Utterance@MOT"))]).to(device)
        prosody = torch.tensor([self.lookup(self.vocab_prosody, input.get("Prosody@MOT"))]).to(device)
        facial = torch.tensor([self.lookup(self.vocab_facial, input.get("Facial@MOT"))]).to(device)
        laughter = torch.tensor([self.lookup(self.vocab_laughter, input.get("Laughter@MOT"))]).to(device)

        action_text = input.get("Action@MOT")

        if action_text is None or not isinstance(action_text, str):
            action_text = "<UNK>"
        action = torch.tensor(self.sentence_encoder.encode(action_text)).to(device).unsqueeze(0)

        gaze_text = input.get("Gaze@MOT")

        if gaze_text is None or not isinstance(gaze_text, str):
            gaze_text = "<UNK>"
        gaze = torch.tensor(self.sentence_encoder.encode(gaze_text)).to(device).unsqueeze(0)

        return torch.cat([
            gaze,
            self.embed_utterance(utterance),
            self.embed_prosody(prosody),
            self.embed_facial(facial),
            self.embed_laughter(laughter),
            action
        ], dim=-1)

    def forward(self, sequence_of_dicts, device):
        encoded_steps = []
        targets_gaze_per_step = []
        targets_laughter_per_step = []
        
        if self.isGazeRelation:
            targets_gazepattern_per_step = []

        for step_dict in sequence_of_dicts:
            encoded_steps.append(self.encode_input(step_dict, device))

            gaze_chi_value = step_dict.get("Gaze@CHI")

            if gaze_chi_value is None or not isinstance(gaze_chi_value, str):
                gaze_chi_value = "<UNK>"
            gaze_chi_embedding = torch.tensor(self.sentence_encoder.encode(gaze_chi_value)).to(device).unsqueeze(0)
            targets_gaze_per_step.append(gaze_chi_embedding.squeeze(0)) # Remove the batch dimension

            targets_laughter_per_step.append(self.lookup(self.vocab_laughter_chi, step_dict.get("Laughter@CHI")))

            if self.isGazeRelation:
                targets_gazepattern_per_step.append(self.lookup(self.vocab_gazepattern, step_dict.get("GazePattern")))

        x = torch.cat(encoded_steps, dim=0)
        x = x.unsqueeze(0)

        output, (h_n, c_n) = self.lstm(x)

        predicted_gaze_embeddings = self.project_gaze(output[0])
        logits_laughter_per_step = self.decoder_laughter(output[0])

        targets_gaze_per_step = torch.stack(targets_gaze_per_step, dim=0)

        targets_laughter_per_step = torch.tensor(targets_laughter_per_step, dtype=torch.long).to(device)
        
        if self.isGazeRelation:
            logits_gazepattern_per_step = self.decoder_gazepattern(output[0])
            targets_gazepattern_per_step = torch.tensor(targets_gazepattern_per_step, dtype=torch.long).to(device)


            return (predicted_gaze_embeddings, logits_laughter_per_step, logits_gazepattern_per_step,
            targets_gaze_per_step, targets_laughter_per_step, targets_gazepattern_per_step)
        
        return (predicted_gaze_embeddings, logits_laughter_per_step,
                targets_gaze_per_step, targets_laughter_per_step)
     

#### Training


In [38]:
vocabs = {"action": vocab_action, "gaze": vocab_gaze, "utterance": vocab_utterance,
             "prosody": vocab_prosody, "facial": vocab_facial, "laughter": vocab_laughter,
             "gazerelation": vocab_gazerelation, "laughter_chi": vocab_laughter_chi, "gaze_chi": vocab_gaze_chi}

isGazeRelation=False

In [41]:
def training_loop(isGazeRelation, hidden_dim, num_layers, dropout, bidirectional, train_set, validation_set, test_set):
        model = SocialActionPredictor(
            vocabs = vocabs,
            isGazeRelation = isGazeRelation,
            hidden_dim = hidden_dim,
            num_layers = num_layers,
            dropout = dropout,
            bidirectional = bidirectional,
            input_size=792
        ).to(device)

        optimizer = optim.Adam(model.parameters(), lr=0.003, weight_decay=1e-5)
        criterion = nn.CrossEntropyLoss()
        criterion_emb = nn.CosineEmbeddingLoss()
        cosine_similarity = nn.CosineSimilarity(dim=1)

        num_epochs = 5

        train_losses = []
        train_gaze_similarities = [] # Store gaze similarities
        train_laughter_accuracies = []
        val_losses = []
        val_gaze_similarities = [] # Store gaze similarities
        val_laughter_accuracies = []

        print("Starting training...")
        for epoch in tqdm(range(num_epochs), desc="Training Epochs"):
            model.train()
            total_loss = 0

            # Training
            if isGazeRelation:
                (predicted_gaze, logits_laughter, logits_gazerelation,
                targets_gaze, targets_laughter, targets_gazerelation) = model(train_set, device)
            else:
                (predicted_gaze, logits_laughter,
                targets_gaze, targets_laughter) = model(train_set, device)

            similarity_target = torch.ones(predicted_gaze.size(0), device=device)  # all similar

            loss_gaze = criterion_emb(predicted_gaze, targets_gaze, similarity_target)
            loss_laughter = criterion(logits_laughter, targets_laughter)

            if isGazeRelation:
                loss_gazerelation = criterion(logits_gazerelation, targets_gazerelation)

            # Combine losses
            loss = loss_gaze + loss_laughter
            if isGazeRelation:
                loss += loss_gazerelation

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss = loss.item()
            train_losses.append(total_loss) # Store training loss

            # Calculate training gaze similarity
            train_sim_gaze = cosine_similarity(predicted_gaze, targets_gaze).mean().item()
            train_gaze_similarities.append(train_sim_gaze)

            pred_laughter = torch.argmax(logits_laughter, dim=1)
            correct_laughter = (pred_laughter == targets_laughter).sum().item()
            accuracy_laughter = correct_laughter / targets_laughter.size(0)
            train_laughter_accuracies.append(accuracy_laughter)

            # Validation
            model.eval()
            with torch.no_grad():
                if isGazeRelation:
                    (val_predicted_gaze, val_logits_laughter, val_logits_gazerelation,
                    val_targets_gaze, val_targets_laughter, val_targets_gazerelation) = model(validation_set, device)
                else:
                    (val_predicted_gaze, val_logits_laughter,
                    val_targets_gaze, val_targets_laughter) = model(validation_set, device)
                
                val_loss_gaze = criterion_emb(val_predicted_gaze, val_targets_gaze, torch.ones(val_predicted_gaze.size(0), device=device))
                val_loss_laughter = criterion(val_logits_laughter, val_targets_laughter)
                if isGazeRelation:
                    val_loss_gazerelation = criterion(val_logits_gazerelation, val_targets_gazerelation)
                val_total_loss = val_loss_gaze + val_loss_laughter
                if isGazeRelation:
                    val_total_loss += val_loss_gazerelation
                val_losses.append(val_total_loss.item()) # Store validation loss

                # Calculate validation gaze similarity
                val_sim_gaze = cosine_similarity(val_predicted_gaze, val_targets_gaze).mean().item()
                val_gaze_similarities.append(val_sim_gaze)

                val_pred_laughter = torch.argmax(val_logits_laughter, dim=1)
                val_correct_laughter = (val_pred_laughter == val_targets_laughter).sum().item()
                val_accuracy_laughter = val_correct_laughter / val_targets_laughter.size(0)
                val_laughter_accuracies.append(val_accuracy_laughter) # Store validation accuracy

            print(f"Epoch [{epoch+1}/{num_epochs}], "
                f"Train Loss: {total_loss:.4f}, Train Gaze Sim: {train_sim_gaze:.4f}, Train Laughter Acc: {accuracy_laughter:.4f}, "
                f"Val Loss: {val_total_loss:.4f}, Val Gaze Sim: {val_sim_gaze:.4f}, Val Laughter Acc: {val_accuracy_laughter:.4f}"
                )

        print("Training finished.")

        # Evaluate on Test Set
        model.eval()
        with torch.no_grad():
            if isGazeRelation:
                (test_predicted_gaze, test_logits_laughter, test_logits_gazerelation,
                test_targets_gaze, test_targets_laughter, test_targets_gazerelation) = model(test_set, device)
            else:
                (test_predicted_gaze, test_logits_laughter,
                test_targets_gaze, test_targets_laughter) = model(test_set, device)

            test_loss_gaze = criterion_emb(test_predicted_gaze, test_targets_gaze, torch.ones(test_predicted_gaze.size(0), device=device))
            test_loss_laughter = criterion(test_logits_laughter, test_targets_laughter)

            if isGazeRelation:
                test_loss_gazepattern = criterion(test_logits_gazerelation, test_targets_gazerelation)
            test_total_loss = test_loss_gaze + test_loss_laughter

            if isGazeRelation:
                test_total_loss += test_loss_gazepattern

            # Calculate test gaze similarity
            test_sim_gaze = cosine_similarity(test_predicted_gaze, test_targets_gaze).mean().item()

            test_pred_laughter = torch.argmax(test_logits_laughter, dim=1)
            test_correct_laughter = (test_pred_laughter == test_targets_laughter).sum().item()
            test_accuracy_laughter = test_correct_laughter / test_targets_laughter.size(0)


        print("\nTest Set Evaluation:")
        print(f"Test Loss: {test_total_loss:.4f}, Test Gaze Sim: {test_sim_gaze:.4f}, Test Laughter Acc: {test_accuracy_laughter:.4f}")

In [45]:
train_set, validation_set, test_set = split_datset(X, 4, 0.75, 0.15)

training_loop(isGazeRelation=False, hidden_dim=256, num_layers=1, dropout=0.3, bidirectional=False, train_set=train_set, validation_set=validation_set, test_set=test_set)



Starting training...


Training Epochs:  20%|██        | 1/5 [02:00<08:02, 120.72s/it]

Epoch [1/5], Train Loss: 1.6834, Train Gaze Sim: 0.0268, Train Laughter Acc: 0.3406, Val Loss: 1.2487, Val Gaze Sim: 0.4919, Val Laughter Acc: 0.4315


Training Epochs:  40%|████      | 2/5 [03:48<05:39, 113.19s/it]

Epoch [2/5], Train Loss: 1.0844, Train Gaze Sim: 0.4917, Train Laughter Acc: 0.7296, Val Loss: 1.2479, Val Gaze Sim: 0.6688, Val Laughter Acc: 0.4315


Training Epochs:  60%|██████    | 3/5 [05:39<03:44, 112.07s/it]

Epoch [3/5], Train Loss: 0.8616, Train Gaze Sim: 0.6706, Train Laughter Acc: 0.7296, Val Loss: 1.1079, Val Gaze Sim: 0.7168, Val Laughter Acc: 0.4315


Training Epochs:  80%|████████  | 4/5 [07:31<01:52, 112.22s/it]

Epoch [4/5], Train Loss: 0.7585, Train Gaze Sim: 0.7250, Train Laughter Acc: 0.7296, Val Loss: 1.0271, Val Gaze Sim: 0.7207, Val Laughter Acc: 0.5645


Training Epochs: 100%|██████████| 5/5 [09:23<00:00, 112.75s/it]

Epoch [5/5], Train Loss: 0.7204, Train Gaze Sim: 0.7366, Train Laughter Acc: 0.7850, Val Loss: 1.1438, Val Gaze Sim: 0.7381, Val Laughter Acc: 0.5484
Training finished.






Test Set Evaluation:
Test Loss: 1.4615, Test Gaze Sim: 0.6234, Test Laughter Acc: 0.4154


#### 10-fold CV

In [None]:
from sklearn.model_selection import KFold

# Combine train and test data for cross-validation
X_combined = X_train + X_validation + X_test

# Initialize KFold
kf = KFold(n_splits=10, shuffle=True, random_state=42)

test_losses = []
test_gaze_similarities = []
test_laughter_accuracies = []

print("Starting 10-Fold Cross-Validation...")

# Iterate over each fold
for fold, (train_index, test_index) in tqdm(enumerate(kf.split(X_combined)), total=kf.get_n_splits(), desc="Cross-Validation Progress"):
    print(f"\n--- Fold {fold+1}/10 ---")

    # Split data into train and test sets for the current fold
    X_train_fold = [X_combined[i] for i in train_index]
    X_test_fold = [X_combined[i] for i in test_index]

    # Initialize a new model for each fold to ensure independent training
    model = SocialActionPredictor(
        vocab_gaze, vocab_utterance, vocab_prosody,
        vocab_facial, vocab_laughter, vocab_action,
        vocab_laughter_chi, vocab_gaze_chi, vocab_gazepattern,
        hidden_dim = 256, num_layers = 1,
        dropout = 0.5,
        bidirectional=True, input_size=792
    ).to(device)

    optimizer = optim.Adam(model.parameters(), lr=0.003, weight_decay=1e-5)
    criterion = nn.CrossEntropyLoss()
    criterion_emb = nn.CosineEmbeddingLoss()
    cosine_similarity = nn.CosineSimilarity(dim=1)

    num_epochs_fold = 10
    for epoch in range(num_epochs_fold):
        model.train()
        (predicted_gaze, logits_laughter, logits_gazepattern,
         targets_gaze, targets_laughter, targets_gazepattern) = model(X_train_fold, device)

        similarity_target = torch.ones(predicted_gaze.size(0), device=device)  # all similar

        loss_gaze = criterion_emb(predicted_gaze, targets_gaze, similarity_target)
        loss_laughter = criterion(logits_laughter, targets_laughter)
        loss_gazepattern = criterion(logits_gazepattern, targets_gazepattern)
        loss = loss_gaze + loss_laughter + loss_gazepattern

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Evaluate on the test set for the current fold
    model.eval()
    with torch.no_grad():
        (test_predicted_gaze, test_logits_laughter, test_logits_gazepattern,
         test_targets_gaze, test_targets_laughter, test_targets_gazepattern) = model(X_test_fold, device)

        test_loss_gaze = criterion_emb(test_predicted_gaze, test_targets_gaze, torch.ones(test_predicted_gaze.size(0), device=device))
        test_loss_laughter = criterion(test_logits_laughter, test_targets_laughter)
        test_loss_gazepattern = criterion(test_logits_gazepattern, test_targets_gazepattern)
        test_total_loss = test_loss_gaze + test_loss_laughter + test_loss_gazepattern

        # Calculate validation gaze similarity
        test_sim_gaze = cosine_similarity(test_predicted_gaze, test_targets_gaze).mean().item()
        # test_gaze_similarities.append(test_sim_gaze)

        test_pred_laughter = torch.argmax(test_logits_laughter, dim=1)
        test_correct_laughter = (test_pred_laughter == test_targets_laughter).sum().item()
        test_accuracy_laughter = test_correct_laughter / test_targets_laughter.size(0)

        test_pred_gazepattern = torch.argmax(test_logits_gazepattern, dim=1)
        test_correct_gazepattern = (test_pred_gazepattern == test_targets_gazepattern).sum().item()
        test_accuracy_gazepattern = test_correct_gazepattern / test_targets_gazepattern.size(0)


    test_losses.append(test_total_loss.item())
    test_gaze_similarities.append(test_sim_gaze)
    test_laughter_accuracies.append(test_accuracy_laughter)

    print(f"Fold {fold+1} - Test Loss: {test_total_loss:.4f}, Test Gaze Sim: {test_sim_gaze:.4f}, Test Laughter Acc: {test_accuracy_laughter:.4f}, Test GazePattern Acc: {test_accuracy_gazepattern:.4f}")

# Report average performance across all folds
print("\n--- Cross-Validation Results ---")
print(f"Average Test Loss: {np.mean(test_losses):.4f}")
print(f"Average Test Gaze Similarity: {np.mean(test_gaze_similarities):.4f}")
print(f"Average Test Laughter Accuracy: {np.mean(test_laughter_accuracies):.4f}")

Starting 10-Fold Cross-Validation...


Cross-Validation Progress:   0%|          | 0/10 [00:00<?, ?it/s]


--- Fold 1/10 ---
Fold 1 - Test Loss: 1.4926, Test Gaze Sim: 0.8339, Test Laughter Acc: 0.7718, Test GazePattern Acc: 0.7147

--- Fold 2/10 ---
Fold 2 - Test Loss: 1.5404, Test Gaze Sim: 0.8182, Test Laughter Acc: 0.7898, Test GazePattern Acc: 0.6637

--- Fold 3/10 ---
Fold 3 - Test Loss: 1.5271, Test Gaze Sim: 0.8161, Test Laughter Acc: 0.7868, Test GazePattern Acc: 0.6697

--- Fold 4/10 ---
Fold 4 - Test Loss: 1.4678, Test Gaze Sim: 0.8330, Test Laughter Acc: 0.7778, Test GazePattern Acc: 0.6577

--- Fold 5/10 ---
Fold 5 - Test Loss: 1.4638, Test Gaze Sim: 0.8394, Test Laughter Acc: 0.8378, Test GazePattern Acc: 0.6907

--- Fold 6/10 ---
Fold 6 - Test Loss: 1.5554, Test Gaze Sim: 0.8236, Test Laughter Acc: 0.8078, Test GazePattern Acc: 0.6547

--- Fold 7/10 ---
Fold 7 - Test Loss: 1.4757, Test Gaze Sim: 0.8385, Test Laughter Acc: 0.7711, Test GazePattern Acc: 0.6928

--- Fold 8/10 ---
Fold 8 - Test Loss: 1.4921, Test Gaze Sim: 0.8135, Test Laughter Acc: 0.7922, Test GazePattern Acc: