In [608]:
import pandas as pd
import numpy as np
from constants import *
from preprocessing_utils import *
from hots_utils import *
import neurokit2 as nk
from sklearn.metrics import classification_report, confusion_matrix

In [613]:
def generate_peaks_annotations(row, ann_df):
    peaks = row['r_peaks']
    filename = row['file']
    signal_extrasystoles = ann_df.loc[ann_df['file']==filename]
    annotation = []
    
    n_extrasystoles = signal_extrasystoles.shape[0]
    n_peaks_affected_to_extrasystoles = 0
    extrasystoles_affected_index = []
    
    for peak in peaks:
        peak_annotation_found_in_extrasystoles = False
        for i in range(signal_extrasystoles.shape[0]):
            extrasystole = signal_extrasystoles.iloc[i]
            if (peak >= extrasystole.start) and (peak <= extrasystole.end):
                annotation.append(extrasystole.type)
                
                peak_annotation_found_in_extrasystoles = True
                n_peaks_affected_to_extrasystoles+=1
                extrasystoles_affected_index.append(i)
                
                break
        if peak_annotation_found_in_extrasystoles==False:
            annotation.append('N')
    if n_peaks_affected_to_extrasystoles!=n_extrasystoles:
        print('file {} : there is {} extrasystole(s) not affected to peaks'\
              .format(filename, n_extrasystoles-n_peaks_affected_to_extrasystoles))
        print('Total extrasystoles present in file :{}'.format(n_extrasystoles))
        print('indexes of extrasystoles affected '+str(extrasystoles_affected_index))
        print()
    return annotation


def add_annotation(sig_df, ann_df):
    ann_sig_df = sig_df.copy()
    ann_sig_df['annotation'] = ann_sig_df.apply(lambda x: generate_peaks_annotations(x, ann_df), axis=1)
    return ann_sig_df


def binarize_annotation_list(row):
    ann_list = row['annotation']
    binary_ann_list = [0 if ann=='N' else 1 for ann in ann_list]
    return binary_ann_list

def binarize_annotation(data):
    data_ = data.copy()
    data_['annotation'] = data_.apply(lambda x: binarize_annotation_list(x), axis=1)
    return data_

def compute_rr(i, samples):
    previous_sample = samples[i-1]
    current_sample = samples[i]
    next_sample = samples[i+1]
    rr_1 = current_sample - previous_sample
    rr_2 = next_sample - current_sample
    rr_1_per = rr_1 / (rr_1 + rr_2)
    rr_2_per = rr_2 / (rr_1 + rr_2)
    return rr_1_per, rr_2_per

def determine_annotation(i, ann, normal_beats, extrasystole_beats):
    annotation = -1
    if (ann[i] in extrasystole_beats) or (ann[i+1] in extrasystole_beats):
        annotation = 1
    elif (ann[i] in normal_beats) and (ann[i+1] in normal_beats) and \
            (ann[i-1] in normal_beats):
        annotation = 0
    return annotation


def split_signal(row, normal_beats, extrasystole_beats, fs):
    dataframe = pd.DataFrame()
    record_name = row['file']
    signal = row['signal']
    ann = row['annotation']
    samples = list(row['r_peaks'])
    allowed_beats = normal_beats + extrasystole_beats
    for i in range(1, len(ann)-2):
        anns_seq = [ann[i-1], ann[i], ann[i+1], ann[i+2]]
        if all(e in allowed_beats for e in anns_seq):
            template_signal = extract_template_signal(i, samples, signal, fs)
            rr_1, rr_2 = compute_rr(i, samples)
            annotation = determine_annotation(
                i, ann, normal_beats, extrasystole_beats)
            if annotation in [0, 1]:
                dataframe = dataframe.append({'record_name': record_name,
                                              'template': template_signal,
                                              'rr_1': rr_1,
                                              'rr_2': rr_2,
                                              'label': annotation},
                                             ignore_index=True)
    return dataframe

def generate_templates(data, normal_beats, extrasystole_beats, fs):
    templates_df = pd.DataFrame()
    list_of_df = list(data.apply(
        lambda x: split_signal(x, normal_beats, extrasystole_beats, fs),
        axis=1))
    for df in list_of_df:
        templates_df = pd.concat([templates_df, df], ignore_index=True)
    return templates_df


def scale_template(row):
    scaler = MinMaxScaler(feature_range=(-1, 1))
    signal = np.array(row['template'])
    scaled_signal = scaler.fit_transform(signal.reshape(-1, 1)).flatten()
    return scaled_signal


def minmax_scale(data):
    data_ = data.copy()
    data_['template'] = data_.apply(lambda x: scale_template(x), axis=1)
    return data_

In [610]:
# read annotations from xls file
annotation_df = pd.read_excel('Marc Annotation/annotation/annotations.xlsx')

In [612]:
# detect R peaks
signal_df = pd.DataFrame()
for ecg_file in ecg_files:
    ecg_folder = ecg_file.split('_')[0]
    ecg = list(pd.read_csv('Marc annotation/signaux/'+ecg_folder+'/'+ecg_file).value)
    _, r_peaks = nk.ecg_peaks(pd.Series(ecg, name='ECG'), sampling_rate=CHRONOLIFE_FS)
    r_peaks = r_peaks['ECG_R_Peaks']
    signal_df = signal_df.append({'file': ecg_file, 'signal': ecg, 'r_peaks': r_peaks}, ignore_index=True)
    

In [615]:
# generate signal beat annotation vector based on Marc annotation
annotated_signal_df = add_annotation(signal_df, annotation_df)

In [617]:
# confuse ventricular and atrial extrasystoles
annotated_signal_df = binarize_annotation(annotated_signal_df)

In [620]:
# generate templates and their annotations
templates_df = generate_templates(data=annotated_signal_df, 
                                  normal_beats=[0], 
                                  extrasystole_beats=[1], 
                                  fs=CHRONOLIFE_FS)

In [621]:
# scale templates
templates_df = minmax_scale(templates_df)

In [623]:
#load centers
with open('centers.pkl','rb') as f:
    centers = pickle.load(f)

In [624]:
# generate hots features
#templates_df = generate_hots_test_features(templates_df, centers, **CHR_HOTS_PARAMS)
templates_df = generate_hots_test_features(templates_df.groupby('label').sample(148), centers, **CHR_HOTS_PARAMS)

  context = np.array(context)
  context = np.array(context)


In [625]:
#load classifier
with open('classifier.pkl','rb') as f:
    clf = pickle.load(f)

In [626]:
x_test = templates_df.drop(columns=['record_name', 'template', 'label'], axis=1)
y_test = templates_df['label']
y_pred = clf.predict(x_test)
print(classification_report(y_test, y_pred))
print(confusion_matrix(y_test, y_pred))

              precision    recall  f1-score   support

         0.0       0.92      0.99      0.95       148
         1.0       0.99      0.92      0.95       148

    accuracy                           0.95       296
   macro avg       0.95      0.95      0.95       296
weighted avg       0.95      0.95      0.95       296



array([[146,   2],
       [ 12, 136]], dtype=int64)