In [68]:
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import GRU, Dense, Dropout, BatchNormalization
from common import sns_cm, plot_loss_accuracy, get_set
from sklearn.model_selection import train_test_split
from keras.callbacks import Callback
import pandas as pd
import math
import tensorflow as tf
import numpy as np

#### Parses the model names.

In [69]:
def parse_key(title):
    parts = title.split(", ")
    
    # Parse the key to get the number of neurons, layers, and timesteps
    neuron = int(parts[0].split(": ")[1])
    layer = int(parts[1].split(": ")[1])
    timestep = int(parts[2].split(": ")[1])
    
    return neuron, layer, timestep

### A callback to stop training if accuracy > 98% and loss < 20%

In [70]:
class MyCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        if logs.get('accuracy') > 0.98 and logs.get('val_accuracy') > 0.98 and logs.get('loss') < 0.20 and logs.get(
                'val_loss') < 0.20:
            print(
                f"\nTraining stopped at epoch: {epoch}. Reached >98% accuracy and validation accuracy, and loss and validation loss are below 0.20.")
            self.model.stop_training = True


my_callback = MyCallback()

In [71]:
def map_to_100(data, return_data=False):
    count = math.floor(data['class'].value_counts().min() / 100) * 100
    balanced_data = data.groupby('class').apply(lambda x: x.sample(count)).reset_index(drop=True)
    return (balanced_data.drop('class', axis='columns').to_numpy(), pd.get_dummies(balanced_data['class'], columns=['class']).to_numpy()) if not return_data else balanced_data

In [72]:
movement_positions: list[str] = ['Walking', 'Jumping', 'Running', 'Squats']

training_sets = get_set(positions=movement_positions, folder='Class Dataframes')
testing_sets = get_set(positions=movement_positions, folder='Test')
testing_set = pd.concat(testing_sets, axis='rows', ignore_index=True)
df = pd.concat(training_sets, axis='rows', ignore_index=True)

In [73]:
training_X, training_y = map_to_100(df)
X_test, y_test = map_to_100(testing_set)

#### -1 to make the min_class_count = 4600

In [74]:
# = df['class'].value_counts().min() - 1

### Use groupby and apply to sample min_class_count rows from each class

In [75]:
#balanced_df = df.groupby('class').apply(lambda x: x.sample(min_class_count)).reset_index(drop=True)

### Training set

In [76]:
# X = balanced_df.drop('class', axis='columns')
# y = balanced_df['class']
# 
# X = X.to_numpy()
# y = pd.get_dummies(y, columns=['class']).to_numpy()

### Testing set

In [77]:
# testing_set = m_knn.testing_set
# test_min_class_count = math.floor(testing_set['class'].value_counts().min() / 100) * 100
# balanced_df_test = testing_set.groupby('class').apply(lambda x: x.sample(test_min_class_count)).reset_index(drop=True)

In [78]:
# X_test = balanced_df_test.drop('class', axis='columns').to_numpy()
# y_test = pd.get_dummies(balanced_df_test['class'], columns=['class']).to_numpy()

### New raw data file.

In [79]:
# data_file = pd.read_pickle("neural_test_set.pkl")
# data_file_min_class_count = math.floor(data_file['class'].value_counts().min() / 100) * 100
# balanced_data_file = data_file.groupby('class').apply(lambda x: x.sample(data_file_min_class_count)).reset_index(drop=True)
# 
# X_new = balanced_data_file.drop('class', axis='columns').to_numpy()
# y_new = pd.get_dummies(balanced_data_file['class'], columns=['class']).to_numpy()

### This reshapes the data and makes sure that each window has data from the same class

