In [None]:
import os
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import numpy as np
import albumentations as A

import keras
from keras import layers, models
import h5py
import shutil
import gc


os.environ["KERAS_BACKEND"] = "torch"

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def getHDF5Files(path):
    hdf5_files = []
    data_folders = os.listdir(path)
    filter_file = lambda x: x.endswith('.h5')
    #filter_file = lambda x: x.endswith('.h5') and not ("ToiletSit2" in x) and not ("ToiletSit3" in x) and not ("ToiletStand2" in x) and not ("ToiletStand3" in x)
    chunks = list(filter(filter_file, os.listdir(path)))
    for chunk in chunks:
        filePath = os.path.join(path, chunk)
        hdf5_files.append(filePath)
    return hdf5_files


def normalize_middle_dimensions(data):
    """
    Normalize only the middle dimensions (64, 256) of the data with shape (X, 64, 256, 1)
    using standard normalization (z-score normalization).

    Args:
        data (numpy array): The input data of shape (X, 64, 256, 1).

    Returns:
        numpy array: Normalized data where only the (64, 256) dimensions are normalized.
    """
    # Compute mean and standard deviation along the (64, 256) dimensions
    mean = np.mean(data, axis=(1, 2), keepdims=True)
    std = np.std(data, axis=(1, 2), keepdims=True)

    # Avoid division by zero by ensuring std is not zero
    epsilon = 1e-8
    std_adjusted = np.where(std > 0, std, epsilon)

    # Apply standard normalization
    normalized_data = (data - mean) / std_adjusted

    return normalized_data

def extract_data_from_hdf5(file_paths, reshaped_dim, n_channels, timesteps, UseWeights = False, weights= None, shuffle = True, stride=3, Augmentation=True):

    #azimuth = None
    elevation = None
    doppler = None
    labels = None

    augmentation_pipeline = A.Compose([
        A.VerticalFlip(p=0.5)  # Vertical flip with a 50% probability
    ])

    for file_path in file_paths:
        try:
            with h5py.File(file_path, 'r') as hdf5:
                #azimuth_data = hdf5["Azimuth"][:]
                elevation_data = hdf5["Elevation"][:]
                doppler_data = hdf5["Doppler"][:]
                label_data = hdf5["label"][:]
                length = int(len(elevation_data)//stride) - timesteps

                #azimuth_reshaped = np.empty((length, timesteps,  *reshaped_dim, n_channels), dtype=np.float32)
                elevation_reshaped = np.empty((length, timesteps, *reshaped_dim, n_channels), dtype=np.float32)
                doppler_reshaped = np.empty((length, timesteps, *reshaped_dim, n_channels), dtype=np.float32)
                label_reshaped = np.empty((length), dtype=np.int32)

                for i in range(length):
                  for step in range(timesteps):
                    #azimuth_reshaped[i, step] = np.reshape(azimuth_data[stride*i + step], (*reshaped_dim, n_channels))
                    elevation_reshaped[i, step] = np.reshape(elevation_data[stride*i + step], (*reshaped_dim, n_channels))
                    doppler_reshaped[i, step] = np.reshape(doppler_data[stride*i + step], (*reshaped_dim, n_channels))

                    if Augmentation:
                      #azimuth_reshaped[i, step] = augmentation_pipeline(image=azimuth_reshaped[i, step])["image"]
                      elevation_reshaped[i, step] = augmentation_pipeline(image=elevation_reshaped[i, step])["image"]
                      doppler_reshaped[i, step] = augmentation_pipeline(image=doppler_reshaped[i, step])["image"]

                  label_reshaped[i] = label_data[stride*i]
                  if (label_reshaped[i] == 6) or (label_reshaped[i] == 7) or (label_reshaped[i] == 8):
                    label_reshaped[i] = 6

                  if UseWeights:
                    elevation_reshaped[i] = elevation_reshaped[i] * weights[label_reshaped[i]][0]
                    doppler_reshaped[i] = doppler_reshaped[i] *  weights[label_reshaped[i]][1]



                if elevation is None:
                    #azimuth = azimuth_reshaped
                    elevation = elevation_reshaped
                    doppler = doppler_reshaped
                    labels = label_reshaped
                else:
                    #azimuth = np.concatenate((azimuth, azimuth_reshaped), axis=0)
                    elevation = np.concatenate((elevation, elevation_reshaped), axis=0)
                    doppler = np.concatenate((doppler, doppler_reshaped), axis=0)
                    labels = np.concatenate((labels, label_reshaped), axis=0)

                #del azimuth_reshaped
                del elevation_reshaped
                del doppler_reshaped
                del label_reshaped
                gc.collect()

        except Exception as e:
            print(f"Error processing file {file_path}: {e}")
    if shuffle:
      indices = np.arange(elevation.shape[0])  # Assuming shape[0] is 'length'
      np.random.shuffle(indices)
      #azimuth = azimuth[indices]
      elevation = elevation[indices]
      doppler = doppler[indices]
      labels = labels[indices]

    #normalized_azimuth = normalize_middle_dimensions(azimuth)
    normalized_elevation = normalize_middle_dimensions(elevation)
    normalized_doppler = normalize_middle_dimensions(doppler)



    #del azimuth
    del elevation
    del doppler
    gc.collect()

    #return [normalized_azimuth, normalized_elevation, normalized_doppler], labels
    return [normalized_elevation, normalized_doppler], labels


In [None]:
#Defining model inputs
# Define the model
#del model


timesteps = 10

input1 = keras.Input(shape=(timesteps, 32, 256, 1))
input2 = keras.Input(shape=(timesteps, 32, 256, 1))
#input3 = keras.Input(shape=(timesteps, 32, 256, 1))

#Applying Convolutional Layers


#For input 1
conv1 = layers.TimeDistributed(layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu', padding='same' ))(input1)
#conv1 = layers.TimeDistributed(layers.Dropout(0.1))(conv1)
conv1 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv1)
conv1 = layers.TimeDistributed(layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same' ))(conv1)
#conv1 = layers.TimeDistributed(layers.Dropout(0.1))(conv1)
conv1 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv1)
conv1 = layers.TimeDistributed(layers.Conv2D(filters=128, kernel_size=(3, 3), activation='relu', padding='same' ))(conv1)
#conv1 = layers.TimeDistributed(layers.Dropout(0.1))(conv1)
conv1 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv1)


