In [5]:
# pyright: reportAny=false
import scipy.stats as stats
import scipy.signal as signal
from scipy.interpolate import interp1d
from scipy.integrate import trapezoid

import warnings
warnings.filterwarnings('ignore')

from numpy.typing import ArrayLike
type FeatureDict = dict[str, np.ndarray]

In [6]:
############################ STAT FEATURES ##############################

def ecg_stat(array: ArrayLike) -> pd.DataFrame:
    x = np.array(array)
    df = pd.DataFrame(data = [x.max(), x.min(), x.mean(), x.std(),
                              stats.kurtosis(x), stats.skew(x), np.quantile(x,0.5),
                              np.quantile(x, 0.25), np.quantile(x,0.75),
                              np.quantile(x, 0.05), np.quantile(x, 0.95)]).T
    df.columns = ['max_ecg', 'min_ecg', 'mean_ecg', 'sd_ecg', 'ku_ecg', 'sk_ecg', 'median_ecg',
                  'q1_ecg', 'q3_ecg', 'q05_ecg', 'q95_ecg']
    
    return df

In [7]:
############################ HRV FEATURES ##############################    

def ecg_peaks(array: ArrayLike, sampling_rate: float=1000) -> np.ndarray:
    x = np.array(array)
    
    # distance : minimal horizontal distance (>= 1) in samples between neighbouring peaks. By default = None
    # height : required height of peaks. By default = None
    
    distance = sampling_rate / 3
    #height = x.max() / 2
    height = np.quantile(x, 0.99)/2 #use 99th quantile instead of max (to ignore outliers)
    r_peaks, _ = signal.find_peaks(x, distance= distance, height=height)
    
    return r_peaks
    
    

def heart_rate(r_peaks: ArrayLike, sampling_rate: float=1000, upsample_rate: int=4) -> tuple[np.ndarray, np.ndarray]:
    x = np.array(r_peaks)
    rri = np.diff(x)
    rri = 1000 * rri / sampling_rate #convert to ms
    hr = 1000 * 60 / rri
    
    hr_time = np.cumsum(rri) / 1000
    hr_time -= hr_time[0] 
    
    interpolation_f = interp1d(hr_time, hr, kind='cubic')
    
    x = np.arange(1, hr_time.max(), 1/upsample_rate)
    hr_interpolated = interpolation_f(x)
    return hr_interpolated, x
    
    
def ecg_time(array: ArrayLike, sampling_rate: float = 1000) -> pd.DataFrame:
    x = np.array(array)
    r_peaks = ecg_peaks(x, sampling_rate=sampling_rate)
    
    # RR intervals are expressed in number of samples and need to be converted into ms. By default, sampling_rate=1000
    # HR is given by: 60/RR intervals in seconds. 
    
    rri = np.diff(r_peaks) #RR intervals
    rri = 1000 * rri / sampling_rate #Convert to ms
    drri = np.diff(rri) #Difference between successive RR intervals
    hr = 1000*60 / rri #Heart rate
    
    meanHR = hr.mean()
    minHR = hr.min()
    maxHR = hr.max()
    sdHR = hr.std()
    modeHR =  maxHR - minHR
    nNN = rri.shape[0] / (x.shape[0]/sampling_rate/60) #to get the number of NN intervals per minute
    meanNN = rri.mean()
    SDSD = drri.std()
    CVNN = rri.std()
    SDNN = CVNN / meanNN
    pNN50 = np.sum(np.abs(drri)>50) / nNN * 100
    pNN20 = np.sum(np.abs(drri)>20) / nNN * 100
    RMSSD = np.sqrt(np.mean(drri**2))
    medianNN = np.quantile(rri, 0.5)
    q20NN = np.quantile(rri, 0.2)
    q80NN = np.quantile(rri, 0.8)
    minNN = rri.min()
    maxNN = rri.max()
    
    # HRV triagular index (HTI): The density distribution D of the RR intervals is estimated. The most frequent
    # RR interval lenght X is established. Y= D(X) is the maximum of the sample density distribution.
    # The HTI is then obtained as : (the number of total NN intervals)/Y

    bins = np.arange(meanNN - 3*CVNN , meanNN + 3*CVNN, 10)
    d,_ = np.histogram(rri, bins=bins)
    y = d.argmax()
    triHRV = nNN / y
    
    df = pd.DataFrame(data = [meanHR, minHR, maxHR, sdHR, modeHR, nNN, meanNN, 
                              SDSD, CVNN, SDNN, pNN50, pNN20, RMSSD, medianNN,
                              q20NN, q80NN, minNN, maxNN, triHRV]).T
    
    df.columns = ['meanHR', 'minHR', 'maxHR', 'sdHR', 'modeHR', 'nNN','meanNN', 'SDSD', 'CVNN', 
                  'SDNN', 'pNN50', 'pNN20', 'RMSSD', 'medianNN', 'q20NN','q80NN','minNN', 'maxNN', 'triHRV']
    return df

