In [1]:
import cv2
import numpy as np
import mediapipe as mp
import math
import argparse
from typing import List, Dict, Tuple
import os

class MediaPipeToBVH:
    def __init__(self, fps: int = 30):
        """
        Initialize the MediaPipe to BVH converter.
        
        Args:
            fps: Frames per second for the output BVH file
        """
        self.mp_pose = mp.solutions.pose
        self.fps = fps
        
        # Define the BVH skeleton hierarchy
        self.joint_hierarchy = {
            "Hips": ["Spine", "LeftUpLeg", "RightUpLeg"],
            "Spine": ["Spine1"],
            "Spine1": ["Spine2"],
            "Spine2": ["Neck", "LeftShoulder", "RightShoulder"],
            "Neck": ["Head"],
            "Head": [],
            "LeftShoulder": ["LeftArm"],
            "LeftArm": ["LeftForeArm"],
            "LeftForeArm": ["LeftHand"],
            "LeftHand": [],
            "RightShoulder": ["RightArm"],
            "RightArm": ["RightForeArm"],
            "RightForeArm": ["RightHand"],
            "RightHand": [],
            "LeftUpLeg": ["LeftLeg"],
            "LeftLeg": ["LeftFoot"],
            "LeftFoot": ["LeftToe"],
            "LeftToe": [],
            "RightUpLeg": ["RightLeg"],
            "RightLeg": ["RightFoot"],
            "RightFoot": ["RightToe"],
            "RightToe": []
        }
        
        # MediaPipe landmark indices mapping to BVH joints
        self.landmark_to_joint = {
            "Hips": [23, 24],  # Mid-point of left and right hip
            "Spine": [11],
            "Spine1": [12],
            "Spine2": [12],  # MediaPipe doesn't have enough spine points
            "Neck": [11],  # Approximation
            "Head": [0],
            "LeftShoulder": [11, 13],  # Using multiple landmarks for better positioning
            "LeftArm": [13],
            "LeftForeArm": [15],
            "LeftHand": [19],
            "RightShoulder": [12, 14],
            "RightArm": [14],
            "RightForeArm": [16],
            "RightHand": [20],
            "LeftUpLeg": [23],
            "LeftLeg": [25],
            "LeftFoot": [27],
            "LeftToe": [31],
            "RightUpLeg": [24],
            "RightLeg": [26],
            "RightFoot": [28],
            "RightToe": [32]
        }
        
        # Initialize joint offset data
        self.joint_offsets = {}
        
        # Initialize pose tracking
        self.pose = self.mp_pose.Pose(
            static_image_mode=False,
            model_complexity=2,
            enable_segmentation=False,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )

    def process_video(self, video_path: str) -> List[Dict]:
        """
        Process video and extract pose landmarks.
        
        Args:
            video_path: Path to the video file
            
        Returns:
            List of pose landmarks for each frame
        """
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file {video_path}")
        
        frames = []
        success = True
        
        while success:
            success, image = cap.read()
            if not success:
                break
            
            # Convert the BGR image to RGB
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            
            # Process the image and get pose landmarks
            results = self.pose.process(image)
            
            if results.pose_world_landmarks:
                # Convert landmarks to a dictionary format
                landmarks_dict = {}
                for i, landmark in enumerate(results.pose_world_landmarks.landmark):
                    landmarks_dict[i] = {
                        'x': landmark.x,
                        'y': landmark.y,
                        'z': landmark.z,
                        'visibility': landmark.visibility
                    }
                frames.append(landmarks_dict)
            else:
                # If no pose detected, duplicate the last frame or use a default pose
                if frames:
                    frames.append(frames[-1])
                else:
                    print(f"No pose detected in initial frame. Check your video.")
                    return []
        
        cap.release()
        return frames

    def calculate_joint_positions(self, frames: List[Dict]) -> Dict:
        """
        Calculate BVH joint positions from MediaPipe landmarks.
        
        Args:
            frames: List of pose landmarks for each frame
            
        Returns:
            Dictionary of joint positions for each frame
        """
        joint_positions = []
        
        for frame_idx, landmarks in enumerate(frames):
            frame_positions = {}
            
            # Calculate position for each joint
            for joint_name, landmark_indices in self.landmark_to_joint.items():
                if len(landmark_indices) == 1:
                    # Single landmark
                    lm_idx = landmark_indices[0]
                    frame_positions[joint_name] = {
                        'x': landmarks[lm_idx]['x'],
                        'y': landmarks[lm_idx]['y'],
                        'z': landmarks[lm_idx]['z']
                    }
                else:
                    # Average of multiple landmarks
                    x = sum(landmarks[idx]['x'] for idx in landmark_indices) / len(landmark_indices)
                    y = sum(landmarks[idx]['y'] for idx in landmark_indices) / len(landmark_indices)
                    z = sum(landmarks[idx]['z'] for idx in landmark_indices) / len(landmark_indices)
                    frame_positions[joint_name] = {'x': x, 'y': y, 'z': z}
            
            joint_positions.append(frame_positions)
        
        # Calculate joint offsets from the first frame (T-pose preferred)
        self.calculate_joint_offsets(joint_positions[0])
        
        return joint_positions

    def calculate_joint_offsets(self, first_frame: Dict):
        """
        Calculate joint offsets based on the first frame.
        
        Args:
            first_frame: Joint positions in the first frame
        """
        for joint_name, children in self.joint_hierarchy.items():
            parent_pos = first_frame[joint_name]
            
            for child in children:
                child_pos = first_frame[child]
                
                # Calculate offset from parent to child
                offset_x = child_pos['x'] - parent_pos['x']
                offset_y = child_pos['y'] - parent_pos['y']
                offset_z = child_pos['z'] - parent_pos['z']
                
                self.joint_offsets[child] = (offset_x, offset_y, offset_z)
        
        # Set Hips offset to 0
        self.joint_offsets["Hips"] = (0, 0, 0)

    def calculate_joint_rotations(self, joint_positions: List[Dict]) -> Dict:
        """
        Calculate joint rotations from positions.
        
        Args:
            joint_positions: Joint positions for each frame
            
        Returns:
            Dictionary of joint rotations for each frame
        """
        joint_rotations = []
        
        for frame_idx, frame_positions in enumerate(joint_positions):
            frame_rotations = {}
            
            # Calculate rotation for each joint with children
            for joint_name, children in self.joint_hierarchy.items():
                if not children:
                    # End sites don't have rotations
                    frame_rotations[joint_name] = (0, 0, 0)
                    continue
                
                parent_pos = frame_positions[joint_name]
                
                # Calculate average direction vector from parent to children
                dir_vectors = []
                for child in children:
                    child_pos = frame_positions[child]
                    dir_x = child_pos['x'] - parent_pos['x']
                    dir_y = child_pos['y'] - parent_pos['y']
                    dir_z = child_pos['z'] - parent_pos['z']
                    mag = math.sqrt(dir_x**2 + dir_y**2 + dir_z**2)
                    if mag > 0:
                        dir_vectors.append((dir_x/mag, dir_y/mag, dir_z/mag))
                
                if dir_vectors:
                    # Average direction vectors
                    avg_dir = [sum(v[i] for v in dir_vectors)/len(dir_vectors) for i in range(3)]
                    
                    # Convert direction to Euler angles (ZXY order)
                    # This is a simplified conversion and might need refinement
                    x_rot = math.atan2(-avg_dir[2], math.sqrt(avg_dir[0]**2 + avg_dir[1]**2))
                    y_rot = math.atan2(avg_dir[0], avg_dir[1])
                    z_rot = 0  # Simplified assumption
                    
                    frame_rotations[joint_name] = (
                        math.degrees(x_rot),
                        math.degrees(y_rot),
                        math.degrees(z_rot)
                    )
                else:
                    frame_rotations[joint_name] = (0, 0, 0)
            
            joint_rotations.append(frame_rotations)
        
        return joint_rotations

    def write_bvh_file(self, output_path: str, joint_positions: List[Dict], joint_rotations: List[Dict]):
        """
        Write BVH file from joint positions and rotations.
        
        Args:
            output_path: Path to save the BVH file
            joint_positions: Joint positions for each frame
            joint_rotations: Joint rotations for each frame
        """
        with open(output_path, 'w') as f:
            # Write header
            f.write("HIERARCHY\n")
            
            # Write root joint (Hips)
            f.write("ROOT Hips\n")
            f.write("{\n")
            
            # Write Hips offset
            f.write(f"\tOFFSET {0:.6f} {0:.6f} {0:.6f}\n")
            
            # Write Hips channels
            f.write("\tCHANNELS 6 Xposition Yposition Zposition Zrotation Xrotation Yrotation\n")
            
            # Write child joints recursively
            self._write_joint_hierarchy(f, "Hips", 1)
            
            # End of hierarchy
            f.write("}\n")
            
            # Write motion data
            f.write("MOTION\n")
            f.write(f"Frames: {len(joint_positions)}\n")
            f.write(f"Frame Time: {1.0/self.fps:.6f}\n")
            
            # Write frame data
            for frame_idx in range(len(joint_positions)):
                frame_data = []
                
                # Add root position
                hips_pos = joint_positions[frame_idx]["Hips"]
                frame_data.extend([hips_pos['x'], hips_pos['y'], hips_pos['z']])
                
                # Add all joint rotations in order
                self._add_joint_rotations(frame_data, joint_rotations[frame_idx], "Hips")
                
                # Write frame data line
                f.write(" ".join(f"{val:.6f}" for val in frame_data) + "\n")

    def _write_joint_hierarchy(self, file, joint_name: str, indent_level: int):
        """
        Recursively write joint hierarchy to BVH file.
        
        Args:
            file: File object to write to
            joint_name: Current joint name
            indent_level: Current indentation level
        """
        indent = "\t" * indent_level
        
        # Write children
        for child in self.joint_hierarchy[joint_name]:
            offset = self.joint_offsets[child]
            
            file.write(f"{indent}JOINT {child}\n")
            file.write(f"{indent}{{\n")
            file.write(f"{indent}\tOFFSET {offset[0]:.6f} {offset[1]:.6f} {offset[2]:.6f}\n")
            
            # Write channels
            file.write(f"{indent}\tCHANNELS 3 Zrotation Xrotation Yrotation\n")
            
            # Recursively write child's children
            if self.joint_hierarchy[child]:
                self._write_joint_hierarchy(file, child, indent_level + 1)
            else:
                # End site for leaf joints
                file.write(f"{indent}\tEnd Site\n")
                file.write(f"{indent}\t{{\n")
                file.write(f"{indent}\t\tOFFSET 0.00 0.00 0.00\n")
                file.write(f"{indent}\t}}\n")
            
            file.write(f"{indent}}}\n")

    def _add_joint_rotations(self, frame_data: List[float], rotations: Dict, joint_name: str):
        """
        Recursively add joint rotations to frame data in the correct order.
        
        Args:
            frame_data: List to append rotation data to
            rotations: Dictionary of rotations for the current frame
            joint_name: Current joint name
        """
        # Add current joint rotation
        rot = rotations[joint_name]
        frame_data.extend([rot[2], rot[0], rot[1]])  # ZXY order
        
        # Add children rotations in order
        for child in self.joint_hierarchy[joint_name]:
            self._add_joint_rotations(frame_data, rotations, child)

    def convert_video_to_bvh(self, video_path: str, output_path: str):
        """
        Convert video to BVH file.
        
        Args:
            video_path: Path to input video file
            output_path: Path to save output BVH file
        """
        print(f"Processing video: {video_path}")
        frames = self.process_video(video_path)
        
        if not frames:
            print("No pose data extracted. Check your video.")
            return False
        
        print(f"Extracted pose data from {len(frames)} frames")
        
        joint_positions = self.calculate_joint_positions(frames)
        joint_rotations = self.calculate_joint_rotations(joint_positions)
        
        print(f"Writing BVH file to: {output_path}")
        self.write_bvh_file(output_path, joint_positions, joint_rotations)
        
        print("Conversion complete")
        return True


