<a href="https://colab.research.google.com/github/jnguy450/Simple_ASL_Translation_TFModel/blob/main/03_18_23_ASL_alphabet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Model Training

In [None]:
# Install the required modules.
!pip install mediapipe

In [None]:
# Import the required libraries.
import os
import cv2
import math
import random
import numpy as np
import datetime as dt
import tensorflow as tf
from collections import deque
import matplotlib.pyplot as plt
import mediapipe as mp
import time
import copy
import itertools

%matplotlib inline
 
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
 
from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import plot_model

In [None]:
seed_constant = 27
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Discard the output of this cell.
%%capture
 
# Extract the Dataset (can use any zip file)
# !unzip "/content/drive/MyDrive/Colab Notebooks/ASL_Dataset_Clips_v1_0.zip"
!unzip "/content/drive/MyDrive/CPSLO/Courses/CPE/CPE461 462: Senior Project/ASL_Translation_Tool/Datasets/ASL_Dataset_Clips_v0.zip"
# !unzip "/content/drive/MyDrive/CPSLO/Courses/CPE/CPE461 462: Senior Project/ASL_Translation_Tool/Datasets/ASL_Dataset_Clips_v1.zip"
# !unzip "/content/drive/MyDrive/CPSLO/Courses/CPE/CPE461 462: Senior Project/ASL_Translation_Tool/Datasets/ASL_Dataset_Clips_v2.zip"
# !unzip "/content/drive/MyDrive/CPSLO/Courses/CPE/CPE461 462: Senior Project/ASL_Translation_Tool/Datasets/alphabet_vid.zip"

In [None]:
# Specify the height and width to which each video frame will be resized in our dataset.
IMAGE_HEIGHT , IMAGE_WIDTH = 700, 700
 
# Specify the number of frames of a video that will be fed to the model as one sequence.
SEQUENCE_LENGTH = 25

# Specify the number of landmark frames and landmarks
NUM_LANDMARKS_LIST_ENTRIES = 42
NUM_LANDMARKS_LISTS = 5
 
# Specify the directory containing the WLASL dataset. 
DATASET_DIR = "ASL_Dataset_Clips_v0"
 
# Specify the list containing the names of the classes used for training. Feel free to choose any set of classes.
CLASSES_LIST = os.listdir(DATASET_DIR)
CLASSES_LIST.remove(".DS_Store")

In [None]:
def frames_extraction(video_path):
    '''
    This function will extract the required frames from a video after resizing and normalizing them.
    Args:
        video_path: The path of the video in the disk, whose frames are to be extracted.
    Returns:
        frames_list: A list containing the resized and normalized frames of the video.
    '''
 
    # Declare a list to store video frames.
    frames_list = []
    
    # Read the Video File using the VideoCapture object.
    video_reader = cv2.VideoCapture(video_path)
 
    # Get the total number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
 
    # Calculate the the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)
    # skip_frames_window = 1
 
    # Iterate through the Video Frames.
    for frame_counter in range(SEQUENCE_LENGTH):
 
        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
 
        # Reading the frame from the video. 
        success, frame = video_reader.read() 
 
        # Check if Video frame is not successfully read then break the loop
        if not success:
            break
 
        # Resize the Frame to fixed height and width.
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))
        
        # Append the resized frame into the frames list
        frames_list.append(resized_frame)
    
    # Release the VideoCapture object. 
    video_reader.release()
 
    # Return the frames list.
    return frames_list

In [None]:
# Create Hands object and Hands drawing object.
mp_hands = mp.solutions.hands
mp_draw = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

hands = mp_hands.Hands()

In [None]:
def calculate_landmark_list(frame, hand_landmarks):
    frame_width, frame_height = frame.shape[1], frame.shape[0]

    landmark_point = []

    # Get keypoint.
    for _, landmark in enumerate(hand_landmarks.landmark):
        landmark_x = min(int(landmark.x * frame_width), frame_width - 1)
        landmark_y = min(int(landmark.y * frame_height), frame_height - 1)

        landmark_point.append([landmark_x, landmark_y])

    return landmark_point