In [8]:
############################ FREQ FEATURES ##############################

def interpolate_Rpeaks(peaks: ArrayLike, sampling_rate: float=1000, upsample_rate: int=4) -> tuple[np.ndarray, np.ndarray]:
    # Input: An array (numpy, dataframe, list) 
    # -> Output : A numpy array 
    
    # The RR intervals are aranged over time, and the values are summed up to find the time points.
    # An interpolation function is defined, to use to sample from with any upsampling resolution. 
    # By default upsample_rate = 4 : 4 evenly spaced data points per seconds are added. 
    
    rr = np.diff(peaks)
    rr = 1000 * rr / sampling_rate # convert to ms
    rr_time = np.cumsum(rr) / 1000 # convert to s
    rr_time -= rr_time[0] # shift time axis so it starts at 0
    
    interpolation_f = interp1d(rr_time, rr, kind='cubic')
    
    x = np.arange(1, rr_time.max(), 1/upsample_rate)
    rr_interpolated = interpolation_f(x)
    
    return rr_interpolated, x
    
    
    
def ecg_freq(
    array: ArrayLike,
    sampling_rate: float=1000,
    upsample_rate: int=4,
    freqband_limits: tuple[float, float, float, float, float, float]=(.0, .0033,.04,.15,.4, .5)) -> pd.DataFrame:
    # FFT needs evenly sampled data, so the RR-interval can't be used directly and need to
    # be interpolated. Then the spectral density of the signal is computed using Welch method.
    
    x = np.array(array)
    r_peaks = ecg_peaks(x, sampling_rate=sampling_rate)
    rri, _ = interpolate_Rpeaks(r_peaks, upsample_rate=upsample_rate)
    freq, power = signal.welch(x=rri, fs=upsample_rate)
    print(freq)
    print(power)

    pd.set_option("display.max_columns", 10)
    pd.set_option("display.max_colwidth", None)
    hrv_freq = nk.hrv_frequency(r_peaks, sampling_rate=sampling_rate, normalize=False, interpolation_rate=upsample_rate)
    rlf = hrv_freq['HRV_LF'].iloc[0] / (hrv_freq['HRV_LF'].iloc[0] + hrv_freq['HRV_HF'].iloc[0]) * 100
    rhf = hrv_freq['HRV_HF'].iloc[0] / (hrv_freq['HRV_LF'].iloc[0] + hrv_freq['HRV_HF'].iloc[0]) * 100
    print("HRVFreq Total: {0:.4f}, ulf: {1:.2f}, vlf: {2:.2f}, lf: {3:.2f}, hf: {4:.2f}, vhf: {5:.2f}, lfhf: {6:.2f}, rlf: {7:.2f}, rhf: {8:.2f}".format(
        hrv_freq['HRV_TP'].iloc[0], hrv_freq['HRV_ULF'].iloc[0], hrv_freq['HRV_VLF'].iloc[0], hrv_freq['HRV_LF'].iloc[0],
        hrv_freq['HRV_HF'].iloc[0], hrv_freq['HRV_VHF'].iloc[0], hrv_freq['HRV_LFHF'].iloc[0], rlf, rhf)
    )
    
    lim_ulf= (freq >= freqband_limits[0]) & (freq < freqband_limits[1])
    lim_vlf = (freq >= freqband_limits[1]) & (freq < freqband_limits[2])
    lim_lf = (freq >= freqband_limits[2]) & (freq < freqband_limits[3])
    lim_hf = (freq >= freqband_limits[3]) & (freq < freqband_limits[4])
    lim_vhf = (freq >= freqband_limits[4]) & (freq < freqband_limits[5])
    print(f"ulf: freq {freq[lim_ulf]} power {power[lim_ulf]}")
    print(f"vlf: freq {freq[lim_vlf]} power {power[lim_vlf]}")
    print(f"lf: freq {freq[lim_lf]} power {power[lim_lf]}")
    print(f"hf: freq {freq[lim_hf]} power {power[lim_hf]}")
    print(f"vhf: freq {freq[lim_vhf]} power {power[lim_vhf]}")
    
    # The power (PSD) of each frequency band is obtained by integrating the spectral density 
    # by trapezoidal rule, using the scipy.integrate.trapz function.
    
    ulf = trapezoid(power[lim_ulf], freq[lim_ulf])
    vlf = trapezoid(power[lim_vlf], freq[lim_vlf])
    lf = trapezoid(power[lim_lf], freq[lim_lf])
    hf = trapezoid(power[lim_hf], freq[lim_hf])
    vhf = trapezoid(power[lim_vhf], freq[lim_vhf])
    totalpower = ulf + vlf + lf + hf + vhf
    lfhf = lf / hf
    rlf = lf / (lf + hf) * 100
    rhf = hf / (lf + hf) * 100

    print(f"Manual Total: {totalpower:.4f}, ulf: {ulf:.2f}, vlf: {vlf:.2f}, lf: {lf:.2f}, hf: {hf:.2f}, vhf: {vhf:.2f}, lfhf: {lfhf}, rlf: {rlf:.2f}, rhf: {rhf:.2f}")

    peaklf = freq[lim_lf][np.argmax(power[lim_lf])]
    peakhf = freq[lim_hf][np.argmax(power[lim_hf])]
    
    df = pd.DataFrame(data = [totalpower, lf, hf, ulf, vlf, vhf, lfhf,
                             rlf, rhf, peaklf, peakhf]).T
    
    df.columns = ['totalpower', 'LF', 'HF', 'ULF', 'VLF', 'VHF', 'LF/HF',
                 'rLF', 'rHF', 'peakLF', 'peakHF']

    return df

