In [1]:
# Python packages
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Dropout, BatchNormalization, Flatten, Dense, ReLU, Add
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping

from sklearn.metrics import accuracy_score
from sklearn.metrics import multilabel_confusion_matrix
from sklearn.metrics import classification_report
import pandas as pd
import seaborn as sns

In [2]:
# Load data
data = np.load('dados.npz')

# Training set
X_train = data['X_train']
y_train = data['y_train']

# Test set
X_test = data['X_test']
y_test = data['y_test']

In [None]:
# Data summary
print(f'X_train = {X_train.shape}')
print(f'y_train = {y_train.shape}')
print(f'X_test = {X_test.shape}')
print(f'y_test = {y_test.shape}')

In [None]:
# Network implementation
rate_drop = 0.5
initializer='he_normal'

In [None]:
# Input layer
input_layer = Input(shape=X_train.shape[1:])

In [None]:
# First layer
conv_1 = Conv1D(kernel_size=16, filters=64, strides=1, padding="same", kernel_initializer=initializer)(input_layer)
bn_1 = BatchNormalization()(conv_1)
relu_1 = ReLU()(bn_1)

In [None]:
# Second layer
conv_2 = Conv1D(kernel_size=16, filters=64, strides=1, padding="same", kernel_initializer=initializer)(relu_1)
bn_2 = BatchNormalization()(conv_2)
relu_2 = ReLU()(bn_2)
drop_1 = Dropout(rate_drop)(relu_2)
conv_3 = Conv1D(kernel_size = 16, filters=64, strides=2, padding="same", kernel_initializer=initializer)(drop_1)

In [None]:
# Short connection
short_1 = MaxPooling1D(pool_size=1, strides=2)(relu_1)

In [None]:
# Adding layers
layers = Add()([conv_3, short_1])

In [None]:
def residual_blocks(x, stride=1, num_filter = 64):
    bn_1 = BatchNormalization()(x)
    relu_1 = ReLU()(bn_1)
    drop_1 = Dropout(rate_drop)(relu_1)
    conv_1 = Conv1D(kernel_size=16, filters=num_filter, strides=1, padding="same", kernel_initializer=initializer)(drop_1)
    bn_2 = BatchNormalization()(conv_1)
    relu_2 = ReLU()(bn_2)
    drop_2 = Dropout(rate_drop)(relu_2)
    conv_2 = Conv1D(kernel_size=16, filters=num_filter, strides=stride, padding="same", kernel_initializer=initializer)(drop_2)

    if i == 3 or i == 7 or i == 11:  #Verifica se houve mudança na quantidade de número de filtros
        #Short connection
        conv_aj = Conv1D(kernel_size=16, filters=num_filter, strides=1, padding="same")(x) #Ajustar o número de filtros
        short = MaxPooling1D(pool_size = 1, strides=2)(conv_aj)
    else:
        #Short connection
        short = MaxPooling1D(pool_size = 1, strides=stride)(x)

    # Adding layers
    return Add()([conv_2, short])

In [None]:
num_filter = np.array([64, 64, 64, 128, 128, 128, 128, 192, 192, 192, 192, 256, 256, 256, 256])
for i in range(15):
    #print(f"i = {i} STRIDE = {(i % 2)+1}, FILTER LENGHT = {num_filter[i]}")
    layers = residual_blocks(layers, stride=(i % 2)+1, num_filter = num_filter[i])

In [None]:
# Last layers
# The ﬁnal fully connected layer and sigmoid activation produce a distribution 
# over the 5 output superclasses for each time-step.
bn_x = BatchNormalization()(layers)
relu_x = ReLU()(bn_x)
flat_x = Flatten()(relu_x)
dense_x = Dense(32)(flat_x)
classification = Dense(5, activation='sigmoid')(dense_x)

In [None]:
# Constructing the model
model = Model(inputs=input_layer, outputs=classification)
# model.summary()

In [None]:
#Parâmetros de otimização
lr = 0.001
epochs = 10
batch_size = 32

In [None]:
#Otimizador Adam
opt = Adam(lr, beta_1=0.9, beta_2=0.999)

In [None]:
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])

In [None]:
callbacks = []

# filepath="weights-improvement-{epoch:02d}-{val_accuracy:.2f}.hdf5"

#Reduz a taxa de aprendizagem quando o erro de validação para de melhorar
callbacks.append(ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=lr/10000))
# callbacks.append(EarlyStopping(monitor='val_accuracy', mode='max', verbose=1, patience=10))
# callbacks.append(ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max'))

In [None]:
callbacks

In [None]:
#Treino do modelo
history = model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(X_test, y_test), callbacks=callbacks)

Test: open the hdf5 files

In [None]:
import h5py

