In [1]:
import wfdb
import numpy as np
import os

import scipy.io
import tensorflow as tf
from tensorflow import  keras
from keras.callbacks import TensorBoard
from sklearn.preprocessing import MinMaxScaler,StandardScaler,scale
from skimage.measure import block_reduce
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D,LSTM, MaxPooling1D, Dense, Flatten, Dropout, GlobalMaxPooling1D,BatchNormalization
from keras.optimizers import Adam

from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

In [2]:
def function(muscle,disease, label_map):
    k=0
    for i in range (len(label_map)):
        if muscle in label_map[i]:
            if disease in label_map[i]:
                k=k+1
    return k

In [3]:
def scale_data_standard(X_train,X_test):
    #divide by variance and subtract mean for data 
    scaler = StandardScaler()
    data = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    return X_train ,X_test

In [4]:
def gen_tensors_list(data,dim):
    dataset  = list()
    for i in range (len(data)):
        arg = tf.convert_to_tensor(data[i], dtype=tf.float32)
        arg = tf.reshape(arg,dim)
        dataset.append(arg)

    return dataset

In [5]:
def windowing_fun(data,window_size=23437,overlap=14062):
    windowed_data = [data[j][i : i + window_size] for j in range(0,len(data)) for i in range(0, len(data[j]), window_size-overlap)]
    final_data = [windowed_data[i] for i in range(len(windowed_data)) if len(windowed_data[i]) == window_size ]
    return final_data

In [6]:
def remove_nan_rows(arr,Y):
    # Find the rows with NaN values
    nan_rows = np.isnan(arr).any(axis=1)

    # Convert the boolean array to integer indices
    index_to_remove = np.where(nan_rows)[0]

    # Remove the rows with NaN values
    arr = np.delete(arr, index_to_remove, axis=0)
    Y = np.delete(Y, index_to_remove, axis=0)

    return arr,Y

In [7]:
def noise_for_samples(data, Y):
    data = np.array(data)
    noise = np.random.normal(0, 1, size=data.shape)
    noisy_data = np.add(data, noise)
    data = np.concatenate((data, noisy_data))
    Y = np.concatenate((Y, Y))
    return noisy_data 

In [8]:
def preprocessing(X_train,X_val,X_test,Y_train,Y_val,Y_test):
    
    X_train = windowing_fun(X_train, 23437)
    X_val = windowing_fun(X_val, 23437)
    X_test = windowing_fun(X_test, 23437)

   
    X_train,Y_train = remove_nan_rows(X_train,Y_train)
    X_val,Y_val = remove_nan_rows(X_val,Y_val)
    X_test,Y_test = remove_nan_rows(X_test,Y_test)
    
    tensors_list_train = gen_tensors_list(X_train,(23437,1))
    tensors_list_val = gen_tensors_list(X_val,(23437,1))
    tensors_list_test = gen_tensors_list(X_test,(23437,1))
    
    return tensors_list_train,tensors_list_val,tensors_list_test,Y_train,Y_val,Y_test

In [9]:
def outputs(Y, X):
    
    last_index_als = len(Y) - 1 - Y[::-1].index(0)
    last_index_normal = len(Y) - 1 - Y[::-1].index(1)
    
    windowed_als = len(windowing_fun(X[:last_index_als+1],23437))
    windowed_normal = len(windowing_fun(X[last_index_als+1:last_index_normal+1],23437))
    
    return windowed_normal, windowed_als

In [10]:
def last_index (Y):
    last_index_normal = len(Y) - 1 - Y[::-1].index(1)
    last_index_als = len(Y) - 1 - Y[::-1].index(0)
    return last_index_normal, last_index_als

In [11]:
def generate_dataframe(data,files):
    df = pd.DataFrame([], columns=['type_signal',"num_personne","muscle","num_enregistrement","signal"])
    windowed_data = [{"signal": data[j],"type_signal":files[j][5],"num_personne": files[j][6:8],"muscle": files[j][8:10],"num_enregistrement":files[j][10:12] } for j in range(0,len(data))]
    final_data = [windowed_data[i] for i in range(len(windowed_data))]
    df =df.append(final_data, ignore_index=True)
    #df =pd.concat([df, final_data])
    #df = pd.concat([df, pd.DataFrame.from_records(final_data)], ignore_index=True)
    return df

