In [1]:
## TODO: 

# Code to detect backsteps 
#    - Also need to figure out how that affects the balance/stability physics (from a coaching and movement perspective) and decide what to draw.

# Scale drawings based on output resolution

# Figure out how to make it more useful
#    - Make things more self-evident for end-users, so it doesn't require a coach's eye
#    - I need to think about how to turn my implicit knowledge into explicit knowledge and instruction

# Consider better implementation of CoG tracking?

# Add force vector display based on CoG
#    - Have a dead zone and then once the CoG moves outside dead zone, show a force arrow, colored to indicate speed/force?

# Look into models that can better handle holds, occlusions, & understanding of 3D space which will certainly be present in climbing footage
#    - MediaPipe misreads holds as body parts, particulary when dealing with occlusions
#    - Currently playing with YOLO_NAS. Handling multiple people in frame might be important for real-life footage (spotters, etc.)
#        - If MediaPipe grabs the spotter or a person walking through frame instead of the climber, it's useless

# Develop basic UI
#    - Basic File Select & Process UI
#    - There's probably existing video comparison software that would be better than aything I can do
#    - Allow side by side video playing with individual scrubbing
#    - Allow Slow-Mo and Frame-by-Frame movement
#    - Perhaps allow selection of two processed videos and have another tab for running the analysis

## Inputs and Outputs

In [2]:
input_video_path = '/path/to/.mp4'
output_video_path = '/path/to/.mp4'
desired_width = 'width'  # Set an integer value or 'width' for original width

# Refactor.

In [3]:
import cv2
import mediapipe as mp
import numpy as np

def initialize_video(input_path, desired_width):
    """
    Initializes video capture from a file or camera and calculates new dimensions.
    
    :param input_path: Path to the video file or camera index.
    :param desired_width: Desired width of the output frames.
    :return: Tuple of video capture object, new dimensions (width, height).
    """
    # Initialize video capture
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise ValueError("Unable to open  source: {}".format(input_path))

    # Read the first frame to get original dimensions
    ret, frame = cap.read()
    if not ret:
        raise ValueError("Unable to read from video source: {}".format(input_path))

    # Original dimensions
    height, width = frame.shape[:2]

    if desired_width == 'width':
        # Use the original width
        new_width = width
        new_height = height
    else:
        # Calculate the new dimensions preserving aspect ratio
        aspect_ratio = width / height
        new_width = desired_width
        new_height = int(new_width / aspect_ratio)

    return cap, (new_width, new_height)


def initialize_pose():
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    return pose

def process_pose(frame, pose, new_dimensions):
    """
    Processes a frame with MediaPipe Pose to extract keypoints.

    :param frame: The video frame to process.
    :param pose: The MediaPipe Pose object.
    :param new_dimensions: New dimensions (width, height) to resize the frame.
    :return: Tuple of the processed frame and extracted landmarks.
    """
    # Resize the frame to new dimensions
    frame_resized = cv2.resize(frame, new_dimensions)

    # Convert the frame to RGB
    frame_rgb = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB)

    # Process the frame with MediaPipe Pose
    results = pose.process(frame_rgb)

    # Convert the frame back to BGR
    frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)

    # Extract landmarks if any are detected
    landmarks = results.pose_landmarks.landmark if results.pose_landmarks else None

    return frame_bgr, results, landmarks

def calculate_joint_angle(a, b, c):
    """
    Calculates the angle at joint 'b' given points a, b, c.

    :param a: The first keypoint (as a tuple or list).
    :param b: The middle keypoint (joint) where the angle is calculated.
    :param c: The third keypoint (as a tuple or list).
    :return: Calculated angle in degrees.
    """
    a = np.array(a) # First
    b = np.array(b) # Mid
    c = np.array(c) # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
     
    if angle >180.0:
        angle = 360-angle
        
    return angle 


