# Impact of varying the number of leads in ECGs classification using deep learning models

## 0. Data description

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import wfdb
import numpy as np
import ecg_plot

### 0.1 Data visualization using WFDB package

In [None]:
record = wfdb.rdrecord("Training_Data/WFDB_ChapmanShaoxing/JS00001")
print(record.__dict__)

### 0.2 Sizes and samples for all datasets

In [None]:
directory_CSPC = "D:/ULB/MA2/Master thesis/Training_Data/WFDB_CPSC2018"
directory_CSPC2 = "D:/ULB/MA2/Master thesis/Training_Data/WFDB_CPSC2018_2"
directory_PTB = "D:/ULB/MA2/Master thesis/Training_Data/WFDB_PTB"
directory_PTBXL = "D:/ULB/MA2/Master thesis/Training_Data/WFDB_PTBXL"
directory_StPet = "D:/ULB/MA2/Master thesis/Training_Data/WFDB_StPetersburg"
dir_list = [directory_CSPC,directory_CSPC2,directory_PTB,directory_PTBXL,directory_StPet]


for i in range(len(dir_list)) :
    data_X, data_Y = load_challenge_data(dir_list[i])
    all_diag, data_Y_bin = binarizing(data_Y)
    var_name = dir_list[i].split("/")
    print(var_name[len(var_name)-1]+": ")
    print(len(data_X),len(data_X[0]))
    print(len(all_diag))
    print(np.shape(np.array(min(data_X, key=lambda x: np.shape(np.array(x))[1])))[1])
    print(np.shape(np.array(max(data_X, key=lambda x: np.shape(np.array(x))[1])))[1])

### 0.3 Distribution of the diagnoses in PTB and PTB-XL datasets

In [None]:
directory = "D:/ULB/MA2/Master thesis/Training_Data/WFDB_PTBXL"
max_numb_samples = 20000
train_X, train_Y, header_data = load_challenge_data(directory, max_numb_samples)
train_Y_plot = []
for lst in train_Y:
    for element in lst :
        train_Y_plot.append(element)
train_Y_occ = []
for i in range(len(all_diag)):
    train_Y_occ.append(np.count_nonzero(np.array(train_Y_plot) == all_diag[i]))

plt.rcParams["figure.figsize"] = (10,5.5)
plt.bar(all_diag,train_Y_occ)
plt.xticks(list(range(len(all_diag))), all_diag, rotation='vertical')
plt.xlabel("Diagnosis in dataset")
plt.ylabel("Number of occurences")
plt.show()

all_samp = []
for lst in train_X :
    all_samp.append(str(np.shape(np.array(lst))[1]))
all_samp_uni = np.unique(np.array(all_samp))
train_Y_samp = []
for element in all_samp_uni :
    train_Y_samp.append(np.count_nonzero(np.array(all_samp) == element))
plt.bar(list(all_samp_uni),train_Y_samp)
plt.xticks(list(range(len(all_samp_uni))), list(all_samp_uni), rotation='vertical')
plt.xlabel("Samples sizes")
plt.ylabel("Number of occurences")
plt.show()

### 0.4 ECG plots

In [None]:
ecg_plot.plot(train_X[0][:,0:3000]/1000,sample_rate=500,title='')

## 1. Loading data

In [None]:
from scipy.io import loadmat
import wfdb
import pandas as pd
import os
import numpy as np
from sklearn.model_selection import train_test_split
import copy

In [None]:
def load_challenge_data(directory,max_numb_samples):
    data_X = []
    data_Y = []
    header_data_lst = []
    i = 0
    for file in os.listdir(directory):
        if file.endswith(".mat"):
            i += 1
            if i > max_numb_samples :
                break
            x = loadmat(directory+"/"+file)
            data = np.asarray(x['val'], dtype=np.float64)
            new_file = file.replace('.mat','.hea')
            input_header_file = os.path.join(directory+"/"+new_file)
            with open(input_header_file,'r') as f:
                header_data=f.readlines()
            for line in header_data :
                if "#Dx: " in line :
                    data_Y_part = line.split("Dx: ")[1].split("\n")[0].split(",")
            header_data_lst.append(header_data)
            data_X.append(data)
            data_Y.append(data_Y_part)
    return data_X, data_Y, header_data_lst

def binarizing(dataset) :
    all_diag = []
    for i in range(len(dataset)):
        all_diag = list(set(all_diag + dataset[i]))
    bin_data = np.zeros((len(dataset),len(all_diag)))
    for i in range(len(dataset)) :
        for j in range(len(dataset[i])) :
            bin_data[i,np.where(np.array(all_diag) == str(dataset[i][j]))[0][0]] = 1
    return all_diag, np.asarray(bin_data, dtype = np.int8)

def keeping_bin_class(dataset, all_diag, spec_value):
    ind = np.where(np.array(all_diag) == spec_value)[0]
    return dataset[:,ind]

In [None]:
def load_challenge_data_2(dir_list,max_numb_samples,k):
    data_X = []
    data_Y = []
    if (k+1)*max_numb_samples >= len(dir_list) :
        maxi = len(dir_list)
    else :
        maxi = (k+1)*max_numb_samples
    for i in range(k*max_numb_samples,maxi):
        file = dir_list[i]
        x = loadmat(directory+"/"+file)
        data = np.asarray(x['val'], dtype=np.float64)
        new_file = file.replace('.mat','.hea')
        input_header_file = os.path.join(directory+"/"+new_file)
        with open(input_header_file,'r') as f:
            header_data=f.readlines()
        for line in header_data :
            if "#Dx: " in line :
                data_Y_part = line.split("Dx: ")[1].split("\n")[0].split(",")
        data_X.append(data)
        data_Y.append(data_Y_part)
    return data_X, data_Y