In [12]:
def NaN_values(signal_data_array):
    # Check if there are any NaN values in the data
    nan_indices = np.argwhere(np.isnan(signal_data_array))

    # Check if there are any empty values in the data
    empty_indices = np.argwhere(np.array([len(x) == 0 for x in signal_data_array]))
    np.set_printoptions(threshold=np.inf)

     #Print the indices of any NaN or empty values
    if len(nan_indices) > 0:
        print("Found NaN values ")
    else:
        print("none")
    if len(empty_indices) > 0:
        print("Found empty values")
    else:
        print("none")
    nan_indices.shape

In [13]:
import os
import wfdb
import numpy as np

database_dir = '/kaggle/input/emglab'

hea_files = [f for f in os.listdir(database_dir)]

# Initialize empty arrays to store the signal data and record names
signal_data_list = []
record_name_list = []
# Loop over each .hea file and read its signal data
for file in hea_files:
    if file.endswith(".hea"):
        record_name = os.path.splitext(file)[0] # Get the record name from the file name
        if ((record_name[5] == 'A') or (record_name[5] == 'C')): # Add the condition to check for the sixth character of the record name to be 'M' or 'C'
            record_path = os.path.join(database_dir, record_name) # Get the full path to the record
            record = wfdb.rdrecord(record_path) # Read the record
            signal_data = np.array(record.p_signal) # Get the signal data as a numpy array
            signal_data_list.append(signal_data) # Append the signal data to the list
            record_name_list.append(record_name) # Append the record name to the list

# Convert the list of signal data arrays to a 3D numpy array
signal_data_array = np.stack(signal_data_list)

print(record_name_list)