In [80]:
def reshape_data(time_step, X_train, X_val, y_train, y_val):
    X_train_reshaped_list = []
    X_val_reshaped_list = []
    y_train_reshaped_list = []
    y_val_reshaped_list = []

    # Get unique classes
    classes = np.unique(np.argmax(y_train, axis=1))

    # Iterate over each class
    for class_id in classes:
        # Get data for this class
        X_train_this_class = X_train[np.argmax(y_train, axis=1) == class_id]
        X_val_this_class = X_val[np.argmax(y_val, axis=1) == class_id]
        y_train_this_class = y_train[np.argmax(y_train, axis=1) == class_id]
        y_val_this_class = y_val[np.argmax(y_val, axis=1) == class_id]

        # Calculate the number of samples that fit into the time_step
        num_samples_train = len(X_train_this_class) // time_step * time_step
        num_samples_val = len(X_val_this_class) // time_step * time_step

        # Reshape the data according to the time_step
        X_train_reshaped = X_train_this_class[:num_samples_train].reshape(-1, time_step, X_train.shape[1])
        X_val_reshaped = X_val_this_class[:num_samples_val].reshape(-1, time_step, X_val.shape[1])
        y_train_reshaped = y_train_this_class[:num_samples_train].reshape(-1, time_step, y_train.shape[1])
        y_val_reshaped = y_val_this_class[:num_samples_val].reshape(-1, time_step, y_val.shape[1])

        # Append reshaped data to lists
        X_train_reshaped_list.append(X_train_reshaped)
        X_val_reshaped_list.append(X_val_reshaped)
        y_train_reshaped_list.append(y_train_reshaped)
        y_val_reshaped_list.append(y_val_reshaped)

    # Concatenate reshaped data for all classes
    X_train_reshaped_all_classes = np.concatenate(X_train_reshaped_list)
    X_val_reshaped_all_classes = np.concatenate(X_val_reshaped_list)
    y_train_reshaped_all_classes = np.concatenate(y_train_reshaped_list)
    y_val_reshaped_all_classes = np.concatenate(y_val_reshaped_list)

    return X_train_reshaped_all_classes, X_val_reshaped_all_classes, y_train_reshaped_all_classes, y_val_reshaped_all_classes


In [81]:
def reshape_test(time_step, _X_test, _y_test):
    X_test_reshaped_list = []
    y_test_reshaped_list = []

    # Get unique classes
    classes = np.unique(np.argmax(_y_test, axis=1))

    # Iterate over each class
    for class_id in classes:
        X_test_this_class = _X_test[np.argmax(_y_test, axis=1) == class_id]
        y_test_this_class = _y_test[np.argmax(_y_test, axis=1) == class_id]

        num_samples_test = len(X_test_this_class) // time_step * time_step

        # Reshape the data according to the time_step
        X_test_reshaped = X_test_this_class[:num_samples_test].reshape(-1, time_step, _X_test.shape[1])
        y_test_reshaped = y_test_this_class[:num_samples_test].reshape(-1, time_step, _y_test.shape[1])

        # Append reshaped data to lists
        X_test_reshaped_list.append(X_test_reshaped)
        y_test_reshaped_list.append(y_test_reshaped)

    # Concatenate reshaped data for all classes
    X_test_reshaped_all_classes = np.concatenate(X_test_reshaped_list)
    y_test_reshaped_all_classes = np.concatenate(y_test_reshaped_list)

    return X_test_reshaped_all_classes, y_test_reshaped_all_classes
        

## Classification of the four movement classes

### You should use RNN as a classifier. To save time we will just use the GRU neuron. Hyperparameters you can change for optimizing are:
##### — Number of neurons in each layer. You need to provide result for at least three different sizes of a layer
##### — Number of layers. We can limit ourself to 1 and 2 layers.
##### — Window length, You need to provide result for at least 3 different window lengths
##### This does mean that you need to train 2 x 3 x 3 models.

### Hyperparameters

In [82]:
neurons = [20, 40, 60, 80, 100]
layers = [1, 2, 3, 4, 5]
time_steps = [25, 50, 100]  # Number of previous time steps used to predict each step.
training_scores = {}

### Model information

In [83]:
models = {}
histories = {}
sets = {}
test_sets = {}

### Reshape the training data, pre-populate sets

In [84]:
for i in time_steps:
    X_train, X_val, y_train, y_val = train_test_split(training_X, training_y, test_size=0.30, shuffle=False)
    X_train, X_val, y_train, y_val = reshape_data(i, X_train, X_val, y_train, y_val)
    sets[i] = X_train, X_val, y_train, y_val

### Reshape the testing data, pre-populate test_sets

In [85]:
for i in time_steps:
    _X_test, _y_test = reshape_test(i, X_test, y_test)
    test_sets[i] = _X_test, _y_test

### Reshape the new raw data, pre-populate test_sets

### Create the models

In [86]:
for n in neurons:
    for layer in layers:
        for timestep in time_steps:
            model = Sequential()
            X_train, X_val, y_train, y_val = sets[timestep]

            model.add(
                GRU(
                    units=n,
                    return_sequences=True,
                    input_shape=(timestep, X_train.shape[2])
                )
            )

            for _ in range(layer - 1):
                model.add(
                    GRU(
                        units=n,
                        activation='relu',
                        return_sequences=True))

                model.add(
                    Dropout(
                        rate=0.3
                    )
                )

                model.add(
                    BatchNormalization()
                )

            model.add(Dense(len(movement_positions), activation='softmax'))

            sets[f"model - neurons: {n}, layers: {layer}, win_len: {timestep}"] = (X_train, X_val, y_train, y_val)
            models[f"model - neurons: {n}, layers: {layer}, win_len: {timestep}"] = model

