In [None]:
from imblearn.under_sampling import CondensedNearestNeighbour,RandomUnderSampler
from sklearn.preprocessing import OneHotEncoder, minmax_scale
from sklearn.model_selection import train_test_split

from tensorflow.keras import layers, models,Model
from sklearn.metrics import confusion_matrix
from imblearn.over_sampling import SMOTE


import matplotlib.pyplot as plt
from keras import backend as K
import tensorflow as tf
import seaborn as sns
import pandas as pd
import numpy as np
import glob

#####

from hopefullnet_files.general_processor import Utils
from hopefullnet_files.models import HopefullNet
from custom_utils import CustomUtils


> **!! the template for normalization and oversampling is not mine, and hopefullnet is an neural network for motor imagery EEGs classification described here https://github.com/Kubasinska/MI-EEG-1D-CNN/blob/master/docs/hopefullnet.png !!**

***
## extract data:
data extraction variables:

In [None]:

SAMP_RATE = 250
FRAME_WIDTH = 640#4*SAMP_RATE   # in number of samples
FRAME_STEP = 10              # jumps between frames, in nb of samples


In [None]:
frames,classes=[],[]

# storing filenames in a list
files = glob.glob("./dataset Nathan/*/*") 
print(files[1].split("/")[-2])
# extract data for every CSV files:
classes_names = {'main gauche':0,'main droite':1,'deux mains':2,'deux pieds':3}
        
for filename in files:
    CustomUtils.get_data_from_csv(filename,frames,classes,['EEG 3', 'EEG 4'],SAMP_RATE,FRAME_WIDTH,FRAME_STEP,classes_names)
 

visualisation:

In [None]:
CustomUtils.viz(np.array(frames[-1]).T,-100,100)

In [None]:
x,y = np.array(frames),np.array(classes)
print(x.shape)

### normalization + oversampling
reshaping puis division en train + test:

In [None]:

y_one_hot  = Utils.to_one_hot(y, by_sub=False)

reshaped_x = x.reshape(x.shape[0], x.shape[1] * x.shape[2])

#separate a test set
x_train_raw, x_valid_test_raw, y_train_raw, y_valid_test_raw = train_test_split(reshaped_x,
                                                                            y_one_hot,
                                                                            stratify=y_one_hot,
                                                                            test_size=0.20,
                                                                            random_state=42) #1

normalization (scaling):

In [None]:
#Scale indipendently train/test
x_train_scaled_raw = minmax_scale(x_train_raw, axis=1) #2 <- 1
x_test_valid_scaled_raw = minmax_scale(x_valid_test_raw, axis=1)#3 <-1



division du jeu de test,
oversampling (equilibrer le jeu de données, algo **SMOTE**)

In [None]:

#Create Validation/test
x_valid_raw, x_test_raw, y_valid, y_test = train_test_split(x_test_valid_scaled_raw,
                                                    y_valid_test_raw,
                                                    stratify=y_valid_test_raw,
                                                    test_size=0.50,
                                                    random_state=42) #4 <- 2,3,1

#apply smote to train data
print('classes count')
print ('before oversampling = {}'.format(y_train_raw.sum(axis=0)))

# smote
sm = SMOTE(random_state=42)

# undersampling
undersample = RandomUnderSampler(sampling_strategy='all')

x_train_smote_raw, y_train = sm.fit_resample(x_train_scaled_raw, y_train_raw)
print('classes count')
print ('before oversampling = {}'.format(y_train_raw.sum(axis=0)))
print ('after oversampling = {}'.format(y_train.sum(axis=0)))


remise en forme d'origine:

In [None]:
x_train = x_train_smote_raw.reshape(x_train_smote_raw.shape[0], int(x_train_smote_raw.shape[1]/2), 2).astype(np.float64)

x_valid = x_valid_raw.reshape(x_valid_raw.shape[0], int(x_valid_raw.shape[1]/2),2).astype(np.float64)
x_test = x_test_raw.reshape(x_test_raw.shape[0], int(x_test_raw.shape[1]/2),2).astype(np.float64)

***
# transfer learning
à partir du modèle entrainé ***hopefullnet***
* chargement du modèle, de ses poids
* remplacement de la dernière couche
* essai bloquage des poids des premières couches

In [None]:
model = HopefullNet_res()
model.build(input_shape=(None,640,2))
model.load_weights('./modelcheckpts/')
#print(model.summary())