# def main():
#     parser = argparse.ArgumentParser(description='Convert video to BVH using MediaPipe.')
#     parser.add_argument('--input', type=str, required=True, help='Input video file')
#     parser.add_argument('--output', type=str, help='Output BVH file')
#     parser.add_argument('--fps', type=int, default=30, help='Frames per second for BVH')
    
#     args = parser.parse_args()
    
#     # If output is not specified, use input filename with .bvh extension
#     if not args.output:
#         base_name = os.path.splitext(os.path.basename(args.input))[0]
#         args.output = f"{base_name}.bvh"
    
#     converter = MediaPipeToBVH(fps=args.fps)
#     converter.convert_video_to_bvh(args.input, args.output)


# if __name__ == "__main__":
#     main()

2025-02-26 16:51:28.343813: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1740617488.362117   26700 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1740617488.368392   26700 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-02-26 16:51:28.386993: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
filename = "fight2"

converter = MediaPipeToBVH(fps=30)
converter.convert_video_to_bvh(f"{filename}.mp4", "test.bvh")

I0000 00:00:1740617492.163329   26700 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1740617492.166407   26792 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 Mesa 23.2.1-1ubuntu3.1~22.04.3), renderer: Mesa Intel(R) UHD Graphics (CML GT2)
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1740617492.233940   26772 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1740617492.315656   26771 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


Processing video: fight2.mp4


W0000 00:00:1740617492.387900   26771 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.


Extracted pose data from 43 frames
Writing BVH file to: test.bvh
Conversion complete


True