In [43]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os

import keras_tuner as kt
import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Dropout

from sklearn.metrics import classification_report, f1_score, precision_score, recall_score, accuracy_score, confusion_matrix
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping
from scipy.fft import fft



In [44]:
# Loading the training datasets
accelerometer_train_data = pd.read_csv('/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/Combined_100min_data/Combined_accelerometer_100min_with_lowpass.csv')
gyroscope_train_data = pd.read_csv('/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/Combined_100min_data/Combined_gyroscope_100min_with_lowpass.csv')
linear_accelerometer_train_data = pd.read_csv('/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/Combined_100min_data/Combined_linear_accelerometer_100min_with_lowpass.csv')
magnetometer_train_data = pd.read_csv('/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/Combined_100min_data/Combined_magnetometer_100min_with_lowpass.csv')

In [45]:
# Loading the test datasets
accelerometer_test_data = pd.read_csv('/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/Combined_40min_test/Combined_accelerometer_40min_with_lowpass.csv')
gyroscope_test_data = pd.read_csv('/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/Combined_40min_test/Combined_gyroscope_40min_with_lowpass.csv')
linear_accelerometer_test_data = pd.read_csv('/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/Combined_40min_test/Combined_linear_accelerometer_40min_with_lowpass.csv')
magnetometer_test_data = pd.read_csv('/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/Combined_40min_test/Combined_magnetometer_40min_with_lowpass.csv')

### Transformation

In [46]:
# Merge datasets on time and label columns - syncronizing to same time point. (train)
train_df_1 = pd.merge(accelerometer_train_data, gyroscope_train_data, on=['time', 'label'])
train_df_2 = pd.merge(train_df_1, linear_accelerometer_train_data, on=['time', 'label'])
train_df_3 = pd.merge(train_df_2, magnetometer_train_data, on=['time', 'label'])

In [47]:
# Merge datasets on time and label columns - syncronizing to same time point. (test)
test_df_1 = pd.merge(accelerometer_test_data, gyroscope_test_data, on=['time', 'label'])
test_df_2 = pd.merge(test_df_1, linear_accelerometer_test_data, on=['time', 'label'])
test_df_3 = pd.merge(test_df_2, magnetometer_test_data, on=['time', 'label'])

In [48]:
display(type(train_df_3))
display(type(test_df_3))

pandas.core.frame.DataFrame

pandas.core.frame.DataFrame

##### Fourier Transformation --> Normalization Method: X

###### Fourier Transform

In [49]:

# Applying Fast Fourier Transformation - Discrete fourier Transformation
def apply_dft(df):
    features_X = df.columns.difference(['time', 'label'])
    df[features_X] = np.abs(fft(df[features_X], axis=1))

    return df 

train_df_X = apply_dft(train_df_3)
test_df_X = apply_dft(test_df_3)      

###### Normalization

In [50]:

def normalizer(df):
    # Normalize the feature columns
    features_X = df.columns.difference(['time', 'label'])
    scaler = StandardScaler()
    df[features_X] = scaler.fit_transform(df[features_X])

    # Encode labels
    label_encoder = LabelEncoder()
    df['label'] = label_encoder.fit_transform(df['label'])

    # Verify the unique classes
    print(f"Classes: {label_encoder.classes_}")
    print(f"Number of classes: {label_encoder.classes_.shape[0]}")

    return df, label_encoder

train_data_X, train_label_encoder = normalizer(train_df_X)
test_data_X, test_label_encoder = normalizer(test_df_X)


Classes: ['clapping' 'handshake' 'highfive' 'waving']
Number of classes: 4
Classes: ['clapping' 'handshake' 'highfive' 'waving']
Number of classes: 4


##### Normalization -> Fourier Transformation Method: Y

###### Normalization

In [51]:
def normalizer(df):
    # Normalize the feature columns
    features_Y = df.columns.difference(['time', 'label'])
    scaler = StandardScaler()
    df[features_Y] = scaler.fit_transform(df[features_Y])

    # Encode labels
    label_encoder = LabelEncoder()
    df['label'] = label_encoder.fit_transform(df['label'])

    # Verify the unique classes
    print(f"Classes: {label_encoder.classes_}")
    print(f"Number of classes: {label_encoder.classes_.shape[0]}")
    
    return df

train_data_Y = normalizer(train_df_3)
test_data_Y = normalizer(test_df_3)