In [9]:
############################ NONLIN FEATURES ##############################  


def apEntropy(array: ArrayLike, m: int=2, r: float | None=None) -> float:
    # Input: An array (numpy, dataframe, list) 
    # -> Output : A Float
    
    # m : a positive integer representing the length of each compared run of data (a window).
    # By default m = 2
    # r : a positive real number specifying a filtering level. By default r = 0.2 * sd.
    
    x = np.array(array)
    N = len(x)
    r = 0.2 * x.std() if r == None else r == r
    
    # A sequence of vectors z(1),..., z(N-m+1) is formed from a time series of N equally 
    # spaced raw data values x(1),…,x(N), such that z(i) = x(1),...,x(i+m-1).
    
    # For each i in {1,..., N-m+1}, C = [number of z(j) such that d(x(i),x(j)) < r]/[N-m+1]
    # is computed, with d(z(i),z(j)) = max|x(i)-x(j)|
    
    # phi_m(r) = (N-m+1)^-1 x sum log(Ci) is computed, and the ap Entropy is given by: 
    # phi_m(r) - phi_m+1(r)

    def _maxdist(zi, zj):
        return max([abs(xi - xj) for xi, xj in zip(zi, zj)])
    
    def _phi(m):
        z = [[x[j] for j in range(i, i + m - 1 + 1)] for i in range(N - m + 1)]
        C = [len([1 for zj in z if _maxdist(zi, zj) <= r]) / (N - m + 1.0)
        for zi in z]
        return (N - m + 1.0) ** (-1) * sum(np.log(C))
    
    apEn = abs(_phi(m + 1) - _phi(m))
    return apEn
    
    
