In [1]:
%load_ext autoreload
%autoreload 2

Libraries

In [2]:
import os
import pandas as pd
import numpy as np
import verbio as vb
from verbio import settings
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from collections import defaultdict

Constants

In [3]:
SUBJECT_LABELS = [f'P{p:03d}' for p in range(1, 74, 1)] # Participants 001-073
TRAIN_SESSIONS = ['TEST01','TEST02','TEST03','TEST04']
TEST_SESSIONS = ['TEST05','TEST06','TEST07','TEST08']
DATA_DIR = '/home/jason/workspace/hubbs/project_verbio/data/physio/'
EDA_FILENAME = 'E4_EDA_PPT.xlsx'
BVP_FILENAME = 'E4_BVP_PPT.xlsx'
ANNOTATION_FILENAME = 'MANUAL_ANNOTATION_PPT.xlsx'
WIN_LEN = 30
WIN_STRIDE = 15
ANNOTATION_THRESHOLD = 2.5
EDA_FILTER_SIZE = 8

Helper functions

In [4]:
def get_data(participant, session):
    eda_filepath = os.path.join(DATA_DIR, participant, session, EDA_FILENAME)
    bvp_filepath = os.path.join(DATA_DIR, participant, session, BVP_FILENAME)
    annotation_filepath = os.path.join(DATA_DIR, participant, session, ANNOTATION_FILENAME)
    
    if any(not os.path.exists(x) for x in (eda_filepath, bvp_filepath, annotation_filepath)): return None
    
    vbr = vb.readers.DataReader()
    
    eda_df = vbr.read_excel(eda_filepath)
    bvp_df = vbr.read_excel(bvp_filepath)
    annotation_df = vbr.read_excel(annotation_filepath)
    
    eda_fx = get_eda_fx(eda_df)
    bvp_fx = get_bvp_fx(bvp_df)
    annotation_fx = get_annotation_fx(annotation_df)

    min_len = min(len(annotation_fx), len(eda_fx), len(bvp_fx))
    annotation_fx = annotation_fx[:min_len]
    eda_fx = eda_fx.iloc[:min_len]
    bvp_fx = bvp_fx[:min_len]
    
    return pd.concat([eda_fx, bvp_fx], axis=1)
    
def get_eda_fx(eda_df):
    # Convert EDA signals to numpy
    eda_signal = eda_df['EDA'].to_numpy()
    eda_times = eda_df[vb.settings.time_key].to_numpy()
    
    # Get EDA features
    eda_fx = vb.features.eda_features(
        signal      = eda_signal, 
        times       = eda_times, 
        sr          = vb.settings.e4_eda_sr, 
        win_len     = WIN_LEN, 
        win_stride  = WIN_STRIDE,
        filter_size = EDA_FILTER_SIZE
    )[['SCR_Peaks', 'SCR_Amplitude', 'SCL']]
    
    return eda_fx

def get_bvp_fx(bvp_df):
    # Convert BVP signals to numpy
    bvp_signal = bvp_df['BVP'].to_numpy()
    bvp_times = bvp_df[vb.settings.time_key].to_numpy()
    
    # Get BVP features
    bvp_fx = vb.features.bvp_features(
        signal     = bvp_signal,
        times      = bvp_times,
        sr         = vb.settings.e4_bvp_sr,
        win_len    = WIN_LEN,
        win_stride = WIN_STRIDE
    )[['HR', 'HR_Grad']]
    
    return bvp_fx

def get_annotation_fx(annotation_df):
    # Convert annotation signals to numpy
    annotation_r1 = annotation_df['R1'].to_numpy()
    annotation_r2 = annotation_df['R2'].to_numpy()
    annotation_r4 = annotation_df['R4'].to_numpy()
    annotation_r5 = annotation_df['R5'].to_numpy()
    annotation_times = annotation_df[vb.settings.time_key].to_numpy()
    
    # Combine both annotators
    annotation_mixed = np.vstack([annotation_r1, annotation_r2, annotation_r4, annotation_r5])
    annotation_mean = np.mean(annotation_mixed, axis=0)
   
    # Window annotations
    annotation_fx = vb.preprocessing.window_timed(
        x=annotation_mean,
        times=annotation_times,
        win_len=WIN_LEN,
        win_stride=WIN_STRIDE,
        win_fn=lambda x: vb.preprocessing.binarize(np.mean(x), threshold=ANNOTATION_THRESHOLD)
    )
    annotation_fx = np.array(annotation_fx, dtype='int') 
    
    # Shift annotations back in time
    assert WIN_LEN % WIN_STRIDE < 0.1 # Assert that they're at least somewhat divisible
    shift_len = -int(WIN_LEN//WIN_STRIDE)
    
    return vb.temporal.shift(annotation_fx, shift_len)[:shift_len] # Shift back in time and truncate

Grab raw data from VerBIO dataset for training and testing sessions

In [5]:
test_dict = {}

for p in SUBJECT_LABELS:
    valid = True
    participant_test = []
    
    for s in TEST_SESSIONS:
        session_data = get_data(p, s)
        if session_data is None:
            valid = False
            break
        else:
            participant_test.append(session_data)
            
    if valid:
        print(f'Valid participant {p}')
        test_dict[p] = participant_test

Valid participant P004
Valid participant P005
Valid participant P008
Valid participant P016
Valid participant P020
Valid participant P021
Valid participant P023
Valid participant P032
Valid participant P035
Valid participant P037
Valid participant P039
Valid participant P041
Valid participant P042
Valid participant P044
Valid participant P047
Valid participant P050
Valid participant P051
Valid participant P053
Valid participant P060
Valid participant P061
Valid participant P062
Valid participant P065
Valid participant P071
Valid participant P073


Run experiment loop

In [29]:
ratio = 0.0
n_valid = 0
for target_p in test_dict.keys():
    
    p_data = pd.concat(test_dict[target_p][:3], axis=0)
    
    y_pred = sum((p_data['HR_Grad'].to_numpy() > 0) & (p_data['SCR_Peaks'].to_numpy() >= 9))
    n_valid += 1
    print(y_pred, len(p_data))
    ratio += (y_pred/len(p_data))

print(ratio/n_valid)        

30 54
3 31
6 22
7 47
4 20
9 18
19 45
21 46
0 53
9 37
12 39
4 22
1 36
13 24
11 48
11 31
16 45
14 28
2 33
2 20
19 38
16 36
15 32
2 24
0.2938179208699289
