In [13]:
import os
import pandas as pd
import numpy as np

from scipy.signal import butter, sosfiltfilt, iirnotch, filtfilt


## **Load Data**

**Load Raw Data Files from Each Participant**

In [14]:
# path to folder that holds participant data
data_path = "C:/Users/nicho/Desktop/Nick-Weiss-CSC-Thesis-2526/data"

# get all the folders inside data directory that hold individual trials
participant_folders = [
    entry for entry in os.listdir(data_path) 
    if os.path.isdir(os.path.join(data_path, entry))
]

# keep list of all dataframes
dfs = []

# folder name is the participant id
# ****MIGHT CHANGE THE FORMAT OF THE FOLDER LATER --> COME BACK TO THIS****
for participant_id in participant_folders:
    # get the path to the finalized data file
    csv_path = os.path.join(
        data_path, 
        participant_id, 
        "finalized_data.csv"
    )
    
    # print error if csv not found
    if not os.path.exists(csv_path):
        print(f"Skipping Participant {participant_id}: finalized_data.csv not found in {os.path.join(data_path, participant_id)}" )
    else:
        # read csv into dataframe
        participant_df = pd.read_csv(csv_path)

        # add participant id column and append to list
        participant_df['participant_id'] = participant_id
        dfs.append(participant_df)


**Clean Dataframes to ensure proper labels**

In [15]:
def clean_participant_df(df: pd.DataFrame) -> pd.DataFrame:
    # drop rows that are entirely empty
    df = df.dropna(how="all").copy()

    # check that df includes required columns
    time_col = "Time (s)"
    label_col = "Primitive"
    required_cols = [time_col, label_col]
    for col in required_cols:
        if col not in df.columns:
            raise ValueError(f"{df['participant_id'].iloc[0]}: missing required column '{col}'")

    # make sure time column is numeric
    df[time_col] = pd.to_numeric(df[time_col], errors="coerce")

    # remove rows where Time (s) is NaN, orders rows by increasing time, reset df indices
    df = df.dropna(subset=[time_col]).sort_values(time_col)

    # drop duplicated time rows (keep first occurrance)
    df = df.drop_duplicates(subset=[time_col], keep="first")

    # force labels to be text and strip whitespace
    df[label_col] = df[label_col].astype("string").str.strip()

    # only keep rows that have a valid label attached
    df = df[df[label_col].notna() & (df[label_col] != "")]

    return df.reset_index(drop=True)


**Estimating Sampling Rate**

In [16]:
def estimate_fs_from_time(df: pd.DataFrame) -> float:
    time_col = "Time (s)"
    t = pd.to_numeric(df[time_col], errors="coerce").to_numpy(dtype=float)
    t = t[~np.isnan(t)]
    dt = np.diff(t)
    dt = dt[dt > 0]
    if dt.size == 0:
        raise ValueError("Cannot estimate fs: no positive time differences.")
    fs = 1.0 / np.median(dt)
    return float(fs)

**Filtering Helper Functions**

In [17]:
# handle missing values --> important so filters dont explode when they hit NaN
def col_to_float_with_small_gap_fill(series: pd.Series) -> np.ndarray:
    x = pd.to_numeric(series, errors="coerce").astype(float)
    x = x.interpolate(method="linear", limit_direction="both")
    return x.to_numpy()

# high-pass filter to remove drift
def highpass_filter(x: np.ndarray, fs: float, cutoff_hz: float = 20.0, order: int = 2) -> np.ndarray:
    nyq = 0.5 * fs
    sos = butter(order, cutoff_hz / nyq, btype="highpass", output="sos")
    # sosfiltfilt applies the filter foward and backward, giving zero phase shift (envelope timing must match video labels)
    return sosfiltfilt(sos, x)

# notch filter 60Hz and 120Hz power line interference
def notch_filter(x: np.ndarray, fs: float, freq_hz: float, q: float = 35.0) -> np.ndarray:
    nyq = 0.5 * fs
    if freq_hz >= nyq:
        return x  # can't notch above Nyquist
    w0 = freq_hz / nyq
    b, a = iirnotch(w0, q)
    return filtfilt(b, a, x)

# band pass filter (keep EMG band)
def bandpass_filter(x: np.ndarray, fs: float, low_hz: float = 20.0, high_hz: float = 450.0, order: int = 4) -> np.ndarray:
    nyq = 0.5 * fs
    high_hz = min(high_hz, 0.99 * nyq)
    sos = butter(order, [low_hz / nyq, high_hz / nyq], btype="bandpass", output="sos")
    return sosfiltfilt(sos, x)