Classes: [0 1 2 3]
Number of classes: 4
Classes: [0 1 2 3]
Number of classes: 4


###### Fourier Transform

In [52]:

# Applying Fast Fourier Transformation - Discrete fourier Transformation
def apply_dft(df):
    features_Y = df.columns.difference(['time', 'label'])
    df[features_Y] = np.abs(fft(df[features_Y], axis=1))
    
    return df 

train_data_Y = apply_dft(train_data_Y)
test_data_Y = apply_dft(test_data_Y)     

### Long Short-term Memory Model Implimentation

###### Manual Method: Creating sequence and sorting data: method Y


In [53]:
# # Example feature columns and label column
# features = train_data_Y.columns.difference(['time', 'label'])

# # Prepare training data
# time_steps = 50  # Number of time steps to look back
# X_train = []
# y_train = []

# for i in range(time_steps, len(train_data_X)):
#     X_train.append(train_data_Y.iloc[i-time_steps:i][features].values)
#     y_train.append(train_data_Y.iloc[i]['label'])

# X_train, y_train = np.array(X_train), np.array(y_train)

# # Prepare test data
# X_test = []
# y_test = []

# for i in range(time_steps, len(test_data_X)):
#     X_test.append(test_data_Y.iloc[i-time_steps:i][features].values)
#     y_test.append(test_data_Y.iloc[i]['label'])

# X_test, y_test = np.array(X_test), np.array(y_test)


###### Manual Method: Creating sequence and sorting data: method X



In [54]:
# Example feature columns and label column
features = train_data_X.columns.difference(['time', 'label'])

# Prepare training data
time_steps = 50  # Number of time steps to look back
X_train = []
y_train = []

for i in range(time_steps, len(train_data_X)):
    X_train.append(train_data_X.iloc[i-time_steps:i][features].values)
    y_train.append(train_data_X.iloc[i]['label'])

X_train, y_train = np.array(X_train), np.array(y_train)

# Prepare test data
X_test = []
y_test = []

for i in range(time_steps, len(test_data_X)):
    X_test.append(test_data_X.iloc[i-time_steps:i][features].values)
    y_test.append(test_data_X.iloc[i]['label'])

X_test, y_test = np.array(X_test), np.array(y_test)



###### LSTM Model - Tenserflow & Keras

