# Imports and Initializations

## Import necessary Libraries

In [None]:
import cv2
import numpy as np
import os
import time
import mediapipe as mp
import uuid


from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard
from tkinter import *
from PIL import Image, ImageTk
from scipy import stats

## Initialise Mediapipe variables

In [None]:
#Initialising MediaPipe's holistic model and Drawing Utilites

#Allows for simultaneous detection of body, hand and facial landmarks
mp_holistic = mp.solutions.holistic

#Helps to draw detection landmarks onto the image
mp_drawing = mp.solutions.drawing_utils

# Function definitions for Landmark Detection and Keypoint Extraction

In [None]:
#Function for detecting landmarks using MediaPipe and OpenCV
def landmarks_detection(image, model):
    
    #Converting image from BGR to RGB since OpenCV uses BGR as default while MediaPipe uses RGB
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) 
    
    #Turning off writeable feature of Numpy array as MediaPipe will crash when trying to process writeable arrays
    image.flags.writeable = False   
    
    #Process the image to get the detection results
    detection_results = model.process(image)   
    
    #Turning the writeability of Numpy array back on
    image.flags.writeable = True 
    
    #Turning the image back to BGR
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) 
    
    #Returning image and the detection results
    return image, detection_results

In [None]:
def draw_styled_landmarks(image, results):
    # Define drawing specs for each landmark type
    drawing_specs = {
        'face': mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
        'pose': mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
        'left_hand': mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
        'right_hand': mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4)
    }

    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, 
                             drawing_specs['face'], drawing_specs['face'])
                             
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             drawing_specs['pose'], drawing_specs['pose'])

    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             drawing_specs['left_hand'], drawing_specs['left_hand'])
                             
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             drawing_specs['right_hand'], drawing_specs['right_hand']) 



In [None]:
#Helper function to extract x, y, z, visibility from each landmarks
def get_landmarks(landmarks, num_landmarks, dimensions=4):
    
    #If landmarks is detected
    if landmarks:
        #Returning a flattened array containing x, y, z, and visibility values of each landmark based on if visibility is present or not
        return np.array([[landmark.x, landmark.y, landmark.z] + ([landmark.visibility] if dimensions == 4 else []) for landmark in landmarks.landmark]).flatten()
    
    #If no landmarks are detected returning a zero filled numpy array with size equal to the number of expected landmarks multiplied by the number of dimensions of each landmark
    return np.zeros(num_landmarks * dimensions)

In [None]:
def extract_keypoints(results):
    
    #Extracting pose keypoints
    # 33 landmarks are expected for pose, and each landmark has 4 values (x, y, z, visibility)
    pose = get_landmarks(results.pose_landmarks, 33, 4)
    
    #Extracting face keypoints
    #468 landmarks are expected for face and each landmark has 3 values (x, y, z)
    face = get_landmarks(results.face_landmarks, 468, 3)
    
    #Extracting left hand keypoints
    #21 landmarks are expected for face and each landmark has 3 values (x, y, z)
    lh = get_landmarks(results.left_hand_landmarks, 21, 3)
    
    #Extracting right hand keypoints
    #21 landmarks are expected for face and each landmark has 3 values (x, y, z)
    rh = get_landmarks(results.right_hand_landmarks, 21, 3)

    #Concatenating all the keypoints into a single numpy array and returning it
    return np.concatenate([pose, face, lh, rh])

# Setting up Video Capture

## Make Detections and Draw Landmarks

In [None]:
#Initialising the webcam feed with OpenCV's VideoCapture object
cap = cv2.VideoCapture(0, cv2.CAP_AVFOUNDATION)

#Setting mediapipe's holistic model for simultaneous body, face and hand landmark detection 
# The minimum detection and tracking confidence parameters are set to 0.5 to begin tracking.
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    #Continues as long as the webcam feed is active
    while cap.isOpened():

        #Reads a frame from the webcam 
        ret, frame = cap.read()

        #Making detections on the frame using 'landmarks_detection' function that uses holistic model to process the frame
        image, results = landmarks_detection(frame, holistic)
        print(results)
        
        #Drawing the detected landmarks on the image using the draw_styled_landmarks
        draw_styled_landmarks(image, results)

        #Displaying the processed fram with the detected landmarks drawn on it in a new window
        cv2.imshow('OpenCV Feed', image)

        #The 'waitKey' function listens for any keyboard event for the time specified in milliseconds. If 'q' is pressed, the loop is broken.
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    
    #Release the webcam once finished
    cap.release()
    
    #Close all OpenCV windows once finished
    cv2.destroyAllWindows()