### Show the models

In [87]:
for model in models.values():
    model.summary()

Model: "sequential_150"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 gru_450 (GRU)               (None, 25, 20)            1680      
                                                                 
 dense_150 (Dense)           (None, 25, 4)             84        
                                                                 
Total params: 1,764
Trainable params: 1,764
Non-trainable params: 0
_________________________________________________________________
Model: "sequential_151"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 gru_451 (GRU)               (None, 50, 20)            1680      
                                                                 
 dense_151 (Dense)           (None, 50, 4)             84        
                                                                 
Total params: 1,764
Traina

### Compile the models

In [88]:
for model in models.values():
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

### Train the models

In [89]:
epochs = 150

In [None]:
%%capture
for key, model in models.items():
    X_train, X_val, y_train, y_val = sets[key]

    # Add callback: callbacks=[my_callback] for faster performance.
    history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=epochs).history
    histories[key] = history

### Plot loss and accuracy for validation and training set.

In [None]:
%%capture
for key, history in histories.items():
    plot_loss_accuracy(history=history, name=key, same_graph=True)

### Accuracy as a function of parameters.

In [None]:
def plot_metrics(metric, is_train=False):
    global training_scores, models, histories

    # Create empty lists to hold max metric values
    max_metric_neurons = []
    max_metric_layers = []
    max_metric_timesteps = []

    # Iterate over models
    for key in models.keys():
        neuron, layer, timestep = parse_key(key)

        if is_train:
            history = training_scores[key]
            max_metric = history[1] if metric == 'accuracy' else history[0]
        else:
            # Get the corresponding history
            history = histories[key]
            # Get the max metric value
            max_metric = max(history[metric])

        # Append max metric value to a corresponding list
        if neuron not in [i[0] for i in max_metric_neurons]:
            max_metric_neurons.append((neuron, max_metric))
        if layer not in [i[0] for i in max_metric_layers]:
            max_metric_layers.append((layer, max_metric))
        if timestep not in [i[0] for i in max_metric_timesteps]:
            max_metric_timesteps.append((timestep, max_metric))

    # Sort lists by neurons/layers/timesteps
    max_metric_neurons.sort(key=lambda x: x[0])
    max_metric_layers.sort(key=lambda x: x[0])
    max_metric_timesteps.sort(key=lambda x: x[0])

    # Create plots
    plt.figure(figsize=(20, 5))

    plt.subplot(1, 3, 1)
    plt.plot([i[0] for i in max_metric_neurons], [i[1] for i in max_metric_neurons])
    plt.title(f'Max {metric} vs Neurons')
    plt.xlabel('Neurons')
    plt.ylabel(f'Max {metric}')

    plt.subplot(1, 3, 2)
    plt.plot([i[0] for i in max_metric_layers], [i[1] for i in max_metric_layers])
    plt.title(f'Max {metric} vs Layers')
    plt.xlabel('Layers')
    plt.ylabel(f'Max {metric}')

    plt.subplot(1, 3, 3)
    plt.plot([i[0] for i in max_metric_timesteps], [i[1] for i in max_metric_timesteps])
    plt.title(f'Max {metric} vs Timesteps')
    plt.xlabel('Timesteps')
    plt.ylabel(f'Max {metric}')

    plt.tight_layout()
    plt.show()


In [None]:
plot_metrics(metric='accuracy')
plot_metrics(metric='val_accuracy')

### Accuracies

In [None]:
%%capture
for key, model in models.items():
    _, _, timestep = parse_key(key)
    _x, _y = test_sets[timestep]
    training_scores[key] = model.evaluate(_x, _y)

### Plot accuracy and loss as a function of each parameter.

In [None]:
plot_metrics(metric='accuracy', is_train=True)
plot_metrics(metric='loss', is_train=True)

### Predictions

In [None]:
for key, model in models.items():
    _, _, timestep = parse_key(key)
    X_tes, y_tes = test_sets[timestep]

    predicated_probabilities = model.predict(X_tes)
    sns_cm(y_true= y_tes.argmax(axis=2).flatten(), y_pred= predicated_probabilities.argmax(axis=2).flatten())
    plt.title(key)
    plt.show()