## Import Libraries :

In [None]:
import numpy as np
from movenet import Movenet
from data import BodyPart
import cv2


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import load_model

## Data path :

In [None]:
ROOT = 'C:/Users/admin/miniconda_codes/POSE/'
movenet_model_path = ROOT + 'models/movenet_thunder'
pose_yoga_model_path = ROOT + 'models/my_model'

Yoga_pose_imgs_path =  ROOT + 'Pose_imgs/'
Yoga_imgs_path =  ROOT + 'imgs/'
Yoga_videos_path = ROOT + 'Videos/'

## Load Models :

In [None]:
movenet = Movenet(movenet_model_path)
model = load_model(pose_yoga_model_path)

In [None]:
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 34)]              0         
                                                                 
 dense (Dense)               (None, 128)               4480      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 64)                8256      
                                                                 
 dropout_1 (Dropout)         (None, 64)                0         
                                                                 
 dense_2 (Dense)             (None, 8)                 520       
                                                                 
Total params: 13,256
Trainable params: 13,256
Non-trainable p

## Variables :

In [None]:
pose_classes = ['chair', 'cobra', 'dog', 
                'no_pose', 'shoudler_stand', 'print',
                'tree', 'warrior']

colors = [(255,255,255), (0,0,255)]

detection_threshold=0.1

EDGES = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

## Movenet_functions :

In [None]:
def img_to_tensor(im):
    im = tf.convert_to_tensor(im, dtype=tf.uint8)
    return im

In [None]:
def detect(input_tensor, inference_count=3):
    movenet.detect(input_tensor.numpy(), reset_crop_region=True)
    
    for _ in range(inference_count - 1):
        detection = movenet.detect(input_tensor.numpy(), 
                                reset_crop_region=False)
    
    return detection

In [None]:
def draw_connection(frame, keypoints, edges, color, confidence_threshold = 0.1, thickness = 1):
    
    for edge, _ in edges.items():
        p1, p2 = edge
        y1, x1, c1 = keypoints[p1]
        y2, x2, c2 = keypoints[p2]
        
        if (c1 > confidence_threshold) & (c2 > confidence_threshold):      
            cv2.line(frame, (int(y1), int(x1)), (int(y2), int(x2)), color, thickness)

In [None]:
def draw_keypoints(frame, keypoints, color, confidence_threshold = 0.1, thickness = 3, fill_inside = 1):
  
    for kp in keypoints:
        ky, kx, kp_conf = kp       
        if kp_conf > confidence_threshold:
            cv2.circle(frame, (int(ky), int(kx)), thickness, color, fill_inside)


## Test movenet with imgs :

In [None]:
img_path = Yoga_imgs_path +'2.jpg'
img = cv2.imread(img_path, cv2.IMREAD_COLOR)
cv2.imshow('Input', img)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
input_img = img_to_tensor(img)
keypoints_with_scores = detect(input_img)

min_landmark_score = min([keypoint.score for keypoint in keypoints_with_scores.keypoints])
should_keep_image = min_landmark_score >= detection_threshold
if not should_keep_image:
    self._message.append('Skipped' + image_path + 'Keypoints score are below than threshold')
pose_landmarks = [[int(keypoint.coordinate.x), int(keypoint.coordinate.y), keypoint.score] for keypoint in keypoints_with_scores.keypoints]
draw_keypoints(img, pose_landmarks,  colors[0])
draw_connection(img, pose_landmarks, EDGES, colors[1]) 
cv2.imshow('Output', img)
cv2.waitKey(0)
cv2.destroyAllWindows()

## Test movenet with videos :

In [None]:
file = Yoga_videos_path + 'v1.mp4'
cap = cv2.VideoCapture(file)
while cap.isOpened():
    ret, frame = cap.read()
    
    # Detect pose keypoints:
    input_img = img_to_tensor(frame)
    keypoints_with_scores = detect(input_img, 2)
    
    pose_landmarks = [[int(keypoint.coordinate.x), int(keypoint.coordinate.y), keypoint.score] for keypoint in keypoints_with_scores.keypoints] 
    
    draw_keypoints(frame, pose_landmarks,  colors[0])
    draw_connection(frame, pose_landmarks, EDGES, colors[1])    
    
    
    cv2.imshow('Yoga_Poses', frame)
        
    if cv2.waitKey(10) & 0xFF==ord('q'):
        break