In [None]:
def pre_process_landmark(landmark_list):
    temp_landmark_list = copy.deepcopy(landmark_list)

    # Convert to relative coordinates.
    base_x, base_y = 0, 0
    for index, landmark_point in enumerate(temp_landmark_list):
        if index == 0:
            base_x, base_y = landmark_point[0], landmark_point[1]

        temp_landmark_list[index][0] = temp_landmark_list[index][0] - base_x
        temp_landmark_list[index][1] = temp_landmark_list[index][1] - base_y

    # Convert to a one-dimensional list.
    temp_landmark_list = list(itertools.chain.from_iterable(temp_landmark_list))

    # Normalization
    max_value = max(list(map(abs, temp_landmark_list)))

    def normalize_(n):
        return n / max_value

    temp_landmark_list = list(map(normalize_, temp_landmark_list))

    return temp_landmark_list

In [None]:
def pre_process_point_history(frame, point_history):
    frame_width, frame_height = frame.shape[1], frame.shape[0]

    temp_point_history = copy.deepcopy(point_history)

    # Convert to relative coordinates.
    base_x, base_y = 0, 0
    for index, point in enumerate(temp_point_history):
        if index == 0:
            base_x, base_y = point[0], point[1]

        temp_point_history[index][0] = (temp_point_history[index][0] - base_x) / frame_width
        temp_point_history[index][1] = (temp_point_history[index][1] - base_y) / frame_height

    # Convert to a one-dimensional list.
    temp_point_history = list(itertools.chain.from_iterable(temp_point_history))

    return temp_point_history

In [None]:
def detect_hand_landmarks(frame):
    # Flip frame.
    frame = cv2.flip(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), 1)

    # Get hand landmarks.
    frame.flags.writeable = False
    results = hands.process(frame)
    frame.flags.writeable = True

    frame_landmark_list = []
    if results.multi_hand_landmarks:
        for hand_landmarks, handedness in zip(results.multi_hand_landmarks, results.multi_handedness):
            # Get landmark calculations.
            landmark_list = calculate_landmark_list(frame, hand_landmarks)

            # Convert landmark calculations to relative/normalized coordinates.
            pre_processed_landmark_list = pre_process_landmark(landmark_list)

            frame_landmark_list = pre_processed_landmark_list
    
    return frame_landmark_list

In [None]:
def create_dataset():
    '''
    This function will extract the data of the selected classes and create the required dataset.
    Returns:
        features:          A list containing the extracted frames of the videos.
        labels:            A list containing the indexes of the classes associated with the videos.
        video_files_paths: A list containing the paths of the videos in the disk.
    '''
 
    # Declared Empty Lists to store the features, labels and video file path values.
    features = []
    labels = []
    video_files_paths = []
    
    # Iterating through all the classes mentioned in the classes list
    for class_index, class_name in enumerate(CLASSES_LIST):
        
        # Display the name of the class whose data is being extracted.
        print(f'Extracting Data of Class: {class_name}')
        
        # Get the list of video files present in the specific class name directory.
        files_list = os.listdir(os.path.join(DATASET_DIR, class_name))
        
        # Iterate through all the files present in the files list.
        landmarks_list_counter = 0
        landmarks_list_accepted_counter = 0
        landmarks_list_history = []
        # landmarks_list_features_history = []
        for file_name in files_list:
            
            # Get the complete video path.
            video_file_path = os.path.join(DATASET_DIR, class_name, file_name)
 
            # Extract the frames of the video file.
            frames = frames_extraction(video_file_path)
 
            # Check if the extracted frames are equal to the SEQUENCE_LENGTH specified above.
            # So ignore the videos having frames less than the SEQUENCE_LENGTH.
            if len(frames) >= NUM_LANDMARKS_LISTS:
                landmarks_list = []
                for frame in frames:
                    # Get hand landmarks
                    landmarks = detect_hand_landmarks(frame)
                    if landmarks:
                        landmarks_list.append(landmarks)

                # Append the data to their repective lists.
                landmarks_list_history.append(len(landmarks_list))
                landmarks_list_counter += 1
                if len(landmarks_list) >= NUM_LANDMARKS_LISTS:
                    landmarks_list_accepted_counter += 1
                    step_val = len(landmarks_list) // NUM_LANDMARKS_LISTS
                    # landmarks_list_features_history.append(step_val)
                    if step_val < 2:
                        features.append(landmarks_list[:NUM_LANDMARKS_LISTS])
                    else:
                        new_list = landmarks_list[::step_val]
                        features.append(new_list[:NUM_LANDMARKS_LISTS])
                    labels.append(class_index)
                    video_files_paths.append(video_file_path)

        print("Length: " + str(landmarks_list_counter) + " | Accepted: " + str(landmarks_list_accepted_counter))
        print(landmarks_list_history)
 
    # Converting the list to numpy arrays
    features = np.asarray(features)
    labels = np.array(labels)  
    
    # Return the frames, class index, and video file path.
    return features, labels, video_files_paths

