In [4]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

def draw_styled_landmarks(image, results):
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, lh, rh])

In [5]:
#-------------------------------TEST TO SEE IF CV2 WORKS-----------------------------------------------------

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)

        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()
#----------------------------------------------------------------------------------------------------------------

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti

In [2]:
#--------------------------CREATION OF PATH AND STORE ACTIONS--------------------------------------------

# Path for exported data, numpy arrays
DATA_PATH = os.path.join('Actions_Data') 

# Actions that we try to detect
Actions = np.array(['GOOD MORNING'])

# Thirty videos worth of data
no_sequences = 20

# Videos are going to be 30 frames in length
sequence_length = 30

#Create folders
for action in Actions: 
    for sequence in range(no_sequences):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

In [3]:
#----------------------------------------COLLECTION OF ACTIONS TIMER------------------------------------------
cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # NEW LOOP
    # Loop through actions
    for action in Actions:
        # Countdown before starting data collection
        for countdown in range(5, 0, -1):
            ret, frame = cap.read()
            cv2.putText(frame, f'Starting in {countdown}', (120, 200),
                        cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 4, cv2.LINE_AA)
            cv2.imshow('OpenCV Feed', frame)
            cv2.waitKey(1000)
        # Loop through sequences aka videos
        for sequence in range(no_sequences): 
            # Loop through video length aka sequence length
            for frame_num in range(sequence_length):  
                # Read feed
                ret, frame = cap.read()

                # Make detections
                image, results = mediapipe_detection(frame, holistic)
                #print(results)

                # Draw landmarks
                draw_styled_landmarks(image, results)
                
                # NEW Apply wait logic
                if frame_num == 0: 
                    cv2.putText(image, 'STARTING DATA COLLECTION', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting data frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(2000)
                else: 
                    cv2.putText(image, 'Collecting data frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                
                # NEW Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
                    
    cap.release()
    cv2.destroyAllWindows()


In [None]:
#----------------------------------GET ACTION NAMES--------------------------------------------------------------------
def action_names(directory):
    # List to store subfolder names
    subfolders = []
    
    # Iterate through all the items in the directory
    for item in os.listdir(directory):
        # Construct full path
        item_path = os.path.join(directory, item)
        
        # Check if the item is a directory
        if os.path.isdir(item_path):
            subfolders.append(item)
    
    return subfolders

directory_path = 'Actions_Data'
actions = np.array(action_names(directory_path))
print(actions)

In [None]:
#---------------------------PREPROCESSING OF DATA FOR TRAINING--------------------------------------------

from sklearn.model_selection import train_test_split
#from tensorflow.keras.utils import to_categorical
from tensorflow import keras

label_map = {label:num for num, label in enumerate(actions)}

sequences, labels = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])
        
X = np.array(sequences)
y = keras.utils.to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [None]:
X.shape[2]

In [None]:
#---------------------------TRAINING OF MODEL------------------------------------------------------------------

import tensorflow as tf
from tensorflow import keras

log_dir = os.path.join('Logs')
tb_callback = keras.callbacks.TensorBoard(log_dir=log_dir)

log_dir = os.path.join('Logs')

# Check if the log directory exists
if os.path.exists(log_dir):
    # Delete the log directory if it exists
    os.system(f"rm -rf {log_dir}")
    print(f"Deleted existing log directory: {log_dir}")

# Create a new log directory
os.makedirs(log_dir, exist_ok=True)

model = keras.models.Sequential()
model.add(keras.layers.Input(shape=(X.shape[1], X.shape[2])))
model.add(keras.layers.LSTM(64, return_sequences=True, activation='relu'))
model.add(keras.layers.LSTM(128, return_sequences=True, activation='relu'))
model.add(keras.layers.LSTM(64, return_sequences=False, activation='relu'))
model.add(keras.layers.Dense(64, activation='relu'))
model.add(keras.layers.Dense(32, activation='relu'))
model.add(keras.layers.Dense(actions.shape[0], activation='softmax'))
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])


In [None]:
from tensorflow import tensorboard   #GET DATA FROM TRAINING

In [None]:
try:
    model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])
    print("Training complete")
except Exception as e:
    print(f"An unexcepted error occurred: {e}")
    
res = model.predict(X_test)
model.save('Action.h5')

In [None]:
#--------------------------------EVALUATIONS-------------------------------------------------------------------
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()
multilabel_confusion_matrix(ytrue, yhat)
accuracy_score(ytrue, yhat)

In [None]:
#-------------------------------------TESTING  AND PREDICTION---------------------------------------------------------------
import cv2
import numpy as np
import mediapipe as mp
from tensorflow import keras

def action_names(directory):
    # List to store subfolder names
    subfolders = []
    
    # Iterate through all the items in the directory
    for item in os.listdir(directory):
        # Construct full path
        item_path = os.path.join(directory, item)
        
        # Check if the item is a directory
        if os.path.isdir(item_path):
            subfolders.append(item)
    
    return subfolders