conv1 = keras.ops.expand_dims(conv1, axis=-1)

#For input 2
conv2 = layers.TimeDistributed(layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu', padding='same' ))(input2)
#conv2 = layers.TimeDistributed(layers.Dropout(0.1))(conv2)
conv2 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv2)
conv2 = layers.TimeDistributed(layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same' ))(conv2)
#conv2 = layers.TimeDistributed(layers.Dropout(0.1))(conv2)
conv2 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv2)
conv2 = layers.TimeDistributed(layers.Conv2D(filters=128, kernel_size=(3, 3), activation='relu', padding='same' ))(conv2)
#conv2 = layers.TimeDistributed(layers.Dropout(0.1))(conv2)
conv2 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv2)

conv2 = keras.ops.expand_dims(conv2, axis=-1)

'''
#For input 3
conv3 = layers.TimeDistributed(layers.Conv2D(filters=16, kernel_size=(3, 3), activation='relu', padding='same' ))(input3)
conv3 = layers.TimeDistributed(layers.Dropout(0.1))(conv3)
conv3 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv3)
conv3 = layers.TimeDistributed(layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu', padding='same' ))(conv3)
conv3 = layers.TimeDistributed(layers.Dropout(0.1))(conv3)
conv3 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv3)
conv3 = layers.TimeDistributed(layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same' ))(conv3)
conv3 = layers.TimeDistributed(layers.Dropout(0.1))(conv3)
conv3 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2, 2)))(conv3)

conv3 = keras.ops.expand_dims(conv3, axis=-1)
'''

#Concatenating each Convolutional Layer
concatenated = layers.Concatenate(axis=-1)([conv1, conv2])
#concatenated = layers.Concatenate(axis=-1)([conv1, conv2, conv3])

x = layers.TimeDistributed(layers.Flatten())(concatenated)

