In [1]:
import configparser
import os
import os.path as osp
import pandas as pd
from collections import defaultdict
import pickle
from typing import List, Dict
from tqdm import tqdm

# Initialize data paths


In [2]:
# Get dataset path
config_path = osp.join(osp.abspath(os.pardir), 'config.ini')
parser = configparser.ConfigParser()
parser.read(config_path)
dcu_nvt_dataset_path = parser['DATA_PATH']['dcu_nvt_dataset_path']
eda_dataset_path = osp.join(dcu_nvt_dataset_path, 'GSR')

# List of user ids
user_ids = os.listdir(eda_dataset_path)

# Objective ground-truth file path
ground_truth_path = osp.join(dcu_nvt_dataset_path, 'Ground-Truth.csv')

# Transform dataset into a predefined format

In [3]:
# Load ground-truth
ground_truth = pd.read_csv(ground_truth_path)
ground_truth.head()

Unnamed: 0,INSTANCE,LABEL,STRESS_LEVEL,ASSUME_LABEL
0,A_Relax_06012020_165451_5Hz.csv,0,0,0
1,A_Relax_06012020_170941_5Hz.csv,0,0,0
2,A_Task01-1_06012020_163149_5Hz.csv,0,1,1
3,A_Task01-2_06012020_163835_5Hz.csv,0,1,1
4,A_Task02_06012020 164432_5Hz.csv,1,3,1


In [4]:
# Map the eda data access via Dict[user_id][task_id]
eda = defaultdict(dict)
for user_id in tqdm(user_ids):
    user_data_path = osp.join(eda_dataset_path, user_id)
    file_names = sorted(os.listdir(user_data_path), key = lambda file_name: file_name.split('_')[-1]) # Sort the files as order by its name index
    for file_name in file_names:
        data_file_path = osp.join(user_data_path, file_name)
        eda_signal = pd.read_csv(data_file_path)['MICROSIEMENS'].values.tolist() # Load eda signal
        task_id = file_name # Task id is also its file name --> This is important to retrieve the ground-truth
        eda[user_id][task_id] = eda_signal

100%|██████████| 11/11 [00:00<00:00, 26.57it/s]


In [5]:
ground_truth = ground_truth[["INSTANCE", "LABEL"]].set_index('INSTANCE') # Set index of the ground-truth file to task_id for retrieval
gt = defaultdict(dict)
for user_id, data in tqdm(eda.items()):
    for task_id, eda_signal in data.items():
        task_ground_truth = ground_truth.loc[task_id].values # Get task ground-truth
        len_eda_signal = len(eda_signal)
        _gt = task_ground_truth.tolist() * len_eda_signal # Duplicate ground-truth to label each eda signal
        _gt = [0 if value < 2 else 1 for value in _gt] # Map levels of stress to binary labels of stress or non-stress. Original labels: 0 - Non-stress, 1 - Mild stress, 2 - Stress, 3 - Very stress
        gt[user_id][task_id] = _gt

100%|██████████| 11/11 [00:00<00:00, 423.10it/s]


In [6]:
# Assert that the ground-truth and the data has no errors
for user_id, data in tqdm(eda.items()):
    for task_id, eda_signal in data.items():
        len_eda_signal = len(eda_signal)
        len_gt = len(gt[user_id][task_id])
        # Assert the length of the ground-truth == the length of eda signal
        if len_eda_signal != len_gt:
            print(user_id, task_id, 'Length not equal')
            print(len_eda_signal, len_gt)
        # Assert if the signal has missing values?
        if any(elem is None for elem in eda_signal):
            print(user_id, task_id, 'Has None value')

100%|██████████| 11/11 [00:00<00:00, 784.97it/s]


In [7]:
# Dump dataset to file
output_file_path = osp.join(dcu_nvt_dataset_path, 'DCU_NVT_EXP1_dataset.pkl')
data = { 'eda': eda, 'ground_truth': gt }
pickle.dump(data, open(output_file_path, 'wb'))