In [74]:
class LstmPipline():
    """
    This is a high-level overview of what the class does.

    More detailed description if necessary.
    """

    def __init__(self, X_train, X_test, y_train, y_test,train_label_encoder):
        self.train_label_encoder = None
        self.test_label_encoder = None
        self.X_train = X_train
        self.X_test = X_test
        self.y_train = y_train
        self.y_test = y_test
        self.tuner = None
        self.best_model = None
        self.best_configs = None
        
    

    def model_builder(self, hp):
        """
        Perform some action with param3.

        Args:
            param3 (type): Description of param3.

        Returns:
            return_type: Description of the return value.
        """

        model = Sequential()
        model.add(LSTM(units=hp.Int('units', min_value=32, max_value=128, step=16), 
                    input_shape=(X_train.shape[1], X_train.shape[2])))# Tuning the number of units in the first LSTM layer
        model.add(Dropout(rate=hp.Float('dropout', min_value=0.0, max_value=0.5, step=0.1))) # Tuning the dropout rate
        model.add(Dense(units=4, activation='softmax'))  # Output layer
        model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='log')),
                    loss=keras.losses.SparseCategoricalCrossentropy(),
                    metrics=['accuracy']) # Compile the model
        
        return model


    def tuner_initiator(self):
        """
        Perform some action with param3.

        Args:
            param3 (type): Description of param3.

        Returns:
            return_type: Description of the return value.
        """

        self.tuner = kt.Hyperband(
            self.model_builder,
            objective='val_accuracy',
            max_epochs=50,
            factor=3,
            directory='/home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/lstm_section/results',
            project_name='ml4qs_gesture_recognition'
        )


    def parameter_search(self):
        """
        Perform some action with param3.

        Args:
            param3 (type): Description of param3.

        Returns:
            return_type: Description of the return value.
        """

        self.tuner.search(self.X_train, self.y_train, epochs=50, validation_split=0.3, callbacks=[EarlyStopping(monitor='val_loss', patience=5)]) # Search for the best parameters 
        best_configs= self.tuner.get_best_hyperparameters(num_trials=1)[0]# Get the optimal hyperparameters

        return best_configs


    def output_optimimal_configs(self, best_configs):
        """
        Perform some action with param3.

        Args:
            param3 (type): Description of param3.

        Returns:
            return_type: Description of the return value.
        """

        units = best_configs.get('units')
        dropout = best_configs.get('dropout')
        learning_rate = best_configs.get('learning_rate')
        optimal_epochs = best_configs.get('tuner/epochs')

        print(f"""
        Optimal hyperparameters:
        - Units in LSTM layer: {units}
        - Dropout rate: {dropout}
        - Learning rate: {learning_rate}
        - Optimal number of epochs: {optimal_epochs} """)


    def evaluation_initiator(self):
        num_iterations = 10
        random_seeds = np.random.randint(0, 1000, size=num_iterations)

        f1_scores = []

        for seed in random_seeds:
            f1 = self.train_evaluate_model(best_configs, X_train, y_train, X_test, y_test)
            f1_scores.append(f1)

        f1_scores = np.array(f1_scores)

        return f1_score


    def train_evaluate_model(self, best_configs, train_label_encoder):
        """
        Perform some action with param3.

        Args:
            param3 (type): Description of param3.

        Returns:
            return_type: Description of the return value.
        """
        # Convert label encoder classes to strings for classification report
        target_names = [str(cls) for cls in test_label_encoder.classes_]
        
        
        optimal_epochs = best_configs.get('tuner/epochs')

        hypermodel = self.tuner.hypermodel.build(best_configs)
        hypermodel.fit(self.X_train, self.y_train, epochs=optimal_epochs, validation_split=0.3)
        y_pred_lstm = hypermodel.predict(self.X_test)
        y_pred_classes_lstm = np.argmax(y_pred_lstm, axis=1)

        accuracy_lstm = accuracy_score(self.y_test, y_pred_classes_lstm)
        conf_matrix_lstm = confusion_matrix(self.y_test, y_pred_classes_lstm)
        class_report_lstm = classification_report(self.y_test, y_pred_classes_lstm, target_names=target_names)
        # f1 = f1_score(self.y_test, y_pred_lstm, average='weighted')

        # Print the metrics
        print(f"Accuracy: {accuracy_lstm:.4f}")
        print("Confusion Matrix:")
        print(conf_matrix_lstm)
        print("Classification Report:")
        print(class_report_lstm)
        print("F1 Score:")
        # print(f1)



    def run(self):
        self.tuner_initiator()
        best_configs = self.parameter_search()
        self.output_optimimal_configs(best_configs)
        # f1_scores  = self.evaluation_initiator() # closed

        # Model 1: Pre-processed Sensor Data
        print("Model 1: LowPass Data")
        lstm_model = self.train_evaluate_model(best_configs, train_label_encoder)
        


    

In [75]:
tester = LstmPipline(X_train, X_test, y_train, y_test, train_label_encoder)
print(tester.run())

Reloading Tuner from /home/mzero/main/uni_repo/machine_learning_fqs_2024/ml4qs-gesture-recognition/lstm_section/results/ml4qs_gesture_recognition/tuner0.json

        Optimal hyperparameters:
        - Units in LSTM layer: 96
        - Dropout rate: 0.0
        - Learning rate: 0.0015717490429428354
        - Optimal number of epochs: 6 
Model 1: LowPass Data
Epoch 1/6


  super().__init__(**kwargs)


[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 21ms/step - accuracy: 0.7437 - loss: 0.6054 - val_accuracy: 0.2390 - val_loss: 3.9786
Epoch 2/6
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 20ms/step - accuracy: 0.9311 - loss: 0.2002 - val_accuracy: 0.1051 - val_loss: 5.6137
Epoch 3/6
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 20ms/step - accuracy: 0.9708 - loss: 0.1132 - val_accuracy: 0.2074 - val_loss: 5.2077
Epoch 4/6
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 20ms/step - accuracy: 0.9859 - loss: 0.0563 - val_accuracy: 0.1066 - val_loss: 6.6642
Epoch 5/6
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 20ms/step - accuracy: 0.9918 - loss: 0.0301 - val_accuracy: 0.2188 - val_loss: 6.1687
Epoch 6/6
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 20ms/step - accuracy: 0.9910 - loss: 0.0333 - val_accuracy: 0.1891 - val_loss: 7.3809
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
