In [1]:
import numpy as np
import pandas as pd
import cv2
import mediapipe as mp
from IPython.display import clear_output
import time
import os
import math


pd.set_option('display.max_columns', None)


2024-02-13 16:59:34.111363: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Initial Research

MediaPipe has a z-axis output, which apparently estimates the pose in a 2m3 box 
wherer the origin is the center of the hip. As usual, the point of the algo is 
to determine the _relataive_ position of the key points, not their absolute
position or distance. MP does provide this as `world_coordinates`


# Mediapipe

Simple framework:
1. Capture the frame from the video feed.
2. Process the frame using the pose detection model to find keypoints.
3. Draw the keypoints directly onto the frame.
4. Display the frame with the drawn keypoints. 

Below is some code using MediaPipe to get the key metrics out of the pose estimation.
**MODEL COMPLEXITY:** The intended moden (lite, full, heavy) can be adjusted my changing the `model_complexity` kwarg when initializing the `pose` variable.

In [2]:
'''
DEPRICATED, USING OLD VERSION WITHOUT VISIBILITY
'''

import cv2
import mediapipe as mp

# Initialize MediaPipe Pose.
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5,
                    min_tracking_confidence=0.5, model_complexity=2)
mp_drawing = mp.solutions.drawing_utils

def sigmoid(x):
  return 1 / (1 + math.exp(-x))

# Start capturing video from the webcam.
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert the frame to RGB.
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Process the frame with MediaPipe Pose.
    results = pose.process(frame_rgb)

    # Draw the pose annotations on the frame.
    mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

    # Display the frame.
    cv2.imshow('Mediapipe Feed', frame)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

# Release the webcam and destroy all OpenCV windows.
cap.release()
cv2.destroyAllWindows()
cv2.waitKey(1)


I0000 00:00:1707809089.052878       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: Intel(R) Iris(TM) Plus Graphics 655
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


-1

Another implementation example, with some readouts. This is also mediapipe.

In [3]:
'''
DEPRICATED, USING OLD VERSION WITHOUT VISIBILITY
'''

# Initialize stuff from mediapipe
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

cap = cv2.VideoCapture(0)

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()

        # Recolor image
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        # Make detection
        results = pose.process(image)

        try:
            landmarks = results.pose_landmarks.landmark
            print(f"R_Sh:\n{landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]}")
            print(f"L_Sh:\n{landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]}")

            # Save values if torso Z values close to zero
            if np.max(np.abs([landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].z,
                              landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].z,
                              landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].z,
                              landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].z])) > 0.5 and\
                np.min([landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].visibility,
                        landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].visibility,
                        landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].visibility,
                        landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].visibility]) > 0.9:
                print("Can See Torso")

                # TODO Write Framework here to save dataframe of correct torso values.
                # The Z value threshold in the above lines need tweaking as well.

        except:
            pass

        # Recolor image back to BGR
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Show detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                  mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2),
                                  mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2),)

        cv2.imshow("Mediapipe feed", image)

        if cv2.waitKey(10) & 0xFF == ord("q"):
            break

        clear_output(wait=True)
    cap.release()
    cv2.destroyAllWindows()
    cv2.waitKey(1)


R_Sh:
x: 0.29915327
y: 0.7627407
z: -0.4048484
visibility: 0.9970653

L_Sh:
x: 0.78462565
y: 0.75210494
z: -0.31445843
visibility: 0.99767625



## Experimenting with asynchronous processing of frames

Another method to run Mediapipe, as described here:
https://developers.google.com/mediapipe/solutions/vision/pose_landmarker/python#live-stream

Results and performance of each models (lite, full, heavy) are summarized here: https://storage.googleapis.com/mediapipe-assets/Model%20Card%20BlazePose%20GHUM%203D.pdf

**Issues with asynchronous processing:** Looks like asynchronous readouts are not suitable if we are doing a video frame overlay. Keep synchronous processing such that overlay is still present with the live feed. The main code difference is using a timestamp in ms and also calling `detect_async()` instead of `detect()`.

**From the oracle**
> If you're running things synchronously, especially for real-time or live feed scenarios with frameworks like MediaPipe, you typically process each frame one at a time in a loop. In such cases, you don't necessarily need to use a callback function. Instead, you can directly process each frame as you capture it, analyze it with the pose detection model, and immediately draw the keypoints or landmarks onto the frame before displaying it. This approach ensures minimal delay between capturing a frame, processing it, and displaying the results, making it suitable for real-time applications.

## YOLO/Ultralytics

YOLO v8 has a pose estimation, but does not include many key ponts (17).
Also, YOLO is more known for object detection, hence may not be the best for this application.

Table this model for now, can return to it if we have reason.

# Extracting poses from still images for NN model

Below is code which takes in a directory, and can output a df including all KP and extracted data.

In [3]:
'''
DEPRICATED, USING OLD VERSION WITHOUT VISIBILITY
'''

import cv2
import mediapipe as mp
import pandas as pd
import glob

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True, model_complexity=1,
                    min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Placeholder for collected data
data = []

# Specify the directory path containing images
directory_path = 'new_pose_data/Yoga-82/images/Warrior_I_Pose_or_Virabhadrasana_I_'

# Use glob to get all the image file paths
image_files = glob.glob(f'{directory_path}/*.jpg')

