In [27]:
import numpy as np
import pandas as pd
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten,Dropout
from keras.layers import Conv2D,LSTM, BatchNormalization,MaxPooling2D,Reshape, GRU
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')

# Loading Data

In [28]:
def load_data():

    X_test = np.load("data/X_test.npy")
    y_test = np.load("data/y_test.npy")
    person_train_valid = np.load("data/person_train_valid.npy")
    person_train_valid = person_train_valid.reshape(2115)
    X_train_valid = np.load("data/X_train_valid.npy")
    y_train_valid = np.load("data/y_train_valid.npy")
    person_test = np.load("data/person_test.npy")

    return X_test,y_test,person_train_valid,X_train_valid,y_train_valid,person_test

# Prep and Preprocessing

In [29]:
def data_prep(X,y,p,sub_sample,average,noise):
    
    total_X = None
    total_y = None
    total_p = None
    # Trimming the data (sample,22,1000) -> (sample,22,500)
    X = X[:,:,0:500]
    print('Shape of X after trimming:',X.shape)
    
    # Maxpooling the data (sample,22,1000) -> (sample,22,500/sub_sample)
    X_max = np.max(X.reshape(X.shape[0], X.shape[1], -1, sub_sample), axis=3)
    
    
    total_X = X_max
    total_y = y
    total_p = p

    print('Shape of X after maxpooling:',total_X.shape)
    
    # Averaging + noise 
    X_average = np.mean(X.reshape(X.shape[0], X.shape[1], -1, average),axis=3)
    X_average = X_average + np.random.normal(0.0, 0.5, X_average.shape)
    
    total_X = np.vstack((total_X, X_average))
    total_y = np.hstack((total_y, y))
    total_p = np.hstack((total_p, p))
    print('Shape of X after averaging+noise and concatenating:',total_X.shape)
    
    # Subsampling
    
    for i in range(sub_sample):
        
        X_subsample = X[:, :, i::sub_sample] + \
                            (np.random.normal(0.0, 0.5, X[:, :,i::sub_sample].shape) if noise else 0.0)
            
        total_X = np.vstack((total_X, X_subsample))
        total_y = np.hstack((total_y, y))
        total_p = np.hstack((total_p, p))
        
    
    print('Shape of X after subsampling and concatenating:',total_X.shape)
    return total_X,total_y, total_p

In [35]:
## Preprocessing the dataset
def preprocessing(X_test,y_test,person_train_valid,X_train_valid,y_train_valid,person_test):

    y_train_valid -= 769
    y_test -= 769
    
    X_train_valid_prep,y_train_valid_prep,person_train_valid_prep = data_prep(X_train_valid,y_train_valid,person_train_valid,2,2,True)
    X_test_prep,y_test_prep,person_test_prep = data_prep(X_test,y_test,person_test,2,2,True)
    stratif_labels = []
    
    for i in range(person_train_valid_prep.shape[0]):
        stratif_labels.append(str(person_train_valid_prep[i].astype('int'))+str(y_train_valid_prep[i]))
    print(y_train_valid_prep.shape)
    print(X_test_prep.shape)
    print(y_test_prep.shape)

    from sklearn.model_selection import train_test_split
    
    total_size = y_train_valid_prep.shape[0]
    num_samples = int(total_size*0.1773)
    
    x_train, x_valid, y_train, y_valid = train_test_split(X_train_valid_prep,y_train_valid_prep,test_size=num_samples/total_size,stratify=stratif_labels)

    print('Shape of training set:',x_train.shape)
    print('Shape of validation set:',x_valid.shape)
    print('Shape of training labels:',y_train.shape)
    print('Shape of validation labels:',y_valid.shape)


    # Converting the labels to categorical variables for multiclass classification
    y_train = to_categorical(y_train, 4)
    y_valid = to_categorical(y_valid, 4)
    y_test = to_categorical(y_test_prep, 4)
    print('Shape of training labels after categorical conversion:',y_train.shape)
    print('Shape of validation labels after categorical conversion:',y_valid.shape)
    print('Shape of test labels after categorical conversion:',y_test.shape)

    # Adding width of the segment to be 1
    x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], x_train.shape[2], 1)
    x_valid = x_valid.reshape(x_valid.shape[0], x_valid.shape[1], x_train.shape[2], 1)
    x_test = X_test_prep.reshape(X_test_prep.shape[0], X_test_prep.shape[1], X_test_prep.shape[2], 1)
    print('Shape of training set after adding width info:',x_train.shape)
    print('Shape of validation set after adding width info:',x_valid.shape)
    print('Shape of test set after adding width info:',x_test.shape)


    # Reshaping the training and validation dataset
    x_train = np.swapaxes(x_train, 1,3)
    x_train = np.swapaxes(x_train, 1,2)
    x_valid = np.swapaxes(x_valid, 1,3)
    x_valid = np.swapaxes(x_valid, 1,2)
    x_test = np.swapaxes(x_test, 1,3)
    x_test = np.swapaxes(x_test, 1,2)
    print('Shape of training set after dimension reshaping:',x_train.shape)
    print('Shape of validation set after dimension reshaping:',x_valid.shape)
    print('Shape of test set after dimension reshaping:',x_test.shape)
    return y_train, y_valid, y_test, x_train, x_valid, x_test