def calculate_belly_button(hips, shoulders):
    """
    Calculate the 'belly_button' keypoint.

    :param hips: Coordinates of the center of the hips (tuple or list).
    :param shoulders: Coordinates of the center of the shoulders (tuple or list).
    :return: Coordinates of the 'belly_button' keypoint.
    """
    # Convert to numpy arrays for vector operations
    hips = np.array(hips)
    shoulders = np.array(shoulders)

    # Calculate belly button position
    belly_button = hips + 0.25 * (shoulders - hips)
    return belly_button.tolist()  # Convert back to list for consistency

def update_cog_trajectory(processed_frame, belly_button, cog_positions, trail_length=150, trail_fade=True):
    """
    Update and draw the trajectory of the Center of Gravity (CoG) using the belly_button point.

    :param processed_frame: The frame to draw on.
    :param belly_button: Current position of the CoG (belly button).
    :param cog_positions: List storing the positions of the CoG.
    :param trail_length: The number of past positions to include in the trail.
    :param trail_fade: Whether to fade the trail as it gets older.
    :return: Updated frame with CoG and trajectory.
    """
    # Append the current CoG position
    cog_positions.append(belly_button)

    # Limit the length of the trail
    if len(cog_positions) > trail_length:
        cog_positions.pop(0)

    # Draw the CoG trajectory
    for i in range(1, len(cog_positions)):
        opacity = 1 if not trail_fade else i / len(cog_positions)
        color = (255, 0, 0, int(255 * opacity))  # Blue with variable opacity
        start_point = (int(cog_positions[i-1][0]), int(cog_positions[i-1][1]))
        end_point = (int(cog_positions[i][0]), int(cog_positions[i][1]))
        cv2.line(processed_frame, start_point, end_point, color[:3], thickness=2)

    # Draw the current CoG position
    cv2.circle(processed_frame, (int(belly_button[0]), int(belly_button[1])), radius=5, color=(0, 255, 0), thickness=-1)  # Green

    return processed_frame

# Initialize an empty list outside of your processing loop to store CoG positions
cog_positions = []


def draw_triangle_with_color(image, belly_button, left_ankle, right_ankle, opacity=0.25):
    """
    Draws a triangle using the belly button and both ankles with specified opacity. 
    The color of the triangle is green if the belly button is horizontally within the ankles, otherwise red.

    :param image: The image to draw on.
    :param belly_button: Coordinates of the belly button.
    :param left_ankle: Coordinates of the left ankle.
    :param right_ankle: Coordinates of the right ankle.
    :param opacity: Opacity of the triangle.
    :return: Image with the triangle drawn.
    """
    # Convert points to numpy arrays for easier manipulation
    belly_button = np.array(belly_button, dtype=np.int32)
    left_ankle = np.array(left_ankle, dtype=np.int32)
    right_ankle = np.array(right_ankle, dtype=np.int32)

    # Check if belly button is horizontally within the ankles
    if left_ankle[0] <= belly_button[0] <= right_ankle[0] or right_ankle[0] <= belly_button[0] <= left_ankle[0]:
        color = (255, 0, 0)  # Blue
    else:
        color = (0, 0, 255)  # Red

    # Create an overlay for the triangle
    overlay = image.copy()
    points = np.array([left_ankle, right_ankle, belly_button])
    cv2.fillPoly(overlay, [points], color)

    # Blend the overlay with the original image
    cv2.addWeighted(overlay, opacity, image, 1 - opacity, 0, image)

    return image

def draw_angle(image, angle, position, offset=(0, 0)):
    """
    Draws the specified angle on the image at the given position.

    :param image: The image to draw on.
    :param angle: The calculated angle to draw.
    :param position: The position (x, y) to place the angle text.
    :param offset: Offset (x, y) to adjust the position of the text.
    :return: Image with angle drawn.
    """
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 0.5
    thickness = 2
    color = (255, 255, 255)  # White

    # Apply offset to position
    text_position = (int(position[0] + offset[0]), int(position[1] + offset[1]))

    # Draw angle text
    cv2.putText(image, f"{int(angle)}", text_position, font, font_scale, color, thickness, cv2.LINE_AA)

    return image

