In [61]:
import csv
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import warnings
import scipy.stats

import pywt
import biosppy.signals.ecg as ecg
import neurokit2 as nk

from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import scipy.stats as stats

In [62]:
# Data import ---------------------------------------------------------------------------------------------------

y_train_raw = pd.read_csv('data/y_train.csv', index_col='id')
X_train_raw = pd.read_csv('data/X_train.csv', index_col='id')
X_test_raw = pd.read_csv("data/X_test.csv", index_col='id')

In [63]:
def generate_hos_features(ecg_signal):
    
    col_names = ['hos_skew' + str(i) for i in np.arange(5)] + ['hos_kurtosis' + str(i) for i in np.arange(5)]
    
    try:
        ecg_signal = ecg_signal.dropna().to_numpy(dtype='float32')

        # inversion of flipped signals
        ecg_signal, _ = nk.ecg_invert(ecg_signal, sampling_rate=300, force=False, show=False)
        ecg_signal = pd.Series(ecg_signal)

        # extract heartbeats
        r_peaks = ecg.engzee_segmenter(ecg_signal, 300)['rpeaks']                         
        beats = ecg.extract_heartbeats(ecg_signal, r_peaks, 300)['templates']

        n_intervals = 5
        # intervals
        lower = [0, 40, 60, 100, 140]
        upper = [30, 60, 80, 130, 180]


        hos_all_beats = []
        for beat in beats:
            hos_beat = np.empty(10)
            for j in range(n_intervals):
                interval = beat[lower[j]:upper[j]]
                hos_beat[j] = scipy.stats.skew(interval, 0, True) # Skewness 
                hos_beat[j+5] = scipy.stats.kurtosis(interval, 0, False, True) # Kurtosis

            hos_all_beats.append(hos_beat)

        features = pd.DataFrame(hos_all_beats)
        features.columns = col_names
        features = pd.DataFrame(features.mean()).transpose()
    
    except:
        # NaN row
        features = pd.DataFrame(np.nan, index=[0], columns=col_names)

    return features

In [None]:
# loop over the rows ------------------------------------------------------------------------------------------

X_features = []
    
for row_idx in range(X_train_raw.shape[0]): 
    
    # show progress
    if (row_idx % 100) == 0:
        print(round(row_idx/X_train_raw.shape[0] * 100, 1), "% completed")
        
    # compute the features
    features = generate_hos_features(X_train_raw.iloc[row_idx])
    
    # add index
    df_id = pd.DataFrame({"id": [row_idx]})
    X_features.append(pd.concat([df_id, features], axis=1))


X_features = pd.concat(X_features)
X_features.set_index('id', inplace=True) 
X_features.to_csv("data/X_train_HOS.csv")

In [None]:
# loop over the rows ------------------------------------------------------------------------------------------

X_features = []
    
for row_idx in range(X_test_raw.shape[0]): 
    
    # show progress
    if (row_idx % 100) == 0:
        print(round(row_idx/X_test_raw.shape[0] * 100, 1), "% completed")
        
    # compute the features
    features = generate_hos_features(X_test_raw.iloc[row_idx])
    
    # add index
    df_id = pd.DataFrame({"id": [row_idx]})
    X_features.append(pd.concat([df_id, features], axis=1))


X_features = pd.concat(X_features)
X_features.set_index('id', inplace=True) 
X_features.to_csv("data/X_test_HOS.csv")