# Detecting Stress/Emotion Signals from Biometric Data

## Import Packages

In [37]:
import os
import pickle
import pandas as pd
import numpy as np
from scipy.signal import resample
from scipy.stats import pearsonr
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import csv

## Loading Data

The WESAD dataset is a multimodal dataset for wearable stress and affect detection, collected from 15 subjects using two devices: a chest-worn RespiBAN and a wrist-worn Empatica E4. It includes synchronized physiological signals such as EDA, ECG, EMG, respiration, temperature, and acceleration, sampled at high resolution. Each subject underwent conditions like baseline, stress, and amusement, with corresponding labels provided in the data.

### Loading Signals and Conditions

#### Downsampling, windowing data

In [2]:
SAMPLING_RATE_ORIG = 700
SAMPLING_RATE_TARGET = 4
WINDOW_SIZE_SEC = 15
STRIDE_SEC = 5

# Maps canonical names to actual keys in the dataset
SIGNAL_MAP = {
    'EDA': {'chest': 'EDA',   'wrist': 'EDA'},
    'TEMP': {'chest': 'Temp', 'wrist': 'TEMP'},
    'RESP': {'chest': 'Resp', 'wrist': None},   # Not available on wrist
    'ECG': {'chest': 'ECG',   'wrist': None},   # Not available on wrist
    'ACC': {'chest': 'ACC',   'wrist': 'ACC'},
    'EMG': {'chest': 'EMG',   'wrist': None},
    'BVP': {'chest': None,    'wrist': 'BVP'}
}

In [3]:
def load_and_downsample(subject_path, signals, device='chest'):
    with open(subject_path, 'rb') as f:
        data = pickle.load(f, encoding='latin1')

    label = data['label']
    scale = SAMPLING_RATE_TARGET / SAMPLING_RATE_ORIG
    new_len = int(len(label) * scale)

    signal_list = []

    for sig in signals:
        sig_key = SIGNAL_MAP[sig]
        parts = []

        for part in ['chest', 'wrist']:
            if device in [part, 'both'] and sig_key[part]:
                source = data['signal'][part][sig_key[part]]
                if source.ndim == 1:
                    source = source[:, np.newaxis]
                s_down = resample(source, new_len, axis=0)
                parts.append(s_down)

        if parts:
            signal_list.append(np.concatenate(parts, axis=1))

    all_signals = np.concatenate(signal_list, axis=1)
    labels = resample(label.astype(float), new_len).round().astype(int)
    return all_signals, labels

def normalize(data):
    return StandardScaler().fit_transform(data)

def create_windows(X, y, window_size=WINDOW_SIZE_SEC * SAMPLING_RATE_TARGET, stride=STRIDE_SEC * SAMPLING_RATE_TARGET):
    windows, labels = [], []
    for i in range(0, len(X) - window_size, stride):
        win_x = X[i:i + window_size]
        win_y = y[i:i + window_size]
        if np.any(win_y == 0):
            continue
        majority_label = np.bincount(win_y).argmax()
        windows.append(win_x)
        labels.append(majority_label)
    return np.array(windows), np.array(labels)

In [4]:
PANAS_ITEMS = [
    'Active', 'Distressed', 'Interested', 'Inspired', 'Annoyed', 'Strong', 'Guilty',
    'Scared', 'Hostile', 'Excited', 'Proud', 'Irritable', 'Enthusiastic', 'Ashamed',
    'Alert', 'Nervous', 'Determined', 'Attentive', 'Jittery', 'Afraid', 'Stressed',
    'Frustrated', 'Happy', 'Angry', 'Irritated', 'Sad'
]

TARGET_PANAS = ['Stressed', 'Angry', 'Happy', 'Sad', 'Inspired', 'Excited', 'Nervous']  # you can modify this list