In [None]:
# Create the dataset.
features, labels, video_files_paths = create_dataset()

In [None]:
# View the shpae of features
features.shape

In [None]:
# Using Keras's to_categorical method to convert labels into one-hot-encoded vectors
one_hot_encoded_labels = to_categorical(labels)

In [None]:
# Split the Data into Train ( 80% ) and Test Set ( 20% ).
features_train, features_test, labels_train, labels_test = train_test_split(features, one_hot_encoded_labels, test_size = 0.2, shuffle = True, random_state = seed_constant, stratify=labels)

In [None]:
# Check number of items in each class for training
print("labels_train")
total_train_arr = np.empty(26)
for arr in labels_train:
    total_train_arr[np.where(arr == 1)[0]] += 1
print(total_train_arr)

In [None]:
# Check number of items in each class for testing
print("labels_test")
total_test_arr = np.empty(26)
for arr in labels_test:
    total_test_arr[np.where(arr == 1)[0]] += 1
print(total_test_arr)

In [None]:
def create_LSTM_model():
    model = Sequential()

    model.add(LSTM(100, input_shape=(NUM_LANDMARKS_LISTS, NUM_LANDMARKS_LIST_ENTRIES)))
    model.add(Dropout(0.5))
    model.add(Dense(100, activation='relu'))
    model.add(Dense(len(CLASSES_LIST), activation='softmax'))

    model.summary()

    return model

In [None]:
# Construct the required model.
LSTM_model = create_LSTM_model()
 
# Display the success message.
print("Model Created Successfully!")

In [None]:
# Plot the structure of the contructed model.
plot_model(LSTM_model, to_file = 'LSTM_model_structure_plot.png', show_shapes = True, show_layer_names = True)

In [None]:
# Create an Instance of Early Stopping Callback.
early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience = 100, mode = 'min', restore_best_weights = True)
 
# Compile the model and specify loss function, optimizer and metrics to the model.
LSTM_model.compile(loss = 'categorical_crossentropy', optimizer = 'Adam', metrics = ["accuracy"])
 
# Start training the model.
LSTM_model_training_history = LSTM_model.fit(x = features_train, y = labels_train, epochs = 300, batch_size = 4 , shuffle = True, validation_split = 0.2, callbacks = [early_stopping_callback])
print("Model has been trained")

# Evaluate the trained model.
model_evaluation_history = LSTM_model.evaluate(features_test, labels_test)
print("Model has been evaluated")

# Get the loss and accuracy from model_evaluation_history.
model_evaluation_loss, model_evaluation_accuracy = model_evaluation_history
 
# Define the string date format.
# Get the current Date and Time in a DateTime Object.
# Convert the DateTime object to string according to the style mentioned in date_time_format string.
date_time_format = '%Y_%m_%d__%H_%M_%S'
current_date_time_dt = dt.datetime.now()
current_date_time_string = dt.datetime.strftime(current_date_time_dt, date_time_format)
    
# Define a useful name for our model to make it easy for us while navigating through multiple saved models.
model_file_name = f'LSTM_model___Date_Time_{current_date_time_string}___Loss_{model_evaluation_loss}___Accuracy_{model_evaluation_accuracy}.h5'
 
# Save the Model.
LSTM_model.save(model_file_name)
print("Model has been saved")