# rectify
def rectify(x: np.ndarray) -> np.ndarray:
    return np.abs(x)

# low-pass filter
def lowpass_filter(x: np.ndarray, fs: float, cutoff_hz: float = 5.0, order: int = 4) -> np.ndarray:
    nyq = 0.5 * fs
    sos = butter(order, cutoff_hz / nyq, btype="lowpass", output="sos")
    return sosfiltfilt(sos, x)

# get emg envelope (low-pass and rectified)
def emg_envelope(x_bandpassed: np.ndarray, fs: float, envelope_lp_hz: float = 5.0) -> np.ndarray:
    x_rect = rectify(x_bandpassed)
    env = lowpass_filter(x_rect, fs=fs, cutoff_hz=envelope_lp_hz, order=4)
    return env

**EMG Feature Engineering**

In [18]:
def add_emg_features(
    df: pd.DataFrame,
    emg_columns: list[str],
    fs: float,
    hp_hz: float = 20.0,
    notch_freqs: tuple[float, ...] = (60.0, 120.0),
    bp_low_hz: float = 20.0,
    bp_high_hz: float = 450.0,
    env_lp_hz: float = 5.0,
) -> pd.DataFrame:
    out = df.copy()
    

    # want to apply feature engineering to all emg columns
    for col in emg_columns:
        if col  in out.columns:      
            # interpolate missing values
            x = col_to_float_with_small_gap_fill(out[col])

            # apply high pass filter
            x = highpass_filter(x, fs=fs, cutoff_hz=hp_hz, order=2)

            # apply notch filters
            for f0 in notch_freqs:
                x = notch_filter(x, fs=fs, freq_hz=f0, q=35.0)

            # apply bandpass filter
            x_bp = bandpass_filter(x, fs=fs, low_hz=bp_low_hz, high_hz=bp_high_hz, order=4)

            # envelope emg
            env = emg_envelope(x_bp, fs=fs, envelope_lp_hz=env_lp_hz)

            out[col] = x_bp
            out[f"{col}_ENV"] = env

    return out

**Accelerometer Feature Engineering**

In [19]:
def add_accel_features(
    df: pd.DataFrame,
    sensor_ids: list[str],
    fs: float,
    accel_lowpass_hz: float = 20.0,
    dyn_highpass_hz: float = 0.3,
) -> pd.DataFrame:
    out = df.copy()

    for sid in sensor_ids:
        ax = f"{sid}_AccelX"
        ay = f"{sid}_AccelY"
        az = f"{sid}_AccelZ"

        # skip if missing
        if not all(c in out.columns for c in [ax, ay, az]):
            continue

        # filter each axis (lowpass), then compute dynamic component (highpass)
        for col in [ax, ay, az]:
            x = col_to_float_with_small_gap_fill(out[col])

            x_lp = lowpass_filter(x, fs=fs, cutoff_hz=accel_lowpass_hz, order=4)
            x_dyn = highpass_filter(x_lp, fs=fs, cutoff_hz=dyn_highpass_hz, order=2)

            out[col] = x_lp
            out[f"{col}_DYN"] = x_dyn

        # magnitudes (use low-passed signals)
        out[f"{sid}_AccelMag"] = np.sqrt(out[ax]**2 + out[ay]**2 + out[az]**2)

        # optional: magnitude of dynamic component
        out[f"{sid}_AccelMag_DYN"] = np.sqrt(
            out[f"{ax}_DYN"]**2 + out[f"{ay}_DYN"]**2 + out[f"{az}_DYN"]**2
        )

    return out

**Gyroscope Feature Engineering**

In [20]:
def add_gyro_features(
    df: pd.DataFrame,
    sensor_ids: list[str],
    fs: float,
    gyro_lowpass_hz: float = 20.0,
) -> pd.DataFrame:
    out = df.copy()

    for sid in sensor_ids:
        gx = f"{sid}_GyroX"
        gy = f"{sid}_GyroY"
        gz = f"{sid}_GyroZ"

        # skip if missing
        if not all(c in out.columns for c in [gx, gy, gz]):
            continue

        # low-pass each axis
        for col in [gx, gy, gz]:
            x = col_to_float_with_small_gap_fill(out[col])
            x_lp = lowpass_filter(x, fs=fs, cutoff_hz=gyro_lowpass_hz, order=4)
            out[col] = x_lp

        # magnitude (use low-passed axes)
        out[f"{sid}_GyroMag"] = np.sqrt(out[gx]**2 + out[gy]**2 + out[gz]**2)

    return out