In [5]:
def extract_panas_scores(quest_path):
    condition_scores = []

    with open(quest_path, 'r') as f:
        reader = csv.reader(f, delimiter=';')
        rows = [r for r in reader if r and r[0].startswith('# PANAS')]

    for row in rows:
        try:
            # Clean and parse only non-empty fields
            scores = [int(val) for val in row[1:] if val.strip().isdigit()]
            if len(scores) < len(PANAS_ITEMS):
                continue
            condition_scores.append(dict(zip(PANAS_ITEMS, scores)))
        except Exception as e:
            print(f"Failed to parse PANAS in {quest_path}: {e}")
            continue

    return condition_scores

#### Loading to dataset

In [6]:
def load_wesad_dataset(root_path, selected_signals=('EDA', 'TEMP', 'RESP', 'ECG', 'ACC'), device='chest'):
    all_x, all_y_cls, all_y_reg, all_subject_ids = [], [], [], []

    for subject_dir in sorted(os.listdir(root_path)):

        if not subject_dir.startswith("S"):
            continue

        pkl_path = os.path.join(root_path, subject_dir, f"{subject_dir}.pkl")
        quest_path = os.path.join(root_path, subject_dir, f"{subject_dir}_quest.csv")

        if not os.path.exists(pkl_path) or not os.path.exists(quest_path):
            print(f"Skipping {subject_dir}")
            continue

        try:
            X, y = load_and_downsample(pkl_path, selected_signals, device)
            X = normalize(X)
            win_x, win_y = create_windows(X, y)

            # === Load PANAS Scores ===
            panas_per_condition = extract_panas_scores(quest_path)
            y_reg = []

            for label in win_y:
                condition_idx = label - 1  # 1=Base, 2=Stress, ...
                if condition_idx >= len(panas_per_condition):
                    y_reg.append([0.0] * len(TARGET_PANAS))  # fallback
                    continue

                score_vec = [panas_per_condition[condition_idx][key] for key in TARGET_PANAS]
                y_reg.append(score_vec)

            all_x.append(win_x)
            all_y_cls.append(win_y)
            all_y_reg.append(np.array(y_reg))
            
            subject_ids = [subject_dir] * len(win_x)  # same ID for all windows from this subject
            all_subject_ids.append(np.array(subject_ids))

        except Exception as e:
            print(f"Error loading {subject_dir}: {e}")
    
    return (
        np.concatenate(all_x),
        np.concatenate(all_y_cls),
        np.concatenate(all_y_reg),
        np.concatenate(all_subject_ids)
    )

#### Define class for dataset

In [7]:
class WESADDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

#### Load and filter

In [17]:
X, y_cls, y_reg, subject_ids = load_wesad_dataset(
    root_path="../data/WESAD",
    selected_signals=('EDA', 'TEMP', 'RESP', 'ECG', 'ACC'),
    device='both'
)

print("Features:", X.shape)
print("Classification labels:", y_cls.shape)
print("PANAS regression targets:", y_reg.shape)

Features: (9120, 60, 12)
Classification labels: (9120,)
PANAS regression targets: (9120, 7)


In [9]:
# Save for later regression task
X_reg = X
subject_ids_reg = subject_ids

# Define mask
valid_mask = (y_cls >= 1) & (y_cls <= 4)

# Apply masking
X = X[valid_mask]
y_cls = y_cls[valid_mask] - 1

subject_ids = np.array(subject_ids)[valid_mask]

## Modeling - Condition Classification

### Split and Load Datasets

In [10]:
# Unique subjects
unique_subjects = sorted(set(subject_ids))
train_subjects, val_subjects = train_test_split(unique_subjects, test_size=0.2, random_state=42)

# Subject-level masks
train_mask = np.isin(subject_ids, train_subjects)
val_mask = np.isin(subject_ids, val_subjects)

In [11]:
X_train, y_cls_train = X[train_mask], y_cls[train_mask]
X_val,   y_cls_val   = X[val_mask],   y_cls[val_mask]

In [12]:
class WESADClassificationDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_ds = WESADClassificationDataset(X_train, y_cls_train)
val_ds   = WESADClassificationDataset(X_val, y_cls_val)

train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=64)

### Define Model

