In [45]:
import scipy.io
import tensorflow as tf
import numpy as np
from tensorflow import  keras
from keras.callbacks import TensorBoard
from sklearn.preprocessing import MinMaxScaler,StandardScaler,scale
from skimage.measure import block_reduce
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from datetime import datetime
import pandas as pd
import os
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dense, Flatten, Dropout

In [2]:
def load_data():
    als = scipy.io.loadmat('C:/Users/lenovo/OneDrive/Documents/drive-download-20230214T190624Z-001/ALS.mat',)["data"].transpose()  #(262134, 321)
    normal = scipy.io.loadmat('C:/Users/lenovo/OneDrive/Documents/drive-download-20230214T190624Z-001/normal.mat')['data'].transpose() # (262134, 270) 
    myopathie = scipy.io.loadmat('C:/Users/lenovo/OneDrive/Documents/drive-download-20230214T190624Z-001/myopathie.mat')['data'].transpose() #(262134, 315)
    return als,normal,myopathie

In [3]:

def scale_data_minmax(data):
#type of scaler that scales the minimum and maximum values to be 0 and 1 respectively. 
    scaler = MinMaxScaler()
#fit_transform() then it will calculate the mean(μ) and standard deviation(σ) of the feature F at a time it will transform the data points of the feature F.    
    normalized_data =scaler.fit_transform(data)


    return normalized_data

In [4]:
def scale_data_standard(data):
    scaled_data = scale(data,axis=1)
    return scaled_data

In [5]:
def gen_tensors_list(data,dim):
    dataset  = list()
    for i in range (len(data)):
        arg = tf.convert_to_tensor(data[i], dtype=tf.float32)
        arg = tf.reshape(arg,dim)
        dataset.append(arg)

    return dataset

In [6]:
def windowing_fun(data,window_size=1000,overlap=100):
    windowed_data = [data[j][i : i + window_size] for j in range(0,len(data)) for i in range(0, len(data[j]), window_size-overlap)]
    final_data = [windowed_data[i] for i in range(len(windowed_data)) if len(windowed_data[i]) == window_size ]
    return final_data

In [7]:
def load_file_names():
    als = scipy.io.loadmat('C:/Users/lenovo/OneDrive/Documents/drive-download-20230214T190624Z-001/ALS.mat',)["files"]  #(262134, 321)
    normal = scipy.io.loadmat('C:/Users/lenovo/OneDrive/Documents/drive-download-20230214T190624Z-001/normal.mat')['files'] # (262134, 270) 
    myopathie = scipy.io.loadmat('C:/Users/lenovo/OneDrive/Documents/drive-download-20230214T190624Z-001/myopathie.mat')['files'] #(262134, 315)
    return als,normal,myopathie

In [16]:
def generate_dataframe(data,files):
    df = pd.DataFrame([], columns=['type_signal',"num_personne","muscle","num_enregistrement","signal"])
    windowed_data = [{"signal": data[j],"type_signal":files[0][j][0][5],"num_personne": files[0][j][0][6:8],"muscle": files[0][j][0][8:10],"num_enregistrement":files[0][j][0][10:12] } for j in range(0,len(data)) ]
    final_data = [windowed_data[i] for i in range(len(windowed_data))]
    df =df.append(final_data, ignore_index=True)
    #df =pd.concat([df, final_data])
    #df = pd.concat([df, pd.DataFrame.from_records(final_data)], ignore_index=True)
    return df

In [17]:
als, normal , myopathie = load_data()

In [18]:
alsfiles,normalfiles,myopathiefiles = load_file_names()

In [19]:
df = generate_dataframe(np.vstack((normal,als,myopathie)),np.concatenate((normalfiles,alsfiles,myopathiefiles),axis=1))

In [20]:
def f(row):
    return {"C":0,"A":1,"M":2}[row["type_signal"]]

df["num_class"] =df.apply(f, axis=1) 

In [21]:
df = df.query("muscle=='BB'")

In [22]:
X = df['signal'].tolist()
Y = df['num_class'].tolist()

In [23]:
X=X[170:]
Y=Y[170:]

In [24]:
del df

In [None]:
random_noise_1 = np.random.normal(0,1,len(X[0]))
random_noise_2 = np.random.normal(0,1,len(X[0]))
random_noise_3 = np.random.normal(0,1,len(X[0]))
random_noise_4 = np.random.normal(0,1,len(X[0]))
np.shape(random_noise_1)

In [17]:
data = [*X]
#data, np.shape(data)

In [None]:
data  += list(map(lambda x : x + random_noise_1, X))
data  += list(map(lambda x : x + random_noise_2, X))
data  += list(map(lambda x : x + random_noise_3, X))
#data  += list(map(lambda x : x + random_noise_4, X))
print(np.asarray(data).shape)
#print(data)

In [18]:
del  als,normal,myopathie,alsfiles,normalfiles,myopathiefiles 

In [27]:
data = scale_data_standard(data)

In [28]:
last_index_normal = len(Y) - 1 - Y[::-1].index(0)

In [29]:
last_index_als = len(Y) - 1 - Y[::-1].index(1)

In [30]:
last_index_myopathie = len(Y) - 1 - Y[::-1].index(2)

