<a href="https://colab.research.google.com/github/vshukl01/Neurova_Shield/blob/main/1_df_preprocess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ---------------------------------------------
# WESAD Batch Processing Pipeline for All Subjects (with Logging)
# ---------------------------------------------

import os
import zipfile
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import shutil

In [None]:
# ---------------------------------------------
# Set Base Directory Containing All Subject Folders
# ---------------------------------------------

from google.colab import drive
drive.mount('/content/drive')

base_dir = '/content/drive/MyDrive/WESAD'

subject_dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d)) and d.lower().startswith('s')]

Mounted at /content/drive


In [None]:
# ---------------------------------------------
# Function: Process Sensor Data for One Subject
# ---------------------------------------------
def process_sensor_data(subject_folder):
    print(f"[INFO] Processing sensor data for {subject_folder}...")
    subject_path = os.path.join(base_dir, subject_folder)
    zip_path = os.path.join(subject_path, f"{subject_folder}_E4_Data.zip")
    extract_path = os.path.join(subject_path, f"{subject_folder}_E4_unzipped")
    os.makedirs(extract_path, exist_ok=True)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("[INFO] Files extracted.")

    acc  = pd.read_csv(os.path.join(extract_path, 'ACC.csv'), skiprows=2, header=None).values
    bvp  = pd.read_csv(os.path.join(extract_path, 'BVP.csv'), skiprows=2, header=None).values.flatten()
    eda  = pd.read_csv(os.path.join(extract_path, 'EDA.csv'), skiprows=2, header=None).values.flatten()
    temp = pd.read_csv(os.path.join(extract_path, 'TEMP.csv'), skiprows=2, header=None).values.flatten()
    hr   = pd.read_csv(os.path.join(extract_path, 'HR.csv'), skiprows=2, header=None).values.flatten()
    ibi  = pd.read_csv(os.path.join(extract_path, 'IBI.csv'), header=None)
    print("[INFO] Sensor CSVs loaded.")

    acc_1hz  = acc[:len(acc)//32*32].reshape(-1, 32, 3).mean(axis=1)
    bvp_1hz  = bvp[:len(bvp)//64*64].reshape(-1, 64).mean(axis=1)
    eda_1hz  = eda[:len(eda)//4*4].reshape(-1, 4).mean(axis=1)
    temp_1hz = temp[:len(temp)//4*4].reshape(-1, 4).mean(axis=1)
    min_len = min(len(acc_1hz), len(bvp_1hz), len(eda_1hz), len(temp_1hz), len(hr))

    # Step 6: Read start time from ACC.csv (first row)
    acc_path = os.path.join(extract_path, 'ACC.csv')
    with open(acc_path, 'r') as f:
        line = f.readline().strip()
        try:
            unix_start = float(line.split(',')[0])
        except ValueError:
            raise ValueError(f"Expected a float in the first value of {acc_path}, got: {line}")

    start_time = datetime.utcfromtimestamp(unix_start)
    timestamps = [start_time + timedelta(seconds=i) for i in range(min_len)]

    df = pd.DataFrame({
        'timestamp': timestamps,
        'ACC_x': acc_1hz[:min_len, 0],
        'ACC_y': acc_1hz[:min_len, 1],
        'ACC_z': acc_1hz[:min_len, 2],
        'BVP': bvp_1hz[:min_len],
        'EDA': eda_1hz[:min_len],
        'TEMP': temp_1hz[:min_len],
        'HR': hr[:min_len]
    })

    # df.set_index('timestamp', inplace=True)
    # df.reset_index(inplace=True)  # Make timestamp a column instead of index

    # Add Personal Info
    height = weight = gender = None
    readme = os.path.join(subject_path, f"{subject_folder}_readme.txt")
    if os.path.exists(readme):
        with open(readme, 'r') as f:
            for line in f:
                if "Height" in line:
                    height = int(line.split(':')[1].strip())
                elif "Weight" in line:
                    weight = int(line.split(':')[1].strip())
                elif "Gender" in line:
                    gender = line.split(':')[1].strip().lower()
    df['height_cm'] = height
    df['weight_kg'] = weight
    df['gender'] = gender
    df.index = range(1, len(df)+1)
    df[['ACC_x', 'ACC_y', 'ACC_z', 'BVP', 'EDA', 'TEMP', 'HR']] = df[['ACC_x', 'ACC_y', 'ACC_z', 'BVP', 'EDA', 'TEMP', 'HR']].round(3)
    print("[INFO] Sensor DataFrame assembled.")
    print(df.head())
    return df


In [None]:
# ---------------------------------------------
# Function: Process Questionnaire Data for One Subject
# ---------------------------------------------
def process_questionnaire(subject_folder):
    print(f"[INFO] Processing questionnaire for {subject_folder}...")
    PANAS = ['Active', 'Distressed', 'Interested', 'Inspired', 'Annoyed', 'Strong', 'Guilty','Scared', 'Hostile', 'Excited', 'Proud', 'Irritable', 'Enthusiastic', 'Ashamed','Alert', 'Nervous', 'Determined', 'Attentive', 'Jittery', 'Afraid','Stressed', 'Frustrated', 'Happy', 'Sad']
    STAI = ['I feel at ease', 'I feel nervous', 'I am jittery','I am relaxed', 'I am worried', 'I feel pleasant']
    SAM = ['Valence', 'Arousal']
    SSSQ = ['Committed to goals', 'Wanted to succeed', 'Motivated','Reflected about self', 'Worried what others think', 'Concerned about impression']

    lines = open(os.path.join(base_dir, subject_folder, f"{subject_folder}_quest.csv"), 'r').read().splitlines()
    lines = [l.strip() for l in lines if l.strip()]

    def parse_block(tag, length):
        # Modify parse_block to handle non-integer values (like 'Nan') by converting them to np.nan
        parsed_data = []
        for line in lines:
            if line.startswith(tag):
                values = line.split(';')[1:length+1]
                processed_values = []
                for v in values:
                    try:
                        # Attempt to convert to int
                        processed_values.append(int(v))
                    except ValueError:
                        # If conversion fails (e.g., value is 'Nan'), append np.nan
                        processed_values.append(np.nan)
                parsed_data.append(processed_values)
        return parsed_data

    cl = next(l for l in lines if l.startswith('# ORDER'))
    sl = next(l for l in lines if l.startswith('# START'))
    el = next(l for l in lines if l.startswith('# END'))
    conds = cl.split(';')[1:6]
    starts = list(map(float, sl.split(';')[1:6]))
    ends = list(map(float, el.split(';')[1:6]))

    df = pd.DataFrame({'Condition': conds, 'Start_Time_min': starts, 'End_Time_min': ends})
    panas_df = pd.DataFrame(parse_block('# PANAS', len(PANAS)), columns=PANAS)
    stai_df = pd.DataFrame(parse_block('# STAI', len(STAI)), columns=STAI)
    sam_df = pd.DataFrame(parse_block('# DIM', len(SAM)), columns=SAM)
    # Assigning 'Condition' column to the questionnaire dataframes before merging
    panas_df['Condition'] = conds
    stai_df['Condition'] = conds
    sam_df['Condition'] = conds

    df = df.merge(panas_df, on='Condition', how='left').merge(stai_df, on='Condition', how='left').merge(sam_df, on='Condition', how='left') # Added how='left' for robustness

    sssq = parse_block('# SSSQ', len(SSSQ))
    if sssq:
        # Ensure sssq data is handled row-wise correctly
        sssq_data = []
        for i, condition in enumerate(df['Condition']):
            if condition == 'TSST' and i < len(sssq):
                sssq_data.append(sssq[i])
            else:
                sssq_data.append([np.nan]*len(SSSQ)) # Use np.nan for missing SSSQ data

        sssq_full = pd.DataFrame(sssq_data, columns=SSSQ)
        df = pd.concat([df, sssq_full], axis=1)

    print("[INFO] Questionnaire DataFrame assembled.")
    print(df.head())
    return df

In [None]:
# ---------------------------------------------
# Batch Execution: Run Pipeline for All Subjects
# ---------------------------------------------
for subject in subject_dirs:
    print(f"\n[START] Processing {subject}...")
    sensor_df = process_sensor_data(subject)
    quest_df = process_questionnaire(subject)
    save_dir = os.path.join(base_dir, subject, 'df_files')
    os.makedirs(save_dir, exist_ok=True)
    sensor_file = os.path.join(save_dir, f"{subject.lower()}_1hz_df.csv")
    quest_file = os.path.join(save_dir, f"{subject.lower()}_quest_combined_df.csv")
    sensor_df.to_csv(sensor_file, index_label='index')
    quest_df.to_csv(quest_file, index_label='index')
    print(f"[DONE] ✅ Saved all outputs for {subject}\n")



[START] Processing S11...
[INFO] Processing sensor data for S11...
[INFO] Files extracted.
[INFO] Sensor CSVs loaded.
[INFO] Sensor DataFrame assembled.
            timestamp   ACC_x  ACC_y   ACC_z     BVP    EDA    TEMP     HR  \
1 2017-07-25 11:14:19 -26.156  0.000  57.656  15.760  0.876  382.21  82.00   
2 2017-07-25 11:14:20 -26.312  0.031  57.688 -41.960  1.448   30.37  70.00   
3 2017-07-25 11:14:21 -26.344  0.000  57.594  26.305  1.453   30.35  71.33   
4 2017-07-25 11:14:22 -26.062  0.031  57.844  -5.872  1.452   30.35  69.25   
5 2017-07-25 11:14:23 -26.062  0.219  57.750   6.168  1.449   30.37  68.60   

   height_cm  weight_kg  gender  
1        171         54  female  
2        171         54  female  
3        171         54  female  
4        171         54  female  
5        171         54  female  
[INFO] Processing questionnaire for S11...
[INFO] Questionnaire DataFrame assembled.
  Condition  Start_Time_min  End_Time_min  Active  Distressed  Interested  \
0      Base

In [None]:
# ---------------------------------------------
# Combine all df_files into one central folder
# ---------------------------------------------
combined_folder = os.path.join(base_dir, 'df_files')
os.makedirs(combined_folder, exist_ok=True)

for subject in subject_dirs:
    source_dir = os.path.join(base_dir, subject, 'df_files')
    target_dir = os.path.join(combined_folder, f"{subject}_df_files")
    os.makedirs(target_dir, exist_ok=True)
    for file in os.listdir(source_dir):
        src_file = os.path.join(source_dir, file)
        dst_file = os.path.join(target_dir, file)
        if os.path.isfile(src_file):
            shutil.copy2(src_file, dst_file)
print(f"\n✅ All subject df_files copied to: {combined_folder}, organized in folders named")



✅ All subject df_files copied to: /content/drive/MyDrive/WESAD/df_files, organized in folders named
