In [19]:
import os
import pandas as pd
import numpy as np
from scipy import interpolate, stats

CASE_DIR = 'data/CASE/data/interpolated'
PHYS_DIR = os.path.join(CASE_DIR, 'physiological')
ANN_DIR  = os.path.join(CASE_DIR, 'annotations')

In [20]:
files = os.listdir(PHYS_DIR)
subs  = sorted({f.split('_')[1].split('.')[0] for f in files})
print("Found subjects:", subs)

Found subjects: ['1', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '2', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '3', '30', '4', '5', '6', '7', '8', '9', 'physiological']


In [None]:
sid      = subs[0]  # change index to try different subjects
physio_fp = os.path.join(PHYS_DIR, f'sub_{sid}.csv')
annot_fp  = os.path.join(ANN_DIR,  f'sub_{sid}.csv')

df_phys = pd.read_csv(physio_fp)
df_ann  = pd.read_csv(annot_fp)

print(f"Subject {sid}:")
print("  physio shape:     ", df_phys.shape)
print("  annotation shape: ", df_ann.shape)

display(df_phys.head())
display(df_ann.head())

In [13]:
import pickle
import numpy as np
from scipy import interpolate, stats
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import os
from typing import Dict, Tuple, List, Optional
import warnings
import torch
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader
warnings.filterwarnings('ignore')

In [14]:
CASE_DIR     = 'data/CASE/data/interpolated'
PHYS_DIR     = os.path.join(CASE_DIR, 'physiological')
ANN_DIR      = os.path.join(CASE_DIR, 'annotations')
TARGET_FS    = 32    # Hz
ORIG_FS      = 1000  # Hz
WINDOW_SIZE  = 160   # 5 s @ 32 Hz
STRIDE       = 32    # 1 s
BATCH_SIZE   = 64

In [15]:
def list_subjects():
    """Return ['1','2',...,'30'] based on sub_<n>.csv filenames."""
    files = os.listdir(ANN_DIR)
    subs = sorted({f.split('_')[1].split('.')[0] for f in files})
    return subs

def load_case_interpolated(subject_id: str) -> pd.DataFrame:
    """
    Read sub_<id>.csv from both physio & annotation folders,
    assert they align, merge valence/arousal onto physio DataFrame.
    """
    physio_fp = os.path.join(PHYS_DIR,     f'sub_{subject_id}.csv')
    annot_fp  = os.path.join(ANN_DIR,      f'sub_{subject_id}.csv')
    df_phys   = pd.read_csv(physio_fp)
    df_ann    = pd.read_csv(annot_fp)

    assert len(df_phys) == len(df_ann), "Interpolation mismatch!"
    # pick only valence/arousal from annotation
    df = df_phys.copy()
    df['valence'] = df_ann['valence']
    df['arousal'] = df_ann['arousal']
    return df

def downsample_to_32Hz(df: pd.DataFrame) -> pd.DataFrame:
    """
    Linearly resample every column (except 'video') from 1kHz → 32Hz.
    Drop 'daqtime', 'jstime', and keep only sensor + valence/arousal.
    """
    # drop the time and video columns for interpolation
    to_drop = [c for c in ('daqtime','jstime','video') if c in df.columns]
    data = df.drop(columns=to_drop).values
    orig_len, nchan = data.shape

    new_len = int(orig_len * TARGET_FS / ORIG_FS)
    t_orig  = np.linspace(0,1,orig_len)
    t_new   = np.linspace(0,1,new_len)

    out = np.zeros((new_len, nchan))
    for i in range(nchan):
        f = interpolate.interp1d(t_orig, data[:,i],
                                 kind='linear',
                                 fill_value='extrapolate')
        out[:,i] = f(t_new)

    return pd.DataFrame(out, columns=df.drop(columns=to_drop).columns)

def create_windows(X: np.ndarray, y: np.ndarray,
                   window_size=WINDOW_SIZE, stride=STRIDE):
    Xw, yw = [], []
    for start in range(0, len(X)-window_size+1, stride):
        win = X[start:start+window_size]
        lbls = y[start:start+window_size]
        mode = stats.mode(lbls, keepdims=False).mode
        Xw.append(win)
        yw.append(mode)
    return np.array(Xw), np.array(yw)


In [18]:
list_subjects()

['1',
 '10',
 '11',
 '12',
 '13',
 '14',
 '15',
 '16',
 '17',
 '18',
 '19',
 '2',
 '20',
 '21',
 '22',
 '23',
 '24',
 '25',
 '26',
 '27',
 '28',
 '29',
 '3',
 '30',
 '4',
 '5',
 '6',
 '7',
 '8',
 '9',
 'annotations']

In [16]:
class TSData(Dataset):
    def __init__(self, X: np.ndarray, y: np.ndarray):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self):
        return len(self.y)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

def prepare_case_dataloaders(test_size=0.3, random_state=42):
    from sklearn.model_selection import train_test_split

    # 1) load & merge all subjects
    all_X, all_y = [], []
    for sid in list_subjects():
        df = load_case_interpolated(sid)
        df32 = downsample_to_32Hz(df)

        # binarize valence into 2 classes (example)
        v = df32['valence'].values
        labels = (v > v.mean()).astype(int)

        # build windows
        Xw, yw = create_windows(df32.drop(columns=['valence','arousal']).values,
                                labels)
        all_X.append(Xw);  all_y.append(yw)

    X = np.vstack(all_X)
    y = np.hstack(all_y)

    # 2) normalize
    scaler = StandardScaler()
    flat = X.reshape(-1, X.shape[2])
    flat = scaler.fit_transform(flat)
    X_norm = flat.reshape(X.shape)

    # 3) split
    X_tr, X_tmp, y_tr, y_tmp = train_test_split(X_norm, y,
                                                test_size=test_size,
                                                random_state=random_state,
                                                stratify=y)
    X_val, X_te, y_val, y_te = train_test_split(X_tmp, y_tmp,
                                                test_size=0.5,
                                                random_state=random_state,
                                                stratify=y_tmp)

    # 4) wrap
    loaders = {}
    for split, (X_s, y_s) in zip(
        ['train','val','test'],
        [(X_tr,y_tr),(X_val,y_val),(X_te,y_te)]
    ):
        ds = TSData(X_s, y_s)
        loaders[split] = DataLoader(ds, batch_size=BATCH_SIZE,
                                    shuffle=(split=='train'))
    return loaders

In [17]:
loaders = prepare_case_dataloaders()
for name, loader in loaders.items():
    print(f"{name} batches:", len(loader), 
            "->", next(iter(loader))[0].shape)
# Now you can feed `loaders['train']`, etc. into your train_model/test_model.

AssertionError: Interpolation mismatch!