def color_for_angle(angle, is_shoulder=False):
    if is_shoulder:
        # For shoulder joints, consider close to 0 as at rest
        if angle < 30:  # You can adjust this threshold
            return (0, 255, 0)  # Green
        elif 30 <= angle <= 60:
            return (0, 255, 255) # Yellow
        else:
            return (0, 0, 255)  # Red
    else:
        # For other joints
        if 150 <= angle <= 180:
            return (0, 255, 0)  # Green
        elif 100 <= angle < 150:
            return (0, 255, 255)  # Yellow
        else:
            return (0, 0, 255)  # Red


def draw_skeleton(processed_frame, landmarks):
    def draw_bone(point1, point2, color):
        cv2.line(processed_frame, point1, point2, color, 2)

    # Convert landmark positions to pixels
    pixel_landmarks = {landmark: (int(landmarks[landmark.value].x * processed_frame.shape[1]),
                                  int(landmarks[landmark.value].y * processed_frame.shape[0]))
                       for landmark in mp.solutions.pose.PoseLandmark}

    # Define skeleton structure and joints for angle calculations
    skeleton = [
        # Arms
        (mp.solutions.pose.PoseLandmark.LEFT_SHOULDER, mp.solutions.pose.PoseLandmark.LEFT_ELBOW),
        (mp.solutions.pose.PoseLandmark.LEFT_ELBOW, mp.solutions.pose.PoseLandmark.LEFT_WRIST),
        (mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER, mp.solutions.pose.PoseLandmark.RIGHT_ELBOW),
        (mp.solutions.pose.PoseLandmark.RIGHT_ELBOW, mp.solutions.pose.PoseLandmark.RIGHT_WRIST),
        # Legs
        (mp.solutions.pose.PoseLandmark.LEFT_HIP, mp.solutions.pose.PoseLandmark.LEFT_KNEE),
        (mp.solutions.pose.PoseLandmark.LEFT_KNEE, mp.solutions.pose.PoseLandmark.LEFT_ANKLE),
        (mp.solutions.pose.PoseLandmark.RIGHT_HIP, mp.solutions.pose.PoseLandmark.RIGHT_KNEE),
        (mp.solutions.pose.PoseLandmark.RIGHT_KNEE, mp.solutions.pose.PoseLandmark.RIGHT_ANKLE),
        # Torso and Hip-Shoulder Connections
        (mp.solutions.pose.PoseLandmark.LEFT_SHOULDER, mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER),
        (mp.solutions.pose.PoseLandmark.LEFT_HIP, mp.solutions.pose.PoseLandmark.RIGHT_HIP),
        (mp.solutions.pose.PoseLandmark.LEFT_SHOULDER, mp.solutions.pose.PoseLandmark.LEFT_HIP),
        (mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER, mp.solutions.pose.PoseLandmark.RIGHT_HIP),
    ]

    # Joints for angle calculations
    joints = [
        (mp.solutions.pose.PoseLandmark.LEFT_SHOULDER, mp.solutions.pose.PoseLandmark.LEFT_HIP, mp.solutions.pose.PoseLandmark.LEFT_ELBOW),
        (mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER, mp.solutions.pose.PoseLandmark.RIGHT_HIP, mp.solutions.pose.PoseLandmark.RIGHT_ELBOW),
        (mp.solutions.pose.PoseLandmark.LEFT_HIP, mp.solutions.pose.PoseLandmark.LEFT_SHOULDER, mp.solutions.pose.PoseLandmark.LEFT_KNEE),
        (mp.solutions.pose.PoseLandmark.RIGHT_HIP, mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER, mp.solutions.pose.PoseLandmark.RIGHT_KNEE),
        (mp.solutions.pose.PoseLandmark.LEFT_ELBOW, mp.solutions.pose.PoseLandmark.LEFT_SHOULDER, mp.solutions.pose.PoseLandmark.LEFT_WRIST),
        (mp.solutions.pose.PoseLandmark.RIGHT_ELBOW, mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER, mp.solutions.pose.PoseLandmark.RIGHT_WRIST),
        (mp.solutions.pose.PoseLandmark.LEFT_KNEE, mp.solutions.pose.PoseLandmark.LEFT_HIP, mp.solutions.pose.PoseLandmark.LEFT_ANKLE),
        (mp.solutions.pose.PoseLandmark.RIGHT_KNEE, mp.solutions.pose.PoseLandmark.RIGHT_HIP, mp.solutions.pose.PoseLandmark.RIGHT_ANKLE),
    ]

    # Calculate angles for joints
    joint_angles = {}
    for joint in joints:
        if all(pixel_landmarks.get(j) for j in joint):
            joint_angles[joint[0]] = calculate_joint_angle(pixel_landmarks[joint[1]], pixel_landmarks[joint[0]], pixel_landmarks[joint[2]])

    # Draw each bone with color based on joint angles
    for limb in skeleton:
        if all(pixel_landmarks.get(point) for point in limb):
            point1, point2 = pixel_landmarks[limb[0]], pixel_landmarks[limb[1]]
            # Determine if this bone is part of a joint we have an angle for
            angle = joint_angles.get(limb[0]) or joint_angles.get(limb[1])
            is_shoulder = limb[0] in [mp.solutions.pose.PoseLandmark.LEFT_SHOULDER, mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER]
            color = color_for_angle(angle, is_shoulder) if angle is not None else (255, 255, 255)  # Default color
            draw_bone(point1, point2, color)

    return processed_frame

