## 1. Install and import dependencies

In [None]:
!pip install scikit-learn mediapipe matplotlib

In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import mediapipe as mp
import time

## 2. Keypoints using MP Holistic

In [2]:
mp_drawing = mp.solutions.drawing_utils #drawing utilities
mp_holistic = mp.solutions.holistic #holistic model

In [3]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image) # making predictions
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [4]:
def draw_styled_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [None]:
cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()

        image, results = mediapipe_detection(frame, holistic)
        
        
        draw_styled_landmarks(image, results)
        
        cv2.imshow("OpenCV feed", image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

## 3. Extract keypoint values

In [5]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

## 4. Setup folders for collection

In [6]:
DATA_PATH = os.path.join('MP_data')
actions = np.array(['hello', 'peace', 'all the best', 'wakanda forever', 'dedicate your hearts'])
no_sequences = 100
sequence_length = 40


In [None]:
for action in actions:
    for sequence in range(no_sequences):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

 ## 5. Collect keypoint values for testing and training

In [None]:
cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    for action in actions:
        for sequence in range(no_sequences):
            for frame_num in range(sequence_length):
                  
                ret, frame = cap.read()

                image, results = mediapipe_detection(frame, holistic) # making detection

                draw_styled_landmarks(image, results) # drawing landmarks
                
                if frame_num == 0:
                    cv2.putText(image, 'STARTING COLLECTION', (120,200),
                               cv2.FONT_HERSHEY_COMPLEX, 2, (0,0,0), 1, cv2.LINE_AA)
                    cv2.putText(image, f'Collecting frames for {action} video number {sequence}', (15,12),
                               cv2.FONT_HERSHEY_COMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)

                    cv2.waitKey(500)
                else:
                    cv2.putText(image, f'Collecting frames for {action} video number {sequence}', (15,12),
                               cv2.FONT_HERSHEY_COMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
                    
                
                # export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)
                                        
                
                cv2.imshow("OpenCV feed", image) 

                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break

    cap.release()
    cv2.destroyAllWindows()

## 6. Preprocessing of data and label and feature creation

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
label_map = {label:num for num, label in enumerate(actions)}

In [None]:
sequences, labels = [], []
for action in actions:
    for sequence in range(no_sequences):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [None]:
np.array(sequences).shape

In [None]:
X = np.array(sequences)

In [None]:
np.array(labels).shape

In [None]:
y = np.array(labels)
y

In [None]:
y = to_categorical(y).astype(int)
y

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.05, random_state=42)

In [None]:
X_train.shape

In [None]:
from sklearn.preprocessing import StandardScaler
import numpy as np

# Compute the mean and standard deviation along the third dimension
mean = np.mean(X_train, axis=2, keepdims=True)
std = np.std(X_train, axis=2, keepdims=True)

# Normalize the training data
X_train_normalized = (X_train - mean) / std

In [None]:
# Compute the mean and standard deviation along the third dimension
mean_test = np.mean(X_test, axis=2, keepdims=True)
std_test = np.std(X_test, axis=2, keepdims=True)

# Normalize the test data using the mean and standard deviation of the test data
X_test_normalized = (X_test - mean_test) / std_test

In [None]:
len(X_test_normalized)

## 7. Build and train LSTM Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [None]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(40,1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=True, activation='relu'))
model.add(LSTM(32, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
history = model.fit(X_train, y_train, epochs=700, callbacks=[tb_callback], validation_data=(X_test, y_test))

In [None]:
# checking is input has null values
if not np.any(np.isnan(X_train)):
    print("There are no NaN values in X_train")
else:
    print("There are NaN values in X_train")


In [None]:
model.summary()

##  8. Making predictions

In [None]:
res = model.predict(X_test)

In [None]:
actions[np.argmax(res[17])]

In [None]:
actions[np.argmax(y_test[17])]

## 9. Save model

In [None]:
model.save('action2.h5')

## 10. Evaluation using Confusion Matrix and Accuracy

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from tensorflow.keras.models import load_model

In [None]:
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
multilabel_confusion_matrix(ytrue, yhat)

In [None]:
 accuracy_score(ytrue, yhat)

## 11. Test in real time

In [7]:
from scipy import stats
from tensorflow.keras.models import load_model

In [8]:
# load model
model = load_model('action1.h5')
model.summary()

Model: "sequential_14"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_34 (LSTM)               (None, 40, 64)            442112    
_________________________________________________________________
lstm_35 (LSTM)               (None, 40, 128)           98816     
_________________________________________________________________
lstm_36 (LSTM)               (None, 40, 64)            49408     
_________________________________________________________________
lstm_37 (LSTM)               (None, 40, 32)            12416     
_________________________________________________________________
lstm_38 (LSTM)               (None, 16)                3136      
_________________________________________________________________
dense_23 (Dense)             (None, 64)                1088      
_________________________________________________________________
dense_24 (Dense)             (None, 32)              

In [9]:
colors = [(245,117,16), (245,117,16), (245,117,16), (245,117,16), (245,117,16), (245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [14]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5

cap = cv2.VideoCapture('wakanda forever.mp4')
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()
        
        if not ret:
            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
            continue
                
        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])
                        
            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        cv2.imshow('OpenCV Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
            
    cap.release()
    cv2.destroyAllWindows()

wakanda forever
wakanda forever
wakanda forever
dedicate your hearts
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
dedicate your hearts
dedicate your hearts
dedicate your hearts
dedicate your hearts
dedicate your hearts
dedicate your hearts
dedicate your hearts
dedicate your hearts
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
wakanda forever
dedicate your hearts
dedicate your hearts
w

In [None]:
cap.release()
cv2.destroyAllWindows()