# Generating Models

In [36]:
# Building the CNN model using sequential class
def generate_basic_cnn_model():
    basic_cnn_model = Sequential()

    # Conv. block 1
    basic_cnn_model.add(Conv2D(filters=100, kernel_size=(10,1), padding='same', activation='elu', input_shape=(250,1,22)))
    basic_cnn_model.add(MaxPooling2D(pool_size=(3,1), padding='same')) # Read the keras documentation
    basic_cnn_model.add(BatchNormalization())
    basic_cnn_model.add(Dropout(0.5))

    # Conv. block 2
    basic_cnn_model.add(Conv2D(filters=100, kernel_size=(10,1), padding='same', activation='elu'))
    basic_cnn_model.add(MaxPooling2D(pool_size=(3,1), padding='same'))
    basic_cnn_model.add(BatchNormalization())
    basic_cnn_model.add(Dropout(0.5))

    # Conv. block 3
    basic_cnn_model.add(Conv2D(filters=100, kernel_size=(10,1), padding='same', activation='elu'))
    basic_cnn_model.add(MaxPooling2D(pool_size=(3,1), padding='same'))
    basic_cnn_model.add(BatchNormalization())
    basic_cnn_model.add(Dropout(0.5))

    # Conv. block 4
    basic_cnn_model.add(Conv2D(filters=100, kernel_size=(10,1), padding='same', activation='elu'))
    basic_cnn_model.add(MaxPooling2D(pool_size=(3,1), padding='same'))
    basic_cnn_model.add(BatchNormalization())
    basic_cnn_model.add(Dropout(0.5))

    # Output layer with Softmax activation
    basic_cnn_model.add(Flatten()) # Flattens the input
    basic_cnn_model.add(Dense(4, activation='softmax')) # Output FC layer with softmax activation

    return basic_cnn_model

In [37]:
def generate_cnn_lstm_hybrid_model():
    hybrid_cnn_lstm_model = Sequential()

    # Conv. block 1
    hybrid_cnn_lstm_model.add(Conv2D(filters=100, kernel_size=(10,1), padding='same', activation='elu', input_shape=(250,1,22)))
    hybrid_cnn_lstm_model.add(MaxPooling2D(pool_size=(3,1), padding='same')) # Read the keras documentation
    hybrid_cnn_lstm_model.add(BatchNormalization())
    hybrid_cnn_lstm_model.add(Dropout(0.5))

    # Conv. block 2
    hybrid_cnn_lstm_model.add(Conv2D(filters=100, kernel_size=(10,1), padding='same', activation='elu'))
    hybrid_cnn_lstm_model.add(MaxPooling2D(pool_size=(3,1), padding='same'))
    hybrid_cnn_lstm_model.add(BatchNormalization())
    hybrid_cnn_lstm_model.add(Dropout(0.5))

    # Conv. block 3
    hybrid_cnn_lstm_model.add(Conv2D(filters=100, kernel_size=(10,1), padding='same', activation='elu'))
    hybrid_cnn_lstm_model.add(MaxPooling2D(pool_size=(3,1), padding='same'))
    hybrid_cnn_lstm_model.add(BatchNormalization())
    hybrid_cnn_lstm_model.add(Dropout(0.5))

    # FC+LSTM layers
    hybrid_cnn_lstm_model.add(Flatten()) # Adding a flattening operation to the output of CNN block
    hybrid_cnn_lstm_model.add(Dense((100))) # FC layer with 100 units
    hybrid_cnn_lstm_model.add(Reshape((100,1))) # Reshape my output of FC layer so that it's compatible
    hybrid_cnn_lstm_model.add(LSTM(100, dropout=0.6, recurrent_dropout=0.1, input_shape=(100,1), return_sequences=True))

    hybrid_cnn_lstm_model.add(LSTM(70, dropout=0.6, recurrent_dropout=0.1, return_sequences=False))
    # Output layer with Softmax activation 
    hybrid_cnn_lstm_model.add(Dense(4, activation='softmax')) # Output FC layer with softmax activation

    return hybrid_cnn_lstm_model


