In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from matplotlib import pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Dense
from sklearn.preprocessing import MinMaxScaler
import tensorflow.lite
# tf.lite.TFLiteConverter.from_keras_model
print(tensorflow.__version__)
from sklearn.metrics import confusion_matrix
import seaborn as sns
tensorflow.random.set_seed(2137)
from keras.optimizers import Adam
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
# from sklearn.metrics import recall_score
from sklearn.metrics import f1_score

In [None]:
# utils
def print_signals(signal:np.ndarray):
    '''
    @Deprecated, util to nicely plot signal of width 9
    :param signal: one complete sequence of any length with width 9'''
    plt.figure(num=1,figsize=(30,40))
    colours = ['r','g','b']
    plt.subplot(3,1,1)
    for i,color in enumerate(colours):
        plt.plot(signal[:,i],color)
    plt.subplot(3,1,2)
    for i,color in enumerate(colours):
        plt.plot(signal[:,i+3],color)
    plt.subplot(3,1,3)
    for i,color in enumerate(colours):
        plt.plot(signal[:,i+6],color)
    plt.show()


def printSignals(signal):
    '''
    util to nicely plot signal of width 8 with proper legend and labels
    :param signal: one complete sequence of any length with width 8'''
    fig, ax = plt.subplots(nrows=3, ncols=1, figsize=(10, 5))
    
    ax[0].plot(signal[:,0],label='Acc x')
    ax[0].plot(signal[:,1],label='Acc y')
    ax[0].plot(signal[:,2],label='Acc z')

    ax[1].plot(signal[:,3],label='Rot x')
    ax[1].plot(signal[:,4],label='Rot y')
    ax[1].plot(signal[:,5],label='Rot z')

    ax[2].plot(signal[:,7],label='Roll')
    ax[2].plot(signal[:,8],label='Yaw')

    ax[0].legend(loc='upper right')
    ax[1].legend(loc='upper right')
    ax[2].legend(loc='upper right')

    ax[0].set_ylabel('Wielokrotności g')
    ax[1].set_ylabel('stopnie/s')
    ax[2].set_ylabel('stopnie')

    ax[0].grid(True)
    ax[1].grid(True)
    ax[2].grid(True)

    fig.suptitle('Wybrany przebieg sygnału odpowiadający codziennej czynności')
    plt.tight_layout()
    plt.show()


def getDataPredictions(data,someModel):
    y_predicted = []
    for elem in data:
        elem = elem.reshape([1,window_size_in_samples,8])
        y_predicted.append(int(someModel.predict(elem,verbose=0)[0][0] > 0.5))
    return y_predicted

def plotConf(mat):
    sns.set(font_scale=1.2)
    plt.figure(figsize=(8, 6))
    sns.heatmap(mat, annot=True, fmt="d", cmap="Blues", annot_kws={"size": 40}, cbar=False)

    # Add labels and title
    plt.xlabel("Predicted Labels", fontsize=16)
    plt.ylabel("True Labels", fontsize=16)
    plt.title("Confusion Matrix", fontsize=18)
    # Show the plot
    plt.show()

In [None]:
# definitions of global variables
x_train = []
y_train = []

x_val = []
y_val = []

x_test = []
y_test = []

lst_to_compute_scaling =[]
window_size_in_samples = 70


In [None]:
def rewrite(num:int) -> tuple:
    '''
    Transform subject file with a given number into tuple. Performs removal of unnecessary colums and axis swap.
    :param num: index of subject file, must be between 1 and 17
    :returns x_falls: list of all separate sequences
    :returns y_falls: corresponding list indicating a sequence is a fall 
    '''
    path = f'fall-dataset/fall-dataset-raw/Subject{num}-raw.csv'
    df = pd.read_csv(path)
    df.drop(columns='Timestamp',inplace=True)
    df.drop(columns='Pitch',inplace=True)
    
    x_falls = []
    y_falls = []
    
    swapAxisInDf(df)
    lst_to_compute_scaling.extend(df.values[:,1:9])
    separated_features_dict = dict(tuple(df.groupby('Feature Line')))
    for _, sub_df in separated_features_dict.items():
        sub_df.drop(columns='Feature Line',inplace=True)
        y_falls.append(np.uint8(sub_df.iloc[0,-1]))
        sub_df.drop(columns='Fall',inplace=True)
        x_falls.append(sub_df.values)
    return x_falls, y_falls