['N2001C03BB51', 'N2001C07BB51', 'N2001C06BB65', 'N2001C07BB63', 'N2001C07BB76', 'N2001A03BB66', 'N2001C09BB51', 'N2001A04BB62', 'N2001C01BB61', 'N2001C06BB51', 'N2001A04BB60', 'N2001A06BB11', 'N2001C03BB80', 'N2001C04BB73', 'N2001C09BB67', 'N2001C06BB57', 'N2001C04BB66', 'N2001A04BB66', 'N2001A06BB14', 'N2001C06BB61', 'N2001C03BB64', 'N2001A06BB09', 'N2001A07BB02', 'N2001C07BB57', 'N2001C09BB59', 'N2001A04BB51', 'N2001C04BB52', 'N2001C02BB76', 'N2001A06BB07', 'N2001C06BB73', 'N2001C01BB74', 'N2001C04BB63', 'N2001C09BB57', 'N2001C02BB70', 'N2001C05BB54', 'N2001C02BB65', 'N2001C06BB77', 'N2001A06BB18', 'N2001C07BB66', 'N2001C01BB68', 'N2001C08BB60', 'N2001C07BB59', 'N2001A03BB54', 'N2001A01BB05', 'N2001A03BB51', 'N2001C08BB80', 'N2001A08BB03', 'N2001C01BB59', 'N2001C09BB78', 'N2001C04BB64', 'N2001C03BB72', 'N2001A08BB02', 'N2001C04BB56', 'N2001C01BB72', 'N2001C02BB59', 'N2001C04BB51', 'N2001A03BB58', 'N2001C04BB68', 'N2001A08BB10', 'N2001C05BB61', 'N2001C06BB53', 'N2001A08BB01', 'N2001C

In [14]:
f=function('BB','N2001A' , record_name_list)
print('number of signals taken from patients with ALS from the biceps brachii muscle ',f)

f=function('BB','N2001C' , record_name_list)
print('number of signals taken from normal people from the biceps brachii muscle ',f)

number of signals taken from patients with ALS from the biceps brachii muscle  98
number of signals taken from normal people from the biceps brachii muscle  270


In [15]:
signal_data_array.shape

(368, 262124, 1)

In [16]:
# Combine the labels and signals arrays into a list of tuples
data = list(zip(record_name_list, signal_data_array))

# Sort the list of tuples by the first element (the labels)
sorted_data = sorted(data, key=lambda x: x[0])

# Extract the sorted signals into a new array
signal_data_array = [x[1] for x in sorted_data]

record_name_list = [x[0] for x in sorted_data]

In [17]:
signal_data_array = np.delete(signal_data_array,np.s_[290:368], axis=0)

In [18]:
record_name_list = np.delete(record_name_list,np.s_[290:368], axis=0)

In [19]:
f=function('BB','N2001A' , record_name_list)
print('number of signals taken from patients with ALS from the biceps brachii muscle ',f)

f=function('BB','N2001C' , record_name_list)
print('number of signals taken from normal people from the biceps brachii muscle ',f)

number of signals taken from patients with ALS from the biceps brachii muscle  98
number of signals taken from normal people from the biceps brachii muscle  192


In [20]:
print(np.shape(signal_data_array))
print(np.shape(record_name_list))

(290, 262124, 1)
(290,)


In [21]:
#check if there are NaN values
NaN_values(signal_data_array)

Found NaN values 
none


In [22]:
df = generate_dataframe(signal_data_array,record_name_list)

In [23]:
df

Unnamed: 0,type_signal,num_personne,muscle,num_enregistrement,signal
0,A,01,BB,01,"[[144.19775692378118], [145.3421835660334], [1..."
1,A,01,BB,02,"[[-5.41695277332723], [0.30518043793392846], [..."
2,A,01,BB,03,"[[-19.15007248035401], [-19.607843137254903], ..."
3,A,01,BB,04,"[[2.2125581750209813], [3.9673456931410698], [..."
4,A,01,BB,05,"[[20.752269779507134], [19.378957808804458], [..."
...,...,...,...,...,...
285,C,07,BB,58,"[[-57.06874189364462], [-58.213168535896855], ..."
286,C,07,BB,59,"[[-5.340657663843748], [-7.095445181963837], [..."
287,C,07,BB,60,"[[46.38742656595713], [45.01411459525445], [45..."
288,C,07,BB,61,"[[95.59777218280308], [91.02006561379416], [93..."


In [24]:
def f(row):
    return {"C":1,"A":0}[row["type_signal"]]

df["num_class"] =df.apply(f, axis=1) 

In [25]:
df

Unnamed: 0,type_signal,num_personne,muscle,num_enregistrement,signal,num_class
0,A,01,BB,01,"[[144.19775692378118], [145.3421835660334], [1...",0
1,A,01,BB,02,"[[-5.41695277332723], [0.30518043793392846], [...",0
2,A,01,BB,03,"[[-19.15007248035401], [-19.607843137254903], ...",0
3,A,01,BB,04,"[[2.2125581750209813], [3.9673456931410698], [...",0
4,A,01,BB,05,"[[20.752269779507134], [19.378957808804458], [...",0
...,...,...,...,...,...,...
285,C,07,BB,58,"[[-57.06874189364462], [-58.213168535896855], ...",1
286,C,07,BB,59,"[[-5.340657663843748], [-7.095445181963837], [...",1
287,C,07,BB,60,"[[46.38742656595713], [45.01411459525445], [45...",1
288,C,07,BB,61,"[[95.59777218280308], [91.02006561379416], [93...",1


In [26]:
del signal_data_array,record_name_list

In [27]:
for signal in df['type_signal'].unique():
    print("",f"signal {signal}:", sep="\n")
    print("pat count")
    print(df[df['type_signal']==signal]['num_personne'].value_counts())


signal A:
pat count
03    20
04    20
06    18
08    15
01    12
07    10
05     3
Name: num_personne, dtype: int64

signal C:
pat count
01    30
02    30
03    30
04    30
05    30
06    30
07    12
Name: num_personne, dtype: int64


In [31]:
df_train = df[~df["num_personne"].isin(["01","05"])]

df_train["num_personne"].value_counts()

03    50
04    50
06    48
02    30
07    22
08    15
Name: num_personne, dtype: int64

In [32]:
df_train['type_signal'].value_counts()

C    132
A     83
Name: type_signal, dtype: int64

In [33]:
df_val = df[df["num_personne"] == "01"]

df_val['type_signal'].value_counts()

C    30
A    12
Name: type_signal, dtype: int64

In [34]:
df_test = df[df["num_personne"] == "05"]

df_test['type_signal'].value_counts()

C    30
A     3
Name: type_signal, dtype: int64

In [35]:
X_train = df_train['signal'].tolist()
Y_train = df_train['num_class'].tolist()
print(Y_train)
print(np.asarray(Y_train).shape)
print(np.asarray(Y_train).shape)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
(215,)
(215,)


In [36]:
X_val = df_val['signal'].tolist()
Y_val = df_val['num_class'].tolist()
print(Y_val)
print(np.asarray(Y_val).shape)
print(np.asarray(X_val).shape)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
(42,)
(42, 262124, 1)


In [37]:
X_test = df_test['signal'].tolist()
Y_test = df_test['num_class'].tolist()
print(Y_test)
print(np.asarray(Y_test).shape)
print(np.asarray(X_test).shape)

[0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
(33,)
(33, 262124, 1)


In [38]:
last_index_normal, last_index_als = last_index (Y_train)
last_index_normal, last_index_als

(214, 82)

In [39]:
del df,df_train,df_test

In [40]:
windowed_normal_train,windowed_als_train = outputs(Y_train, X_train)
windowed_normal_val,windowed_als_val = outputs(Y_val, X_val)
windowed_normal_test,windowed_als_test = outputs(Y_test, X_test)

In [41]:
Y_train = [0]*(windowed_als_train)+[1]*(windowed_normal_train)
print(np.asarray(Y_train).shape)

Y_val = [0]*(windowed_als_val)+[1]*(windowed_normal_val)
print(np.asarray(Y_val).shape)

Y_test = [0]*(windowed_als_test)+[1]*(windowed_normal_test)
print(np.asarray(Y_test).shape)


(5590,)
(1092,)
(858,)


In [42]:
print(np.asarray(X_train).shape)
print(np.asarray(X_val).shape)
print(np.asarray(X_test).shape)

(215, 262124, 1)
(42, 262124, 1)
(33, 262124, 1)


In [43]:
X_train = windowing_fun(X_train, 23437)
X_val = windowing_fun(X_val, 23437)
X_test = windowing_fun(X_test, 23437)
print(np.shape(X_test))
print(np.shape(X_train))

print(np.shape(X_val))
print(np.shape(X_val))

print(np.shape(Y_test))
print(np.shape(Y_train))

(858, 23437, 1)
(5590, 23437, 1)
(1092, 23437, 1)
(1092, 23437, 1)
(858,)
(5590,)


In [44]:
X_train,Y_train = remove_nan_rows(X_train,Y_train)
X_val,Y_val = remove_nan_rows(X_val,Y_val)
X_test,Y_test = remove_nan_rows(X_test,Y_test)

print(np.shape(X_test))
print(np.shape(X_train))

print(np.shape(X_val))
print(np.shape(X_val))

print(np.shape(Y_test))
print(np.shape(Y_train))

(856, 23437, 1)
(5553, 23437, 1)
(1092, 23437, 1)
(1092, 23437, 1)
(856,)
(5553,)


In [45]:
# Convert Y_train_resampled to a pandas Series object
Y_train_resampled_series = pd.Series(Y_train)

# Check the new class distribution
print(Y_train_resampled_series.value_counts())

1    3432
0    2121
dtype: int64


In [46]:
num_samples, num_timesteps, num_channels = X_train.shape
X_train = X_train.reshape(num_samples, num_channels * num_timesteps)
X_train.shape

(5553, 23437)

In [47]:
from imblearn.over_sampling import SMOTE

# Apply SMOTE to the training set
smote = SMOTE()
X_train_resampled, Y_train_resampled = smote.fit_resample(X_train, Y_train)


In [48]:
# Check the new class distribution
# Convert Y_train_resampled to a pandas Series object
Y_train_resampled_series = pd.Series(Y_train_resampled)

# Check the new class distribution
print(Y_train_resampled_series.value_counts())


0    3432
1    3432
dtype: int64


In [49]:
X_train_resampled.shape

(6864, 23437)

In [50]:
X_train_resampled = gen_tensors_list(X_train_resampled,(23437,1))
X_val = gen_tensors_list(X_val,(23437,1))
X_test = gen_tensors_list(X_test,(23437,1))

In [51]:
print(np.shape(X_test))
print(np.shape(X_train_resampled))
print(np.shape(X_val))

print(np.shape(Y_test))
print(np.shape(Y_train_resampled))
print(np.shape(Y_val))

(856, 23437, 1)
(6864, 23437, 1)
(1092, 23437, 1)
(856,)
(6864,)
(1092,)


In [52]:
from keras.regularizers import l2
model = Sequential([
    
    #Conv1D(#of filters, filters size, activation function)
    #in keras for the first layer we always need to mention the input shape
    Conv1D(32,  kernel_size=13, padding='valid', dilation_rate=1,activation = 'relu', input_shape = (23437,1),kernel_initializer='he_normal'), 
    
    BatchNormalization(),
    Dropout(0.2),
    #LAYER2
    Conv1D(64, kernel_size=11, padding='valid' , dilation_rate=2, activation = 'relu',kernel_initializer='he_normal',kernel_regularizer=l2(0.0001)),
    
    BatchNormalization(),
 
    Dropout(0.3),
    #LAYER3
    Conv1D(128, kernel_size=9, padding='valid', dilation_rate=3, activation = 'relu',kernel_initializer='he_normal',kernel_regularizer=l2(0.001)),
    
    BatchNormalization(),
    
    Dropout(0.4),
    
    Conv1D(256, kernel_size=7, padding='valid', dilation_rate=4, activation = 'relu',kernel_initializer='he_normal',kernel_regularizer=l2(0.01)),
    
    BatchNormalization(),
    
    #GlobalMaxPooling1D layer
    GlobalMaxPooling1D(data_format = 'channels_last'),
    
   # Flatten(),
    
    #FC Layers:
    
    #LAYER4
    #Dense : fully connected  Dense(#neurons, activation function )
    
    Dense(128, activation = 'relu'),
    
    BatchNormalization(),
    
    Dropout(0.3),
    
    #LAYER5
    Dense(1, activation = 'sigmoid')
    
    
])

model.summary()


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d (Conv1D)             (None, 23425, 32)         448       
                                                                 
 batch_normalization (BatchN  (None, 23425, 32)        128       
 ormalization)                                                   
                                                                 
 dropout (Dropout)           (None, 23425, 32)         0         
                                                                 
 conv1d_1 (Conv1D)           (None, 23405, 64)         22592     
                                                                 
 batch_normalization_1 (Batc  (None, 23405, 64)        256       
 hNormalization)                                                 
                                                                 
 dropout_1 (Dropout)         (None, 23405, 64)         0

In [None]:
# Set the initial learning rate
lr = 1e-3
opt = Adam(learning_rate=lr, beta_1=0.9,beta_2=0.999, amsgrad= False, epsilon=None)


# Create the ReduceLROnPlateau callback to decrease the learning rate by a factor of 10
# when the validation loss doesn't improve for 5 consecutive epochs
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1, min_lr=1e-10)

# Create the EarlyStopping callback to stop training if the validation loss doesn't improve
# for 20 consecutive epochs
early_stop = EarlyStopping(monitor='val_loss', patience=25, verbose=1)

# Compile the model
model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])
# Train the model with early stopping
history = model.fit(
    
    x = np.asarray(X_train_resampled),
    y = np.asarray(Y_train_resampled),
    #validation_split = 0.25,
    validation_data = (np.asarray(X_val), np.asarray(Y_val)),
    shuffle=True,
    epochs=100,
    batch_size=8,
    callbacks=[reduce_lr, early_stop]

)

