In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import numpy as np
import pandas as pd
import scipy.io as sio
import matplotlib.pyplot as plt
# import seaborn as sns

In [8]:
DATA_PATH = 'data/training2017/'
LABEL_PATH = data_path + 'REFERENCE.csv'

# lower bound of the length of the signal
LB_LEN_MAT = 100

# upper bound of the length of the signal
UB_LEN_MAT = 10100

LABELS = ["N", "A", "O"]
n_classes = len(LABELS) + 1

np.random.seed(7)

In [3]:
def value_of_mat(mat_filename):    
    """
    load the mat file and return the data.
    sio.loadmat returns a dict and 'val' means value.
    """
    
    return sio.loadmat(mat_filename)["val"][0, :]

def len_of_mat(mat_filename):
    return len(value_of_mat(mat_filename))

def plot_ecg(mat_filename, time_interval=1000):
    ecg_signal = list(value_of_mat(mat_filename))
    plt.plot(ecg_signal[:time_interval])

In [4]:
files = [f for f in os.listdir(data_path) if os.path.isfile(os.path.join(data_path, f))]

mat_files = [f for f in files if f.startswith("A") and f.endswith('.mat')]

# filter out short mat_files
mat_files = [f for f in mat_files if len_of_mat(os.path.join(data_path, f)) >= LB_LEN_MAT]

n_sample = len(mat_files)
print('Total training size is ', n_sample)

Total training size is  8528


# load signals as x

In [5]:
def duplicate_padding(signals, UB_LEN_MAT):
    """
    padding the signals not with zeros but the copy of the signal.
    
    :param: signals: list of np.array with 1 dimension.
        more general, it should be a list of objects, which has length and can be concatenate.
    :param: UB_LEN_MAT: int
    """
    
    X = np.zeros((len(signals), UB_LEN_MAT))
    for i, sig in enumerate(signals):
        if  len(sig) >= UB_LEN_MAT:
            X[i, :] = sig[0: UB_LEN_MAT]
        else:
            sig_copy_section = sig[0: (UB_LEN_MAT - len(sig))]
            sig_replay = np.hstack((sig, sig_copy_section))  # np.concatenate()

            # concatenate copied signal to original signal until its length meets the upper bound
            while len(sig_replay) < UB_LEN_MAT:
                sig_copy_section = sig[0:(UB_LEN_MAT - len(sig_replay))]
                sig_replay = np.hstack((sig_replay, sig_copy_section))

            X[i, :] = sig_replay
    return X

In [9]:
signals = [value_of_mat(os.path.join(DATA_PATH, f)) for f in mat_files]

In [10]:
X = duplicate_padding(signals, UB_LEN_MAT)

# load labels as Y

In [None]:
def num2onehot(number, length):
    x = np.zeros(length)
    x[number] = 1
    return x

def num2onehot_for_list(a_list):
    length = max(a_list) + 1
    return np.array([num2onehot(number, length) for number in a_list])

def onehot2num_for_list(onehot_array):
    return [list(onehot).index(1) for onehot in onehot_array]

In [None]:
df_label = pd.read_csv(label_path, sep=',', header=None, names=None)
df_label.columns = ["sigID", "label"]
df_label = df_label.set_index("sigID")

In [None]:
signal_IDs = [f.split(".")[0] for f in mat_files]
labels = [df_label.loc[sigID, "label"] for sigID in signal_IDs]

In [None]:
label_ids = [LABELS.index(l) if l in LABELS else 3 for l in labels]
Y = num2onehot_for_list(label_ids)

# some data preprocessing

In [None]:
X = (X - X.mean())/(X.std()) 
X = np.expand_dims(X, axis=2) 

# shuffle data

In [None]:
values = [i for i in range(len(X))]
permutations = np.random.permutation(values)
X = X[permutations, :]
Y = Y[permutations, :]

# train test split

In [None]:
train_test_ratio = 0.9

X_train = X[:int(train_test_ratio * n_sample), :]
Y_train = Y[:int(train_test_ratio * n_sample), :]
X_test  = X[int(train_test_ratio * n_sample):, :]
Y_test  = Y[int(train_test_ratio * n_sample):, :]

# load model and train it

In [None]:
from models.Conv1d import Conv1d
from keras.callbacks import ModelCheckpoint

In [None]:
model = Conv1d(UB_LEN_MAT)

In [None]:
checkpointer = ModelCheckpoint(filepath='./trained_models/Best_model.h5',
                               monitor='val_acc',
                               verbose=1,
                               save_best_only=True)

# print("x shape", X_train.shape)
# print("y shape", Y_train.shape)

hist = model.fit(X_train, Y_train,
                 validation_data=(X_test, Y_test),
                 batch_size=275,
                 epochs=3,
                 verbose=2,
                 shuffle=True,
                 callbacks=[checkpointer])

# evaluation

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score

In [None]:
predictions = model.predict(X_test)

In [None]:
score = accuracy_score(onehot2num_for_list(Y_test), predictions.argmax(axis=1))
print('Last epoch\'s validation score is ', score)

# save some results

In [None]:
df = pd.DataFrame(predictions.argmax(axis=1))
df.to_csv('./trained_models/Preds_' + str(format(score, '.4f')) + '.csv', index=None, header=None)

In [None]:
confusion_matrix = confusion_matrix(onehot2num_for_list(Y_test), predictions.argmax(axis=1))
df = pd.DataFrame(confusion_matrix)
df.to_csv('./trained_models/Result_Conf' + str(format(score, '.4f')) + '.csv', index=None, header=None)