In [None]:
model.out = layers.Dense(4, activation='softmax')
#for layer in model.layers[:3]:
#  layer.trainable = False
#model.load_weights('./unicornsave_3/')

***
# entrainement du modèle 
hopefull net

In [None]:
model.compile(optimizer='adam',
              loss=tf.keras.losses.CategoricalCrossentropy(), #from_logits=True => softmax embedded in the loss function.
              metrics=['accuracy'])

In [None]:
checkpoint = tf.keras.callbacks.ModelCheckpoint( # set model saving checkpoints
    "./unicornsave", # set path to save model weights
    monitor='val_loss', # set monitor metrics
    verbose=1, # set training verbosity
    save_best_only=True, # set if want to save only best weights
    save_weights_only=False, # set if you want to save only model weights
    mode='auto', # set if save min or max in metrics
    period=100 # interval between checkpoints
    )

earlystopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', # set monitor metrics
    min_delta=0.0001, # set minimum metrics delta
    patience=10, # number of epochs to stop training
    restore_best_weights=True, # set if use best weights or last weights
    )
callbacksList = [checkpoint, earlystopping] # build callbacks list

In [None]:

K.set_value(model.optimizer.learning_rate, 0.0001)
history = model.fit(x_train, y_train, epochs=1000, 
                    validation_data=(x_valid, y_valid),batch_size=10, callbacks=callbacksList)

courbes, losses and accuracies:

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.show()

plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.show()

In [None]:
model.save('unicornmodel')

In [None]:
testLoss, testAcc = model.evaluate(x_test, y_test)

Y_test = np.argmax(y_test, axis=1) # Convert one-hot to index
y_pred = np.argmax(model.predict(x_test),axis=1)

print(confusion_matrix(Y_test, y_pred))
sns.heatmap(confusion_matrix(Y_test, y_pred));  #annot=True to annotate cells, ftm='g' to disable scientific notation

In [None]:
# del(model)
# model = tf.keras.models.load_model("./unicornmodel/")

- let's try with other recordings ( records that were not mixed in the train test splits)

In [None]:
frames_t,classes_t=[],[]

# storing filenames in a list
files = glob.glob("/test_data/*/*") 

# extract data for every CSV files:
for filename in files:
    CustomUtils.get_data_from_csv(filename,frames_t,classes_t,['EEG 3', 'EEG 4'],SAMP_RATE,FRAME_WIDTH,FRAME_STEP)

xt,yt = np.array(frames_t),np.array(classes_t)

yt_one_hot  = Utils.to_one_hot(yt, by_sub=False)
#Reshape for scaling
reshaped_xt = xt.reshape(xt.shape[0], xt.shape[1] * xt.shape[2])
xt_raw = minmax_scale(reshaped_xt, axis=1)#3 <-1
xt_final = xt_raw.reshape(xt_raw.shape[0], int(xt_raw.shape[1]/2), 2).astype(np.float64)

eval, confusion matrix?

In [None]:
testLoss, testAcc = model.evaluate(xt_final,yt_one_hot)

Y_test = np.argmax(yt_one_hot, axis=1) # Convert one-hot to index
y_pred = np.argmax(model.predict(xt_final),axis=1)
print(confusion_matrix(Y_test, y_pred))

---

#### do we have a chance to understand what is learnt?

displaying feature importance... over 1d data

In [None]:
img,itsclass = np.array([x_test[10]]),np.array([y_test[0:10]])

images = tf.Variable(img, dtype=float)

with tf.GradientTape() as tape:
    pred = model(images, training=False)
    class_idxs_sorted = np.argsort(pred.numpy().flatten())[::-1]
    loss = pred[0][class_idxs_sorted[0]]
    
grads = tape.gradient(loss, images)
dgrad_abs = tf.math.abs(grads)
dgrad_max_ = np.max(dgrad_abs, axis=2)[0]

In [None]:
arr_min, arr_max  = np.min(dgrad_max_), np.max(dgrad_max_)
grad_eval = (dgrad_max_ - arr_min) / (arr_max - arr_min + 1e-18)

In [None]:
t = [i for i in range(len(img))]
for channel in img:
    plt.plot(channel*5)


plt.plot((grad_eval)*2-5,alpha=0.8)
plt.ylim(-10,10)
plt.show()