Epoch 1/100
Epoch 2/100
Epoch 3/100

In [None]:

# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper right')
plt.show()

# Plot training & validation accuracy values
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper left')
plt.show()
  
print(history.history.keys())


In [None]:
score = model.evaluate(np.asarray(X_test),np.asarray (Y_test) ) 

In [None]:
#model.save("subject-independent-als-normal.h5")

In [None]:
y_pred = model.predict(np.asarray(X_test))

In [None]:
y_predicted_labels = [{0:"Neuropathie",1:"Normal"}[i] for i in np.argmax(y_pred,axis=1)]

In [None]:
y_true_labels = [{0:"Neuropathie",1:"Normal"}[i] for i in Y_test]

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

y_pred = y_pred >= 0.5
# Generate confusion matrix
con_mat = confusion_matrix(Y_test,y_pred)

# Normalize confusion matrix
con_mat_norm = np.around(con_mat.astype('float') / con_mat.sum(axis=1)[:, np.newaxis], decimals=2)

# Create DataFrame from normalized confusion matrix
con_mat_df = pd.DataFrame(con_mat_norm,
                           index=['ALS', 'Normal'],
                           columns=['ALS', 'Normal'])

# Create heatmap from DataFrame
sns.heatmap(con_mat_df, annot=True, cmap='Blues')

# Add labels and title
plt.xlabel('Predicted Class')
plt.ylabel('True Class')
plt.title('Confusion Matrix')

# Show plot
plt.show()


In [None]:
from sklearn.metrics import classification_report
print(classification_report(Y_test,y_pred))