# Process each image
for image_file in image_files:
    image = cv2.imread(image_file)
    # Ensure the image was correctly loaded before proceeding
    if image is not None:
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        # Initialize a row with the image file name
        row = [image_file]

        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark

            # Flatten all landmark data into the row
            for landmark in landmarks:
                row.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
        else:
            # If no landmarks are detected, fill in with None or a placeholder value
            row.extend([None] * 33 * 4)

        # Append the row to the data list
        data.append(row)
    else:
        print(f"Failed to load image: {image_file}")

# Define column names
columns = ['image_file']
for idx in range(1, 34):  # MediaPipe Pose has 33 landmarks, starting index at 1 for readability
    prefix = f'kp{idx}_'
    attributes = ['x', 'y', 'z', 'visibility']
    columns += [f'{prefix}{attr}' for attr in attributes]

# Create DataFrame
df = pd.DataFrame(data, columns=columns)


I0000 00:00:1707809106.394315       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: Intel(R) Iris(TM) Plus Graphics 655


`pose_landmarker_result.pose_landmarks` gives normalized coordinates in normalized coordinates. Midpoint is the point between the hips.
`pose_landmarker_result.pose_world_landmarks` gives the coordinates (not normalized)

In [2]:
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import glob

model_path = "model_creator/pose_landmarker_full.task"

BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.IMAGE)

# Placeholder for collected data
attrs_to_get = ["x", "y", "z", "visibility", "presence"]
no_kp_files = []
fail_to_load_files = []

# Specify the directory path containing images
pose_dirs = [p for p in glob.glob("selected_poses/*") if os.path.isdir(p)]
pose_names = [os.path.basename(d) for d in pose_dirs]

df_all = pd.DataFrame({})

for pose in pose_names:
    data = []
    filenames = []  # List to keep track of filenames
    directory_path = f'selected_poses/{pose}'

    # Use glob to get all the image file paths
    image_files = glob.glob(f'{directory_path}/*.jpg')

    # Load the input image from an image file.
    # mp_image = mp.Image.create_from_file('selected_poses/akarna/Screenshot 2024-02-06 at 12.50.57.png')

    with PoseLandmarker.create_from_options(options) as landmarker:
        for image_file in image_files:
            # detect key points. object is pose_landmarker_result.pose_landmarks
            try:
                mp_image = mp.Image.create_from_file(image_file)
                pose_landmarker_result = landmarker.detect(mp_image)
                if len(pose_landmarker_result.pose_landmarks) > 0:
                # extract info out of landmarks object
                    row = []
                    for lmk in attrs_to_get:
                        for x in range(len(pose_landmarker_result.pose_landmarks[0])):
                            row.append(getattr(pose_landmarker_result.pose_landmarks[0][x], lmk))
                    data.append(row)
                    filenames.append(image_file)
                else:
                    no_kp_files.append(image_file)
            except:
                fail_to_load_files.append(image_file)
                pass

    columns = []
    for x in range(33):
        for attr in attrs_to_get:
            col = f"kp{x}_{attr}"
            columns.append(col)

    df_pose = pd.DataFrame(data, columns=columns)
    df_pose['filename'] = filenames
    df_pose.set_index('filename', inplace=True)
    df_pose["pose"] = pose

    if len(df_all) < 1:
        df_all = df_pose.copy()
    else:
        df_all = pd.concat([df_all, df_pose], axis=0)


I0000 00:00:1707811194.089459       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: Intel(R) Iris(TM) Plus Graphics 655
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
I0000 00:00:1707811206.986461       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: Intel(R) Iris(TM) Plus Graphics 655
I0000 00:00:1707811210.436236       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: Intel(R) Iris(TM) Plus Graphics 655
I0000 00:00:1707811228.153658       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: Intel(R) Iris(TM) Plus Graphics 655
I0000 00:00:1707811241.038104       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: Intel(R) Iris(TM) Plus Graphics 655
I0000 00:00:1707811245.188779       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: Intel(R) Iris(TM) Plus Graphics 655
I0000 00:00:1707811257.464644       1 gl_context.cc:344] GL version: 2.1 (2.1 INTEL-22.1.29), renderer: 

Comfirming that the number of rows is correct.
Note that some of the files do not get read, so nums from below code are overestimates.

In [4]:
df_all.to_csv("pose_landmark_data_2_13.csv", index=True)

In [113]:
import os

# The path to the directory containing the folders
directory_path = 'selected_poses/'

# List all items in the directory and filter for directories only
folders = [item for item in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, item))]

# Initialize a dictionary to hold the folder names and their item counts
folder_item_counts = {}

# Iterate over each folder and count its contents
for folder in folders:
    folder_path = os.path.join(directory_path, folder)
    item_count = len(os.listdir(folder_path))
    folder_item_counts[folder] = item_count

# Print the count of items in each folder
for folder, count in folder_item_counts.items():
    print(f"{folder}: {count} items")


downdog: 246 items
tree: 41 items
boat: 387 items
akarna: 78 items
warrior: 235 items
heron: 141 items
goddess: 112 items
plank: 112 items
revolved_triangle: 508 items
cobra: 656 items


In [1]:
df_all

NameError: name 'df_all' is not defined