def swapAxisInDf(df:pd.DataFrame):
    '''swaps axis orientation for training neural network taking into account phone axis'''
    df['Acc(X)'] = df['Acc(Y)'] * -1
    df['Acc(Y)'] = df['Acc(Z)']
    df['Acc(Z)'] = df['Acc(X)']

    df['Rot(X)'] = df['Rot(Y)'] * -1
    df['Rot(Y)'] = df['Rot(Z)']
    df['Rot(Z)'] = df['Rot(X)']

In [None]:
def prepare_all_data(nums_for_validation:list,nums_for_testing):
    '''
    goes through every subject and extends lists for training validating and testing,
    unspecified numbers will be used for training,
    lists should not contain the same number (sets should be independent of each other)
    param: nums_for_validation: list of numbers of subjects whose data will be used for validating algorithm
    param: nums_for_validation: list of numbers of subjects whose data will be used for testing algorithm  '''

    if any([x_test,y_test,x_val,y_val,x_train,y_train]):
        print('data already initialized')
        return
    for i in range(1,18):
        x_part, y_part = rewrite(i)

        if i in nums_for_testing:
            x_test.extend(x_part)
            y_test.extend(y_part)
        elif i in nums_for_validation:
            x_val.extend(x_part)
            y_val.extend(y_part)
        else:
            x_train.extend(x_part)
            y_train.extend(y_part)

In [None]:
prepare_all_data([4,17,9,15],[2,6,12])

In [None]:
min_max_of_col = [] 
'''variable holding min max column values as list of tuples (min, max) '''
lst_to_compute_scaling = np.array(lst_to_compute_scaling)
for column_idx in range(0,8):
    min_max_of_col.append((lst_to_compute_scaling[:, column_idx].min(),lst_to_compute_scaling[:, column_idx].max()))
print(min_max_of_col)

In [None]:
def transform_column(col,min_,max_):
    '''applies desired scaling to every element in np array column
    :parameter col: column array
    :returns transformed array
    '''
    temp = []
    for item in col:
        res = 2 * (item - min_)/ (max_ - min_) -1
        temp.append(res)
    return temp

def scale(x_lst,min_max_of_col):
    '''applies scaling defined in transform_column() to every column in sequence and every sequence, works with differing sequence lengths but requires matching params
    param: x_list: list of sequences
    param: min_max_of_col: list containing tuple of (min,max) in order for every column in sequences of x_list'''
    for record in x_lst:
        num_rows, num_cols = record.shape
        for col_idx in range(num_cols):
            record[:,col_idx] = transform_column(record[:,col_idx],(min_max_of_col[col_idx])[0],(min_max_of_col[col_idx])[1])


In [None]:
scale(x_train,min_max_of_col)
scale(x_val,min_max_of_col) 
scale(x_test,min_max_of_col)

In [None]:
def myCustomPad(x_arr,window_size_in_samples):
    max_rows = max(arr.shape[0] for arr in x_arr)
    max_cols = max(arr.shape[1] for arr in x_arr)
    padded_array = []
    for sequence in x_arr:
        rows_to_pad = max_rows - sequence.shape[0]
        cols_to_pad = max_cols - sequence.shape[1]
        padded_sequence = np.pad(sequence, ((0, rows_to_pad), (0, cols_to_pad)), mode='constant', constant_values=0)
        padded_array.append(padded_sequence)
    ret = np.stack(padded_array,axis=0)
    ret = ret[:,:window_size_in_samples,:] 
    return ret