x = layers.TimeDistributed(layers.Dense(128, activation='relu'))(x)
x = layers.BatchNormalization()(x)
#x = layers.Dropout(0.1)(x)

x = layers.LSTM(128, return_sequences=False, activation='relu')(x)
#x = layers.BatchNormalization()(x)
#x = layers.Dropout(0.1)(x)


x = layers.Dense(96, activation='relu')(x)
x = layers.BatchNormalization()(x)
#x = layers.Dropout(0.1)(x)


x = layers.Dense(64, activation='relu')(x)
x = layers.BatchNormalization()(x)
#x = layers.Dropout(0.1)(x)


x = layers.Dense(32, activation='relu')(x)
x = layers.BatchNormalization()(x)
#x = layers.Dropout(0.1)(x)


output = layers.Dense(7, activation='softmax')(x)

# Create the model
model = models.Model(inputs=[input1, input2], outputs=output)
#model = models.Model(inputs=[input1, input2, input3], outputs=output)

# Print model summary
model.summary()

# Setting learning rate
optimizer = keras.optimizers.Adam(learning_rate=0.000001)

# Compiling model and history
model.compile(optimizer=optimizer,
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy'])

In [None]:
# Training the model
#keras.mixed_precision.set_global_policy('float32')
#print(model.dtype_policy)
Trainpath = '/content/drive/MyDrive/Josh/ProcessedMyData10fps/Unshuffled/Training'
Train_hdf5_files = getHDF5Files(Trainpath)
Valpath = '/content/drive/MyDrive/Josh/ProcessedMyData10fps/Unshuffled/Validation'
Val_hdf5_files = getHDF5Files(Valpath)

batch_size = 16
reshaped_dim = (32, 256)
n_channels = 1
timesteps =   10
#order of classes 0-toiletSit, 1-ToiletStand, 2-WashHands, 3-Floor, 4-Walk, 5-MultiPerson, 6-Clean
#Weights in form [Elevation, Doppler]
weights = [[0.9, 0.1], [0.9, 0.1], [0.5, 0.5], [0.9, 0.1], [0.1, 0.9], [0.5, 0.5], [0.2, 0.8]]

print("Processing Training Data")
x_train, y_train = extract_data_from_hdf5(Train_hdf5_files, reshaped_dim, n_channels, timesteps, UseWeights=True, weights = weights, stride=4, Augmentation=True)
print("Processing Validation Data")
x_val, y_val = extract_data_from_hdf5(Val_hdf5_files, reshaped_dim, n_channels, timesteps, UseWeights=False, weights = None, stride=timesteps, Augmentation=False)



from sklearn.utils import class_weight

class_weights = class_weight.compute_class_weight(class_weight='balanced',
                                                 classes=np.unique(y_train),
                                                 y=y_train)

# Convert class weights to a dictionary format
class_weight_dict = dict(enumerate(class_weights))

#callback = keras.callbacks.EarlyStopping(monitor='val_loss', patience=6)
history = model.fit(x_train, y_train, batch_size=batch_size, epochs=200, validation_data=(x_val, y_val), shuffle=True, class_weight=class_weight_dict)

In [None]:

model = keras.saving.load_model("/content/drive/MyDrive/Josh/Results/LSTM-Tuning/FinalModel/Final-Augment/Model.keras")


reshaped_dim = (32, 256)
n_channels = 1
timesteps =   10


In [None]:
savePath = "/content/drive/MyDrive/Josh/Results/LSTM-Tuning/FinalModel/Final-Augment"
if not os.path.exists(savePath):
  os.makedirs(savePath)
model.save(os.path.join(savePath, "Model.keras"))

In [None]:
Valpath = '/content/drive/MyDrive/Josh/ProcessedMyData10fps/Unshuffled/Validation'

Val_hdf5_files = getHDF5Files(Valpath)


reshaped_dim = (32, 256)
n_channels = 1
timesteps =   10
#x_val, y_val = extract_data_from_hdf5(Val_hdf5_files, reshaped_dim, n_channels, timesteps, UseWeights=False, weights = None, stride=timesteps, Augmentation=False)

#text_x = x_val
#text_y = y_val

test_loss, test_acc = model.evaluate(x_val, y_val)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_acc}")

