In [2]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import tensorflow as tf
import math

from sklearn.model_selection import train_test_split
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, classification_report
from tensorflow.keras.utils import to_categorical

import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

from tensorflow.keras.models import Sequential, Model

from tensorflow.keras.layers import (LSTM, Dense, Concatenate, Attention, Dropout, Softmax,
                                     Input, Flatten, Activation, Bidirectional, Permute, multiply, 
                                     ConvLSTM2D, MaxPooling3D, TimeDistributed, Conv2D, MaxPooling2D)

from scipy import stats

# disable some of the tf/keras training warnings 
os.environ['TF_CPP_MIN_LOG_LEVEL'] = "3"
tf.get_logger().setLevel("ERROR")
tf.autograph.set_verbosity(1)

# suppress untraced functions warning
import absl.logging
absl.logging.set_verbosity(absl.logging.ERROR)

In [3]:
#from mediapipe import solutions as mp_pose?????????????????????????????????????????????????


In [4]:
# Pre-trained pose estimation model from Google Mediapipe
mp_pose = mp.solutions.pose
# to detect and locate human body keypoints

# Supported Mediapipe visualization tools for overlaying detected keypoints
mp_drawing = mp.solutions.drawing_utils

In [5]:
def mediapipe_detection(image, model): #Detects human pose estimation keypoints from webcam footage

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable, read only
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    #print("Frame shape:", frame.shape)
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [6]:
def draw_landmarks(image, results): #Draws keypoints and landmarks detected by the human pose estimation model

    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2) 
                                 )

In [7]:
cap = cv2.VideoCapture("frame_rate.mp4") # camera object
HEIGHT = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # webcam video frame height
WIDTH = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # webcam video frame width
FPS = int(cap.get(cv2.CAP_PROP_FPS)) # webcam video fram rate 

# Set and test mediapipe model using webcam
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Video ended.")
            break
        # Make detection
        image, results = mediapipe_detection(frame, pose)
        
        # Extract landmarks
        try:
            landmarks = results.pose_landmarks.landmark
        except:
            pass
        
        # Render detections
        draw_landmarks(image, results)               
        
        # Display frame on screen
        cv2.imshow('OpenCV Feed', image)
        
        # Exit / break out logic
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()



Video ended.


In [8]:
# Recollect and organize keypoints from the test
pose = []
for res in results.pose_landmarks.landmark:
    test = np.array([res.x, res.y, res.z, res.visibility])
    pose.append(test)

In [9]:
# 33 landmarks with 4 values (x, y, z, visibility)
num_landmarks = len(landmarks)
num_values = len(test)
num_input_values = num_landmarks*num_values

In [10]:
# This is an example of what we would use as an input into our AI models
#pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)

In [11]:
def extract_keypoints(results): #Processes and organizes the keypoints detected from the pose estimation model to be used as inputs for the exercise decoder models
    
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    return pose

In [12]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join(os. getcwd(),'data') 
print(DATA_PATH)

# make directory if it does not exist yet
if not os.path.exists(DATA_PATH):
    os.makedirs(DATA_PATH)

# Actions/exercises that we try to detect
actions = np.array(['squats','pushups'])
num_classes = len(actions)

# How many videos worth of data
no_sequences = 40

# Videos are going to be this many frames in length
sequence_length = np.int16(FPS*1.46)

# Folder start
# Change this to collect more data and not lose previously collected data
start_folder = 101

c:\Users\wangd\Ismail\NMIMS\SemVI\Applications of Machine Learning\Project\real\data


In [13]:
# Build folder paths
for action in actions:     
    for sequence in range(start_folder,no_sequences+start_folder):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))  
        except:
            pass

In [14]:
# Colors associated with each exercise (e.g., curls are denoted by blue, squats are denoted by orange, etc.)
colors = [(245,117,16),(117,245,16)]

In [156]:
# Collect Training Data

