<a href="https://colab.research.google.com/github/mojtabaSefidi/Fall-Detection-System/blob/master/Fall_Detection_System_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


Import libraries and frameworks

In [2]:
!pip install -q tqdm

In [3]:
from tqdm import tqdm
from sklearn import preprocessing
import tensorflow as tf
from tensorflow import keras
from math import sqrt
import pandas as pd
import numpy as np
import glob
import os
window_size = 200
from sklearn.utils import compute_class_weight
import matplotlib.pyplot as plt
import seaborn as sn
sn.set()
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
import random
from itertools import groupby, chain
import pickle

Get the sisfall and sisfall_enhanced dataset

In [4]:
# !unzip /content/gdrive/MyDrive/Datasets/SisFall_dataset.zip
# !unzip /content/gdrive/MyDrive/Datasets/SisFall_enhanced.zip

# Implementation

In [5]:
def plot_confusion_matrix(confusion_matrix):
    df = pd.DataFrame(confusion_matrix, range(len(confusion_matrix)), range(len(confusion_matrix)))
    plt.figure(figsize=(6,4))
    plt.title('Confusion Matrix')
    sn.set(font_scale=1) # for label size
    sn.heatmap(df, annot=True, annot_kws={"size": 12},fmt='.0f',cmap='Greens') # font size
    plt.show()

Get all addresses

In [6]:
def get_file_name(path, ratio=0.8):
  allfiles = []
  allFolders = sorted(glob.glob(path + "*"))
  for files in allFolders:
    allfiles.append(sorted(glob.glob(files+"/*.txt")))
  if 'desktop.ini' in allfiles:
        allfiles.remove('desktop.ini')

  dataset = np.hstack(allfiles)
  start = dataset[0].rfind('/') + 1
  end = dataset[0][start:].find('_') + start
  dataset = [list(g) for k, g in groupby(dataset, key=lambda x: x[start:end])]
  train = []
  test = []
  for data in dataset:
    if len(data) == 1:
      if random.randint(1,100)>=81:
        test.extend(data)
      else:
        train.extend(data)

    else:
      random.shuffle(data)
      train.extend(data[:int(len(data)*ratio)])
      test.extend(data[int(len(data)*ratio):])

  return train, test


read dataset from address path