In [28]:
class CNNEmotionClassifier(nn.Module):
    def __init__(self, input_dim=12, num_classes=4):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(input_dim, 64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Dropout(0.5)
        )
        self.classifier = nn.Linear(256, num_classes)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # (B, 12, 60)
        x = self.net(x)
        x = x.squeeze(-1)       # (B, 256)
        return self.classifier(x)

### Define Parameters

In [14]:
print("Unique labels:", np.unique(y_cls))

Unique labels: [0 1 2 3]


In [30]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model = CNNEmotionClassifier(input_dim=12, num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

### Set Up Training / Evalution

In [16]:
def train(model, loader, optimizer, criterion):
    model.train()
    total_loss, correct, total = 0, 0, 0
    for X, y in loader:
        X, y = X.to(device), y.to(device)

        optimizer.zero_grad()
        logits = model(X)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * X.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)

    return total_loss / total, correct / total

In [17]:
@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    total_loss, correct, total = 0, 0, 0
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        logits = model(X)
        loss = criterion(logits, y)
        total_loss += loss.item() * X.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)
    return total_loss / total, correct / total

### Run Training / Evaluation

In [18]:
for epoch in range(1, 26):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_loader, criterion)

    print(f"Epoch {epoch:02d} | Train Loss: {train_loss:.4f} | Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f} | Acc: {val_acc:.4f}")

Epoch 01 | Train Loss: 0.3137 | Acc: 0.8947 | Val Loss: 0.8289 | Acc: 0.7955
Epoch 02 | Train Loss: 0.0646 | Acc: 0.9826 | Val Loss: 1.0672 | Acc: 0.7994
Epoch 03 | Train Loss: 0.0310 | Acc: 0.9922 | Val Loss: 1.0234 | Acc: 0.8040
Epoch 04 | Train Loss: 0.0226 | Acc: 0.9942 | Val Loss: 0.9121 | Acc: 0.7994
Epoch 05 | Train Loss: 0.0162 | Acc: 0.9953 | Val Loss: 0.8207 | Acc: 0.8466
Epoch 06 | Train Loss: 0.0124 | Acc: 0.9963 | Val Loss: 1.2622 | Acc: 0.8074
Epoch 07 | Train Loss: 0.0063 | Acc: 0.9990 | Val Loss: 1.1858 | Acc: 0.7869
Epoch 08 | Train Loss: 0.0051 | Acc: 0.9984 | Val Loss: 1.1133 | Acc: 0.8091
Epoch 09 | Train Loss: 0.0023 | Acc: 0.9994 | Val Loss: 1.1484 | Acc: 0.8102
Epoch 10 | Train Loss: 0.0071 | Acc: 0.9981 | Val Loss: 1.7754 | Acc: 0.7824
Epoch 11 | Train Loss: 0.0104 | Acc: 0.9979 | Val Loss: 1.3348 | Acc: 0.8239
Epoch 12 | Train Loss: 0.0088 | Acc: 0.9979 | Val Loss: 1.0663 | Acc: 0.8551
Epoch 13 | Train Loss: 0.0012 | Acc: 0.9999 | Val Loss: 1.0458 | Acc: 0.8222

In [19]:
torch.save(model.state_dict(), "cnn_emotion_classifier.pth")

## Modeling - Emotion Regressor

### Split and Load Datasets

In [23]:
class WESADRegressionDataset(Dataset):
    def __init__(self, X, y_reg):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y_reg, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [24]:
# Include subject ids
subject_ids = np.array(subject_ids_reg)

# Normalize score
y_reg = (y_reg - 1) / 4.0

train_mask = np.isin(subject_ids, train_subjects)
val_mask   = np.isin(subject_ids, val_subjects)

X_train, y_reg_train = X_reg[train_mask], y_reg[train_mask]
X_val,   y_reg_val   = X_reg[val_mask],   y_reg[val_mask]

In [25]:
train_ds = WESADRegressionDataset(X_train, y_reg_train)
val_ds   = WESADRegressionDataset(X_val,   y_reg_val)

train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=64)

