# Extract R-peaks from ECG files

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
from pathlib import Path

import pandas as pd
from tqdm.auto import tqdm

from speech_study.process_ecg import process_ecg

In [3]:
# configure user
user = "jonas"  # set this to mitchel

if user.lower() == "jonas":
    BASE_PATH = Path("/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/")
elif user.lower() == "mitchel":
    BASE_PATH = Path("D:/Data/EEG_Study_1/")
DATA_PATH = BASE_PATH.joinpath("aligned_data")

eeg_feat_stat_dir = Path(DATA_PATH).joinpath("EEG1_study_feat_stats")

For more information about the ECG processing, look at [this python script](../speech_study/process_ecg.py).

## Process the `edf_aligned` files

In [4]:
# First, process the aligned EDF files
for pqt in tqdm(list(BASE_PATH.rglob("*/edf_aligned/ecg*.parquet"))):
    print(pqt)

    # Read & process the ECG file
    df_parquet = (
        pd.read_parquet(pqt)
        .set_index("timestamp", drop=True)
        .rename(columns={"ECG_Raw": "ECG"})
    )
    df_rr = process_ecg(df_parquet["ECG"])

    # save the file in feat stat dir
    eeg_feat_stat_dir_user = eeg_feat_stat_dir.joinpath(pqt.parent.parent.name)
    if not eeg_feat_stat_dir_user.exists():
        os.mkdir(eeg_feat_stat_dir_user)
    df_rr.reset_index().to_parquet(
        eeg_feat_stat_dir_user.joinpath(
            f"rr_intervals_{'_'.join(pqt.name.split('_')[-3:])}"
        ),
        engine="fastparquet",
    )

    # save the file in the same folder as the ECG_file resides within
    df_rr.reset_index().to_parquet(
        pqt.parent.joinpath(f"rr_intervals_{'_'.join(pqt.name.split('_')[-3:])}"),
        engine="fastparquet",
    )

del df_parquet, pqt, eeg_feat_stat_dir_user, df_rr

  0%|          | 0/59 [00:00<?, ?it/s]

/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/63/edf_aligned/ecg_2020_03_11.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/17/edf_aligned/ecg_2020_02_10.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/54/edf_aligned/ecg_2020_03_04.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/69/edf_aligned/ecg_2020_07_07.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/34/edf_aligned/ecg_2020_02_20.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/68/edf_aligned/ecg_2020_07_06.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/53/edf_aligned/ecg_2020_03_04.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/22/edf_aligned/ecg_2020_02_12.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/23/edf_aligned/ecg_2020_02_13.parquet
/users/jonvdrdo/jonas/data/a

## Process the non-aligned EDF_files

In [5]:
# First create list of the aligned parquet users
aligned_pqt_users = []
for pqt in BASE_PATH.rglob("*/edf_aligned/ecg*.parquet"):
    aligned_pqt_users.append(pqt.parent.parent.name)


for pqt in tqdm(list(BASE_PATH.rglob("*/edf/ecg*.parquet"))):
    # As aligned data takes precedence over non-aligned data, We will only process
    # non-aligned `edf`-files when we do not have an aligned equivalent
    if pqt.parent.parent.name not in aligned_pqt_users:
        print(pqt)

        df_parquet = (
            pd.read_parquet(pqt)
            .set_index("timestamp", drop=True)
            .rename(columns={"ECG_Raw": "ECG"})
        )
        df_rr = process_ecg(df_parquet["ECG"])

        # save the file in feat_stat_dir
        eeg_feat_stat_dir_user = eeg_feat_stat_dir.joinpath(pqt.parent.parent.name)
        if not eeg_feat_stat_dir_user.exists():
            os.mkdir(eeg_feat_stat_dir_user)
        df_rr.reset_index().to_parquet(
            eeg_feat_stat_dir_user.joinpath(
                f"rr_intervals_{'_'.join(pqt.name.split('_')[-3:])}"
            ),
            engine="fastparquet",
        )

        # save the file in the same folder as the ECG_file resides within
        df_rr.reset_index().to_parquet(
            pqt.parent.joinpath(f"rr_intervals_{'_'.join(pqt.name.split('_')[-3:])}"),
            engine="fastparquet",
        )


  0%|          | 0/75 [00:00<?, ?it/s]