In [7]:
class DatasetProcessor():

  def __init__(self,
               train_dataset_saving_path=None,
               test_dataset_saving_path=None
               ):
    self.train_dataset_saving_path = train_dataset_saving_path
    self.test_dataset_saving_path = test_dataset_saving_path

  def __read_data(self, data_path):
    data = pd.read_csv(data_path, header=None)
    data.columns = ['ADXL345_x', 'ADXL345_y', 'ADXL345_z', 'ITG3200_x', 'ITG3200_y', 'ITG3200_z', 'MMA8451Q_x',
                    'MMA8451Q_y', 'MMA8451Q_z']
    data['MMA8451Q_z'] = data['MMA8451Q_z'].map(lambda x: str(x)[:-1])
    for name in data.columns :
      data[name] = data[name].astype(float)
    return data

  def __add_label(self, data_path, merge_feature=False):

    dataset = self.__read_data(data_path)

    if not merge_feature:
      dataset['label'] = self.__get_label(data_path)
      return dataset.to_numpy()

    else:
      new_dataset = pd.DataFrame()
      new_dataset['acc_1'] = dataset.apply(
          lambda row: sqrt((row.ADXL345_x ** 2 + row.ADXL345_y ** 2 + row.ADXL345_z ** 2)), axis=1)
      new_dataset['acc_2'] = dataset.apply(
          lambda row: sqrt((row.MMA8451Q_x ** 2 + row.MMA8451Q_y ** 2 + row.MMA8451Q_z ** 2)), axis=1)
      new_dataset['geo'] = dataset.apply(
          lambda row: sqrt((row.ITG3200_x ** 2 + row.ITG3200_y ** 2 + row.ITG3200_z ** 2)), axis=1)
      new_dataset['label'] = self.__get_label(data_path)

      return np.round(new_dataset.to_numpy(), 2)

  def __get_label(self, data_path):
    label = data_path[54]
    if label =='D':
      return int(0)
    elif label =='F':
      label_path = data_path.replace('dataset', 'enhanced')
      labels = pd.read_csv(label_path, header=None)
      # labels[labels == 2] = 1
      return labels

  def datasets_to_nparray(self, datasets_address_array, outputsize=20000000, column_dimension=10):
    result = np.zeros((outputsize, column_dimension), 'int16')
    first_index = 0
    for address in tqdm(datasets_address_array, ncols=50):
      feature = self.__add_label(address)
      # print(len(feature), first_index)
      result[first_index : (first_index+len(feature))] = feature
      first_index += len(feature)

    return result[result.sum(axis=1) != 0]

  def windowing2d(self, dataset, window_size=200):
    window = window_size * (dataset.shape[1]-1)
    cut = dataset.shape[0] % window_size
    feature = dataset[:-cut,0:-1]
    label = dataset[:-cut,-1]
    feature = feature.ravel().reshape(feature.size//window,window)
    label = label.reshape(label.size// window_size, window_size)
    label = label.sum(axis=1)
    label[label > 0] = 1
    # return feature, label
    feature = np.roll((np.roll(feature, -1, axis=0) - feature), 1, axis=0)
    feature[0] = 0
    return feature, label.ravel()

  def windowing3d(self, dataset, window_size=200):
    n_windows = len(dataset) // window_size
    cut = dataset.shape[0] % window_size
    feature = dataset[:-cut,0:-1]
    label = dataset[:-cut,-1]
    feature = feature.reshape(n_windows, window_size, dataset.shape[1]-1)
    label = label.reshape(n_windows, window_size, 1)
    label = label.sum(axis=1)
    label[label > 0] = 1
    # return feature, label.ravel()
    feature = np.roll((np.roll(feature, -1, axis=0) - feature), 1, axis=0)
    feature[0] = 0
    return feature, label.ravel()

  def normalizer(self, scaler, X_train, X_test):
    X_train = scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
    X_test = scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)
    return X_train, X_test

  def dataset_to_tensor(self, window_size, dataset, saving_path):
    features, labels = self.windowing(self.__datasets_to_nparray(dataset), window_size)
    return features, labels
    # return np.savez(saving_path, inputs=features, targets=labels)

  def dataset_loader(self, data_path):
    npz = np.load(data_path)
    return npz["inputs"].astype(np.float), npz["targets"].astype(np.int)

  def downsampling(self, dataset, down_sampleing_factor):
      positive = dataset[dataset['targets']==1]
      negative = dataset[dataset['targets']==0].sample(n=int(len(positive)* down_sampleing_factor))
      return pd.concat([positive, negative], ignore_index=True).sample(frac=1).reset_index(drop=True)

  def generate_class_weight(self, label):
    class_weights = compute_class_weight(class_weight = "balanced",
                                         classes = np.unique(y_train),
                                         y = y_train)
    return dict(zip(np.unique(y_train), class_weights))


In [None]:
%%time

dp = DatasetProcessor()

print('1.Split Adresses...')
train, test = get_file_name('/content/gdrive/MyDrive/Datasets/SisFall_dataset/')

print('2.Extract Features and Labels...')
print('------------------------Train Dataset')
train_dataset = dp.datasets_to_nparray(train)
print('------------------------Test Dataset')
test_dataset = dp.datasets_to_nparray(test)

print('3.Windowing...')
print('------------------------Train Dataset')
X_train, y_train = dp.windowing3d(train_dataset)
print('------------------------Test Dataset')
X_test, y_test = dp.windowing3d(test_dataset)

print('4.Normalizing...')
X_train, X_test = dp.normalizer(StandardScaler(), X_train, X_test)

print('5.Calculate Class Weight...')
class_weight = dp.generate_class_weight(y_train)


### Save/Load train & test dataset

In [8]:
save = False
if save:
  with open('/content/gdrive/MyDrive/Inputs/X_train_fall.pkl','wb') as f:
    pickle.dump(X_train, f)

  with open('/content/gdrive/MyDrive/Inputs/y_train_fall.pkl','wb') as f:
    pickle.dump(y_train, f)

  with open('/content/gdrive/MyDrive/Inputs/X_test_fall.pkl','wb') as f:
    pickle.dump(X_test, f)

  with open('/content/gdrive/MyDrive/Inputs/y_test_fall.pkl','wb') as f:
    pickle.dump(y_test, f)

else:
  with open('/content/gdrive/MyDrive/Inputs/X_train_fall.pkl','rb') as f:
    X_train = pickle.load(f)

  with open('/content/gdrive/MyDrive/Inputs/y_train_fall.pkl','rb') as f:
    y_train = pickle.load(f)

  with open('/content/gdrive/MyDrive/Inputs/X_test_fall.pkl','rb') as f:
    X_test = pickle.load(f)

  with open('/content/gdrive/MyDrive/Inputs/y_test_fall.pkl','rb') as f:
    y_test = pickle.load(f)
    dp = DatasetProcessor()
    class_weight = dp.generate_class_weight(y_train)


# Main

Data Pre-processing

Neural Network Training

In [13]:
METRICS = [
      keras.metrics.TruePositives(name='TP'),
      keras.metrics.FalsePositives(name='FP'),
      keras.metrics.TrueNegatives(name='TN'),
      keras.metrics.FalseNegatives(name='FN'),
      keras.metrics.BinaryAccuracy(name='Accuracy'),
      keras.metrics.Precision(name='Precision'),
      keras.metrics.Recall(name='Recall'),
      keras.metrics.AUC(name='AUC'),
      keras.metrics.AUC(name='PRC', curve='PR')
]

early_stopping = keras.callbacks.EarlyStopping(
    monitor='Precision',
    verbose=1,
    patience=10,
    mode='max',
    restore_best_weights=True)

# checkpoint_path = "/content/gdrive/MyDrive/Model Checkpoints/TrendDetection.ckpt"
# # save_freq = 'epoch'

# cp_callback = tensorflow.keras.callbacks.ModelCheckpoint(
#    checkpoint_path, verbose=1, save_weights_only=True,
#    # Save weights, every epoch.
#    save_freq = 10*3030)

In [14]:
X_test[0].shape

(200, 9)

In [17]:
class Define_Train():

  def __init__(self,
               X_train,
               y_train,
               X_test,
               y_test,
               ):

        self.X_train = X_train
        self.y_train = y_train
        self.X_test = X_test
        self.y_test = y_test

  def define_cnn(self, input_size, units=512, drop_rate=0.25, filter=16, kernel_size=(3*3), output_size=1):
    input = keras.layers.Input((input_size))
    x = keras.layers.Conv1D(filters=filter, kernel_size=kernel_size, padding='same', activation='relu', name="conv1")(input)
    x = keras.layers.MaxPooling1D(padding="same")(x)
    x = keras.layers.Conv1D(filters=filter*4, kernel_size=kernel_size, padding='same', activation='relu', name="conv2")(x)
    x = keras.layers.MaxPooling1D(padding="same")(x)
    x = keras.layers.Conv1D(filters=filter*16, kernel_size=kernel_size, padding='same', activation='relu', name="conv3")(x)
    x = keras.layers.GlobalAveragePooling1D()(x)
    classifier = keras.layers.Dense(units, activation='relu')(x)
    classifier = keras.layers.Dropout(drop_rate)(classifier)
    output = keras.layers.Dense(output_size, activation='sigmoid')(classifier)
    model = keras.Model(inputs=input, outputs=output)
    # print(model.summary())
    return model

  def define_lstm(self, input_size, units=128, drop_rate=0.25, lstm_units=16, output_size=1):
    input = keras.layers.Input((input_size))
    x = keras.layers.LSTM(units=lstm_units, input_shape=input_size, return_sequences=True, name="lstm1")(input)
    x = keras.layers.LSTM(units=lstm_units*4, input_shape=input_size, return_sequences=True, name="lstm2")(x)
    x = keras.layers.GlobalAveragePooling1D()(x)
    classifier = keras.layers.Dense(units, activation='relu')(x)
    classifier = keras.layers.Dropout(drop_rate)(classifier)
    output = keras.layers.Dense(output_size, activation='sigmoid')(classifier)
    model = keras.Model(inputs=input, outputs=output)
    # print(model.summary())
    return model
  def train_deep_models(self, model, class_weight, epochs=100, batch_size=256, learning_rate=0.01, validation_split=0.2):

    model.compile(optimizer = tf.optimizers.Adam(learning_rate = learning_rate),
                  loss='binary_crossentropy',
                  metrics=METRICS)

    history = model.fit(self.X_train,
                        self.y_train,
                        batch_size = batch_size,
                        epochs = epochs,
                        shuffle = True,
                        class_weight = class_weight,
                        validation_split=validation_split,
                        callbacks = [early_stopping],
                        verbose = 1)
    return history, model

  def define_mlp(self, input_size, hidden_layer_size, output_size):

    self.mlp_model = tf.keras.Sequential([
                                tf.keras.layers.Dense(input_size,activation="relu"),
                                tf.keras.layers.Dense(hidden_layer_size, activation="relu"),
                                tf.keras.layers.Dense(hidden_layer_size,activation="relu"),
                                tf.keras.layers.Dense(output_size, activation='sigmoid')
                                ])

    self.mlp_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=METRICS)


  def evaluation(self, model, batch_size=None):
    if not batch_size is None:
      prediction = model.predict(self.X_test, batch_size=batch_size)
    else:
      prediction = model.predict(self.X_test)
    print(classification_report(self.y_test, prediction))
    plot_confusion_matrix(confusion_matrix(self.y_test, prediction))
    return prediction

  def data_process_lstm(self):
    lstm_X_train = self.X_train.reshape(self.X_train.shape[0], self.X_train.shape[-1], 1)
    self.lstm_X_test = self.X_test.reshape(self.X_test.shape[0], self.X_test.shape[-1], 1)

    self.train_dataset_sequence = tf.data.Dataset.from_tensor_slices((lstm_X_train , self.y_train))
    self.train_dataset_sequence = self.train_dataset_sequence.cache().shuffle(self.lstm_buffer_size).batch(self.lstm_batch_size).repeat()

    # self.test_dataset_sequence = tf.data.Dataset.from_tensor_slices((lstm_X_test , self.y_test))
    # self.test_dataset_sequence = self.test_dataset_sequence.cache().shuffle(self.lstm_buffer_size).batch(self.lstm_batch_size).repeat()

  def define_lstm_model(self, input_shape, output_shape=1):
    self.lstm_model = tf.keras.models.Sequential([tf.keras.layers.LSTM(units=32, return_sequences=True, input_shape=input_shape),
                                            tf.keras.layers.LSTM(units=64, return_sequences=True),
                                            tf.keras.layers.LSTM(units=32),
                                            tf.keras.layers.Dense(1,activation='sigmoid')])

    self.lstm_model.compile(loss = 'binary_crossentropy', metrics = ['accuracy'])

  def train_classic_model(self, model):
    return model.fit(self.X_train, self.y_train)


