In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import tqdm
import math
import os
import time

class JointAngleEstimation():
    def __init__(self, input_mp4: str, output_csv: str):
        self.input_mp4 = input_mp4
        self.output_csv = output_csv

    def calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points."""
        v1 = np.array([p1.x, p1.y]) - np.array([p2.x, p2.y])
        v2 = np.array([p3.x, p3.y]) - np.array([p2.x, p2.y])
        dot_product = np.dot(v1, v2)
        magnitude_v1 = np.linalg.norm(v1)
        magnitude_v2 = np.linalg.norm(v2)
        angle_rad = math.acos(dot_product / (magnitude_v1 * magnitude_v2))
        angle_deg = math.degrees(angle_rad)
        return angle_deg

    def process_frame(self, frame_num, frame, hands, mp_drawing, draw_landmarks=False):
        image = frame.copy()
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = hands.process(image_rgb)

        frame_angles = {}
        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                if draw_landmarks:
                    mp_drawing.draw_landmarks(image, hand_landmarks, mp.solutions.hands.HAND_CONNECTIONS)
                    for lm in hand_landmarks.landmark:
                        h, w, _ = image.shape
                        cx, cy = int(lm.x * w), int(lm.y * h)
                        cv2.circle(image, (cx, cy), radius=5, color=(0, 255, 0), thickness=-1)

                landmarks = hand_landmarks.landmark
                angles = {
                    'Thumb_CMC': self.calculate_angle(landmarks[2], landmarks[4], landmarks[3]),
                    'Index_MCP': self.calculate_angle(landmarks[5], landmarks[6], landmarks[7]),
                    'Middle_MCP': self.calculate_angle(landmarks[9], landmarks[10], landmarks[11]),
                    'Ring_MCP': self.calculate_angle(landmarks[13], landmarks[14], landmarks[15]),
                    'Pinky_MCP': self.calculate_angle(landmarks[17], landmarks[18], landmarks[19]),
                }
                frame_angles.update(angles)

        return {'Frame': frame_num, **frame_angles}, image


    def run(self, output_video: str = None) -> None:
        start_time = time.time()

        cap = cv2.VideoCapture(self.input_mp4)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        out = None
        if output_video:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_video, fourcc, fps, (frame_width, frame_height))

        results = []
        mp_hands = mp.solutions.hands
        mp_drawing = mp.solutions.drawing_utils

        with mp_hands.Hands(static_image_mode=True, max_num_hands=1) as hands:
            frame_num = 0
            with tqdm.tqdm(total=total_frames, desc="Processing Video", dynamic_ncols=True) as pbar:
                while True:
                    success, frame = cap.read()
                    if not success:
                        break

                    frame_data, processed_frame = self.process_frame(frame_num, frame, hands, mp_drawing, draw_landmarks=bool(out))
                    results.append(frame_data)

                    if out:
                        out.write(processed_frame)

                    frame_num += 1
                    pbar.update(1)

        cap.release()
        if out:
            out.release()
            print(f"Processed video saved to {output_video}")

        df = pd.DataFrame(results)
        df.to_csv(self.output_csv, index=False)
        print(f"Angles saved to {self.output_csv}")

        print(f"Total time taken: {time.time() - start_time:.2f} seconds")



# Example usage:
input_m4 = 'April07_2025_trial1_int5s.mp4'
name, _ = os.path.splitext(input_m4)

ja = JointAngleEstimation(input_m4, f"{name}_angles.csv")

# Only calculate angles and save to CSV:
# ja.run()

# Calculate angles, save to CSV, and save processed video:
ja.run(output_video=f"{name}_processed.mp4")

I0000 00:00:1744066294.182666 3959996 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M1 Pro
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
Processing Video:   0%|          | 0/22641 [00:00<?, ?it/s]W0000 00:00:1744066294.207872 3960329 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744066294.215161 3960328 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744066294.278496 3960327 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
Processing Video: 100%|██████████| 22641/22641 [13:46<00:00, 27.40it/s]


Angles saved to April07_2025_trial1_int5s_angles.csv
Total time taken: 826.41 seconds