# Predict the labels for the test data
y_pred = np.argmax(model.predict(x_val), axis=-1)

# Compute the confusion matrix
cm = confusion_matrix(y_val, y_pred)

# Define the save path
save_path = '/content/drive/MyDrive/Josh/Results/LSTM-Tuning/FinalModel/Final-Normal'

# Compute and plot the confusion matrix
plt.figure(figsize=(10, 8))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(7)  # Adjust this to the number of classes
class_labels = ['ToiletSit', 'ToiletStand', 'WashHands', 'WashroomSit', 'WashroomWalk', "Multiperson", "Clean"]
plt.xticks(tick_marks, class_labels, rotation=45)
plt.yticks(tick_marks, class_labels)

# Annotate the confusion matrix
fmt = 'd'
thresh = cm.max() / 2.
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
accuracy = test_acc * 100

# Overlay accuracy and voted accuracy below the plot
plt.figtext(0.5, 0.01, f"Accuracy: {accuracy:.2f}%",
            fontsize=12, color="black", horizontalalignment='center')

# Finalize the plot
plt.tight_layout(rect=[0, 0.05, 1, 1])  # Adjust layout to make space for text
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.savefig(os.path.join(save_path, 'confusion_matrix.png'))  # Save figure
plt.show()

# Plot training and validation accuracy
plt.figure()
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.savefig(os.path.join(save_path, 'accuracy_plot.png'))  # Save figure
plt.show()

# Plot training and validation loss
plt.figure()
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.savefig(os.path.join(save_path, 'loss_plot.png'))  # Save figure
plt.show()