### Define Model

In [26]:
class PANASRegressor(nn.Module):
    def __init__(self, input_dim=12, output_dim=7):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(input_dim, 64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1)
        )
        self.regressor = nn.Linear(256, output_dim)

    def forward(self, x):
        x = x.permute(0, 2, 1)     # (B, 12, 60)
        x = self.encoder(x)        # (B, 256, 1)
        x = x.squeeze(-1)          # (B, 256)
        return self.regressor(x)   # (B, 3)

In [31]:
model = PANASRegressor(input_dim=12, output_dim=7).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

### Set Up Training / Evaluation

In [32]:
def train_regression(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for X, y in loader:
        X, y = X.to(device), y.to(device)

        optimizer.zero_grad()
        pred = model(X)
        loss = criterion(pred, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * X.size(0)

    return total_loss / len(loader.dataset)

In [49]:
@torch.no_grad()
def evaluate_regression(model, loader, criterion):
    model.eval()
    total_loss = 0
    all_preds, all_targets, all_inputs = [], [], []

    for X, y in loader:
        X, y = X.to(device), y.to(device)
        pred = model(X)

        loss = criterion(pred, y)
        total_loss += loss.item() * X.size(0)

        all_inputs.append(X.cpu().numpy())
        all_preds.append(pred.cpu().numpy())
        all_targets.append(y.cpu().numpy())

    # Concatenate all batches
    all_inputs = np.concatenate(all_inputs)
    all_preds = np.concatenate(all_preds)
    all_targets = np.concatenate(all_targets)

    # Prepare columns
    x_vecs = [x.flatten() for x in all_inputs]
    pred_vecs = [x.flatten() for x in all_preds]

    # === PANAS emotions ===
    panas_emotions = ['Stressed', 'Angry', 'Happy', 'Sad', 'Inspired', 'Excited', 'Nervous']

    # Extract top moods from predictions
    def extract_top_moods(pred_vec, top_n=3):
        indices = np.argsort(pred_vec)[-top_n:][::-1]
        return [(panas_emotions[i], round(pred_vec[i], 4)) for i in indices]

    top_moods = [extract_top_moods(vec) for vec in pred_vecs]

    # Build final DataFrame
    df = pd.DataFrame({
        "x_vec": x_vecs,
        "pred": pred_vecs,
        "top_moods": top_moods,
        "mood_1": [m[0][0] for m in top_moods],
        "mood_1_score": [m[0][1] for m in top_moods],
        "mood_2": [m[1][0] for m in top_moods],
        "mood_2_score": [m[1][1] for m in top_moods],
        "mood_3": [m[2][0] for m in top_moods],
        "mood_3_score": [m[2][1] for m in top_moods],
    })

    # Metrics
    mse = mean_squared_error(all_targets, all_preds)
    mae = mean_absolute_error(all_targets, all_preds)
    pearsons = [pearsonr(all_preds[:, i], all_targets[:, i])[0] for i in range(all_targets.shape[1])]

    return total_loss / len(loader.dataset), mse, mae, pearsons, df

### Run Training / Evaluation

In [50]:
for epoch in range(1, 26):
    train_loss = train_regression(model, train_loader, optimizer, criterion)
    val_loss, val_mse, val_mae, val_corr, pred_df = evaluate_regression(model, val_loader, criterion)

    print(f"Epoch {epoch:02d} | Train Loss: {train_loss:.4f} | "
          f"Val Loss: {val_loss:.4f} | MSE: {val_mse:.4f} | MAE: {val_mae:.4f} | "
          f"Pearson r: {['%.3f' % r for r in val_corr]}")

Epoch 01 | Train Loss: 0.0000 | Val Loss: 0.0045 | MSE: 0.0045 | MAE: 0.0359 | Pearson r: ['0.766', '0.800', '0.674', '0.838', '0.748', '0.695', '0.823']
Epoch 02 | Train Loss: 0.0001 | Val Loss: 0.0038 | MSE: 0.0038 | MAE: 0.0379 | Pearson r: ['0.787', '0.844', '0.645', '0.856', '0.655', '0.705', '0.829']
Epoch 03 | Train Loss: 0.0002 | Val Loss: 0.0044 | MSE: 0.0044 | MAE: 0.0369 | Pearson r: ['0.637', '0.786', '0.785', '0.714', '0.810', '0.616', '0.730']
Epoch 04 | Train Loss: 0.0000 | Val Loss: 0.0046 | MSE: 0.0046 | MAE: 0.0380 | Pearson r: ['0.748', '0.754', '0.663', '0.795', '0.748', '0.660', '0.795']
Epoch 05 | Train Loss: 0.0000 | Val Loss: 0.0051 | MSE: 0.0051 | MAE: 0.0401 | Pearson r: ['0.680', '0.754', '0.623', '0.761', '0.719', '0.632', '0.754']
Epoch 06 | Train Loss: 0.0000 | Val Loss: 0.0047 | MSE: 0.0047 | MAE: 0.0388 | Pearson r: ['0.722', '0.813', '0.614', '0.795', '0.719', '0.673', '0.789']
Epoch 07 | Train Loss: 0.0000 | Val Loss: 0.0049 | MSE: 0.0049 | MAE: 0.0390

In [51]:
# Pair names with r values
emotion_r = list(zip(TARGET_PANAS, val_corr))

# Sort descending by correlation
emotion_r_sorted = sorted(emotion_r, key=lambda x: x[1], reverse=True)

print("Per-Emotion Pearson Correlation (sorted):")
for name, r in emotion_r_sorted:
    print(f"{name:10s} | r = {r:.3f}")

Per-Emotion Pearson Correlation (sorted):
Angry      | r = 0.843
Sad        | r = 0.833
Nervous    | r = 0.819
Inspired   | r = 0.764
Stressed   | r = 0.753
Excited    | r = 0.718
Happy      | r = 0.659


In [52]:
torch.save(model.state_dict(), "cnn_emotion_regressor.pth")

In [69]:
pred_df.to_csv('Biometric_Preds.csv', index = False)

In [105]:
pred_df.iloc[922].pred

array([-0.764921  , -0.4475805 , -0.82929504,  0.5212836 ,  0.50040215,
       -0.92199504, -0.36001477,  0.44702625, -0.87836605, -0.01312813,
        0.6364317 ,  1.2824786 , -0.7653725 , -0.43644914, -0.83486634,
        0.5212836 ,  0.48305476,  1.3990307 , -0.3486122 ,  0.46409488,
       -0.8671883 , -0.01120681,  0.6335896 ,  1.2833139 , -0.7630118 ,
       -0.4457255 , -0.8287748 ,  0.5566924 ,  0.26409566,  0.8168606 ,
       -0.32449073,  0.46885985, -0.8543507 , -0.01298389,  0.6354041 ,
        1.2824144 , -0.7635935 , -0.4327377 , -0.83064765,  0.5566924 ,
        0.07390724, -0.61315817, -0.3058746 ,  0.4831434 , -0.84214777,
       -0.0113356 ,  0.6341799 ,  1.2834722 , -0.76508826, -0.43644914,
       -0.8332962 ,  0.5566924 , -0.1570733 , -0.21044455, -0.30179775,
        0.4825566 , -0.83459693, -0.01287169,  0.6350271 ,  1.2820122 ,
       -0.76225907, -0.43459415, -0.831026  ,  0.5566924 , -0.36315736,
        1.2195939 , -0.282875  ,  0.49737942, -0.82534254, -0.01

In [None]:
stress_nervous = [0.00653729, -0.18582821, -0.23318774, -0.2487897,  -0.23372887, -0.12444461, 0.00557272]

excited_stressed = [-0.14335369, -0.2734212, -0.21269707, -0.26963243, -0.22057566, -0.07807975, -0.14732485]

excited_inspired_happy = [-0.25548002, -0.29832116, -0.25125208, -0.28806356, -0.24982394,
       -0.24082947, -0.2577177 ]