def process_video(cap, out, pose, new_dimensions):
    # Initialize drawing utility
    mp_drawing = mp.solutions.drawing_utils

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Process the frame to get pose results and landmarks
        processed_frame, results, landmarks = process_pose(frame, pose, new_dimensions)

        if landmarks:
            # Extract keypoints
            left_shoulder = [landmarks[mp.solutions.pose.PoseLandmark.LEFT_SHOULDER.value].x * new_dimensions[0],
                             landmarks[mp.solutions.pose.PoseLandmark.LEFT_SHOULDER.value].y * new_dimensions[1]]
            right_shoulder = [landmarks[mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER.value].x * new_dimensions[0],
                              landmarks[mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER.value].y * new_dimensions[1]]
            left_elbow = [landmarks[mp.solutions.pose.PoseLandmark.LEFT_ELBOW.value].x * new_dimensions[0],
                          landmarks[mp.solutions.pose.PoseLandmark.LEFT_ELBOW.value].y * new_dimensions[1]]
            right_elbow = [landmarks[mp.solutions.pose.PoseLandmark.RIGHT_ELBOW.value].x * new_dimensions[0],
                           landmarks[mp.solutions.pose.PoseLandmark.RIGHT_ELBOW.value].y * new_dimensions[1]]
            left_wrist = [landmarks[mp.solutions.pose.PoseLandmark.LEFT_WRIST.value].x * new_dimensions[0],
                          landmarks[mp.solutions.pose.PoseLandmark.LEFT_WRIST.value].y * new_dimensions[1]]
            right_wrist = [landmarks[mp.solutions.pose.PoseLandmark.RIGHT_WRIST.value].x * new_dimensions[0],
                           landmarks[mp.solutions.pose.PoseLandmark.RIGHT_WRIST.value].y * new_dimensions[1]]
            left_hip = [landmarks[mp.solutions.pose.PoseLandmark.LEFT_HIP.value].x * new_dimensions[0],
                        landmarks[mp.solutions.pose.PoseLandmark.LEFT_HIP.value].y * new_dimensions[1]]
            right_hip = [landmarks[mp.solutions.pose.PoseLandmark.RIGHT_HIP.value].x * new_dimensions[0],
                         landmarks[mp.solutions.pose.PoseLandmark.RIGHT_HIP.value].y * new_dimensions[1]]
            left_ankle = [landmarks[mp.solutions.pose.PoseLandmark.LEFT_ANKLE.value].x * new_dimensions[0],
                          landmarks[mp.solutions.pose.PoseLandmark.LEFT_ANKLE.value].y * new_dimensions[1]]
            right_ankle = [landmarks[mp.solutions.pose.PoseLandmark.RIGHT_ANKLE.value].x * new_dimensions[0],
                           landmarks[mp.solutions.pose.PoseLandmark.RIGHT_ANKLE.value].y * new_dimensions[1]]
            left_knee = [landmarks[mp.solutions.pose.PoseLandmark.LEFT_KNEE.value].x * new_dimensions[0],
                         landmarks[mp.solutions.pose.PoseLandmark.LEFT_KNEE.value].y * new_dimensions[1]]
            right_knee = [landmarks[mp.solutions.pose.PoseLandmark.RIGHT_KNEE.value].x * new_dimensions[0],
                          landmarks[mp.solutions.pose.PoseLandmark.RIGHT_KNEE.value].y * new_dimensions[1]]

            
            # Calculate additional angles
            left_knee_angle = calculate_joint_angle(left_hip, left_knee, left_ankle)
            right_knee_angle = calculate_joint_angle(right_hip, right_knee, right_ankle)
            left_hip_angle = calculate_joint_angle(left_shoulder, left_hip, left_knee)
            right_hip_angle = calculate_joint_angle(right_shoulder, right_hip, right_knee)
            left_shoulder_angle = calculate_joint_angle(left_elbow, left_shoulder, left_hip)
            right_shoulder_angle = calculate_joint_angle(right_elbow, right_shoulder, right_hip)
            left_elbow_angle = calculate_joint_angle(left_shoulder, left_elbow, left_wrist)
            right_elbow_angle = calculate_joint_angle(right_shoulder, right_elbow, right_wrist)
        
            # Draw additional angles
            processed_frame = draw_angle(processed_frame, left_knee_angle, left_knee)
            processed_frame = draw_angle(processed_frame, right_knee_angle, right_knee)
            processed_frame = draw_angle(processed_frame, left_hip_angle, left_hip)
            processed_frame = draw_angle(processed_frame, right_hip_angle, right_hip)
            processed_frame = draw_angle(processed_frame, left_shoulder_angle, left_shoulder)
            processed_frame = draw_angle(processed_frame, right_shoulder_angle, right_shoulder)
            processed_frame = draw_angle(processed_frame, left_elbow_angle, left_elbow)
            processed_frame = draw_angle(processed_frame, right_elbow_angle, right_elbow)
        
            # Calculate belly button position
            hips_center = [(left_hip[0] + right_hip[0]) / 2, (left_hip[1] + right_hip[1]) / 2]
            shoulders_center = [(left_shoulder[0] + right_shoulder[0]) / 2, (left_shoulder[1] + right_shoulder[1]) / 2]
            belly_button = calculate_belly_button(hips_center, shoulders_center)
            processed_frame = update_cog_trajectory(processed_frame, belly_button, cog_positions)

            # Draw a white line going straight down from the belly button
            line_start = (int(belly_button[0]), int(belly_button[1]))
            line_end = (int(belly_button[0]), int(belly_button[1]) + 200)
            cv2.line(processed_frame, line_start, line_end, (255, 255, 255), 2)

            # Draw triangle with color based on belly button position
            processed_frame = draw_triangle_with_color(processed_frame, belly_button, left_ankle, right_ankle)
            #processed_frame = draw_triangle_with_color(processed_frame, belly_button, left_wrist, right_wrist)

            processed_frame = draw_skeleton(processed_frame, landmarks)
        # Write the processed frame to the output file
        out.write(processed_frame)

    # Release the video capture and writer objects
    cap.release()
    out.release()


## Attempt to Auto-detect Frame Rate - Needs more testing

In [4]:
def main():

    # Initialize video capture and new dimensions
    cap, new_dimensions = initialize_video(input_video_path, desired_width)

    # Get input video frame rate
    input_frame_rate = cap.get(cv2.CAP_PROP_FPS)

    # Initialize MediaPipe pose
    pose = initialize_pose()

    # Define the codec and create VideoWriter object with input frame rate
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, input_frame_rate, new_dimensions)

    try:
        # Process the video
        process_video(cap, out, pose, new_dimensions)
    finally:
        # Clean up. Release video capture and writer
        cap.release()
        out.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


I0000 00:00:1708961200.702224   60329 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1708961200.706664   60378 gl_context.cc:344] GL version: 3.2 (OpenGL ES 3.2 Mesa 22.3.6), renderer: Mesa Intel(R) HD Graphics 5500 (BDW GT2)
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