In [None]:
#Voting Algorithm
from collections import Counter
def replace_with_most_common(data, chunk_size=3):
    # Ensure the input list length is a multiple of chunk_size


    result = []

    for i in range(len(data)//chunk_size):
        chunk = data[chunk_size*i:(i+1)*chunk_size]
        # Find the most common value in the chunk
        most_common = Counter(chunk).most_common(1)[0][0]
        # Append the most common value for the whole chunk
        result.extend([most_common] * chunk_size)

    remainder = len(data) % chunk_size
    if remainder > 0:
        result.extend(data[-remainder:])

    return result

In [None]:
#Graph of predicted Label vs Real Label over time for each Activity File

savePath = "/content/drive/MyDrive/Josh/Results/LSTM-Tuning/FinalModel/Final-Augment/ByFileValidation"
basePath = '/content/drive/MyDrive/Josh/ProcessedMyData10fps/Unshuffled/Validation'

if os.path.exists(savePath):
  shutil.rmtree(savePath)
os.makedirs(savePath)


file_names = getHDF5Files(basePath)

MetaRealLabels = []
MetaPredictedLabels = []
MetaPredictedVotedLabels = []

for filePath in file_names:
  real_label = []
  predicted_label = []
  voted_label = []
  correct_count = 0
  voted_correct_count = 0


  file_name = os.path.basename(filePath)
  file_name = file_name[:-3]

  if not os.path.exists(filePath):
    print("file not found: ", filePath)
    continue

  batch_size = 16
  reshaped_dim = (32, 256)
  n_channels = 1
  timesteps =   10


  test_data, test_labels = extract_data_from_hdf5([filePath], reshaped_dim, n_channels, timesteps, UseWeights=False, weights = None, stride=timesteps, Augmentation=False)

  #test_data = np.array(test_data)
  test_labels = np.array(test_labels)

  # Predict the labels for the test data
  y_pred = np.argmax(model.predict(test_data), axis=-1)

  predicted_classes = y_pred
  # Extract the predicted class for each sample
  length = len(predicted_classes)

  voted1 = replace_with_most_common(predicted_classes, chunk_size=5)
  voted = replace_with_most_common(voted1, chunk_size=30)

  MetaRealLabels.extend(test_labels)
  MetaPredictedLabels.extend(predicted_classes)
  MetaPredictedVotedLabels.extend(voted)



  # Print only the predicted class for each sample
  for i in range(length):

      predicted_class = predicted_classes[i]
      real_class = test_labels[i]
      voted_class = voted[i]


      if predicted_class == real_class:
          correct = "yes"
          correct_count += 1
      if voted_class == real_class:
          voted_correct_count += 1
      else:
          correct = "no"

      real_label.append(real_class)
      predicted_label.append(predicted_class)
      voted_label.append(voted_class)


  accuracy = (correct_count / length) * 100
  voted_accuracy = (voted_correct_count / length) * 100




  #Plotting Code

  xaxis = list(range(1, length + 1))

  fig, ax = plt.subplots()

  # Plot the first list against the x-axis list
  ax.plot(xaxis, real_label, label='Real Label', color='b',  linewidth=10)

  # Plot the second list against the same x-axis list
  ax.plot(xaxis, predicted_label, label='Predicted Label', color='r',linewidth=6)

  # Plot the third list against the same x-axis
  ax.plot(xaxis, voted_label, label='Voted Label', color='g', linewidth=2)

  # Adding labels and title
  ax.set_xlabel('Samples (1.66 per second)')
  ax.set_ylabel('Label')
  ax.set_title(f"predictions for {file_name}")

  # Adding legend
  ax.legend()

  # Y Axis
  y_ticks = [0, 1, 2, 3, 4, 5, 6]
  y_labels = ['ToiletSit', "ToiletStand", "WashHands", "Fall", "Walk", 'MultiPerson', "Clean"]
  ax.set_yticks(y_ticks)
  ax.set_yticklabels(y_labels)

  # Set the y-axis to start at -1
  ax.set_ylim(-1, max(y_ticks) + 1)

  ax.text(0.05, 0.95, f"\nAccuracy: {accuracy}%\n\n", transform=ax.transAxes,
          verticalalignment='top', horizontalalignment='left',
          color='green', fontsize=12,  alpha=0.8)

  ax.text(0.05, 1, f"\nVoted Accuracy: {voted_accuracy}%\n\n", transform=ax.transAxes,
          verticalalignment='top', horizontalalignment='left',
          color='green', fontsize=12,  alpha=0.8)

  # Show plot
  #plt.show()
  figurepath = savePath + "/Predictions_" + file_name + ".png"
  fig.savefig(figurepath)
  plt.close(fig)





In [None]:
#Confusion Matrix for Voted Labels
length = len(MetaPredictedVotedLabels)

correct_count = 0
voted_correct_count = 0

for i in range(length):

    real_class = MetaRealLabels[i]
    predicted_class = MetaPredictedLabels[i]
    voted_class = MetaPredictedVotedLabels[i]

    if predicted_class == real_class:
        correct_count += 1
    if voted_class == real_class:
        voted_correct_count += 1

accuracy = (correct_count / length) * 100
print("The Accuracy is ", accuracy, "%")
voted_accuracy = (voted_correct_count / length) * 100
print("The Voted Accuracy is ", voted_accuracy, "%")

# Compute the confusion matrix
cm = confusion_matrix(MetaRealLabels, MetaPredictedVotedLabels)

# Define the save path
save_path = '/content/drive/MyDrive/Josh/Results/LSTM-Tuning/FinalModel/Final-Normal'

# Compute and plot the confusion matrix
plt.figure(figsize=(10, 8))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(7)  # Adjust this to the number of classes
class_labels = ['ToiletSit', 'ToiletStand', 'WashHands', 'WashroomSit', 'WashroomWalk', "Multiperson", "Clean"]
plt.xticks(tick_marks, class_labels, rotation=45)
plt.yticks(tick_marks, class_labels)

# Annotate the confusion matrix
fmt = 'd'
thresh = cm.max() / 2.
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

# Overlay accuracy and voted accuracy below the plot
plt.figtext(0.5, 0.01, f"Accuracy: {accuracy:.2f}%, Voted Accuracy: {voted_accuracy:.2f}%",
            fontsize=12, color="black", horizontalalignment='center')

# Finalize the plot
plt.tight_layout(rect=[0, 0.05, 1, 1])  # Adjust layout to make space for text
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.savefig(os.path.join(save_path, 'voted_confusion_matrix.png'))  # Save figure
plt.show()





In [None]:

keras.backend.clear_session()
gc.collect()
del model