In [3]:
import os
import random
import numpy as np
import scipy.io as sio
import pandas as pd
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.model_selection import StratifiedKFold, train_test_split
import sklearn.preprocessing


import tensorflow as tf

def loadMatlabData(filePath):
    fileName = filePath + 'stretch_press_data.mat'
    
    ###============= Load Matlab files
    contentsMat = sio.loadmat(fileName)
    x_data = contentsMat['x_data']
    y_data = contentsMat['y_data']
    
    return x_data, y_data

def dnn_model():
    input1 = tf.keras.layers.Input(shape=(100,1,1), name='stretch_press')
    input2 = tf.keras.layers.Input(shape=(1), name='amplitude')
    input3 = tf.keras.layers.Input(shape=(1), name='time')
    
    x_concat1 = same_model(input1)
    x_concat2 = input2
    x_concat3 = input3
    
    x = tf.keras.layers.Concatenate()([x_concat1,x_concat2,x_concat3])
    x = tf.keras.layers.Dense(128, activation='relu', name='FC1')(x)
    output = tf.keras.layers.Dense(8, activation='softmax', name='Output')(x)
    
    model= tf.keras.models.Model(inputs=[input1, input2, input3], outputs=output)
    #model = tf.keras.applications.resnet.ResNet50(weights=None, input_tensor=tf.keras.layers.Input(shape=(100, 1, 1)), classes=8)
    #model.summary()
    
    return model