cap = cv2.VideoCapture("CV_training_data.mov")
# Set mediapipe model  
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    # Loop through actions
    for idx, action in enumerate(actions):
        # Loop through sequences (i.e., videos)
        for sequence in range(start_folder, start_folder+no_sequences): # if you wanna do it folders then we need to make changes over here!!!!!!!!!!!!!!!!!!!!!!!!!!!
            # Loop through video length (i.e, sequence length)
            for frame_num in range(sequence_length):
                # Read feed
                ret, frame = cap.read()
                
                # Make detection
                image, results = mediapipe_detection(frame, pose)

                # Extract landmarks
                try:
                    landmarks = results.pose_landmarks.landmark
                except:
                    pass
                
                # Render detections
                draw_landmarks(image, results) 

                # Apply visualization logic
                if frame_num == 0: # If first frame in sequence, print that you're starting a new data collection and wait 500 ms
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0,255, 0), 4, cv2.LINE_AA)
                    
                    cv2.putText(image, 'Collecting {} Video # {}'.format(action, sequence), (15,30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 8, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting {} Video # {}'.format(action, sequence), (15,30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.75, colors[idx], 4, cv2.LINE_AA)
                    
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(500)
                else: 
                    cv2.putText(image, 'Collecting {} Video # {}'.format(action, sequence), (15,30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 8, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting {} Video # {}'.format(action, sequence), (15,30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.75, colors[idx], 4, cv2.LINE_AA)
                    
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)

                # Export keypoints (sequence + pose landmarks)
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
                    
    cap.release()
    cv2.destroyAllWindows()



In [157]:
label_map = {label:num for num, label in enumerate(actions)}

In [158]:
# Load and organize recorded training data
sequences, labels = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        for frame_num in range(sequence_length):
            # LSTM input data
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)

        sequences.append(window)
        labels.append(label_map[action])

In [159]:
# Make sure first dimensions of arrays match
X = np.array(sequences)
y = to_categorical(labels).astype(int)
print(X.shape, y.shape)

(80, 43, 132) (80, 2)


In [160]:
X[0][0]

array([ 0.47933993,  0.10833035, -0.39967483,  0.99928313,  0.49214929,
        0.0993852 , -0.35145369,  0.99886787,  0.49845818,  0.10010359,
       -0.351455  ,  0.99871147,  0.50524592,  0.10087913, -0.35140765,
        0.99873763,  0.46640211,  0.09895936, -0.34727737,  0.99883384,
        0.45693085,  0.0994277 , -0.3472417 ,  0.99862313,  0.44859716,
        0.10015267, -0.3473191 ,  0.99862492,  0.51851034,  0.11229318,
       -0.11008525,  0.99905771,  0.43664345,  0.114052  , -0.08318076,
        0.99877173,  0.49465042,  0.12781209, -0.3101452 ,  0.99922991,
        0.46261483,  0.12911376, -0.30309829,  0.99892968,  0.5937503 ,
        0.21132728, -0.13626924,  0.99863178,  0.37099621,  0.21480617,
       -0.07910075,  0.99776411,  0.59841847,  0.21669582, -0.61081773,
        0.97556835,  0.32059982,  0.23250274, -0.57662928,  0.97238117,
        0.43517354,  0.19540048, -0.82896918,  0.88660878,  0.50025916,
        0.20485941, -0.90037018,  0.85298449,  0.39543945,  0.19

In [161]:
# Split into training, validation, and testing datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.10, random_state=20)
print(X_train.shape, y_train.shape)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=15/90, random_state=2)

(72, 43, 132) (72, 2)


In [162]:
X_train

array([[[ 0.57709968,  0.18409464, -0.19047332, ...,  0.97334433,
         -0.10704067,  0.93577439],
        [ 0.57984221,  0.18647322, -0.21534778, ...,  0.9747197 ,
         -0.06548204,  0.93878496],
        [ 0.58254856,  0.18983205, -0.23846945, ...,  0.97495574,
         -0.11102952,  0.94199216],
        ...,
        [ 0.62587541,  0.42921075, -0.08826359, ...,  0.96821421,
         -0.31841052,  0.97769934],
        [ 0.62594324,  0.41896376, -0.05789725, ...,  0.96837646,
         -0.382431  ,  0.97750986],
        [ 0.62605   ,  0.40444192, -0.03222335, ...,  0.96901178,
         -0.26719826,  0.97560936]],

       [[ 0.80143583,  0.51748693, -0.40549079, ...,  0.55255985,
          0.55256027,  0.96152276],
        [ 0.80655473,  0.52955306, -0.40346554, ...,  0.55365574,
          0.5237239 ,  0.9607802 ],
        [ 0.80764812,  0.54712963, -0.3509554 , ...,  0.55302334,
          0.48337397,  0.96074015],
        ...,
        [ 0.81649047,  0.49713993, -0.32698241, ...,  

In [177]:
# Callbacks to be used during neural network training
es_callback = EarlyStopping(monitor='val_loss', min_delta=5e-4, patience=10, verbose=0, mode='min')
lr_callback = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001, verbose=0, mode='min')
chkpt_callback = ModelCheckpoint(filepath=os.path.join(DATA_PATH, 'best_model.keras'),  # Use '.keras' extension
                                monitor='val_loss', verbose=0, save_best_only=True,
                                save_weights_only=False, mode='min', save_freq=1)

# Optimizer
opt = tf.keras.optimizers.Adam(learning_rate=0.01)

# some hyperparamters
batch_size = 32
max_epochs = 500

In [178]:
# Set up Tensorboard logging and callbacks
NAME = f"ExerciseRecognition-LSTM-{int(time.time())}"
log_dir = os.path.join(os.getcwd(), 'logs', NAME,'')
tb_callback = TensorBoard(log_dir=log_dir)

callbacks = [tb_callback, es_callback, lr_callback, chkpt_callback]

lstm = Sequential()
lstm.add(LSTM(128, return_sequences=True, activation='relu', input_shape=(sequence_length, num_input_values)))
lstm.add(LSTM(256, return_sequences=True, activation='relu'))
lstm.add(LSTM(128, return_sequences=False, activation='relu'))
lstm.add(Dense(128, activation='relu'))
lstm.add(Dense(64, activation='relu'))
lstm.add(Dense(actions.shape[0], activation='softmax'))
print(lstm.summary())

None


In [179]:
lstm.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['categorical_accuracy'])
lstm.fit(X_train, y_train, batch_size=batch_size, epochs=max_epochs, validation_data=(X_val, y_val), callbacks=callbacks)

Epoch 1/500
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step - categorical_accuracy: 0.5229 - loss: 746596.0625

  self._save_model(epoch=self._current_epoch, batch=batch, logs=logs)


[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 520ms/step - categorical_accuracy: 0.5097 - loss: 995461.1250 - val_categorical_accuracy: 0.4167 - val_loss: 22931450.0000 - learning_rate: 0.0100
Epoch 2/500
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 112ms/step - categorical_accuracy: 0.5104 - loss: 41828512.0000 - val_categorical_accuracy: 0.4167 - val_loss: 400474.8438 - learning_rate: 0.0100
Epoch 3/500
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 111ms/step - categorical_accuracy: 0.5431 - loss: 7933467.5000 - val_categorical_accuracy: 0.4167 - val_loss: 9700973.0000 - learning_rate: 0.0100
Epoch 4/500
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 116ms/step - categorical_accuracy: 0.5014 - loss: 6470252.5000 - val_categorical_accuracy: 0.4167 - val_loss: 2333989.0000 - learning_rate: 0.0100
Epoch 5/500
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 112ms/step - categorical_accuracy: 0.4778 - loss: 4079

<keras.src.callbacks.history.History at 0x18014553190>

In [180]:
models = {'LSTM': lstm}

In [181]:
for model_name, model in models.items():
    save_dir = os.path.join(os.getcwd(), f"{model_name}.h5")
    model.save(save_dir)

In [182]:
# Run model rebuild before doing this
for model_name, model in models.items():
    load_dir = os.path.join(os.getcwd(), f"{model_name}.h5")
    model.load_weights(load_dir)

In [183]:
for model in models.values():
    res = model.predict(X_test, verbose=0)

In [184]:
eval_results = {}
eval_results['confusion matrix'] = None
eval_results['accuracy'] = None

confusion_matrices = {}
classification_accuracies = {}

In [185]:
for model_name, model in models.items():
    yhat = model.predict(X_test, verbose=0)

    # Get list of classification predictions
    ytrue = np.argmax(y_test, axis=1).tolist()
    yhat = np.argmax(yhat, axis=1).tolist()

    # Confusion matrix
    confusion_matrices[model_name] = multilabel_confusion_matrix(ytrue, yhat)
    print(f"{model_name} confusion matrix: {os.linesep}{confusion_matrices[model_name]}")

# Collect results
eval_results['confusion matrix'] = confusion_matrices

LSTM confusion matrix: 
[[[0 3]
  [0 5]]

 [[5 0]
  [3 0]]]


In [186]:
for model_name, model in models.items():
    yhat = model.predict(X_test, verbose=0)

    # Get list of classification predictions
    ytrue = np.argmax(y_test, axis=1).tolist()
    yhat = np.argmax(yhat, axis=1).tolist()

    # Model accuracy
    classification_accuracies[model_name] = accuracy_score(ytrue, yhat)
    print(f"{model_name} classification accuracy = {round(classification_accuracies[model_name]*100,3)}%")

# Collect results
eval_results['accuracy'] = classification_accuracies

LSTM classification accuracy = 62.5%


In [173]:
def prob_viz(res, actions, input_frame, colors):
    """
    This function displays the model prediction probability distribution over the set of exercise classes
    as a horizontal bar graph

    """
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)

    return output_frame

In [174]:
# 1. New detection variables
sequence = []
predictions = []
res = []
threshold = 0.5 # minimum confidence to classify as an action/exercise
current_action = ''

curl_stage = None
press_stage = None
squat_stage = None

# Camera object
cap = cv2.VideoCapture(0)

# Video writer object that saves a video of the real time test
fourcc = cv2.VideoWriter_fourcc('M','J','P','G') # video compression format
HEIGHT = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # webcam video frame height
WIDTH = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # webcam video frame width
FPS = int(cap.get(cv2.CAP_PROP_FPS)) # webcam video fram rate

video_name = os.path.join(os.getcwd(),f"{model_name}_real_time_test.avi")
out = cv2.VideoWriter(video_name, cv2.VideoWriter_fourcc(*"MJPG"), FPS, (WIDTH,HEIGHT))

# Set mediapipe model
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detection
        image, results = mediapipe_detection(frame, pose)

        # Draw landmarks
        draw_landmarks(image, results)

        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-sequence_length:]

        if len(sequence) == sequence_length:
            res = model.predict(np.expand_dims(sequence, axis=0), verbose=0)[0]
            predictions.append(np.argmax(res))
            current_action = actions[np.argmax(res)]
            confidence = np.max(res)

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
        cv2.imshow('OpenCV Feed', image)

        # Write to video file
        if ret == True:
            out.write(image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    out.release()
    cv2.destroyAllWindows()