def binarizing(dataset) :
    all_diag = []
    for i in range(len(dataset)):
        all_diag = list(set(all_diag + dataset[i]))
    bin_data = np.zeros((len(dataset),len(all_diag)))
    for i in range(len(dataset)) :
        for j in range(len(dataset[i])) :
            bin_data[i,np.where(np.array(all_diag) == str(dataset[i][j]))[0][0]] = 1
    return all_diag, np.asarray(bin_data, dtype = np.int8)

def keeping_bin_class(dataset, all_diag, spec_value):
    ind = np.where(np.array(all_diag) == spec_value)[0]
    return dataset[:,ind]

## 2. Data preprocessing

In [None]:
from scipy.io import loadmat
import wfdb
import pandas as pd
import os
import numpy as np
from sklearn.model_selection import train_test_split
import copy
from math import ceil
from sklearn.utils.class_weight import compute_class_weight
from scipy.signal import kaiserord, firwin, butter, iirnotch, filtfilt, freqz, resample
from matplotlib import pyplot as plt
from copy import deepcopy

### 2.1 Data normalization

In [None]:
def data_normalization(dataset):
    max_value = np.amax(dataset)
    min_value = np.amin(dataset)
    return (dataset*2-max_value-min_value)/(max_value-min_value)

### 2.2 Data equalization

In [None]:
def data_equalizing(dataset_X,dataset_Y, eq_numb):
    all_diag = np.unique(dataset_Y,axis=0)
    new_dataset_Y = []
    new_dataset_X = []
    for i in range(len(all_diag)):
        j = 0
        for k in range(len(dataset_Y)):
            if np.array_equal(dataset_Y[k],all_diag[i]):
                new_dataset_Y.append(dataset_Y[k])
                new_dataset_X.append(dataset_X[k])
                j += 1
            if j >= min(np.count_nonzero(dataset_Y==all_diag[i]),eq_numb):
                break
    return np.array(new_dataset_X), np.array(new_dataset_Y)

### 2.3 Weights computation

In [None]:
def calculating_class_weights(dataset):
    number_dim = np.shape(dataset)[1]
    weights = np.empty([number_dim, 2])
    for i in range(number_dim):
        weights[i] = compute_class_weight(class_weight='balanced', classes=[0,1], y=dataset[:, i])
    keys = np.arange(0,len(weights),1)
    weight_dictionary = dict(zip(keys, weights.T[1]))
    return weight_dictionary

### 2.4 Data filtering

In [None]:
def data_filtering(dataset, sample_freq, cutoff_high, cutoff_low, powerline, type_bandpass):
    nyq_freq = sample_freq/2
    b, a = iirnotch(powerline, 20, fs=sample_freq)
    dataset = filtfilt(b, a, dataset)
    if type_bandpass == "bw" :
        d, c = butter(3, [cutoff_low/nyq_freq, cutoff_high/nyq_freq], 'band')
        dataset = filtfilt(d, c, dataset, method='gust')
    elif type_bandpass == "fir" :
        FIR_filter = firwin(40, [cutoff_low/nyq_freq, cutoff_high/nyq_freq], pass_zero=False)
        dataset_filt = []
        for i in range(np.shape(dataset)[0]):
            dataset_filt_part = []
            for j in range(np.shape(dataset)[1]):
                dataset_filt_part.append(np.convolve(FIR_filter, dataset[i][j], mode='valid'))
            dataset_filt.append(deepcopy(dataset_filt_part))
        dataset = np.array(dataset_filt)
    return dataset, b, a

### 2.5 Diagnoses choice

In [None]:
def diseases_choice(diseases_list, all_diag, dataset_Y, dataset_X):
    new_dataset_Y = []
    new_dataset_X = []
    new_new_dataset_Y = []
    for element in diseases_list :
        ind = all_diag.index(element)
        new_dataset_Y.append(dataset_Y[:,ind])
    new_dataset_Y = np.array(new_dataset_Y).T
    for j in range(len(diseases_list)) :
        for i in range(len(new_dataset_Y)) :
            if new_dataset_Y[i][j] == 1 :
                new_new_dataset_Y.append(new_dataset_Y[i])
                new_dataset_X.append(dataset_X[i])
    return np.array(new_dataset_X),np.array(new_new_dataset_Y)

### 2.6 Data augmentation (by cropping and downsampling)

In [None]:
def crop_augmentation(dataset_X, dataset_Y, inter_value):
    dataset_X_new = []
    dataset_Y_new = []
    for i in range(len(dataset_X)):
        init_value = 0
        data_size = np.shape(np.array(dataset_X[i]))[1]
        for j in range(ceil(data_size/inter_value)):
            if data_size <= init_value + inter_value :
                dataset_X_new.append(np.array(dataset_X[i])[:,data_size-inter_value:data_size])
            else :
                dataset_X_new.append(np.array(dataset_X[i])[:,init_value:init_value + inter_value])
            dataset_Y_new.append(dataset_Y[i])
            init_value += inter_value
    return np.array(dataset_X_new), np.array(dataset_Y_new)