previous_size = 0
last_action=None
    
def action_recognition():
    global previous_size
    global last_action
    # Load the trained model
    model = keras.models.load_model('Action.h5')
    model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])  
    
    directory_path = 'Actions_Data'
    actions = np.array(action_names(directory_path))

    # Set up the MediaPipe Holistic model
    mp_holistic = mp.solutions.holistic
    holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)

    # Initialize variables
    sequence = []
    sentence = []
    predictions = []
    threshold = 0.6
    window_size = 30  # Adjust the window size to improve accuracy
    window_stride = 10  # Adjust the window stride to improve accuracy
   
    # Capture video from the webcam
    cap = cv2.VideoCapture(0)

    while cap.isOpened():
        ret, frame = cap.read()  # Read a frame from the webcam
        if not ret:
            break

        # Make detections
        image, results = mediapipe_detection(frame, holistic)

        # Draw landmarks
        draw_styled_landmarks(image, results)

        # Extract keypoints
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)

        # Apply windowing to improve accuracy
        if len(sequence) >= window_size:
            window_sequence = sequence[-window_size:]
            res = model.predict(np.expand_dims(window_sequence, axis=0))[0]
            action = actions[np.argmax(res)]
            confidence = res[np.argmax(res)]

            if confidence > threshold:
                if len(sentence) > 0:
                    if action != sentence[-1]:
                        sentence.append(action)
                        last_action=action
                        print("New action detected:", action)  # Print the new action
                else:
                    sentence.append(action)
                    print("First action detected:", action)
        
            # Update the predictions
            predictions.append(np.argmax(res))

            # Remove old frames from the sequence
            sequence = sequence[-window_stride:]

        # Convert the image back to BGR for display
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Display results on the screen
        cv2.rectangle(image, (0, 0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.imshow('OpenCV Feed', image)
        output=np.array(sentence)
        print(sentence)
        print(last_action)
            
        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
        
    cap.release()
    cv2.destroyAllWindows()
    holistic.close()  # Properly close the holistic model
    print('Action recognition complete.')

if __name__ == '__main__':
    action_recognition()


In [None]:
import cv2
import numpy as np
import mediapipe as mp
from tensorflow import keras
from PyQt6.QtGui import QImage, QPixmap
import os

def action_recognition(self):
    # Load the trained model
    model = keras.models.load_model('Action.h5')
    model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])  
    
    directory_path = 'Actions_Data'
    actions = np.array(action_names(directory_path))

    # Set up the MediaPipe Holistic model
    mp_holistic = mp.solutions.holistic
    holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)

    # Initialize variables
    sequence = []
    sentence = []
    predictions = []
    threshold = 0.6
    window_size = 30  # Adjust the window size to improve accuracy
    window_stride = 10  # Adjust the window stride to improve accuracy
    
    # Capture video from the webcam
    cap = cv2.VideoCapture(0)

    # Add a flag to control the recognition loop
    self.running = True

    while cap.isOpened() and self.running:
        ret, frame = cap.read()  # Read a frame from the webcam
        if not ret:
            break

        # Make detections
        image, results = mediapipe_detection(frame, holistic)

        # Draw landmarks
        draw_styled_landmarks(image, results)

        # Extract keypoints
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)

        # Apply windowing to improve accuracy
        if len(sequence) >= window_size:
            window_sequence = sequence[-window_size:]
            res = model.predict(np.expand_dims(window_sequence, axis=0))[0]
            action = actions[np.argmax(res)]
            confidence = res[np.argmax(res)]

            # Update the sentence
            if confidence > threshold:
                if len(sentence) > 0:
                    if action != sentence[-1]:
                        sentence.append(action)
                else:
                    sentence.append(action)

            # Update the predictions
            predictions.append(np.argmax(res))

            # Remove old frames from the sequence
            sequence = sequence[-window_stride:]

        # Convert the image to RGB
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Convert the image to a QImage
        height, width, channel = image.shape
        bytes_per_line = 3 * width
        q_img = QImage(image.data, width, height, bytes_per_line, QImage.Format.Format_RGB888)
        
        # Convert QImage to QPixmap and display it in QLabel
        self.vid_label.setPixmap(QPixmap.fromImage(q_img))

        # Update GUI
        cv2.rectangle(image, (0, 0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        output = np.array(sentence)
        print(sentence)
        
        if sentence:
            output = np.array(sentence)
            if len(output) > 1:
                if output[-1] != output[-2]:
                    print("Prediction output", output[-1])
            else: 
                print("Prediction first output", output[-1])

        # Process PyQt events
        QtWidgets.QApplication.processEvents()
        
        # Check if 'q' is pressed to stop
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()
    holistic.close()  # Properly close the holistic model

    print('Action recognition complete.')

# Method to stop the action recognition
def stop_action_recognition(self):
    self.running = False