In [None]:
x_train = myCustomPad(x_train,window_size_in_samples)
x_test = myCustomPad(x_test,window_size_in_samples)
x_val = myCustomPad(x_val,window_size_in_samples)

y_val = np.array(y_val,dtype=np.float32)
y_train = np.array(y_train,dtype=np.float32)
y_test = np.array(y_test,dtype=np.float32)

In [None]:
class SaveBestModel(tensorflow.keras.callbacks.Callback):
    def __init__(self, save_best_metric='val_loss', this_max=False):
        self.save_best_metric = save_best_metric
        self.max = this_max
        if this_max:
            self.best = float('-inf')
        else:
            self.best = float('inf')

    def on_epoch_end(self, epoch, logs=None):
        metric_value = logs[self.save_best_metric]
        if self.max:
            if metric_value > self.best:
                self.best = metric_value
                self.best_weights = self.model.get_weights()
                print(f'got best model in: {epoch} epoch')

        else:
            if metric_value < self.best:
                self.best = metric_value
                self.best_weights= self.model.get_weights()
                print(f'got best model in: {epoch} epoch')

In [None]:
# Here specify wchich model would you like to build 
build = 1
input_shape = (window_size_in_samples,8)
plotModel = 0

In [None]:
model = Sequential()
if build ==1:
    model.add(LSTM(units=40, return_sequences=False, input_shape=input_shape,stateful=False))
    model.add(Dropout(0.4))
    model.add(Dense(1, activation='sigmoid'))
    optimizer = Adam(learning_rate=0.0005)
    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    save_best_model = SaveBestModel()
    # history = model.fit(x_train,y_train,epochs=50,validation_data=(x_val, y_val),batch_size=45,verbose=0,callbacks=[save_best_model])
    # model.set_weights(save_best_model.best_weights)
    model.load_weights('Weights_folder1/Weights')
    
elif build ==2:
    model.add(LSTM(units=20, return_sequences=True, input_shape=input_shape,stateful=False))
    model.add(Dropout(0.3))
    model.add(LSTM(units=20, return_sequences=False, input_shape=input_shape,stateful=False))
    model.add(Dropout(0.3))
    model.add(Dense(1, activation='sigmoid'))
    optimizer = Adam(learning_rate=0.0002)
    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    save_best_model = SaveBestModel()
    # history = model.fit(x_train,y_train,epochs=60,validation_data=(x_val, y_val),batch_size=45,verbose=0,callbacks=[save_best_model])
    # model.set_weights(save_best_model.best_weights)
    model.load_weights('Weights_folder2/Weights')

elif build ==3:
    model.add(LSTM(units=20, return_sequences=True, input_shape=input_shape,stateful=False))
    model.add(Dropout(0.2))
    model.add(LSTM(units=20, return_sequences=True, input_shape=input_shape,stateful=False))
    model.add(Dropout(0.2))
    model.add(LSTM(units=20, return_sequences=False, input_shape=input_shape,stateful=False))
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='sigmoid'))
    optimizer = Adam(learning_rate=0.001)
    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    save_best_model = SaveBestModel()
    # history = model.fit(x_train,y_train,epochs=50,validation_data=(x_val, y_val),batch_size=45,verbose=0,callbacks=[save_best_model])
    # model.set_weights(save_best_model.best_weights)
    model.load_weights('Weights_folder3/Weights')

if plotModel ==1:
    tensorflow.keras.utils.plot_model(
        model,
        to_file=f'modelPics/model{build}.png',
        show_shapes=True,
        show_dtype=False,
        show_layer_names=False,
        rankdir='TB',
        expand_nested=False,
        dpi=96,
        layer_range=None,
        show_layer_activations=True,
        show_trainable=False
    )

In [None]:
model.evaluate(x_train,y_train,verbose=1)
model.evaluate(x_val,y_val,verbose=1)
model.evaluate(x_test,y_test,verbose=1)

