# 1: Install Necessary Libraries and Open Tensor Board Logs

In [None]:
# Installing libraries namely tensorflow, opencv, mediapipe, sklearn and matplotlib

!pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python mediapipe sklearn matplotlib

In [None]:
# Loading tensor board to check the performance and workflow of machine learning model

%load_ext tensorboard
%tensorboard --logdir Logs/

# 2: Importing libraries and defining MediaPipe related functions

In [1]:
# Importing libraries

import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

In [2]:
# Initializing variables for MediaPipe holistic and drawing utilities

mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [3]:
# This funtion take an image and holistic funtion as an input to detect landmarks

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [4]:
# This funtion draw landmarks on the image it get as an input with the help of landmarks detected by mediapipe_detection 
# function

def draw_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             )

In [5]:
# This is for testing the MediaPipe

cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence = 0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        image, results = mediapipe_detection(frame, holistic)
        draw_landmarks(image, results)
        cv2.imshow("Capture", image)
        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
        
    cap.release()
    cv2.destroyAllWindows()

# 3: Extracting keypoints from images

In [14]:
# Extract the keypoints from the landmarks detected in an image

def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)

# Here we have excluded face keypoints because they are not necessary for sign language translation

#     face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
#     return np.concatenate([pose, face, lh, rh])
    return np.concatenate([pose, lh, rh])

# 4: Creating directory to store the extracted keypoints

In [18]:
# Creating a directory

DATA_PATH = os.path.join('MP_Data') 

# Initializing variable to store data for the actions
# NOTE: If we want to give data for a single action we can use:
# actions = np.array(['goodbye'])
# If we want to give data for multiple action we can use
actions = np.array(['a', 'hello'])

# Number of videos we want for each action

no_sequences = 50

# Number of frames in each video
# NOTE: Once initialized do not change it or it will be hard to train the model on uneven number of frames for different 
# videos

sequence_length = 20

In [19]:
# Create folders for the action

folderdict = {}
for action in actions:
    try:
        asdf = sorted([int(i) for i in os.listdir(os.path.join(DATA_PATH, action))])
        folderdict[action] = asdf[-1]
    except:
        # If folder does not exist create a new folder
        
        folderdict[action] = -1

# 5: Get keypoints from the video camera

In [21]:
# Start video camera and give action as input with the help of camera

cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    for action in actions:
        for sequence in range(folderdict[action] + 1,folderdict[action] + no_sequences + 1):
            for frame_num in range(sequence_length):
                ret, frame = cap.read()

                # Make detections
                image, results = mediapipe_detection(frame, holistic)
                
                # Draw landmarks
                draw_landmarks(image, results)
                
                if frame_num == 0: 
                    
                    # Show the number of video we are collecting for the action
                    
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(100)
                    
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)
                
                # Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Press 'q' from keyboard to exit video camera
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
                    
    cap.release()
    cv2.destroyAllWindows()

# 6: Loading data and preparing it for training and testing

In [23]:
# Importing important libraries

from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [24]:
# Labeling actions for better understanding 

DATA_PATH = os.path.join('MP_Data') 
actions = np.array(['[]', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 
                    'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'])
label_map = {label:num for num, label in enumerate(actions)}
print(label_map)

In [None]:
# Check number of data for each action

folderdict = {}
for action in actions:
    try:
        asdf = sorted([int(i) for i in os.listdir(os.path.join(DATA_PATH, action))])
        folderdict[action] = asdf[-1]
    except:
        folderdict[action] = -1

In [None]:
# Load Data
# NOTE: This cell might take long time

start = time.time()
sequences, labels = [], []
for action in actions:
    for sequence in range(folderdict[action] + 1):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        print(f"{action}->{sequence}")
        sequences.append(window)
        labels.append(label_map[action])
print(time.time() - start)

In [30]:
# Describing data

print(f"Number of data: {np.array(sequences).shape}")
print(f"Number of labels: {np.array(labels).shape}")
X = np.array(sequences, dtype = object)
print(f"Shape of data: {X.shape}")
y = to_categorical(labels).astype(int)
print(f"Shape of labels: {y.shape}")

(16200, 20, 258)

In [36]:
# Spliting data for training, testing and validation

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2)

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size = 0.25)

X_train=np.asarray(X_train).astype("float32")

X_test = np.asarray(X_test).astype("float32")

X_val = np.asarray(X_val).astype("float32")