def sampEntropy(array: ArrayLike, m: int=2, r: float | None=None) -> float:
    # Input: An array (numpy, dataframe, list) 
    # -> Output : A Float
    
    # m: embedding dimension
    # r: tolerance distance to consider two data points as similar. By default r = 0.2 * sd.
    
    # SampEn is the negative logarithm of the probability that if two sets of simultaneous 
    # data points of length m have distance < r then two sets of simultaneous data points of 
    # length m + 1 also have distance < r. 
    
    x = np.array(array)
    N = len(x)
    r = 0.2 * x.std() if r == None else r == r
    
    # All templates vector of length m are defined. Distances d(xmi, xmj) are computed
    # and all matches such that d < r are saved. Same for the distances d(xm+1i, xm+1j).
    
    xmi = np.array([x[i : i + m] for i in range(N - m)])
    xmj = np.array([x[i : i + m] for i in range(N - m + 1)])
    B = np.sum([np.sum(np.abs(xmii - xmj).max(axis=1) <= r) - 1 for xmii in xmi])
    m += 1
    xm = np.array([x[i : i + m] for i in range(N - m + 1)])
    A = np.sum([np.sum(np.abs(xmi - xm).max(axis=1) <= r) - 1 for xmi in xm])
    
    return -np.log(A / B)
    
    
def ecg_nonlinear(array: ArrayLike, sampling_rate: float=1000, m: int=2, r: float | None=None) -> pd.DataFrame:
    # The Poincaré ellipse plot is diagram in which each RR intervals are plotted as a function 
    # of the previous RR interval value. SD1 is the standard deviation spread orthogonally 
    # to the identity line (y=x) and is the ellipse width. SD2 is the standard deviation spread
    # along the identity line and specifies the length of the ellipse.
    
    x = np.array(array)
    r_peaks = ecg_peaks(x, sampling_rate=sampling_rate)
    rri = np.diff(r_peaks)
    rr1 = rri[:-1]
    rr2 = rri[1:]
    SD1 =  np.std(rr2 - rr1) / np.sqrt(2)
    SD2 =  np.std(rr2 + rr1) / np.sqrt(2)
    SD1SD2 = SD1/SD2
    apEn = apEntropy(r_peaks, m=2)
    sampEn = sampEntropy(r_peaks, m=2)
    
    df = pd.DataFrame(data = [SD1, SD2, SD1SD2, apEn, sampEn]).T
    
    df.columns = ['SD1', 'SD2', 'SD1SD2', 'apEn', 'sampEn']
    return df

In [10]:
############################ ECG API ##############################

def ecg_stat_features(dictionary: FeatureDict) -> pd.DataFrame:
    data = dictionary.copy()
    df_stat = ecg_stat( list(data.values())[0] )
    names = [list(data.keys())[0]]
    
    items = iter(data.items())
    _ = next(items)
    
    for k,v in items:
        df_stat = pd.concat([df_stat, ecg_stat(v)], axis=0) 
        names.append(k)
    
    return df_stat.set_axis(names)

def ecg_time_features(dictionary: FeatureDict, sampling_freq: float =500) -> pd.DataFrame:
    data = dictionary.copy()
    df_time = ecg_time(list(data.values())[0] , sampling_freq)
    names = [list(data.keys())[0]]
    
    items = iter(data.items())
    _ = next(items)
    
    for k,v in items:
        df_time = pd.concat([df_time, ecg_time(v, sampling_freq)], axis=0)
        names.append(k)
        
    return df_time.set_axis(names)