cap.release()
cv2.destroyAllWindows() 

## Model Functions :

In [None]:
def get_center_point(landmarks, left_bodypart, right_bodypart):
    """Calculates the center point of the two given landmarks."""
    left = tf.gather(landmarks, left_bodypart.value, axis=1)
    right = tf.gather(landmarks, right_bodypart.value, axis=1)
    center = left * 0.5 + right * 0.5
    return center


def get_pose_size(landmarks, torso_size_multiplier=2.5):
    """Calculates pose size.
    It is the maximum of two values:
    * Torso size multiplied by `torso_size_multiplier`
    * Maximum distance from pose center to any pose landmark
    """
    # Hips center
    hips_center = get_center_point(landmarks, BodyPart.LEFT_HIP, 
                                 BodyPart.RIGHT_HIP)

    # Shoulders center
    shoulders_center = get_center_point(landmarks, BodyPart.LEFT_SHOULDER,
                                      BodyPart.RIGHT_SHOULDER)

    # Torso size as the minimum body size
    torso_size = tf.linalg.norm(shoulders_center - hips_center)
    # Pose center
    pose_center_new = get_center_point(landmarks, BodyPart.LEFT_HIP, 
                                     BodyPart.RIGHT_HIP)
    pose_center_new = tf.expand_dims(pose_center_new, axis=1)
    # Broadcast the pose center to the same size as the landmark vector to
    # perform substraction
    pose_center_new = tf.broadcast_to(pose_center_new,
                                    [tf.size(landmarks) // (17*2), 17, 2])

    # Dist to pose center
    d = tf.gather(landmarks - pose_center_new, 0, axis=0,
                name="dist_to_pose_center")
    # Max dist to pose center
    max_dist = tf.reduce_max(tf.linalg.norm(d, axis=0))

    # Normalize scale
    pose_size = tf.maximum(torso_size * torso_size_multiplier, max_dist)
    return pose_size



def normalize_pose_landmarks(landmarks):
    """Normalizes the landmarks translation by moving the pose center to (0,0) and
    scaling it to a constant pose size.
  """
  # Move landmarks so that the pose center becomes (0,0)
    pose_center = get_center_point(landmarks, BodyPart.LEFT_HIP, 
                                 BodyPart.RIGHT_HIP)

    pose_center = tf.expand_dims(pose_center, axis=1)
    # Broadcast the pose center to the same size as the landmark vector to perform
    # substraction
    pose_center = tf.broadcast_to(pose_center, 
                                [tf.size(landmarks) // (17*2), 17, 2])
    landmarks = landmarks - pose_center

    # Scale the landmarks to a constant pose size
    pose_size = get_pose_size(landmarks)
    landmarks /= pose_size
    return landmarks


def landmarks_to_embedding(landmarks_and_scores):
    """Converts the input landmarks into a pose embedding."""
    # Reshape the flat input into a matrix with shape=(17, 3)
    reshaped_inputs = keras.layers.Reshape((17, 3))(landmarks_and_scores)

    # Normalize landmarks 2D
    landmarks = normalize_pose_landmarks(reshaped_inputs[:, :, :2])
    # Flatten the normalized landmark coordinates into a vector
    embedding = keras.layers.Flatten()(landmarks)
    return embedding


def preprocess_data(X_test):
    embedding = landmarks_to_embedding(tf.reshape(tf.convert_to_tensor(X_test), (1, 51)))
    processed_X_test = (tf.reshape(embedding, (34)))
    processed_X_test = tf.convert_to_tensor(processed_X_test)
    processed_X_test = tf.expand_dims(processed_X_test, axis=0)
    return processed_X_test


In [None]:
def overlap_a_b(a, b):
    alpha = 0.5
    beta = 0.5
    gamma = 0
    out_img = cv2.addWeighted(a, alpha, b, beta, gamma)
    
    return out_img

In [None]:
def process_model(frame, yoga_pose_img, class_id):
    yoga_x, yoga_y, _ = yoga_pose_img.shape
    # Detect pose keypoints:
    input_img = img_to_tensor(frame)
    keypoints_with_scores = detect(input_img)
    
    
    pose_landmarks = [[int(keypoint.coordinate.x), int(keypoint.coordinate.y), keypoint.score] for keypoint in keypoints_with_scores.keypoints]
    input_landmarks = np.array(pose_landmarks,dtype=np.float32).flatten()
    model_input = preprocess_data(input_landmarks)
    result = model.predict(model_input)
    
    overlap_imgs = overlap_a_b(frame[50:yoga_y+50,0:yoga_x], yoga_pose_img)
    frame[50:yoga_y+50,0:yoga_x] = overlap_imgs
    
    if (np.argmax(result)) == class_id:
        
        draw_keypoints(frame, pose_landmarks,  colors[0])
        draw_connection(frame, pose_landmarks, EDGES, colors[0])  
        
        

    
    else: 
        draw_keypoints(frame, pose_landmarks,  colors[1])
        draw_connection(frame, pose_landmarks, EDGES, colors[1]) 
        
    cv2.putText(frame, 
                ('Pose: {} with a {:.2f} %. '.format(pose_classes[np.argmax(result)], 100 * np.max(result))), 
                (5, 25), cv2.FONT_HERSHEY_SIMPLEX, .5, (0, 0, 255), 2)

## Model Test with imgs :


pose_classes = ['chair', 'cobra', 'dog', 
                'no_pose', 'shoudler_stand', 'print',
                'tree', 'warrior']

In [None]:
img_path = Yoga_imgs_path +'1.jpg'
img = cv2.imread(img_path, cv2.IMREAD_COLOR)
cv2.imshow('Input', img)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
process_model(img, class_id=0) 
cv2.imshow('Output', img)
cv2.waitKey(0)
cv2.destroyAllWindows()

This image most likely belongs to tree with a 100.00 percent confidence.


## Model Test with videos :

In [None]:
Yoga_pose_path = Yoga_pose_imgs_path +'chair_3.jpg'
Yoga_pose = cv2.imread(Yoga_pose_path, cv2.IMREAD_COLOR)
Yoga_pose = cv2.resize(Yoga_pose, (120,120), interpolation = cv2.INTER_AREA)

file = Yoga_videos_path + 'vid4.mp4'
cap = cv2.VideoCapture(file)

video_width = int(cap.get(3))
video_height = int(cap.get(4))
FPS = int(cap.get(cv2.CAP_PROP_FPS))
print(FPS)
video_size = (int(video_width/2), int(video_height/2))


filename = (Yoga_videos_path + 'testing_vid.avi')
out_video = cv2.VideoWriter(filename, cv2.VideoWriter_fourcc(*'MJPG'), FPS, video_size)


while cap.isOpened():
    ret, frame = cap.read()
    
    try:
    
        frame = cv2.resize(frame, video_size, interpolation = cv2.INTER_AREA)

        process_model(frame, Yoga_pose, class_id=0)


        out_video.write(frame)
        cv2.imshow('Yoga_Poses', frame)

        if cv2.waitKey(10) & 0xFF==ord('q'):
            break
    except: pass
        
cap.release()
out_video.release()
cv2.destroyAllWindows() 

30


In [None]:
video_size

(1264, 824)

In [None]:
img_path = Yoga_pose_imgs_path +'chair.png'
test = cv2.imread(img_path, cv2.IMREAD_COLOR)
test = cv2.resize(test, (60,60), interpolation = cv2.INTER_AREA)
cv2.imshow('Input', test)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
test.shape

(60, 60, 3)

In [None]:
cv2.imshow('overlap', img)
cv2.waitKey(0)
cv2.destroyAllWindows()