def same_model(input):
    x = tf.keras.layers.Conv2D(filters=16, kernel_size=(2,1), activation = 'relu', padding='same', name='CV1')(input)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.AveragePooling2D(pool_size = (3,1), strides = (2,1), name='MP1')(x)
    x = tf.keras.layers.Conv2D(filters=32, kernel_size=(2,1), activation = 'relu', padding='same', name='CV2')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.AveragePooling2D(pool_size = (3,1), strides = (2,1), name='MP2')(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    x = tf.keras.layers.Flatten(name='Flatten')(x)
    
    return x

def one_hot(y_, n_classes=6):
    # Function to encode neural one-hot output labels from number indexes
    # e.g.:
    # one_hot(y_=[[5], [0], [3]], n_classes=6):
    #     return [[0, 0, 0, 0, 0, 1], [1, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0]]

    y_ = y_.reshape(len(y_))
    return np.eye(n_classes)[np.array(y_, dtype=np.int32)]  # Returns FLOATS 

def decode_one_hot(y_, n_classes=6):
    new_y = np.zeros([int(y_.size/n_classes)])
    for i in range(0,int(y_.size/n_classes)):
        max = 0
        for j in range(0,n_classes):
            if(y_[i,max] < y_[i,j]):
                max = j
        new_y[i] = max;
        
    return new_y  # Returns FLOATS 

def reshape_input(x_train, y_train, x_valid, y_valid, x_test, y_test):
    
    x_train = x_train.reshape(int(x_train.size/102),102,1)
    y_train = y_train.reshape(int(y_train.size),1,1)
    x_valid = x_valid.reshape(int(x_valid.size/102),102,1)
    y_valid = y_valid.reshape(int(y_valid.size),1,1) 
    x_test = x_test.reshape(int(x_test.size/102),102,1)
    y_test = y_test.reshape(int(y_test.size),1,1) 
                
    train_time = x_train[:, 100, :].flatten()
    valid_time = x_valid[:, 100, :].flatten()
    test_time = x_test[:, 100, :].flatten()
    
    train_time = train_time.reshape(train_time.size,1)
    valid_time = valid_time.reshape(valid_time.size,1)
    test_time = test_time.reshape(test_time.size,1)
    
    ss1 = sklearn.preprocessing.StandardScaler()
    ss1.fit(train_time)
    train_time = ss1.transform(train_time)
    valid_time = ss1.transform(valid_time)
    test_time = ss1.transform(test_time)
    
    train_amplitude = x_train[:, 101, :].flatten()
    valid_amplitude = x_valid[:, 101, :].flatten()
    test_amplitude = x_test[:, 101, :].flatten()
    
    train_amplitude = train_amplitude.reshape(train_amplitude.size,1)
    valid_amplitude = valid_amplitude.reshape(valid_amplitude.size,1)
    test_amplitude = test_amplitude.reshape(test_amplitude.size,1)
    
    ss2 = sklearn.preprocessing.StandardScaler()
    ss2.fit(train_amplitude)
    train_amplitude = ss2.transform(train_amplitude)
    valid_amplitude = ss2.transform(valid_amplitude)
    test_amplitude = ss2.transform(test_amplitude)
    
    return x_train, y_train, x_valid, y_valid, x_test, y_test, train_time, valid_time, test_time, train_amplitude, valid_amplitude, test_amplitude
    
def K_fold(x_train, y_train, x_test, y_test):
    num_folds = 0
    str_kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=44)
    accs = []
    y_train = y_train-1
    y_test = y_test-1
    
    for train_idx, valid_idx in str_kf.split(x_train, y_train):
        num_folds += 1
        print(f'--------------------{num_folds}번째 KFold-------------------')
        print(f'train_idx_len : {len(train_idx)} / valid_idx_len : {len(valid_idx)}')

        data_train, data_valid = x_train[train_idx], x_train[valid_idx]
        label_train, label_valid = y_train[train_idx], y_train[valid_idx]
        
        # Data augmentation
        aug_xtrain, aug_ytrain = data_aug(data_train,label_train, [0, 1, 2, 3, 4, 5, 6, 7])
                    
        # Data shuffle
        tmp = [[x,y] for x,y in zip(aug_xtrain,aug_ytrain)]
        random.shuffle(tmp)
        aug_xtrain = [n[0] for n in tmp]
        aug_ytrain = [n[1] for n in tmp]
        aug_xtrain = np.array(aug_xtrain)
        aug_ytrain = np.array(aug_ytrain)
        
        aug_xtrain, aug_ytrain, data_valid, label_valid, x_test1, y_test1, train_time, valid_time, test_time, train_amplitude, valid_amplitude, test_amplitude = reshape_input(aug_xtrain, aug_ytrain, data_valid, label_valid, x_test, y_test)
        
        #model
        model = dnn_model()
        
        callback_list = [
            tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', verbose=0, patience=10),
            tf.keras.callbacks.ModelCheckpoint(filepath='model_best_fold' + str(num_folds) + '.h5', monitor='val_loss', mode='min', verbose=0, save_best_only=True, save_weights_only=True),
        ]

        Input1 = aug_xtrain[:, 0:100, :]
        Input2 = aug_xtrain[:, 100, :].flatten()
        Input3 = aug_xtrain[:, 101, :].flatten()
        Val_Input1 = data_valid[:, 0:100, :]
        Val_Input2 = data_valid[:, 100, :].flatten()
        Val_Input3 = data_valid[:, 101, :].flatten()
        
        Input2 = train_time
        Input3 = train_amplitude
        Val_Input2 = valid_time
        Val_Input3 = valid_amplitude
                       
        model.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])
        model.fit([Input1, Input2, Input3], aug_ytrain, batch_size=32, validation_data = ([Val_Input1, Val_Input2, Val_Input3],label_valid) , epochs=100, callbacks=callback_list)
        vPred = model.predict([x_test1[:,0:100,:], test_time, test_amplitude])        
        loss, acc = model.evaluate([x_test1[:,0:100,:], test_time, test_amplitude], y_test1)
        accs.append(acc)
        print(acc)
        
        Pred = decode_one_hot(vPred,8)
        result = pd.DataFrame({'Pred': Pred, 'label':y_test1.flatten()})
        result.to_csv("./Stretch_press/Result/" + 'Result'+str(num_folds)+'.csv')
        
        Pred = decode_one_hot(vPred)
        # Pred = one_hot(Pred)
                
        accuracy, precision, recall, spec, roc_auc, bal_acc, f1 = get_clf_eval(label_valid, Pred, vPred, [0, 1, 2, 3, 4, 5, 6, 7])
        
    return aug_xtrain, aug_ytrain, x_test1, y_test1, test_time, test_amplitude
        