/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/4/edf/ecg_2020_01_27.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/13/edf/ecg_2020_02_04.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/20/edf/ecg_2020_02_12.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/11/edf/ecg_2020_02_03.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/37/edf/ecg_2020_02_21.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/8/edf/ecg_2020_01_31.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/9/edf/ecg_2020_01_31.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/15/edf/ecg_2020_02_06.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/14/edf/ecg_2020_02_05.parquet
/users/jonvdrdo/jonas/data/aaa_contextaware/raw/uz_study/aligned_data/10/edf/ecg_2020_02_03.parquet
/us

In [6]:
df_rr

Unnamed: 0_level_0,r_peak_agreement,RR_interval_ms,HRV_ms
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2020-02-04 10:05:25.041000+01:00,0.1,,
2020-02-04 10:05:25.843000+01:00,1.0,,
2020-02-04 10:05:26.695000+01:00,1.0,852.0,
2020-02-04 10:05:27.582000+01:00,1.0,887.0,35.0
2020-02-04 10:05:28.435000+01:00,1.0,853.0,-34.0
...,...,...,...
2020-02-04 11:27:41.591000+01:00,1.0,765.0,50.0
2020-02-04 11:27:42.318000+01:00,1.0,727.0,-38.0
2020-02-04 11:27:43.029000+01:00,1.0,711.0,-16.0
2020-02-04 11:27:43.751000+01:00,1.0,722.0,11.0


In [11]:
import warnings
from typing import Dict

import neurokit2 as nk
import numpy as np
import pandas as pd
from context_aware.interfaces.empatica import EMPATICA_CONFIG
from context_aware.time_series_tools.processing.gsr.clean_gsr import clean_eda
# -------------------------------- TIME SERIES --------------------------------
from tsflex.chunking import chunk_data
from tsflex.processing import SeriesPipeline, SeriesProcessor, dataframe_func
from tsflex.processing.utils import process_chunks_multithreaded


# ------------------------------ GSR PROCESSING ---------------------------------
def clean_empatica_eda(
    gsr_series: pd.Series,
    valid_window: int = 3,
    max_interpolate: int = 30,
    min_valid_len: int = 5 * 60,
    eda_sqi_smoothen_win_size_s: float = 4,
) -> pd.DataFrame:
    fs = EMPATICA_CONFIG.SENSOR_FREQUENCIES["gsr"]

    out = clean_eda(
        gsr_series.to_frame(),
        "EDA",
        fs=fs,
        valid_window=valid_window,
        max_interpolate=max_interpolate,
        min_valid_len=min_valid_len,
    )

    out["EDA_SQI_smoothened"] = (
        out["EDA_SQI"]
        .rolling(
            int(fs * eda_sqi_smoothen_win_size_s + 1),  # center=True
        )
        .sum()
        / (fs * eda_sqi_smoothen_win_size_s + 1)
    )
    # if SQI is 0 -> retain zero
    out["EDA_SQI_smoothened"] *= out["EDA_SQI"].map(int)
    return out


def decompose_eda_empatica(df_cleaned: pd.Series, method="cvxEDA") -> pd.DataFrame:
    df_cleaned_ = df_cleaned.dropna()
    if len(df_cleaned_):
        sanitized_eda: pd.Series = nk.signal_sanitize(df_cleaned_)
        return nk.eda_phasic(
            sanitized_eda,
            sampling_rate=EMPATICA_CONFIG.SENSOR_FREQUENCIES["gsr"],
            method=method,
        ).set_index(df_cleaned_.index)
    else:
        print(
            "\tcleaned shape: ",
            df_cleaned.shape,
            "\tcleaned dropna shape: ",
            df_cleaned_.shape,
            flush=True,
        )
        warnings.warn(
            f"no not-Nan data `decompose_eda_empatica` will return an "
            + "empty dataframe"
        )
        return pd.DataFrame(columns=["EDA_Phasic", "EDA_Tonic"])
    # TODO -> add slope features