In [None]:
if history in not None:
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['trening', 'walidacja'], loc='upper left')
    plt.show()

    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['trening', 'walidacja'], loc='upper left')
    plt.show()

In [None]:
y_predicted = getDataPredictions(x_test,model)
cm1 = confusion_matrix(y_test,y_predicted)
plotConf(cm1)

In [None]:
accuracy = accuracy_score(y_test, y_predicted)
print ('Accuracy : ', accuracy)

sensitivity1 = cm1[0,0]/(cm1[0,0]+cm1[0,1])
print('Sensitivity : ', sensitivity1 )

specificity1 = cm1[1,1]/(cm1[1,0]+cm1[1,1])
print('Specificity : ', specificity1)

prec = precision_score(y_test,y_predicted)
print('precision : ',prec)

f1 = f1_score(y_test,y_predicted)
print('f1 : ',f1)

In [None]:
saveModel = 0
if saveModel ==1:
  # path=f'Weights_folder{build}/Weights'
  # model.save_weights(path)

  converter = tensorflow.lite.TFLiteConverter.from_keras_model(model)
  converter.target_spec.supported_ops = [
    tensorflow.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
    tensorflow.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
  ]
  tflite_model = converter.convert()

  # Save the TFLite model to a file
  with open(f'tflites/model.tflite', 'wb') as f:
    f.write(tflite_model)
  # with open(f'tflites/best{build}Layer.tflite', 'wb') as f:
  #     f.write(tflite_model)
else:
  print('not set to save anything')

In [None]:
'''results here are highly volatile and sometimes unrelayable,
 use only to get generic info about model behaviour, 
best params were picked here and later manually adjusted and tested in code above,
 watch out one LSTM row executes around 20 min on one layer model, 
 on (-1 1) range of values model behaves much better '''
class SaveBestModel(tensorflow.keras.callbacks.Callback):
    def __init__(self, save_best_metric='val_loss', this_max=False):
        self.save_best_metric = save_best_metric
        self.max = this_max
        if this_max:
            self.best = float('-inf')
        else:
            self.best = float('inf')

    def on_epoch_end(self, epoch, logs=None):
        metric_value = logs[self.save_best_metric]
        if self.max:
            if metric_value > self.best:
                self.best = metric_value
                self.best_weights = self.model.get_weights()
                # print(f'got best model in: {epoch} epoch')

        else:
            if metric_value < self.best:
                self.best = metric_value
                self.best_weights= self.model.get_weights()
                # print(f'got best model in: {epoch} epoch')


for lstm in [5, 10, 15, 20, 25, 30, 40, 50, 90]:
    for dropout in [0.1, 0.2, 0.3, 0.4]:
        for learning in [0.001, 0.0005, 0.00025]:
            for batchSize in [15, 45, 90]:
                model = Sequential()
                model.add(LSTM(units=lstm, return_sequences=True, input_shape=input_shape,stateful=False))
                model.add(Dropout(dropout))
                model.add(LSTM(units=lstm, return_sequences=True,stateful=False))
                model.add(Dropout(dropout))
                model.add(LSTM(units=lstm, return_sequences=False,stateful=False))
                model.add(Dropout(dropout))
                model.add(Dense(1, activation='sigmoid'))
                optimizer = Adam(learning_rate=learning)
                model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
                save_best_model = SaveBestModel()
                history = model.fit(x_train,y_train,epochs=50,validation_data=(x_val, y_val),batch_size=batchSize,verbose=0,callbacks=[save_best_model])
                model.set_weights(save_best_model.best_weights)
                y_predicted = getDataPredictions(x_test,model)
                cm1 = confusion_matrix(y_test,y_predicted)
                accuracy = accuracy_score(y_test, y_predicted)
                print (f'lstm={lstm}, dropout={dropout},leaning={learning}, batch={batchSize} Accuracy : ', accuracy)
                