def ecg_freq_features(dictionary: FeatureDict, sampling_freq: float =500) -> pd.DataFrame:
    data = dictionary.copy()
    print(f"analyzing {list(data.keys())[0]}")
    df_freq = ecg_freq(list(data.values())[0], sampling_freq)
    names = [list(data.keys())[0]]
    items = iter(data.items())
    _ = next(items)
    
    for k,v in items:
        print(f"analyzing {k}, length: {len(v)}")
        df_freq = pd.concat([df_freq, ecg_freq(v, sampling_freq)], axis=0)
        names.append(k)
        
    return df_freq.set_axis(names)

def ecg_nonlinear_features(dictionary: FeatureDict, sampling_freq: float =500) -> pd.DataFrame:
    data = dictionary.copy()
    df_nonlinear = ecg_nonlinear( list(data.values())[0], sampling_freq)
    names = [list(data.keys())[0]]
    
    items = iter(data.items())
    _ = next(items)
    
    for k,v in items:
        df_nonlinear = pd.concat([df_nonlinear, ecg_nonlinear(v, sampling_freq)], axis=0)
        names.append(k)
        
    return df_nonlinear.set_axis(names)


def get_ecg_features(dictionary: FeatureDict, sampling_freq: float=500) -> pd.DataFrame:
    df_stat = ecg_stat_features(dictionary)
    df_time = ecg_time_features(dictionary, sampling_freq)
    df_freq = ecg_freq_features(dictionary, sampling_freq)
    df_nonlinear = ecg_nonlinear_features(dictionary, sampling_freq)
    
    df = pd.concat([df_time, df_stat, df_freq, df_nonlinear], axis=1)
    #df.set_axis(names, inplace=True)
    return df

In [11]:
import numpy as np
import pandas as pd
import glob

import neurokit2 as nk

from eda_features import get_eda_features

EXPECTED_NUM_FILES = 21
dataset_path = "../../../experiment-data"

type FeatureDict = dict[str, np.ndarray]

In [12]:
filelist = glob.glob(f"{dataset_path}/*.Annotated.csv")
if len(filelist) != EXPECTED_NUM_FILES:
    raise ValueError(f"Expected {EXPECTED_NUM_FILES} files, found: {len(filelist)}")

data_ecg: FeatureDict = dict()
data_eda: FeatureDict = dict()

filelist.sort()

In [13]:
def split_by_label(df: pd.DataFrame) -> list[tuple[str, pd.DataFrame]]:
    output: list[tuple[str, pd.DataFrame]] = []
    labels = ["Baseline", "AmusementClip", "StressClip", "EmoReset", "FormL", "FormM", "Debriefing"]
    start_idx = end_idx = 0
    for label in labels:
        if label == "FormL":
            if "FormLRead" in df["Event"].values:
                start_idx = df.index.get_loc(df[df["Event"] == "FormLRead"].index[0])
                end_idx = df.index.get_loc(df[df["Event"] == "L15"].index[-1])
            else:
                start_idx = df.index.get_loc(df[df["Event"] == "FormL"].index[0])
                end_idx = df.index.get_loc(df[df["Event"] == "FormL"].index[-1])
        elif label == "FormM":
            if "FormMRead" in df["Event"].values:
                start_idx = df.index.get_loc(df[df["Event"] == "FormMRead"].index[0])
                end_idx = df.index.get_loc(df[df["Event"] == "M15"].index[-1])
            else:
                start_idx = df.index.get_loc(df[df["Event"] == "FormM"].index[0])
                end_idx = df.index.get_loc(df[df["Event"] == "FormM"].index[-1])
        else:
            start_idx = df.index.get_loc(df[df["Event"] == label].index[0])
            end_idx = df.index.get_loc(df[df["Event"] == label].index[-1])
        output.append((label, df[start_idx:end_idx]))
    return output