In [18]:
models_obj = Define_Train(X_train,
                          y_train,
                          X_test,
                          y_test)


cnn = models_obj.define_cnn(input_size = X_test[0].shape)
history, model = models_obj.train_deep_models(cnn, class_weight)

Epoch 1/100

KeyboardInterrupt: ignored

Prediction based on Ensemble Concept

Models' Summary

In [None]:
conclusion = pd.DataFrame([['Neural Network',precision_1,recall_1,f1Score_1],
              ['Logistic Regression',0.53,0.17,0.26],
              ['SVM',0.38,0.01,0.03],
              ['KNN',0.94,0.56,0.70],
              ['Neural Network after Balancing',precision_2,recall_2,f1Score_2],
              ['Logistic Regression after Balancing',0.08,0.93,0.15],
              ['SVM after Balancing',0.06,0.97,0.11],
              ['KNN after Balancing',0.73,0.73,0.73],
              ['Ensemble concept after Balancing',0.65,0.81,0.72]],
              columns=["Algorithm","Precision","Recall","F1score"])
conclusion = conclusion.set_index('Algorithm')
conclusion.style.background_gradient(cmap="YlOrRd")


Unnamed: 0_level_0,Precision,Recall,F1score
Algorithm,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Neural Network,0.99,0.34,0.51
Logistic Regression,0.53,0.17,0.26
SVM,0.38,0.01,0.03
KNN,0.94,0.56,0.7
Neural Network after Balancing,0.88,0.58,0.7
Logistic Regression after Balancing,0.08,0.93,0.15
SVM after Balancing,0.06,0.97,0.11
KNN after Balancing,0.73,0.73,0.73
Ensemble concept after Balancing,0.65,0.81,0.72


As you see After Blancing the dataset Ensemble concept and KNN model do the best !