In [None]:
results

In [None]:
draw_styled_landmarks(frame, results)

In [None]:
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

# Data Collection for Action Recognition

## Define Actions and Parameters

In [None]:
#Define the path for the directory where the exported data, in the form of numpy arrays, will be stored
DATA_PATH = os.path.join('Data') 

#Defint the list of actions to be detected by the model
actions = np.array(['hello', 'thanks', 'iloveyou'])

#Specify the number of sequences to be captured for each action
#This number refers to the number of separate instances or examples of each action that we want to capture
sequence_count = 30

#Specify the length of each sequence captured
sequence_length = 30

## Creating necessary Directories

In [None]:
#Function checks if a given directory exists, and if it doesn't, it creates it
def create_directory(path):
    if not os.path.exists(path):
        os.makedirs(path)


In [None]:
#Function calculates the number for the new directory to be created
def get_new_directory_number(parent_directory):
    
    #By checking the existing directories, converting their names to integers, and then choosing the maximum.
    #The new directory number is then one more than the maximum.
    if not os.listdir(parent_directory):
        return 0
    else:
        return np.max(np.array(os.listdir(parent_directory)).astype(int)) + 1


In [None]:
#Function creates a specified number of directories for sequences within a parent directory
def create_sequence_directories(parent_directory, sequence_count):
    
    # The names of these directories are determined by the get_new_directory_number function
    start_sequence = get_new_directory_number(parent_directory)
    
    for sequence in range(start_sequence, start_sequence + sequence_count + 1):
        create_directory(os.path.join(parent_directory, str(sequence)))


In [None]:
#Loops over the defined actions
for action in actions:
    
    #For each action, a directory is created in the defined export path
    action_directory = os.path.join(DATA_PATH, action)
    
    create_directory(action_directory)
    
    #A number of sequence directories are created within the action directory
    create_sequence_directories(action_directory, sequence_count)

# Collect keypoint Values for Training and Testing

In [None]:
#Initialising the webcam feed with OpenCV's VideoCapture object
video_capture = cv2.VideoCapture(0)


#Setting mediapipe's holistic model for simultaneous body, face and hand landmark detection 
# The minimum detection and tracking confidence parameters are set to 0.5 to begin tracking
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic_model:
    
    #Loops over each action to be detected
    for action in actions:
        
        #Loops over the number of sequences for each action
        for sequence_index in range(sequence_count + 1):
            
            #Loops over each frame of a sequence
            for frame_index in range(sequence_length + 1):

                #Capture a frame from the video stream
                ret, frame = video_capture.read()

                #Process the frame to detect and get the landmarks
                image, results = landmarks_detection(frame, holistic_model)

                #Visualize the landmarks on the image
                draw_styled_landmarks(image, results)
                
                #If this is the first frame of the sequence, display a message and wait before starting the collection
                if frame_index == 0: 
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence_index), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(500)
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence_index), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)
                
                #Extracting the keypoints from the detection results
                keypoints = extract_keypoints(results)
                
                #Defining the path for saving the keypoints into a numpy file
                keypoints_save_path = os.path.join(DATA_PATH, action, str(sequence_index), str(frame_index))
                
                #Saving the keypoints into a numpy file
                np.save(keypoints_save_path, keypoints)

                #Exiting the program if 'q' is pressed
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
                    
    #Releasing the video capture object and destroy all OpenCV windows when done
    video_capture.release()
    cv2.destroyAllWindows()



# Preprocessing and Data Organization

In [None]:
#Creates a dictionary to map actions to integers
label_map = {label: num for num, label in enumerate(actions)}

## Load and Organise Data

In [None]:
#Function to load data
def load_data(DATA_PATH, actions, sequence_length, label_map):
    
    #Initialise lists for sequences and labels
    sequences, labels = [], []

    #Load sequences and labels
    for action in actions:
        for sequence in os.listdir(os.path.join(DATA_PATH, action)):
            
            #Skip files that are not directories
            if not os.path.isdir(os.path.join(DATA_PATH, action, sequence)):
                continue
            
            window = []
            
            for frame_num in range(sequence_length):
                frame_path = os.path.join(DATA_PATH, action, sequence, f"{frame_num}.npy")
                frame_keypoints = np.load(frame_path)
                window.append(frame_keypoints)
            
            sequences.append(window)
            labels.append(label_map[action])

    return np.array(sequences), to_categorical(labels).astype(int)