**Magnetometer Feature Engineering**

In [21]:
def add_mag_features(
    df: pd.DataFrame,
    sensor_ids: list[str],
    fs: float,
    mag_lowpass_hz: float = 10.0,
) -> pd.DataFrame:
    out = df.copy()

    for sid in sensor_ids:
        mx = f"{sid}_MagX"
        my = f"{sid}_MagY"
        mz = f"{sid}_MagZ"

        # skip if missing
        if not all(c in out.columns for c in [mx, my, mz]):
            continue

        # low-pass each axis
        for col in [mx, my, mz]:
            x = col_to_float_with_small_gap_fill(out[col])
            x_lp = lowpass_filter(x, fs=fs, cutoff_hz=mag_lowpass_hz, order=4)
            out[col] = x_lp

        # magnitude (use low-passed axes)
        out[f"{sid}_MagnetMag"] = np.sqrt(out[mx]**2 + out[my]**2 + out[mz]**2)

    return out

In [None]:
sensor_ids = ["A5F2", "A19E"]


emg_cols = []
for id in sensor_ids:
    emg_cols.append(f"{id}_EMG1")
    emg_cols.append(f"{id}_EMG2")


for i in range(len(dfs)):
    df = dfs[i]
    df = clean_participant_df(df)
    # get sampling rate
    fs = estimate_fs_from_time(df)
    df = add_emg_features(df, emg_cols, fs)
    df = add_accel_features(df, sensor_ids, fs)
    df = add_gyro_features(df, sensor_ids, fs)
    df = add_mag_features(df, sensor_ids, fs)
    dfs[i] = df

full_df = pd.concat(dfs, ignore_index=True)

**Separate Features and Labels and Split Data**

In [None]:
# separate features, labels, and groups
label_col = "Primitive"
group_col = "participant_id"

feature_cols = [
    c for c in full_df.columns
    if c not in [label_col, group_col, "Time (s)", "Event_Marker"]
]


# independence is defined at the participant level
# all samples from a participant must stay together in either train or test
# this prevents: identity leakage, movement-style memorization, sensor placement bias



Index(['A5F2_AccelX', 'A5F2_AccelY', 'A5F2_AccelZ', 'A5F2_EMG1', 'A5F2_EMG2',
       'A5F2_GyroX', 'A5F2_GyroY', 'A5F2_GyroZ', 'A5F2_MagX', 'A5F2_MagY',
       'A5F2_MagZ', 'Event_Marker', 'A19E_AccelX', 'A19E_AccelY',
       'A19E_AccelZ', 'A19E_EMG1', 'A19E_EMG2', 'A19E_GyroX', 'A19E_GyroY',
       'A19E_GyroZ', 'A19E_MagX', 'A19E_MagY', 'A19E_MagZ', 'Time (s)',
       'Primitive', 'A5F2_EMG1_ENV', 'A5F2_EMG2_ENV', 'A19E_EMG1_ENV',
       'A19E_EMG2_ENV', 'A5F2_AccelX_DYN', 'A5F2_AccelY_DYN',
       'A5F2_AccelZ_DYN', 'A19E_AccelX_DYN', 'A19E_AccelY_DYN',
       'A19E_AccelZ_DYN', 'A5F2_AccelMag', 'A5F2_GyroMag', 'A5F2_MagnetMag',
       'A19E_AccelMag', 'A19E_GyroMag', 'A19E_MagnetMag', 'A5F2_AccelX_Z',
       'A5F2_AccelY_Z', 'A5F2_AccelZ_Z', 'A19E_AccelX_Z', 'A19E_AccelY_Z',
       'A19E_AccelZ_Z', 'A5F2_GyroX_Z', 'A5F2_GyroY_Z', 'A5F2_GyroZ_Z',
       'A19E_GyroX_Z', 'A19E_GyroY_Z', 'A19E_GyroZ_Z', 'A5F2_MagX_Z',
       'A5F2_MagY_Z', 'A5F2_MagZ_Z', 'A19E_MagX_Z', 'A19E_MagY_Z',