In [None]:
# Shape of training, testing and validation data

print(f"Shape of training data: {X_train.shape}")
print(f"Shape of testing data: {X_test.shape}")
print(f"Shape of validation data: {X_val.shape}")

# 7: Training Model

In [40]:
# Importing libraries to build a neural network

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [41]:
# Creating Logs for training

log_dir = os.path.join("Logs")
tb_callback = TensorBoard(log_dir = log_dir)

In [42]:
# Building a model and initializing number of neurons in each layer

model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(20,258)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

In [None]:
# Fitting model for training

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.fit(X_train, y_train, epochs=100, callbacks=[tb_callback])

In [None]:
# Summary of the model

model.summary()

# 8: Checking results

In [None]:
# Checking all mismatched labels during training

print("Actual label : Predicted label")
for i in range(len(X_val)):
    if actions[np.argmax(res_val[i])] != actions[np.argmax(y_val[i])]:
        print(f"{actions[np.argmax(res_val[i])]} : {actions[np.argmax(y_val[i])]}")

In [None]:
# All values in labels

for i in range(len(y_val)):
    print(actions[np.argmax(y_val[i])], end =" ")

# 9: Saving model weight and loading them

In [54]:
# Saving weights of the model

model.save('action.h5')

In [45]:
# Delete model
# NOTE: Do not do it unless you have a better model or unsatisfied from current model

del model

In [55]:
# Load model and weight

from keras.models import load_model
model = load_model('action.h5')
model.load_weights('action.h5')

# 10: Checking accuracy over Validation set

In [49]:
# Importing libraries

from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [50]:
# Predict the labels and create a Confusion Matrix

yhat = model.predict(X_val)
ytrue = np.argmax(y_val, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()
multilabel_confusion_matrix(ytrue, yhat)

In [57]:
# Accuracy of the model over Validation set

accuracy_score(ytrue, yhat)

0.9879629629629629

In [None]:
# Classifiaction report

from sklearn.metrics import classification_report
labels = [str(i) for i in "_abcdefghijklmnopqrstuvwxyz"]

print(classification_report(ytrue, yhat, target_names=labels))

In [None]:
# Importing libraries

from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Plotting Confusion Matrix

matrix_confusion = confusion_matrix(ytrue, yhat)
plt.figure(figsize=(9,9))
plt.xlabel('Predicted', size=15)
plt.ylabel('True', size=15)
plt.title("Confusion Matrix", size=15)
ax = fig.add_subplot(111)
cax = ax.matshow(matrix_confusion)
fig.colorbar(cax)
sns.heatmap(matrix_confusion, annot=True, linewidth=0.5, cmap='Blues_r', fmt='d', cbar=False)

# 11: Checking accuracy over Test set

In [63]:
# Predict the labels

yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
# Accuracy of the model over test set

accuracy_score(ytrue, yhat)

In [None]:
# Classification Report

from sklearn.metrics import classification_report
labels = [str(i) for i in "_abcdefghijklmnopqrstuvwxyz"]

print(classification_report(ytrue, yhat, target_names=labels))

In [None]:
# Plotting Confusion Matrix

matrix_confusion = confusion_matrix(ytrue, yhat)
plt.figure(figsize=(9,9))
plt.xlabel('Predicted', size=15)
plt.ylabel('True', size=15)
plt.title("Confusion Matrix", size=15)
ax = fig.add_subplot(111)
cax = ax.matshow(matrix_confusion)
fig.colorbar(cax)
sns.heatmap(matrix_confusion, annot=True, linewidth=0.5, cmap='Blues_r', fmt='d', cbar=False)

# 12: Real time predictions


In [69]:
# Importing libreary

from scipy import stats

In [None]:
# Prediction in real time

sequence = []
sentence = []
predictions = []
threshold = 0.5

cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        ret, frame = cap.read()
        image, results = mediapipe_detection(frame, holistic)
        draw_landmarks(image, results)
        
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-20:]
        
        if len(sequence) == 20:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            if actions[np.argmax(res)] =='[]':
                actions[np.argmax(res)] = ""
                print("", end=" ")
            else:
                # Print predicted action
                
                print(actions[np.argmax(res)], end=" ")
            predictions.append(np.argmax(res))
            
            
            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 10: 
                sentence = sentence[-10:]

        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ''.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        cv2.imshow('OpenCV Feed', image)

        # Press 'q' to exit
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()