def data_aug(x_data,y_data, labels):
    random.seed(1)
    
    # find maximum
    max_count = 0
    for i in labels:
        print(np.size(np.where(y_data == i)))
        if(max_count < np.size(np.where(y_data == i)[0])):
            max_count = np.size(np.where(y_data == i)[0])
    
    
    # augmentation
    new_data = np.zeros((1,102))
    new_ydata = np.zeros((1,1))
    for i in labels:
        aug_num = max_count - np.size(np.where(y_data == i)[0])
        aug_data = np.where(y_data == i)[0]
        for j in range(aug_num):
            rand_aug = random.randrange(np.shape(aug_data)[0])
            rand_num = random.randrange(-5,6)
            if(rand_num < 0):
                new_data[0][0:-rand_num] = x_data[aug_data[rand_aug]][100+rand_num:100]
                new_data[0][-rand_num:100] = x_data[aug_data[rand_aug]][0:100+rand_num]
                new_data[0][100:102] = x_data[aug_data[rand_aug]][100:102]
            else : 
                new_data[0][0:100-rand_num] = x_data[aug_data[rand_aug]][rand_num:100]
                new_data[0][100-rand_num:100] = x_data[aug_data[rand_aug]][0:rand_num]
                new_data[0][100:102] = x_data[aug_data[rand_aug]][100:102]
            x_data = np.concatenate((x_data,new_data))
            new_ydata[0][0] = i
            y_data = np.concatenate((y_data,new_ydata))
    
    return x_data, y_data   
        
def get_clf_eval(y_test, pred=None, pred_proba=None, labels=[0, 1]):
    confusion = confusion_matrix(y_test, pred, labels)
    accuracy = accuracy_score(y_test, pred)
    precision = precision_score(y_test, pred)
    recall = recall_score(y_test, pred)
    f1 = f1_score(y_test, pred)
    roc_auc = roc_auc_score(y_test, pred_proba)

    spec = []
    bal_acc = []
    recall = []
    for i in labels:
        confusion_col = 0
        confusion_row = 0
        for j in labels:
            confusion_col += confusion[i,j]
            confusion_row += confusion[j,i]
        spec.append(confusion[i,i] / confusion_col)
        recall.append(confusion[i,i] / confusion_row)
        bal_acc.append((recall[i] + spec[i]) / 2)
    
    
    print('오차 행렬')
    print(confusion)
    print('정확도: {0:.4f}, 정밀도: {1:.4f}, 재현율(Sensitivity): {2:.4f}, 특이성(Specificity): {3:.4f}, F1: {4:.4f}, AUC: {5:.4f}, bal_acc: {6:.4f}'.format(accuracy, precision, recall, spec, f1, roc_auc, bal_acc))
    return accuracy, precision, recall, spec, roc_auc, bal_acc, f1
        

In [4]:
x_data, y_data = loadMatlabData("./Stretch_press/data/")
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data,test_size = 0.2, stratify=y_data , shuffle=True, random_state=99)

# result = pd.DataFrame(x_train.squeeze(),y_train.squeeze())
# result.to_csv("./Stretch_press/Result/" + 'Result1.csv')

#### Kfold
x_train, y_train, x_test, y_test, test_time, test_amplitude = K_fold(x_train, y_train, x_test, y_test)

# result = pd.DataFrame(x_train.squeeze(),y_train.squeeze())
# result.to_csv("./Stretch_press/Result/" + 'Result1.csv')

#### load weight
model = dnn_model()
model.load_weights('model_best_fold2.h5')
vPred = model.predict([x_test[:, 0:100, :], test_time, test_amplitude]).squeeze()
Pred = decode_one_hot(vPred,8)
result = pd.DataFrame({'Pred': Pred, 'label':y_test.flatten()})
result.to_csv("./Stretch_press/Result/" + 'Result.csv')

FileNotFoundError: [Errno 2] No such file or directory: './Stretch_press/data/stretch_press_data.mat'