In [None]:
hf = h5py.File('C:/Users/sarah/TCC/5-Arquiteturas/Weights improvement/weights-improvement-08-0.89.hdf5','r')
# ls = list(hf.keys())
# hf.close()

In [None]:
ls = list(hf.keys())
ls

In [None]:
model_weights = hf.get(ls[0])

In [None]:
optimizer_weights = hf.get(ls[1])

In [None]:
model_weight = np.array(model_weights)

In [None]:
optimizer_weight = np.array(optimizer_weights)

______________________

In [None]:
# Plot results
plt.plot(history.epoch, history.history['loss'], '-o')
plt.plot(history.epoch, history.history['val_loss'], '-*')

plt.xlabel('Epochs')
plt.ylabel('Cost')
plt.legend(['Training set', 'Validation set'])
local = 'E:/Usuários/Sarah/Documentos/UTFPR/TCC/Resultados/Gráficos/Custo_atual_callbacks'
plt.savefig(local)
plt.show()

In [None]:
plt.plot(history.epoch, history.history['accuracy'], '-o')
plt.plot(history.epoch, history.history['val_accuracy'], '-*')

plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend(['Training set', 'Validation set'])
local = 'E:/Usuários/Sarah/Documentos/UTFPR/TCC/Resultados/Gráficos/Acuracia_atual_callbacks'
plt.savefig(local)
plt.show()

In [None]:
score = model.evaluate(X_test, y_test)
print(f"Custo de teste = {score[0]:.4f}")
print(f"Acurácia de teste = {100*score[1]:.2f}%")

In [None]:
#Prediction of the model
prediction = model.predict(X_test) #Realiza a predição das probabilidades de cada label
prediction_bin = np.array(prediction)

In [None]:
#Given the probabilities of each label, if it is higher or equal to 0.5, that is the class of the diagnostic
for indice in range(prediction_bin.shape[0]):
    for i in range(prediction_bin.shape[1]):
        if prediction_bin[indice][i] >= 0.5:
            prediction_bin[indice][i] = 1
        else:
            prediction_bin[indice][i] = 0
prediction_bin = np.array(prediction_bin,dtype='int')
#Example of index 67
print(f'Prediction: {prediction_bin[67]} \t\tReal Label: {y_test[67]}')

In [None]:
#List with the labels strings
label_string = ['NORM','MI','CD','STTC','HYP']

In [None]:
#Transform the binary vector in a list with strings (could be y_test or the predictions)
def get_strings(label_string,label_bin):
    label_bin_string = []
    for x in range(len(label_bin)):
        lst = []
        for y in range(len(label_string)):
            value = label_bin[x][y]
            if value == 1:
                lst.append(label_string[y])
        label_bin_string.append(lst)
    
    return label_bin_string

In [None]:
#Y_test string labels
y_test_string = get_strings(label_string, y_test)

#Predictions strings labels
prediction_string = get_strings(label_string, prediction_bin)

In [None]:
#Visualizing an example
index = 67
print(f'Index: {index}\n')

print(f'Diagnostic = {y_test[index]}')
print(f'Prediction = {prediction_bin[index]}\n')

print(f'Diagnostic = {y_test_string[index]}')
print(f'Prediction = {prediction_string[index]}')

In [None]:
#Accuracy from the example below
acc_index = accuracy_score(y_test[index],prediction_bin[index])
print(f'Example accuracy = {100 * acc_index:.2f}%')

In [None]:
#Plot the example with the diagnostic and prediction
valor_med = X_test[index, ].mean(axis=-1)
fig_s, ax_s = plt.subplots(figsize=(10,7))
ax_s.set_title(f'Diagnostic: {y_test_string[index]}       Prediction:{prediction_string[index]}')
ax_s.plot(valor_med)
plt.show()

In [None]:
#Another metrics
report = classification_report(y_test,prediction_bin,output_dict=True,target_names=['NORM', 'MI', 'CD', 'STTC', 'HYP'])
report_df = pd.DataFrame.from_dict(report, orient='index')
report_df

In [None]:
#Confusion marix
cm = multilabel_confusion_matrix(y_test, prediction_bin)

# https://www.kaggle.com/code/trolukovich/multi-label-classification-keras/notebook
# Plot confusion matrix 
fig = plt.figure(figsize = (14, 8))
for i, (label, matrix) in enumerate(zip(label_string, cm)):
    plt.subplot(f'23{i+1}')
    labels = [f'Not {label}', label]
    sns.heatmap(matrix, annot = True, square = True, fmt = 'd', cbar = False, cmap = 'Blues', 
                xticklabels = labels, yticklabels = labels, linecolor = 'black', linewidth = 1)
    plt.title(label)

plt.tight_layout()
plt.show()