In [31]:
windowed_normal = len(windowing_fun(X[:last_index_normal+1],10000))
windowed_als = len(windowing_fun(X[last_index_normal+1:last_index_als+1],10000))
windowed_myopathie = len(windowing_fun(X[last_index_als+1:],10000))

In [32]:
Y = [0]*(windowed_normal)+[1]*(windowed_als)+[2]*(windowed_myopathie)

In [33]:
Y  = Y*4

In [34]:
X = windowing_fun(data,10000)

In [35]:
del data

In [36]:
X = scale_data_standard(X)

In [37]:
tensors_list = gen_tensors_list(X,(1,10000))

In [38]:
X_train, X_test, y_train, y_test = train_test_split( tensors_list, Y, test_size=0.04,random_state=20)
np.shape(X_train), np.shape(X_test)

((7612, 1, 10000), (318, 1, 10000))

In [39]:
model = Sequential([
    #Conv2D(#of filters, filters size, activation function)
    #in keras for the first layer we always need to mention the input shape
    Conv1D(64, 3, activation = 'relu', padding ="same", input_shape = (1,10000)), 
    
    #MaxPooling2D(filter size stride and pad can also be added)
    MaxPooling1D(2, strides = 2, padding ="same"),
    
    #LAYER2
    Conv1D(32, 3, activation = 'relu', padding ="same"),
    MaxPooling1D(2, strides = 2, padding ="same"),
    
    #FLATTEN
    Flatten(),
    
    #FC Layers:
    
    #LAYER3
    #Dense : fully connected  Dense(#neurons, activation function )
    Dense(64, activation = 'relu'),
    Dropout(0.6),
    
    #LAYER4
    Dense(3, activation = 'softmax'),
    
    
])


In [40]:
logdir = "drive/MyDrive/PFE_TENSORBOARD_DATA/augmentation0/CNN1D/logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=logdir),tf.keras.callbacks.EarlyStopping(monitor='val_sparse_categorical_accuracy',patience =60 ,min_delta=1e-3)]


In [41]:
model.compile(loss = 'sparse_categorical_crossentropy', optimizer = 'adam', metrics = ['accuracy'])

In [42]:
X_train = np.asarray(X_train)
y_train = np.asarray(y_train)
X_text = np.asarray(X_test)
y_test = np.asarray(y_test)

In [43]:
model.fit(X_train, y_train, epochs =50, batch_size = 128)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x2e5abe75b50>

In [44]:
score = model.evaluate(np.asarray(X_test),np.asarray (y_test))



In [198]:
%reload_ext tensorboard

In [199]:
%tensorboard --logdir drive/MyDrive/PFE_TENSORBOARD_DATA/augmentation0/CNN1D/logs/scalars/

Reusing TensorBoard on port 6006 (pid 12412), started 4:22:36 ago. (Use '!kill 12412' to kill it.)

In [None]:
logdir = "drive/MyDrive/PFE_TENSORBOARD_DATA/augmentation0/CNN1D/logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=logdir),tf.keras.callbacks.EarlyStopping(monitor='val_sparse_categorical_accuracy',patience =60 ,min_delta=1e-3)]

model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"],
)
history = model.fit(
    
    x = np.asarray(X_train)  ,
    y= np.asarray(y_train) ,
    validation_split=0.1,
    epochs=50,
    batch_size=64,
    callbacks=callbacks

    )

In [None]:
score = model.evaluate(np.asarray(X_test),np.asarray (y_test) ) 

In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir drive/MyDrive/PFE_TENSORBOARD_DATA/augmentation0/CNN1D/logs/scalars/

In [None]:
model.save("/content/drive/MyDrive/PFE_RESULTS_DATA/augmentation0/scaling/CNN1D/CNN1D.h5")

In [None]:
import json

with open('/content/drive/MyDrive/PFE_RESULTS_DATA/augmentation0/scaling/CNN1D/CNN1D_score.json', 'w+') as file:

    json.dump(score, file)

In [None]:
model= tf.keras.models.load_model("/content/drive/MyDrive/PFE_RESULTS_DATA/augmentation0/scaling/CNN1D/CNN1D.h5")

In [None]:
tf.keras.utils.plot_model(
    model,
    show_shapes=True
    )

In [None]:
y_pred = model.predict(np.asarray(X_test))

In [None]:
y_predicted_labels = [{0:"Normal",1:"Neuropathie",2:"Myopathie"}[i] for i in np.argmax(y_pred,axis=1)]

In [None]:
y_true_labels = [{0:"Normal",1:"Neuropathie",2:"Myopathie"}[i] for i in y_test]

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

con_mat = tf.math.confusion_matrix(labels=y_test, predictions=np.argmax(y_pred,axis=1)).numpy()
con_mat_norm = np.around(con_mat.astype('float') / con_mat.sum(axis=1)[:, np.newaxis], decimals=2)

con_mat_df = pd.DataFrame(con_mat_norm,
                     index = ["Normal","Neuropathie"], 
                     columns = ["Normal","Neuropathie"])

figure = plt.figure(figsize=(5,5))
sns.heatmap(con_mat_df, annot=True,cmap=plt.cm.Blues)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()
