In [13]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
from mediapipe.tasks.python import vision

from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2

import pickle

#### Import models

In [34]:
landmarker_path = '../models/pose_landmarker_heavy.task'

with open('../models/downdog_model.pkl', 'rb') as file:
    downdog_model = pickle.load(file)

with open('../models/updog_model.pkl', 'rb') as file:
    updog_model = pickle.load(file)

with open('../models/cobra_model.pkl', 'rb') as file:
    cobra_model = pickle.load(file)

with open('../models/asana_model.pkl', 'rb') as file:
    asana_model = pickle.load(file)

#### Create the task

In [15]:
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
PoseLandmarkerResult = mp.tasks.vision.PoseLandmarkerResult
VisionRunningMode = mp.tasks.vision.RunningMode


options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=landmarker_path),
    output_segmentation_masks=True,
    running_mode=VisionRunningMode.VIDEO)

#### Function to draw landmarks

In [16]:
def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

#### Get and process webcam feed

In [33]:
cap = cv2.VideoCapture('../data/vids/1b.mp4')
#cap = cv2.VideoCapture(0)


prev_timestamp = 0

# Initiate pose model
with PoseLandmarker.create_from_options(options) as detector:
    
    while cap.isOpened():
        ret, frame = cap.read()

        # Recolor image
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False # optimization

        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image)

        frame_timestamp_ms = prev_timestamp + 200
        prev_timestamp = frame_timestamp_ms
        
        # Make detections
        results = detector.detect_for_video(mp_image, frame_timestamp_ms)

        # Recolor image back
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
       
        # Draw pose landmarks
        annotated_image = draw_landmarks_on_image(image, results)
        annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)
        

        # Export coordinates
        try:
            # Extract Pose landmarks
            data = []
            for pose_landmarks in results.pose_landmarks:
                landmarks_data = []
                for pose_landmark in pose_landmarks:
                    landmarks_data.append({
                        'x': pose_landmark.x,
                        'y': pose_landmark.y,
                        'z': pose_landmark.z,
                        'visibility': pose_landmark.visibility,
                        'presence': pose_landmark.presence
                    })
            data.extend(landmarks_data)
            
            pose = pd.DataFrame(data).drop(columns=['visibility', 'presence']).values.flatten()
            
            # Make Detections
            X = pd.DataFrame([pose])

            asana = asana_model.predict(X)
            asana_proba = asana_model.predict_proba(X)
            confidence_threshold = 0.75

            output = ''
            correctness = ''
            if np.max(asana_proba) >= confidence_threshold:
                if asana[0] == 'downdog':
                    correctness = downdog_model.predict(X)
                elif asana[0] == 'updog':
                    correctness == updog_model.predict(X)
                elif asana[0] == 'cobra':
                    correctness == cobra_model.predict(X)
                output = str(asana[0]) + ': ' + str(correctness[0])
                
            #correctness_proba = downdog_model.predict_proba(X)

            coords = tuple([100, 100])
            cv2.putText(annotated_image, output.upper(), coords, cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2, cv2.LINE_AA)
            
        
        except:
            pass
                        
        cv2.imshow('',cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))
        #cv2.waitKey(0)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

I0000 00:00:1718020734.818657  260957 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1718020734.821076  262890 gl_context.cc:357] GL version: 3.2 (OpenGL ES 3.2 Mesa 24.1.1-arch1.1), renderer: Mesa Intel(R) HD Graphics 620 (KBL GT2)


##### In case the window was not closed

In [27]:
cap.release()
cv2.destroyAllWindows()