# Build and Train LSTM Neural Network

In [None]:
def create_model(input_shape, actions):
    #Initialise the model
    model = Sequential()

    #Add LSTM layers
    model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=input_shape))
    model.add(LSTM(128, return_sequences=True, activation='relu'))
    model.add(LSTM(64, return_sequences=False, activation='relu'))

    #Add Dense layers
    model.add(Dense(64, activation='relu'))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(len(actions), activation='softmax'))

    #Compiles the model
    model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['categorical_accuracy'])

    return model

In [None]:
#Loads the  data
X, y = load_data(DATA_PATH, actions, sequence_length, label_map)

In [None]:
#Splits the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [None]:
#Creates model
model = create_model((sequence_length, X.shape[2]), actions)

In [None]:
#Define callbacks
log_dir = 'Logs'
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
#Trains the model
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])


In [None]:
#Prints the model summary
model.summary()

# Saving the trained model


In [None]:
#Saving the model
model.save('action.h5')

# Making Predictions

In [None]:
#Makes predictions with the model
predictions = model.predict(X_test)

# Evaluation using Confusion Matrix and Accuracy

In [None]:
#Converts the predictions and true values to lists
predicted_labels = np.argmax(predictions, axis=1).tolist()
true_labels = np.argmax(y_test, axis=1).tolist()


In [None]:
#Computes the confusion matrix and accuracy score
print(multilabel_confusion_matrix(true_labels, predicted_labels))
print(accuracy_score(true_labels, predicted_labels))

# Using the Model for Real - Time Action Recognition

In [None]:
#Defines a set of colors for the visualization
color_palette = [(245,117,16), (117,245,16), (16,117,245)]


In [None]:
#Function visualises the prediction probabilities on the output frame.
def visualise_probabilities(predictions, actions, input_frame, color_palette):
    
    #Creatse a copy of the input frame to draw on
    output_frame = input_frame.copy()

    #Loops over the prediction probabilities and draw a filled rectangle for each action
    for num, prob in enumerate(predictions):
        
        #Calculates the width of the rectangle proportional to the prediction probability
        rect_width = int(prob * 100)

        #Draws the filled rectangle on the output frame
        cv2.rectangle(output_frame, (0, 60+num*40), (rect_width, 90+num*40), color_palette[num], -1)

        #Adds the action's name text over the rectangle
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    #Returns the annotated output frame
    return output_frame

In [None]:
#Defines sequence, sentence and predictions as empty lists
sequence = []
sentence = []
predictions = []

#Sets a threshold for prediction probability
threshold = 0.5

In [None]:
#Initialise the camera feed
cap = cv2.VideoCapture(0)

#Initialises the mediapipe holistic model with specified confidence parameters
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

    #Starts a loop that continues as long as the camera is opened
    while cap.isOpened():

        #Captures each frame from the camera feed
        ret, frame = cap.read()

        #Uses the landmarks_detection function to detect landmarks on the frame
        image, results = landmarks_detection(frame, holistic)
        
        #Draws the detected landmarks on the frame
        draw_styled_landmarks(image, results)
        
        #Extracts keypoints from the detected landmarks
        keypoints = extract_keypoints(results)

        #Appends the keypoints to the sequence
        sequence.append(keypoints)
        sequence = sequence[-30:]  # keep the last 30 sets of keypoints
        
        #If the sequence length has reached 30, predict the action
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            predictions.append(np.argmax(res))  # store the predicted action
            
            #If the last 10 predictions are the same and above the threshold, add the action to the sentence
            if np.unique(predictions[-10:])[0] == np.argmax(res): 
                if res[np.argmax(res)] > threshold:
                    
                    #Only add new actions to the sentence
                    if len(sentence) == 0 or actions[np.argmax(res)] != sentence[-1]:
                        sentence.append(actions[np.argmax(res)])

            #Limits the sentence length to the last 5 actions
            sentence = sentence[-5:]

            #Visualise the prediction probabilities on the frame
            image = visualise_probabilities(res, actions, image, color_palette)
        
        #Display the sentence on the frame
        cv2.rectangle(image, (0, 0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        #Shows the frame on the screen
        cv2.imshow('OpenCV Feed', image)

        #Breaks the loop if 'q' is pressed
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    #Releases the camera and close all windows when done
    cap.release()
    cv2.destroyAllWindows()