In [None]:
def downsampling(data_X, data_Y):
    new_data_X = []
    new_data_Y = []
    for i in range(len(data_X)):
        new_data_X_part1 = []
        new_data_X_part2 = []
        for j in range(len(data_X[i])):
            new_data_X_part1.append(np.array(resample(data_X[i][j], int(len(data_X[i][j])/2))))
            new_data_X_part2.append(np.array(resample(data_X[i][j][1:len(data_X[i][j])], int(len(data_X[i][j])/2))))
        new_data_X.append(np.array(deepcopy(new_data_X_part1)))
        new_data_X.append(np.array(deepcopy(new_data_X_part2)))
        new_data_Y.append(data_Y[i])
        new_data_Y.append(data_Y[i])
    return np.array(new_data_X), np.array(new_data_Y)

### 2.7 Number of leads choice

In [None]:
def choosing_numb_leads(data_X,numb_leads):
    new_data_X = []
    if numb_leads == 8 :
        for i in range(len(data_X)):
            new_data_X.append(np.concatenate((data_X[i][0:2],data_X[i][6:12])))
    elif numb_leads == 2 :
        for i in range(len(data_X)):
            new_data_X.append(data_X[i][0:2])
    elif numb_leads == 12 :
        new_data_X = data_X
    else :
        print("Not a good number of leads, possibilities: 2, 8 or 12.")
    return np.array(new_data_X)

### 2.8 Transposition of the ECGs matrices

In [None]:
def transposition(dataset):
    new_dataset = []
    for i in range(len(dataset)):
        new_dataset.append(dataset[i].T)
    print(np.shape(new_dataset))
    return np.array(new_dataset)

### 2.9 Removing data both healthy and sick

In [None]:
def removing_11(data_X,data_Y):
    remov = np.array([1,1])
    ind_list = []
    for i in range(len(data_Y)) :
        if np.array_equal(data_Y[i],remov) :
            ind_list.append(i)
    for i in range(len(ind_list)-1,-1,-1):
        data_Y = np.delete(data_Y,ind_list[i],0)
        data_X = np.delete(data_X,ind_list[i],0)
    return data_X, data_Y

### 2.10 Dataset summarized preprocessing and saving

In [None]:
def preprocessing_bin(directory, max_numb_samples, eq_numb, diseases_list):
    dir_list = os.listdir(directory)
    dir_list = [element for element in dir_list if element.endswith(".mat")] 
    dir_list_len = len(dir_list)
    all_diag_fin = []
    for i in range(ceil(dir_list_len/max_numb_samples)):
        print("Batch number {0} out of {1} in preparation".format(i+1,ceil(dir_list_len/max_numb_samples)))
        dataset_X, dataset_Y = load_challenge_data_2(dir_list, max_numb_samples, i)
        all_diag, dataset_Y = binarizing(dataset_Y)
        all_diag_fin = list(set(all_diag_fin + deepcopy(all_diag)))
        dataset_X, b, a = data_filtering(dataset_X, 500, 60, 10, 50, "bw")
        dataset_X = data_normalization(dataset_X)
        if len(diseases_list) != 0:
            dataset_X, dataset_Y = diseases_choice(diseases_list, all_diag, dataset_Y, dataset_X)
        dataset_X, dataset_Y = data_equalizing(dataset_X,dataset_Y,eq_numb)
        weights = calculating_class_weights(dataset_Y)
        dataset_X_reshaped = dataset_X.reshape(dataset_X.shape[0], -1)
        np.savetxt("DATA PROCESSED/X/test_X_"+str(i)+".txt", dataset_X_reshaped)
        np.savetxt("DATA PROCESSED/Y/test_Y_"+str(i)+".txt", dataset_Y)
    print("Number of diagnoses : "+str(len(np.unique(all_diag_fin))))

In [None]:
def preprocessing_multi(directory, max_numb_samples, eq_numb, diseases_list,all_diag):
    dir_list = os.listdir(directory)
    dir_list = [element for element in dir_list if element.endswith(".mat")] 
    dir_list_len = len(dir_list)
    for i in range(ceil(dir_list_len/max_numb_samples)):
        print("Batch number {0} out of {1} in preparation".format(i+1,ceil(dir_list_len/max_numb_samples)))
        dataset_X, dataset_Y = load_challenge_data_2(dir_list, max_numb_samples, i)
        all_diag, dataset_Y = binarizing(dataset_Y,all_diag)
        dataset_X, b, a = data_filtering(dataset_X, 500, 60, 10, 50, "bw")
        dataset_X = deepcopy(data_normalization(dataset_X))
        if len(diseases_list) != 0:
            dataset_X, dataset_Y = diseases_choice(diseases_list, all_diag, dataset_Y, dataset_X)
        dataset_X, dataset_Y = data_equalizing(dataset_X,dataset_Y,eq_numb)
        weights = calculating_class_weights(dataset_Y)
        dataset_X_reshaped = dataset_X.reshape(dataset_X.shape[0], -1)
        np.savetxt("DATA PROCESSED MULTI/X/test_X_"+str(i)+".txt", dataset_X_reshaped)
        np.savetxt("DATA PROCESSED MULTI/Y/test_Y_"+str(i)+".txt", dataset_Y)

### 2.11 Loading preprocessed data and splitting in sets