def find_peaks_empatica(df_phasic: pd.Series, method="neurokit") -> pd.DataFrame:
    # note -> GAMBOA2008 it's peaks seem to be exactly the same as neurokit!
    df_phasic = df_phasic.dropna()
    if not len(df_phasic):
        warnings.warn("find_peaks_empatica -> `df_phasic` has no not-nan values!")
        return pd.DataFrame(
            columns=[
                "SCR_RiseTime",
                f"SCR_Peaks_{method}",
                "SCR_RecoveryTime",
                "SCR_Amplitude",
            ]
        )

    peak_signal, _ = nk.eda_peaks(
        df_phasic,
        sampling_rate=EMPATICA_CONFIG.SENSOR_FREQUENCIES["gsr"],
        amplitude_min=0,  # TODO -> play more with this parameter
        method=method,
    )
    return peak_signal.set_index(df_phasic.index).rename(
        columns={"SCR_Peaks": f"SCR_Peaks_{method}"}
    )


@dataframe_func
def remove_false_positives_acc(
    df,
    min_rise_time_s=1,
    min_recovery_time_s=1,
    max_rise_time_s=15,
    max_recovery_time_s=7,
    min_scr_amplitude=0.02,
    scr_amplitude_ratio=0.03,
    eda_sqi_smoothened_threshold=0.6,
    max_acc_std=45,
) -> pd.DataFrame:
    assert min_rise_time_s < max_rise_time_s
    assert min_recovery_time_s < max_recovery_time_s
    acc_cols = ["ACC_x", "ACC_y", "ACC_z"]
    scr_cols = [
        "SCR_RiseTime",
        "SCR_Peaks_neurokit",  # not that loosely coupled
        "SCR_RecoveryTime",
        "SCR_Amplitude",
        # additional EDA metadata
        "EDA_Tonic",
        "EDA_SQI_smoothened",
    ]

    df_scr = df[scr_cols].dropna(how="all").copy()
    df_scr = df_scr.fillna(0)
    if not len(df_scr):
        warnings.warn("remove_false_positives_acc -> `df_scr` has no not-nan values!")
        return pd.DataFrame(columns=scr_cols).add_suffix("_reduced_acc")

    df_acc = df[acc_cols].dropna()
    # calculate the std
    win_duration_s = 1
    win_size = win_duration_s * EMPATICA_CONFIG.SENSOR_FREQUENCIES["acc"]
    win_size += 1 if win_size % 2 == 0 else 0  # make sure if win_size is odd
    fs_gsr = 4
    df_acc_std = (
        pd.concat(
            [df_acc[col].rolling(win_size, center=True).std() for col in acc_cols],
            axis=1,
            keys=acc_cols,
        )
        .max(axis=1)
        .rename("ACC_std")
        .resample(f"{int(1000 // fs_gsr)}ms")
        .mean()
        .rolling(2 * fs_gsr + 1, center=True)
        .max()
        .dropna()
    )

    df_scr = pd.merge_asof(
        df_scr, df_acc_std, left_index=True, right_index=True, direction="nearest"
    )

    df_scr.loc[df_scr["SCR_RiseTime"] < min_rise_time_s, scr_cols] = 0
    df_scr.loc[df_scr["SCR_RiseTime"] > max_rise_time_s, scr_cols] = 0

    # df_scr.loc[df_scr["SCR_RecoveryTime"] < min_recovery_time_s, scr_cols] = 0
    df_scr.loc[df_scr["SCR_RecoveryTime"] > max_recovery_time_s, scr_cols] = 0

    # additional acc & amplitude related processing (compared to vic's processing)
    # 1. relative SCR_Amplitude processing
    df_scr.loc[
        df_scr["SCR_Amplitude"]
        < np.clip(
            scr_amplitude_ratio * df_scr["EDA_Tonic"],
            a_min=min_scr_amplitude,
            a_max=None,
        ),
        scr_cols,
    ] = 0

    # 2. Remove peaks detected near low EDA sqi data
    df_scr.loc[
        df_scr["EDA_SQI_smoothened"] < eda_sqi_smoothened_threshold, scr_cols
    ] = 0

    df_scr.loc[df_scr["ACC_std"] > max_acc_std, scr_cols] = 0
    # todo -> do not add _reduce_acc suffix to the 'add_std_col'
    return pd.concat(
        [
            df_scr.drop(
                columns=["EDA_Tonic", "ACC_std", "EDA_SQI_smoothened"]
            ).add_suffix("_reduced_acc"),
            df_scr["ACC_std"],
        ],
        axis=1,
        ignore_index=False,
    )


