1. Import and install Dependencies 

In [1]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

2. Key points using MP Holistic 

In [2]:
mp_holistic = mp.solutions.holistic    #Holistic model
mp_drawing = mp.solutions.drawing_utils  #Drawing utilities

In [3]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  #Color conversion BGR to RGB
    image.flags.writeable = False                    #image is no longer writeable
    results = model.process(image)                  #Make prediction 
    image.flags.writeable = True                     # Image is now writeable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)  #Color Conversion RGB to BGR
    return image, results 
    

In [4]:
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION)  #Draw Face connection
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)  #Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)  #Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)  #Draw right hand connections

In [5]:
def draw_style_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION,
                             mp_drawing.DrawingSpec(color = (80,110,10), thickness = 1, circle_radius = 1),
                             mp_drawing.DrawingSpec(color = (80,110,10), thickness = 1, circle_radius = 1)
                             )  
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) 
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) 

In [6]:
cap = cv2.VideoCapture(0) #this is capturing and accessing camera system
with mp_holistic .Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
       
        #Read feed
        ret, frame = cap.read()
        
        #Make detection 
        image, results = mediapipe_detection(frame, holistic)
        print(results)

        #Draw Landmarks 
        draw_style_landmarks(image, results)
        #Show to user
        cv2.imshow('OpenCV Feed', image)
    
        #Break out the loop
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break 
    cap.release()
    cv2.destroyAllWindows()

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti

In [None]:
draw_style_landmarks(frame, results)

In [None]:
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

3. Extract key point values

In [None]:
len(results.face_landmarks.landmark)

In [None]:
pose = []
for res in results.pose_landmarks.landmark:
    test = np.array([res.x,res.y,res.z,res.visibility])
    pose.append(test)