In [38]:
def get_model_results(model, x_train, y_train, x_valid, y_valid, x_test, y_test): 
    
    # Model parameters
    learning_rate = 2e-3
    epochs = 50
    optimizer = keras.optimizers.Adam(learning_rate)
    
    # Compiling the model
    model.compile(loss='categorical_crossentropy',
         optimizer=optimizer,
         metrics=['accuracy'])

    # Training and validating the model
    model_results = model.fit(x_train,
             y_train,
             batch_size=200,
             epochs=epochs,
             validation_data=(x_valid, y_valid), verbose=True)

    score = model.evaluate(x_test, y_test, verbose=0)
    return score[1]

# Function to Calculate Accuracies by Subject

In [39]:
def train_by_subjects(subject_ids=[]):

    # If no subject_ids are provided, the model will be trained on all subjects
    if subject_ids == []:
        X_test,y_test,person_train_valid,X_train_valid,y_train_valid,person_test = load_data()
        y_train, y_valid, y_test, x_train, x_valid, x_test = preprocessing(X_test,y_test,person_train_valid,X_train_valid,y_train_valid,person_test)
        cnn_lstm_hybrid_model = generate_cnn_lstm_hybrid_model()
        return get_model_results(cnn_lstm_hybrid_model, x_train, y_train, x_valid, y_valid, x_test, y_test)
        
    test_accuracies = {}
    
    for i in subject_ids:
        
        X_test,y_test,person_train_valid,X_train_valid,y_train_valid,person_test = load_data()

        X_train_valid = X_train_valid[np.where(person_train_valid == i)[0]]
        y_train_valid = y_train_valid[np.where(person_train_valid == i)[0]]
        X_test = X_test[np.where(person_test == i)[0]]
        y_test = y_test[np.where(person_test == i)[0]]
        person_train_valid = person_train_valid[np.where(person_train_valid == i)[0]]
        person_test = person_test[np.where(person_test == i)[0]]
        
        y_train, y_valid, y_test, x_train, x_valid, x_test = preprocessing(X_test,y_test,person_train_valid,X_train_valid,y_train_valid,person_test)

        cnn_model = generate_basic_cnn_model()
        test_accuracies[i] = get_model_results(cnn_model, x_train, y_train, x_valid, y_valid, x_test, y_test)
    
    return test_accuracies

In [40]:
train_by_subjects([0,1,2,3,4,5,6,7,8])

Shape of X after trimming: (237, 22, 500)
Shape of X after maxpooling: (237, 22, 250)
Shape of X after averaging+noise and concatenating: (474, 22, 250)
Shape of X after subsampling and concatenating: (948, 22, 250)
Shape of X after trimming: (50, 22, 500)
Shape of X after maxpooling: (50, 22, 250)
Shape of X after averaging+noise and concatenating: (100, 22, 250)
Shape of X after subsampling and concatenating: (200, 22, 250)
(948,)
(200, 22, 250)
(200,)
Shape of training set: (780, 22, 250)
Shape of validation set: (168, 22, 250)
Shape of training labels: (780,)
Shape of validation labels: (168,)
Shape of training labels after categorical conversion: (780, 4)
Shape of validation labels after categorical conversion: (168, 4)
Shape of test labels after categorical conversion: (200, 4)
Shape of training set after adding width info: (780, 22, 250, 1)
Shape of validation set after adding width info: (168, 22, 250, 1)
Shape of test set after adding width info: (200, 22, 250, 1)
Shape of tra

KeyboardInterrupt: 