# -------------------------- The processing pipelines
gsr_processing_pipeline = SeriesPipeline(
    processors=[
        SeriesProcessor(series_names=["EDA"], function=clean_empatica_eda),
        SeriesProcessor(
            series_names=["EDA"],
            function=decompose_eda_empatica,
        ),
        SeriesProcessor(
            series_names=["EDA_Phasic"],
            function=find_peaks_empatica,
            # additional kwargs
            method="neurokit",
        ),
        SeriesProcessor(
            series_names=tuple(
                [
                    "SCR_RiseTime",
                    "SCR_Peaks_neurokit",
                    "SCR_RecoveryTime",
                    "SCR_Amplitude",
                    "EDA_Tonic",
                    "EDA_SQI_smoothened",
                    "ACC_x",
                    "ACC_y",
                    "ACC_z",
                ]
            ),
            function=remove_false_positives_acc,
        ),
    ]
)


# ------------- PIPELINES WRAPPERS
def process_gsr_pipeline(
    data_dict: Dict[str, pd.DataFrame], multiprocessing=True, show_progress=True
) -> pd.DataFrame:
    if multiprocessing:
        df_out_gsr = pd.concat(
            process_chunks_multithreaded(
                same_range_chunks_list=chunk_data(
                    # only process a sub-chunk of the data dict
                    data=[data_dict[k] for k in ["gsr", "acc"]],
                    fs_dict=EMPATICA_CONFIG.SIGNAL_FREQUENCIES,
                    chunk_range_margin_s=10,
                    min_chunk_dur_s=60 * 5,
                    max_chunk_dur_s=60 * 60,
                    sub_chunk_overlap_s=60 * 5,
                    verbose=False,
                    copy=False,
                ),
                series_pipeline=gsr_processing_pipeline,
                n_jobs=16,
                show_progress=show_progress,
                drop_keys=["ACC_x", "ACC_y", "ACC_z"],
                return_all_series=True,
                return_df=True,
            )
        )
    else:
        df_out_gsr = gsr_processing_pipeline.process(
            data=list(data_dict.values()),
            return_all_series=True,
            return_df=True,
            drop_keys=["ACC_x", "ACC_y", "ACC_z"],
        )

    df_out_gsr = (
        df_out_gsr.rename(columns={"EDA": "EDA_Processed"})
        .dropna(axis=0, subset=["EDA_SQI_smoothened"], how="all")  # must be all, right?
        .reset_index()
        .drop_duplicates(subset="timestamp", keep="first")
        .set_index("timestamp", drop=True)
        .sort_index()
        # TODO -> fix `Na` investigation in `clean_eda`
        .fillna(method="backfill")
    )

    # note -> with real-not continuous data we will need to work on a gap-like basis ...
    # assert pd.infer_freq(df_out_gsr.index) is not None
    return df_out_gsr

In [12]:
for pqt in tqdm(list(aligned_data_dir.rglob("*/empatica/acc*.parquet"))):
    print(pqt)
    df_acc = pd.read_parquet(pqt).set_index("timestamp", drop=True)
    df_gsr = pd.read_parquet(
        pqt.parent.joinpath(f"gsr_{'_'.join(pqt.name.split('_')[-3:])}")
    ).set_index("timestamp", drop=True)

    df_out_gsr = process_gsr_pipeline(
        data_dict={"acc": df_acc, "gsr": df_gsr}, multiprocessing=False
    )
    eeg_feat_stat_dir_user = eeg_feat_stat_dir.joinpath(pqt.parent.parent.name)
    if not eeg_feat_stat_dir_user.exists():
        os.mkdir(eeg_feat_stat_dir_user)
    print(
        eeg_feat_stat_dir_user.joinpath(
            f"gsr_processed_{'_'.join(pqt.name.split('_')[-3:])}"
        )
    )
    df_out_gsr.reset_index().to_parquet(
        eeg_feat_stat_dir_user.joinpath(
            f"gsr_processed_{'_'.join(pqt.name.split('_')[-3:])}"
        ),
        engine="fastparquet",
    )

  0%|          | 0/65 [00:00<?, ?it/s]