In [None]:
def plot_metric(model_training_history, metric_name_1, metric_name_2, plot_name):
    '''
    This function will plot the metrics passed to it in a graph.
    Args:
        model_training_history: A history object containing a record of training and validation 
                                loss values and metrics values at successive epochs
        metric_name_1:          The name of the first metric that needs to be plotted in the graph.
        metric_name_2:          The name of the second metric that needs to be plotted in the graph.
        plot_name:              The title of the graph.
    '''
    
    # Get metric values using metric names as identifiers.
    metric_value_1 = model_training_history.history[metric_name_1]
    metric_value_2 = model_training_history.history[metric_name_2]
    
    # Construct a range object which will be used as x-axis (horizontal plane) of the graph.
    epochs = range(len(metric_value_1))
 
    # Plot the Graph.
    plt.plot(epochs, metric_value_1, 'blue', label = metric_name_1)
    plt.plot(epochs, metric_value_2, 'red', label = metric_name_2)
 
    # Add title to the plot.
    plt.title(str(plot_name))
 
    # Add legend to the plot.
    plt.legend()

In [None]:
# Visualize the training and validation loss metrices.
plot_metric(LSTM_model_training_history, 'loss', 'val_loss', 'Total Loss vs Total Validation Loss')

In [None]:
# Get confusion matrix
labels_pred = LSTM_model.predict(features_test)

In [None]:
# Print F1 scores and confusion matrix
new_labels_pred = np.argmax(labels_pred, axis=1)
new_labels_test = np.argmax(labels_test, axis=1)
print(classification_report(new_labels_test, new_labels_pred, target_names=CLASSES_LIST))
print(confusion_matrix(new_labels_test, new_labels_pred))

Ensemble

In [None]:
# Import the required libraries
from keras.models import load_model
from sklearn.metrics import accuracy_score

In [None]:
# Load models
model1 = load_model("/content/LSTM_model___Date_Time_2023_01_25__09_04_37___Loss_0.5613618493080139___Accuracy_0.875.h5")
model2 = load_model("/content/LSTM_model___Date_Time_2023_01_25__09_51_04___Loss_0.19653922319412231___Accuracy_0.9757281541824341.h5")
model3 = load_model("/content/LSTM_model___Date_Time_2023_01_25__10_25_19___Loss_0.0022365914192050695___Accuracy_1.0.h5")

In [None]:
# Set the datasets for testing
TEST_SET = features_test
TEST_LABELS = labels_test

In [None]:
# Get individual model predictions
pred1 = np.argmax(model1.predict(TEST_SET), axis=-1)
pred2 = np.argmax(model2.predict(TEST_SET), axis=-1)
pred3 = np.argmax(model3.predict(TEST_SET), axis=-1)

In [None]:
# Print confidence vectors for individual models
print(model1.predict(TEST_SET))
print(model2.predict(TEST_SET))
print(model3.predict(TEST_SET))

In [None]:
# Get accuracy scores for individual models
acc1 = accuracy_score(tf_labels_test, pred1)
acc2 = accuracy_score(tf_labels_test, pred2)
acc3 = accuracy_score(tf_labels_test, pred3)

In [None]:
# Create an array of models with weights
models = [model1, model2, model3]
weights = [acc1, acc2, acc3]

In [None]:
# Get ensemble predictions
preds = []
for model, weight in zip(models, weights):
  preds.append(model.predict(TEST_SET)*weight)

preds = np.array(preds)
summed = np.sum(preds, axis=0)
ensemble_pred = np.argmax(summed, axis=1)

In [None]:
# Get accuracy score for ensemble
tf_labels_test = np.argmax(TEST_LABELS, axis=1)
ensemble_acc = accuracy_score(tf_labels_test, ensemble_pred)

In [None]:
# Print accuracy scores
print("Accuracy Score for Ensemble = ", ensemble_acc)
print("Accuracy Score for model1 = ", acc1)
print("Accuracy Score for model2 = ", acc2)
print("Accuracy Score for model3 = ", acc3)

Citations:

Anwar, Taha, et al. “Human Activity Recognition Using Tensorflow (CNN + LSTM).” Bleed AI, 24 Aug. 2022, https://bleedai.com/human-activity-recognition-using-tensorflow-cnn-lstm/

Goncharov, Ivan. “Custom Hand Gesture Recognition with Hand Landmarks Using Google's Mediapipe + Opencv in Python.” YouTube, 13 Mar. 2022, https://www.youtube.com/watch?v=a99p_fAr6e4&amp;ab_channel=IvanGoncharov .