In [None]:
def data_load_and_split_bin():
    shape = 5000
    data_X = []
    data_Y = []
    for file in os.listdir("DATA PROCESSED/X"):
        data_part = np.loadtxt("DATA PROCESSED/X/"+file)
        data_part = data_part.reshape(data_part.shape[0], data_part.shape[1] // shape, shape)
        data_X.append(data_part)
    data_X = np.vstack(data_X)
    for file in os.listdir("DATA PROCESSED/Y"):
        data_part = np.loadtxt("DATA PROCESSED/Y/"+file)
        data_Y.append(data_part)
    data_Y = np.vstack(data_Y)
    return data_X, data_Y

In [None]:
def data_load_and_split_multi():
    shape = 5000
    data_X = []
    data_Y = []
    for file in os.listdir("DATA PROCESSED MULTI/X"):
        print(file)
        data_part = np.loadtxt("DATA PROCESSED MULTI/X/"+file)
        data_part = data_part.reshape(data_part.shape[0], data_part.shape[1] // shape, shape)
        data_part = crop_augmentation(data_part,500)
        data_part = downsampling(data_part)
        data_X.append(data_part)
    data_X = np.vstack(data_X)
    for file in os.listdir("DATA PROCESSED MULTI/Y"):
        print(file)
        data_part = np.loadtxt("DATA PROCESSED MULTI/Y/"+file)
        data_Y.append(data_part)
    data_Y = np.vstack(data_Y)
    return data_X, data_Y

## 3. Models implementation

In [None]:
import itertools
import sklearn
from sklearn.metrics import confusion_matrix
from keras.layers import Conv1D, MaxPooling1D, Flatten, BatchNormalization, Dense, Dropout, GlobalAveragePooling1D, MaxPool1D
from keras.utils.np_utils import to_categorical
from keras.models import Sequential
from keras.optimizers import Adam
import tensorflow as tf
from tensorflow import keras
from statistics import mean

### 3.1 Convolutional neural network (CNN)

In [None]:
def cnn_bin(input_size, output_size,lr):
    inputlayer = keras.layers.Input(shape=input_size) 
    conv1 = keras.layers.Conv1D(filters=128, kernel_size=8,input_shape=(input_size), padding='same')(inputlayer)
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.Activation(activation='relu')(conv1)
    conv2 = keras.layers.Conv1D(filters=256, kernel_size=5, padding='same')(conv1)
    conv2 = keras.layers.BatchNormalization()(conv2)
    conv2 = keras.layers.Activation('relu')(conv2)
    conv3 = keras.layers.Conv1D(128, kernel_size=3,padding='same')(conv2)
    conv3 = keras.layers.BatchNormalization()(conv3)
    conv3 = keras.layers.Activation('relu')(conv3)
    gap_layer = keras.layers.GlobalAveragePooling1D()(conv3)
    outputlayer = keras.layers.Dense(output_size, activation='sigmoid')(gap_layer)
    model = keras.Model(inputs=inputlayer, outputs=outputlayer)
    model.compile(loss=tf.keras.losses.BinaryCrossentropy(), optimizer=tf.keras.optimizers.Adam(learning_rate=lr), metrics=[tf.keras.metrics.BinaryAccuracy(name='accuracy', dtype=None, threshold=0.5),tf.keras.metrics.Recall(name='Recall'),tf.keras.metrics.Precision(name='Precision'), tf.keras.metrics.AUC(num_thresholds=200,curve="ROC",summation_method="interpolation",name="AUC",dtype=None,thresholds=None,multi_label=True,label_weights=None,)])
    return model

In [None]:
def cnn_multi(input_size, output_size,lr):
    inputlayer = keras.layers.Input(shape=input_size) 
    conv1 = keras.layers.Conv1D(filters=128, kernel_size=8,input_shape=(input_size), padding='same')(inputlayer)
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.Activation(activation='relu')(conv1)
    conv2 = keras.layers.Conv1D(filters=256, kernel_size=5, padding='same')(conv1)
    conv2 = keras.layers.BatchNormalization()(conv2)
    conv2 = keras.layers.Activation('relu')(conv2)
    conv3 = keras.layers.Conv1D(128, kernel_size=3,padding='same')(conv2)
    conv3 = keras.layers.BatchNormalization()(conv3)
    conv3 = keras.layers.Activation('relu')(conv3)
    gap_layer = keras.layers.GlobalAveragePooling1D()(conv3)
    outputlayer = keras.layers.Dense(output_size, activation='softmax')(gap_layer)
    model = keras.Model(inputs=inputlayer, outputs=outputlayer)
    model.compile(loss=tf.keras.losses.CategoricalCrossentropy(), optimizer=tf.keras.optimizers.Adam(learning_rate=lr), metrics=[tf.keras.metrics.CategoricalAccuracy(name='accuracy', dtype=None),tf.keras.metrics.Recall(name='Recall'),tf.keras.metrics.Precision(name='Precision'), tf.keras.metrics.AUC(num_thresholds=200,curve="ROC",summation_method="interpolation",name="AUC",dtype=None,thresholds=None,multi_label=True,label_weights=None,)])
    return model

### 3.2 Residual neural network (ResNet)

In [None]:
def res_net_block(input_layer,nb_filters):
    x = keras.layers.Conv1D(filters=nb_filters, kernel_size=8, padding='same')(input_layer)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.Activation('relu')(x)
    x = keras.layers.Conv1D(filters=nb_filters, kernel_size=5, padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.Activation('relu')(x)
    x = keras.layers.Conv1D(filters=nb_filters, kernel_size=3, padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    return x

In [None]:
def residual_network_1d_bin(input_size,output_size,nb_filters):
    input_shape = input_size
    input_layer = keras.layers.Input(input_shape)

    # BLOCK 1
    x = res_net_block(input_layer,nb_filters)
    shortcut = keras.layers.Conv1D(filters=nb_filters, kernel_size=1, padding='same')(input_layer)
    shortcut = keras.layers.BatchNormalization()(shortcut)
    output_block_1 = keras.layers.Concatenate()([shortcut, x])
    output_block_1 = keras.layers.Activation('relu')(output_block_1)

    # BLOCK 2
    x = res_net_block(output_block_1,nb_filters*2)
    shortcut = keras.layers.Conv1D(filters=nb_filters*2, kernel_size=1, padding='same')(output_block_1)
    shortcut = keras.layers.BatchNormalization()(shortcut)
    output_block_2 = keras.layers.Concatenate()([shortcut, x])
    output_block_2 = keras.layers.Activation('relu')(output_block_2)

    # BLOCK 3
    x = res_net_block(output_block_2,nb_filters*2)
    # shortcut = keras.layers.Conv1D(filters=nb_filters*2, kernel_size=1, padding='same')(output_block_2)
    shortcut = keras.layers.BatchNormalization()(output_block_2)
    output_block_3 = keras.layers.Concatenate()([shortcut, x])
    output_block_3 = keras.layers.Activation('relu')(output_block_3)

    # OUTPUT BLOCK
    gap_layer = keras.layers.GlobalAveragePooling1D()(output_block_3)
    output_layer = keras.layers.Dense(output_size, activation='sigmoid')(gap_layer)
    model = keras.models.Model(inputs=input_layer, outputs=output_layer)
    model.compile(loss=tf.keras.losses.BinaryCrossentropy(), optimizer=tf.keras.optimizers.Adam(learning_rate=0.000001), metrics=[tf.keras.metrics.BinaryAccuracy(name='accuracy', dtype=None, threshold=0.5),tf.keras.metrics.Recall(name='Recall'),tf.keras.metrics.Precision(name='Precision'), tf.keras.metrics.AUC(num_thresholds=200,curve="ROC",summation_method="interpolation",name="AUC",dtype=None,thresholds=None,multi_label=True,label_weights=None,)])
    return model

In [None]:
def residual_network_1d_multi(input_size,output_size,nb_filters):
    input_shape = input_size
    input_layer = keras.layers.Input(input_shape)

    # BLOCK 1
    x = res_net_block(input_layer,nb_filters)
    shortcut = keras.layers.Conv1D(filters=nb_filters, kernel_size=1, padding='same')(input_layer)
    shortcut = keras.layers.BatchNormalization()(shortcut)
    output_block_1 = keras.layers.Concatenate()([shortcut, x])
    output_block_1 = keras.layers.Activation('relu')(output_block_1)

    # BLOCK 2
    x = res_net_block(output_block_1,nb_filters*2)
    shortcut = keras.layers.Conv1D(filters=nb_filters*2, kernel_size=1, padding='same')(output_block_1)
    shortcut = keras.layers.BatchNormalization()(shortcut)
    output_block_2 = keras.layers.Concatenate()([shortcut, x])
    output_block_2 = keras.layers.Activation('relu')(output_block_2)

    # BLOCK 3
    x = res_net_block(output_block_2,nb_filters*2)
    # shortcut = keras.layers.Conv1D(filters=nb_filters*2, kernel_size=1, padding='same')(output_block_2)
    shortcut = keras.layers.BatchNormalization()(output_block_2)
    output_block_3 = keras.layers.Concatenate()([shortcut, x])
    output_block_3 = keras.layers.Activation('relu')(output_block_3)

    # OUTPUT BLOCK
    gap_layer = keras.layers.GlobalAveragePooling1D()(output_block_3)
    output_layer = keras.layers.Dense(output_size, activation='softmax')(gap_layer)
    model = keras.models.Model(inputs=input_layer, outputs=output_layer)
    model.compile(loss=tf.keras.losses.CategoricalCrossentropy(), optimizer=tf.keras.optimizers.Adam(learning_rate=0.000001), metrics=[tf.keras.metrics.CategoricalAccuracy(name='accuracy', dtype=None),tf.keras.metrics.Recall(name='Recall'),tf.keras.metrics.Precision(name='Precision'), tf.keras.metrics.AUC(num_thresholds=200,curve="ROC",summation_method="interpolation",name="AUC",dtype=None,thresholds=None,multi_label=True,label_weights=None,)])
    return model

### 3.3 Transformer encoder 

In [None]:
def encoder_model_bin(input_size,output_size,lr):
    input_layer = keras.layers.Input(shape=input_size)
    if input_size[1] == 2 :
        pool = 1
    else :
        pool = 2

     # conv block -1
    conv1 = keras.layers.Conv1D(filters=128,kernel_size=5,strides=1,padding='same')(input_layer)
    conv1 = tfa.layers.InstanceNormalization()(conv1)
    conv1 = keras.layers.PReLU(shared_axes=[1])(conv1)
    conv1 = keras.layers.Dropout(rate=0.2)(conv1)
    conv1 = keras.layers.MaxPooling1D(pool_size=pool)(conv1)
    # conv block -2
    conv2 = keras.layers.Conv1D(filters=256,kernel_size=11,strides=1,padding='same')(conv1)
    conv2 = tfa.layers.InstanceNormalization()(conv2)
    conv2 = keras.layers.PReLU(shared_axes=[1])(conv2)
    conv2 = keras.layers.Dropout(rate=0.2)(conv2)
    conv2 = keras.layers.MaxPooling1D(pool_size=pool)(conv2)
    # conv block -3
    conv3 = keras.layers.Conv1D(filters=512,kernel_size=21,strides=1,padding='same')(conv2)
    conv3 = tfa.layers.InstanceNormalization()(conv3)
    conv3 = keras.layers.PReLU(shared_axes=[1])(conv3)
    conv3 = keras.layers.Dropout(rate=0.5)(conv3)
    # split for attention
    attention_data = keras.layers.Lambda(lambda x: x[:,:,:256])(conv3)
    attention_softmax = keras.layers.Lambda(lambda x: x[:,:,256:])(conv3)
    # attention mechanism
    attention_softmax = keras.layers.Softmax()(attention_softmax)
    multiply_layer = keras.layers.Multiply()([attention_softmax,attention_data])
    # last layer
    dense_layer = keras.layers.Dense(units=256,activation='sigmoid')(multiply_layer)
    dense_layer = tfa.layers.InstanceNormalization()(dense_layer)
    # output layer
    flatten_layer = keras.layers.Flatten()(dense_layer)
    output_layer = keras.layers.Dense(units=output_size,activation='sigmoid')(flatten_layer)
    model = keras.models.Model(inputs=input_layer, outputs=output_layer)
    model.compile(loss=tf.keras.losses.BinaryCrossentropy(), optimizer=tf.keras.optimizers.Adam(learning_rate=lr), metrics=[tf.keras.metrics.BinaryAccuracy(name='accuracy', dtype=None, threshold=0.5),tf.keras.metrics.Recall(name='Recall'),tf.keras.metrics.Precision(name='Precision'), tf.keras.metrics.AUC(num_thresholds=200,curve="ROC",summation_method="interpolation",name="AUC",dtype=None,thresholds=None,multi_label=True,label_weights=None,)])
    return model

In [None]:
def encoder_model_multi(input_size,output_size,lr):
    input_layer = keras.layers.Input(shape=input_size)
    if input_size[1] == 2 :
        pool = 1
    else :
        pool = 2

     # conv block -1
    conv1 = keras.layers.Conv1D(filters=128,kernel_size=5,strides=1,padding='same')(input_layer)
    conv1 = tfa.layers.InstanceNormalization()(conv1)
    conv1 = keras.layers.PReLU(shared_axes=[1])(conv1)
    conv1 = keras.layers.Dropout(rate=0.2)(conv1)
    conv1 = keras.layers.MaxPooling1D(pool_size=pool)(conv1)
    # conv block -2
    conv2 = keras.layers.Conv1D(filters=256,kernel_size=11,strides=1,padding='same')(conv1)
    conv2 = tfa.layers.InstanceNormalization()(conv2)
    conv2 = keras.layers.PReLU(shared_axes=[1])(conv2)
    conv2 = keras.layers.Dropout(rate=0.2)(conv2)
    conv2 = keras.layers.MaxPooling1D(pool_size=pool)(conv2)
    # conv block -3
    conv3 = keras.layers.Conv1D(filters=512,kernel_size=21,strides=1,padding='same')(conv2)
    conv3 = tfa.layers.InstanceNormalization()(conv3)
    conv3 = keras.layers.PReLU(shared_axes=[1])(conv3)
    conv3 = keras.layers.Dropout(rate=0.5)(conv3)
    # split for attention
    attention_data = keras.layers.Lambda(lambda x: x[:,:,:256])(conv3)
    attention_softmax = keras.layers.Lambda(lambda x: x[:,:,256:])(conv3)
    # attention mechanism
    attention_softmax = keras.layers.Softmax()(attention_softmax)
    multiply_layer = keras.layers.Multiply()([attention_softmax,attention_data])
    # last layer
    dense_layer = keras.layers.Dense(units=256,activation='softmax')(multiply_layer)
    dense_layer = tfa.layers.InstanceNormalization()(dense_layer)
    # output layer
    flatten_layer = keras.layers.Flatten()(dense_layer)
    output_layer = keras.layers.Dense(units=output_size,activation='softmax')(flatten_layer)
    model = keras.models.Model(inputs=input_layer, outputs=output_layer)
    model.compile(loss=tf.keras.losses.CategoricalCrossentropy(), optimizer=tf.keras.optimizers.Adam(learning_rate=lr), metrics=[tf.keras.metrics.CategoricalAccuracy(name='accuracy', dtype=None),tf.keras.metrics.Recall(name='Recall'),tf.keras.metrics.Precision(name='Precision'), tf.keras.metrics.AUC(num_thresholds=200,curve="ROC",summation_method="interpolation",name="AUC",dtype=None,thresholds=None,multi_label=True,label_weights=None,)])
    return model

### 3.4 Bidirectional gated recurrent unit (BiGRU)

In [None]:
def rnn_bin(input_size,output_size,lr):
    input_layer = keras.layers.Input(shape=input_size)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=256, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(input_layer)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=256, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=256, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=128, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=128, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=128, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(256,activation='relu')(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    output_layer = tf.keras.layers.Dense(output_size,activation='sigmoid')(x)
    model = keras.Model(inputs=input_layer, outputs=output_layer)
    model.compile(loss=tf.keras.losses.BinaryCrossentropy(), optimizer=tf.keras.optimizers.Adam(learning_rate=lr), metrics=[tf.keras.metrics.BinaryAccuracy(name='accuracy', dtype=None, threshold=0.5),tf.keras.metrics.Recall(name='Recall'),tf.keras.metrics.Precision(name='Precision'), tf.keras.metrics.AUC(num_thresholds=200,curve="ROC",summation_method="interpolation",name="AUC",dtype=None,thresholds=None,multi_label=True,label_weights=None,)])
    return model

In [None]:
def rnn_multi(input_size,output_size,lr):
    input_layer = keras.layers.Input(shape=input_size)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=256, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(input_layer)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=256, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=256, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=128, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=128, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(units=128, recurrent_dropout=0.1, dropout=0.1, return_sequences=True))(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(256,activation='relu')(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    output_layer = tf.keras.layers.Dense(output_size,activation='softmax')(x)
    model = keras.Model(inputs=input_layer, outputs=output_layer)
    model.compile(loss=tf.keras.losses.CategoricalCrossentropy(), optimizer=tf.keras.optimizers.Adam(learning_rate=lr), metrics=[tf.keras.metrics.CategoricalAccuracy(name='accuracy', dtype=None),tf.keras.metrics.Recall(name='Recall'),tf.keras.metrics.Precision(name='Precision'), tf.keras.metrics.AUC(num_thresholds=200,curve="ROC",summation_method="interpolation",name="AUC",dtype=None,thresholds=None,multi_label=True,label_weights=None,)])
    return model

## 4. Results and graphs

In [None]:
def accuracy_plots(hist_list,name):
    label_list = ['Training accuracy - 2 leads','Validation accuracy - 2 leads','Training accuracy - 8 leads','Validation accuracy - 8 leads','Training accuracy - 12 leads','Validation accuracy - 12 leads']
    color_list = ['b','r','g','c','m','k']
    plt.figure(figsize=[8,6])
    for i in range(len(hist_list)):
        history = hist_list[i]
        plt.plot(history.history['accuracy'],color_list[i*2],linewidth=3.0,label=label_list[i*2])
        plt.plot(history.history['val_accuracy'],color_list[i*2+1],linewidth=3.0,label=label_list[i*2+1])
    plt.xlabel('Epochs ',fontsize=16)
    plt.ylabel('Accuracy',fontsize=16)
    plt.title('Accuracy Curves - '+name,fontsize=16)
    plt.ylim([0.5,1])
    plt.legend()
    plt.show()

In [None]:
def loss_plots(hist_list,name):
    label_list = ['Training loss - 2 leads','Validation Loss - 2 leads','Training loss - 8 leads','Validation Loss - 8 leads','Training loss - 12 leads','Validation Loss - 12 leads']
    color_list = ['b','r','g','c','m','k']
    plt.figure(figsize=[8,6])
    for i in range(len(hist_list)):
        history = hist_list[i]
        plt.plot(history.history['loss'],color_list[i*2],linewidth=3.0,label=label_list[i*2])
        plt.plot(history.history['val_loss'],color_list[i*2+1],linewidth=3.0,label=label_list[i*2+1])
    plt.xlabel('Epochs ',fontsize=16)
    plt.ylabel('Loss',fontsize=16)
    plt.title('Loss Curves - '+name,fontsize=16)
    plt.ylim([0,3])
    plt.legend()
    plt.show()

In [None]:
def plots_for_all_leads_multi():
    nb_leads_list = [2,8,12]
    for i in range(3,4):
        hist_list = []
        for nb_leads in nb_leads_list :
            data_X_leads = choosing_numb_leads(data_X,nb_leads)
            if len(np.shape(data_Y)) > 1 :
                output_size = np.shape(data_Y)[1]
            else :
                output_size = 1
            input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
            
            if i == 0 :
                data_X_leads = transposition(data_X_leads)
                input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
                model = cnn_multi(input_size,output_size,0.000002)
                nb_epochs = 200
                name = "CNN"
                x_train,x_rem,y_train,y_rem = train_test_split(data_X_leads,data_Y,train_size=.80, shuffle=True, random_state=42)
                x_val, x_test, y_val, y_test = train_test_split(x_rem,y_rem,train_size=.80, shuffle=True, random_state=42)
            elif i == 1 :
                data_X_leads = transposition(data_X_leads)
                input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
                model = encoder_model_multi(input_size,output_size,0.00005)
                nb_epochs = 50
                name = "Transformer encoder"
                x_train,x_rem,y_train,y_rem = train_test_split(data_X_leads,data_Y,train_size=.80, shuffle=True, random_state=42)
                x_val, x_test, y_val, y_test = train_test_split(x_rem,y_rem,train_size=.80, shuffle=True, random_state=42)
            elif i == 2 :
                data_X_leads = choosing_numb_leads(data_X,nb_leads)
                input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
                model = residual_network_1d_multi(input_size,output_size,64)
                nb_epochs = 200
                name = "ResNet"
                x_train,x_rem,y_train,y_rem = train_test_split(data_X_leads,data_Y,train_size=.80, shuffle=True, random_state=42)
                x_val, x_test, y_val, y_test = train_test_split(x_rem,y_rem,train_size=.80, shuffle=True, random_state=42)
            elif i == 3 :
                data_X_leads = choosing_numb_leads(data_X,nb_leads)
                input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
                model = rnn_multi(input_size,output_size,0.00001)
                nb_epochs = 50
                name = "BiGRU"
                x_train,x_rem,y_train,y_rem = train_test_split(data_X_leads,data_Y,train_size=.80, shuffle=True, random_state=42)
                x_val, x_test, y_val, y_test = train_test_split(x_rem,y_rem,train_size=.80, shuffle=True, random_state=42)
            history = model.fit(x_train, y_train, epochs=nb_epochs, validation_data=(x_val,y_val), validation_freq=1)
            hist_list.append(history)
            predictions = model.predict(x_test)
            print("Accuracy on test set for {0} model with {1} leads: {2} %".format(name,nb_leads,accuracy_score(y_test,np.around(predictions))))
        accuracy_plots(hist_list,name)
        loss_plots(hist_list,name)

In [None]:
def plots_for_all_leads():
    nb_leads_list = [2,8,12]
    for i in range(2,4):
        hist_list = []
        for nb_leads in nb_leads_list :
            data_X_leads = choosing_numb_leads(data_X,nb_leads)
            if len(np.shape(data_Y)) > 1 :
                output_size = np.shape(data_Y)[1]
            else :
                output_size = 1
            input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
            
            if i == 0 :
                data_X_leads = transposition(data_X_leads)
                input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
                model = cnn_bin(input_size,output_size,0.000002)
                nb_epochs = 10
                name = "CNN"
                x_train,x_rem,y_train,y_rem = train_test_split(data_X_leads,data_Y,train_size=.80, shuffle=True, random_state=100,stratify=data_Y)
                x_val, x_test, y_val, y_test = train_test_split(x_rem,y_rem,train_size=.80, shuffle=True, random_state=100,stratify=y_rem)
            elif i == 1 :
                data_X_leads = transposition(data_X_leads)
                input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
                model = encoder_model_bin(input_size,output_size,0.00005)
                nb_epochs = 50
                name = "Transformer encoder"
                x_train,x_rem,y_train,y_rem = train_test_split(data_X_leads,data_Y,train_size=.80, shuffle=True, random_state=100,stratify=data_Y)
                x_val, x_test, y_val, y_test = train_test_split(x_rem,y_rem,train_size=.80, shuffle=True, random_state=100,stratify=y_rem)
            elif i == 2 :
                data_X_leads = choosing_numb_leads(data_X,nb_leads)
                input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
                model = residual_network_1d_bin(input_size,output_size,64)
                nb_epochs = 50
                name = "ResNet"
                x_train,x_rem,y_train,y_rem = train_test_split(data_X_leads,data_Y,train_size=.80, shuffle=True, random_state=100,stratify=data_Y)
                x_val, x_test, y_val, y_test = train_test_split(x_rem,y_rem,train_size=.80, shuffle=True, random_state=100,stratify=y_rem)
            elif i == 3 :
                data_X_leads = choosing_numb_leads(data_X,nb_leads)
                input_size = (np.shape(data_X_leads)[1],np.shape(data_X_leads)[2])
                model = rnn_bin(input_size,output_size,0.00001)
                nb_epochs = 50
                name = "BiGRU"
                x_train,x_rem,y_train,y_rem = train_test_split(data_X_leads,data_Y,train_size=.80, shuffle=True, random_state=100,stratify=data_Y)
                x_val, x_test, y_val, y_test = train_test_split(x_rem,y_rem,train_size=.80, shuffle=True, random_state=100,stratify=y_rem)
            # x_train,x_val,y_train,y_val = train_test_split(x_train_val,y_train_val,train_size=.66, shuffle=True, random_state=100)
            history = model.fit(x_train, y_train, epochs=nb_epochs, validation_data=(x_val,y_val), validation_freq=1)
            hist_list.append(history)
            predictions = model.predict(x_test)
            matrix = confusion_matrix(y_test, np.around(predictions))
            print("Accuracy on test set for {0} model with {1} leads: {2} %".format(name,nb_leads,(matrix[0][0]+matrix[1][1])*100/(matrix[0][0]+matrix[1][1]+matrix[0][1]+matrix[1][0])))
        accuracy_plots(hist_list,name)
        loss_plots(hist_list,name)

# 5. Filtering effect

## 5.1 Notch filter

In [None]:
train_X_filt, b, a = data_filtering(train_X, 500, 60, 10, 50, "fir")
freq, h = freqz(b, a, fs=2*np.pi)
samp_freq = 500
plt.plot(freq*samp_freq/(2*np.pi), 20 * np.log10(abs(h)),'r', label='Bandpass filter', linewidth='2')
plt.xlabel('Frequency [Hz]', fontsize=20)
plt.ylabel('Magnitude [dB]', fontsize=20)
plt.title('Notch Filter', fontsize=20)
plt.grid()

## 5.2 FIR and Butterworth

In [None]:
train_X_filt_bw, b, a = data_filtering(data_X[0], 500, 100, 3, 50, "bw")
train_X_filt_bw2, b, a = data_filtering(data_X[0], 500, 60, 10, 50, "bw")
plt.plot(np.arange(len(data_X[0][0][0:500]))/500,data_X[0][0][0:500],'b',label="Original data")
plt.plot(np.arange(len(data_X[0][0][0:500]))/500,train_X_filt_bw[0][0:500],'g',label="Filtered data in the range [3,100] Hz")
plt.plot(np.arange(len(data_X[0][0][0:500]))/500,train_X_filt_bw2[0][0:500],'r',label="Filtered data in the range [10,60] Hz")
plt.xlabel('Time (s.)')
plt.ylabel('Signal amplitude normalized')
plt.legend()

In [None]:
n = len(train_X_norm[0][0])
k = np.arange(n)
T = n/500
frq = k/T
frq = frq[:len(frq)//2]

Y_1 = np.fft.fft(train_X_norm[0][0])/n
Y_1 = Y_1[:n//2]
Y_2 = np.fft.fft(train_X_filt_bw[0])/n 
Y_2 = Y_2[:n//2]
Y_3 = np.fft.fft(train_X_filt_fir[0][0])/n 
Y_3 = Y_3[:n//2]
plt.plot(frq,abs(Y_1),'b',label="Original signal",alpha=0.7) 
plt.plot(frq,abs(Y_2),'g',label="Filtered by butterworth data",alpha=1) 
plt.plot(frq,abs(Y_3),'r',label="Filtered by FIR data",alpha=0.6) 
plt.xlabel('Frequence (Hz)')
plt.ylabel('|Signal(frequence)|')
plt.title("Fourier transforms of the signal and filtered signals")
plt.legend()