../../data/raw/uz_study/aligned_data/45/empatica/acc_2020_02_27.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/45/gsr_processed_2020_02_27.parquet
../../data/raw/uz_study/aligned_data/63/empatica/acc_2020_03_11.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/63/gsr_processed_2020_03_11.parquet
../../data/raw/uz_study/aligned_data/17/empatica/acc_2020_02_10.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/17/gsr_processed_2020_02_10.parquet
../../data/raw/uz_study/aligned_data/54/empatica/acc_2020_03_04.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/54/gsr_processed_2020_03_04.parquet
../../data/raw/uz_study/aligned_data/69/empatica/acc_2020_07_07.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/69/gsr_processed_2020_07_07.parquet
../../data/raw/uz_study/aligned_data/34/empatica/acc_2020_02_20.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/34/gsr_processed_2020_02_20.parque



../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/20/gsr_processed_2020_02_12.parquet
../../data/raw/uz_study/aligned_data/42/empatica/acc_2020_02_26.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/42/gsr_processed_2020_02_26.parquet
../../data/raw/uz_study/aligned_data/48/empatica/acc_2020_03_02.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/48/gsr_processed_2020_03_02.parquet
../../data/raw/uz_study/aligned_data/74/empatica/acc_2020_07_16.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/74/gsr_processed_2020_07_16.parquet
../../data/raw/uz_study/aligned_data/37/empatica/acc_2020_02_21.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/37/gsr_processed_2020_02_21.parquet
../../data/raw/uz_study/aligned_data/66/empatica/acc_2020_07_02.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/66/gsr_processed_2020_07_02.parquet
../../data/raw/uz_study/aligned_data/55/empatica/acc_2020_03_06.parque



../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/31/gsr_processed_2020_02_19.parquet
../../data/raw/uz_study/aligned_data/71/empatica/acc_2020_07_13.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/71/gsr_processed_2020_07_13.parquet
../../data/raw/uz_study/aligned_data/33/empatica/acc_2020_02_20.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/33/gsr_processed_2020_02_20.parquet
../../data/raw/uz_study/aligned_data/35/empatica/acc_2020_02_20.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/35/gsr_processed_2020_02_20.parquet
../../data/raw/uz_study/aligned_data/77/empatica/acc_2020_08_20.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/77/gsr_processed_2020_08_20.parquet
../../data/raw/uz_study/aligned_data/56/empatica/acc_2020_03_09.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/56/gsr_processed_2020_03_09.parquet
../../data/raw/uz_study/aligned_data/39/empatica/acc_2020_02_24.parque



../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/30/gsr_processed_2020_02_18.parquet
../../data/raw/uz_study/aligned_data/43/empatica/acc_2020_02_26.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/43/gsr_processed_2020_02_26.parquet
../../data/raw/uz_study/aligned_data/80/empatica/acc_2020_09_16.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/80/gsr_processed_2020_09_16.parquet
../../data/raw/uz_study/aligned_data/21/empatica/acc_2020_02_12.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/21/gsr_processed_2020_02_12.parquet
../../data/raw/uz_study/aligned_data/76/empatica/acc_2020_07_28.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/76/gsr_processed_2020_07_28.parquet
../../data/raw/uz_study/aligned_data/82/empatica/acc_2020_09_17.parquet
../../data/raw/uz_study/aligned_data/EEG1_study_feat_stats/82/gsr_processed_2020_09_17.parquet


In [13]:
for pqt in tqdm(list(eeg_feat_stat_dir.rglob("*/predictions.parquet"))):
    print(pqt)
    os.remove(pqt)

0it [00:00, ?it/s]