for item in filelist:
    file: pd.DataFrame = pd.read_csv(
        item,
        delimiter=";",
        parse_dates=["Datetime", "Timestamp"],
        index_col=["Datetime",],
        dtype={
            "Timestamp": float,
            "Event": str,
            "ExtraEvent": str,
            "AccelLN_X": float,
            "AccelLN_Y": float,
            "AccelLN_Z": float,
            "Battery": float,
            "GSR_Range": int,
            "Skin_Conductance": float,
            "Skin_Resistance": float,
            "Gyro_X": float,
            "Gyro_Y": float,
            "Gyro_Z": float,
            "PPG": float,
            "Pressure": float,
            "Temperature": float,
            "AccelLN_X_Uncal": int,
            "AccelLN_Y_Uncal": int,
            "AccelLN_Z_Uncal": int,
            "Skin_Conductance_Uncal": int,
            "PPG_Uncal": int,}
        )
    filename = item.split("/")[-1]
    participant_id = filename.split("-")[1]

    for label, event_df in split_by_label(file):
        sample_name = f"{participant_id}-{label}"
        file_ecg = np.array(event_df['PPG_Uncal'])
        file_eda = np.array(event_df['Skin_Conductance_Uncal'])

        data_ecg[sample_name] = (file_ecg - file_ecg.mean())/file_ecg.std()
        data_eda[sample_name] = (file_eda - file_eda.mean())/file_eda.std()

In [14]:
####### CLEAN USING NK
ecg_clean = data_ecg.copy()
eda_clean = data_eda.copy()

for ecg,eda in zip(data_ecg.items(), data_eda.items()):
    #ecg_clean[ecg[0]] = nk.ecg_clean(ecg[1], sampling_rate=51.2, method="neurokit")
    ecg_clean[ecg[0]] = ecg[1].copy()
    eda_clean[eda[0]] = nk.eda_clean(eda[1], sampling_rate=51.2, method='neurokit')


    
######## EXTRACT FEATURES BY MODALITY    
df_eda_features = get_eda_features(eda_clean, 51.2)
print('EDA: {0:2d} trials and {1:2d} features'.format(df_eda_features.shape[0], df_eda_features.shape[1]))

df_ecg_features = get_ecg_features(ecg_clean, 51.2)
print('ECG : {0:2d} trials and {1:2d} features'.format(df_ecg_features.shape[0], df_ecg_features.shape[1]))

EDA: 147 trials and 24 features
analyzing 01-Baseline
[0.         0.08888889 0.17777778 0.26666667 0.35555556 0.44444444
 0.53333333 0.62222222 0.71111111 0.8        0.88888889 0.97777778
 1.06666667 1.15555556 1.24444444 1.33333333 1.42222222 1.51111111
 1.6        1.68888889 1.77777778 1.86666667 1.95555556]
[  9552.25884825 175675.95000592 198696.08233784 169619.18070705
  89398.86506345 118371.52157842  26586.20339689 115248.82544164
 110034.33854843  90531.31796941 104408.69384876  68280.83122011
  73086.63139454  83690.90481564  81547.14216108  57692.56472063
  68782.33977607  35664.30224444  47443.12332468  65130.0152752
  14794.6622236     854.07581677   3978.08847063]
HRVFreq Total: 50567656.3827, ulf: nan, vlf: 22138796.01, lf: 27881761.20, hf: 536343.86, vhf: 10755.31, lfhf: 51.98, rlf: 98.11, rhf: 1.89
ulf: freq [0.] power [9552.25884825]
vlf: freq [] power []
lf: freq [0.08888889] power [175675.95000592]
hf: freq [0.17777778 0.26666667 0.35555556] power [198696.08233784 16

ValueError: attempt to get argmax of an empty sequence

In [None]:
######## MERGE
df_features = pd.concat([df_ecg_features, df_eda_features], axis=1)


####### EXPORT
df_eda_features.to_csv(f'{dataset_path}/extracted-features/eda_features.csv', sep=";", index=True)
df_ecg_features.to_csv(f'{dataset_path}/extracted-features/ecg_features.csv', sep=";", index=True)
df_features.to_csv(f'{dataset_path}/extracted-features/all_features.csv', sep=";", index=True)