In [None]:
pose = np.array([[res.x,res.y,res.z,res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)

lh = np.array([[res.x,res.y,res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3) 

rh = np.array([[res.x,res.y,res.z] for res in results.right_hand_landmarks.landmark]).flatten()  if results.right_hand_landmarks else np.zeros(21*3) 

face = np.array([[res.x,res.y,res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468 * 3) 


In [None]:
lh = np.array([[res.x,res.y,res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3) 

In [None]:
rh = np.array([[res.x,res.y,res.z] for res in results.right_hand_landmarks.landmark]).flatten()  if results.right_hand_landmarks else np.zero(21*3) 

In [None]:
def extract_keypoints(results):
    pose = np.array([[res.x,res.y,res.z,res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33 * 4)

    lh = np.array([[res.x,res.y,res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3) 

    rh = np.array([[res.x,res.y,res.z] for res in results.right_hand_landmarks.landmark]).flatten()  if results.right_hand_landmarks else np.zeros(21*3) 

    face = np.array([[res.x,res.y,res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468 * 3) 
    return np.concatenate([pose, face, lh, rh])
    

In [None]:
result_test = extract_keypoints(results)

In [None]:
np.save('0', result_test)

In [None]:
np.load('0.npy')

4. Setup Folders for Collection

In [None]:
#path fot exported data, numpy arrays
DATA_PATH = os.path.join("C:", "Users", "13475", "Downloads", "MP_Data")
#Actions that we try to detect
actions = np.array(['hello', 'thanks', 'iloveyou'])
#Thirty videos worth of data
no_sequences = 30
#Videos are going to be 30 frames in length
sequence_length = 30 


In [None]:
for action in actions:
    dirmax = np.max(np.array(os.listdir(os.path.join(DATA_PATH,action))).astype(int))
    for sequence in range(1, no_sequences+1):
       try:
           os.makedirs(os.path.join(DATA_PATH, action, str(dirmax+sequence)))
       except:
               pass

5. Collect keypoints values for training and testing

In [None]:
cap = cv2.VideoCapture(0) #this is capturing and accessing camera system
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    #NEW lOOP
    #Loop through actions 
    for action in actions:
        #loop through sequences aka video
        for sequence in range(no_sequences):
            #Loop through video length aka sequences length 
            for frame_num in range(sequence_length):
       
                #Read feed
                ret, frame = cap.read()
                
                #Make detection 
                image, results = mediapipe_detection(frame, holistic)
                print(results)
        
                #Draw Landmarks 
                draw_style_landmarks(image, results)

                #NEW Apply wait logic 
                if frame_num == 0:
                    cv2.putText(image,'Starting collection', (120,200),
                            cv2.FONT_HERSHEY_COMPLEX, 1, (0,255,0), 4, cv2.LINE_AA)
                    cv2.putText(image,'Collecting frames for {} video numbers {}'.format(action,sequence), (15,12),
                            cv2.FONT_HERSHEY_COMPLEX, 0.5, (0,0,255), 1, cv2.LINE_AA)
                    cv2.waitKey(2000)
                else:
                    cv2.putText(image,'Collecting frames for {} video numbers {}'.format(action,sequence), (15,12),
                            cv2.FONT_HERSHEY_COMPLEX, 0.5, (0,0,255), 1, cv2.LINE_AA)
                     #Show to user
                    cv2.imshow('OpenCV Feed', image)
                
                #NEW export keypoints 
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)   
               
            
                #Break out the loop
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break 
    cap.release()
    cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()

6. Preprocess Data and Create Labels and Features

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
label_map = {label:num for num, label in enumerate(actions)}

In [None]:
label_map 

In [None]:
sequences, labels = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        for frame_num in range(sequence_length):
           res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
           window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [None]:
import os
import numpy as np

DATA_PATH = r"C:\Users\13475\Downloads\MP_Data"

sequences, labels = [], []
label_map = {label: num for num, label in enumerate(actions)}  # Assuming 'actions' is defined

for action in actions:
    action_path = os.path.join(DATA_PATH, action)
    if not os.path.exists(action_path):
        print(f"Directory does not exist: {action_path}")
        continue  # Skip this iteration if the action directory doesn't exist

    for sequence_str in os.listdir(action_path):
        # Skip non-integer directories (e.g., .ipynb_checkpoints)
        if not sequence_str.isdigit():
            print(f"Skipping non-integer directory name: {sequence_str}")
            continue

        sequence_path = os.path.join(action_path, sequence_str)
        window = []

        for frame_num in range(sequence_length):
            frame_path = os.path.join(sequence_path, f"{frame_num}.npy")
            if not os.path.exists(frame_path):
                print(f"File does not exist: {frame_path}")
                break  # Skip this frame if the file doesn't exist

            res = np.load(frame_path)
            window.append(res)

        if len(window) == sequence_length:
            sequences.append(window)
            labels.append(label_map[action])
        else:
            print(f"Incomplete data for sequence: {sequence_path}, expected {sequence_length} frames, got {len(window)}")

sequences = np.array(sequences, dtype=object)  # Use dtype=object for variable-length sequences
labels = np.array(labels)


In [None]:
np.array(sequences).shape

In [None]:
np.array(labels).shape

In [None]:
x = np.array(sequences)

In [None]:
x.shape

In [None]:
y = to_categorical(labels).astype(int)

In [None]:
y

In [None]:
X_train, X_test, y_train, y_test = train_test_split(x,y,test_size=0.05)

In [None]:
y_test.shape

7.Build and Train LSTM Neural Network 

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [None]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
model = Sequential()
model.add(LSTM(64, return_sequences = True, activation = 'relu', input_shape = (30,1662)))
model.add(LSTM(128, return_sequences = True, activation = 'relu'))
model.add(LSTM(64, return_sequences = False, activation = 'relu'))
model.add(Dense(64, activation = 'relu'))
model.add(Dense(32, activation = 'relu'))
model.add(Dense(actions.shape[0], activation = 'softmax'))

In [None]:
res = [.7,0.2,0.1]

In [None]:
actions[np.argmax(res)]

Reason why we used Mediapipe holistics and LSTM models rather than state of the art models that uses number of CNN layers followed by a number of LSTM.
1. Less data required to create a hyperactive model
2. Faster to train, denser network
3. Faster detection in real time 

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
# loss function must be equal to categorical cross entropy. We must use this when you have a multi-class classification model 
# If youre using binary classificationthen we use binary cross entropy

In [None]:
X_train = X_train.astype('float32')  # or 'float64' as needed
y_train = y_train.astype('float32')  # Adjust this based on your specific requirements


In [None]:
model.fit(X_train,y_train, epochs=2000, callbacks=[tb_callback])

In [None]:
model.summary()

8. Make Predictions

In [None]:
print(X_test.dtype)


In [None]:
X_test = X_test.astype('float32')  # or 'float64' as needed


In [None]:
print(X_test.shape)

In [None]:
# Check for NaN values
if np.isnan(X_test).any():
    print("NaN values found")
    X_test = np.nan_to_num(X_test)  # Replace NaN with 0 and Inf with large finite numbers

# Check for Inf values
if np.isinf(X_test).any():
    print("Inf values found")
    X_test = np.nan_to_num(X_test)  # Replace Inf with large finite numbers


In [None]:
res = model.predict(X_test)

In [None]:
actions[np.argmax(res[4])]

In [None]:
actions[np.argmax(y_test[4])]

9. Save Weights

In [None]:
model.save('action.h5')

9. Evaluation using Confusion Matrix and Accuracy

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [None]:
yhat = model.predict(X_train)

In [None]:
ytrue = np.argmax(y_train, axis = 1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
multilabel_confusion_matrix(ytrue,yhat)

In [None]:
accuracy_score(ytrue,yhat)

11. Test in Real Time 

In [None]:
#1. New Detection variables
sequence = []
sentence = []
threshold = 0.4


cap = cv2.VideoCapture(0) #this is capturing and accessing camera system
with mp_holistic .Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
       
        #Read feed
        ret, frame = cap.read()
        
        #Make detection 
        image, results = mediapipe_detection(frame, holistic)
        print(results)

        #Draw Landmarks 
        draw_style_landmarks(image, results)

        #2. Prediction Logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[:30]

        if len(sequence) == 30:
            res = model.prediction(np.expand_dims(sequence, axis = 0))[0]
            print(res)

        
        #Show to user
        cv2.imshow('OpenCV Feed', image)
    
        #Break out the loop
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break 
    cap.release()
    cv2.destroyAllWindows()