In [None]:
import os
import json
import numpy as np
import cv2
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np


def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)


  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

  
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])

    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image



keypoint_mapping = {
    'nose': 0,
    'left_eye_smplhf': 2,
    'right_eye_smplhf': 5,
    'left_shoulder': 11,
    'right_shoulder': 12,
    'left_elbow': 13,
    'right_elbow': 14,
    'left_wrist': 15,
    'right_wrist': 16,
    'left_hip': 23,
    'right_hip': 24,
    'left_knee': 25,
    'right_knee': 26,
    'left_ankle': 27,
    'right_ankle': 28
}


PoseLandmarker = mp.tasks.vision.PoseLandmarker
base_options = python.BaseOptions(model_asset_path='pose_landmarker_full.task')
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=mp.tasks.vision.RunningMode.VIDEO)


root_folder = 'TestDataset/Full'


output_file = 'mediapipe_keypoint_distances.txt'
with open(output_file, 'w') as distance_file:

   
    for exercise_folder in os.listdir(root_folder):
        exercise_folder_path = os.path.join(root_folder, exercise_folder)

       
        if os.path.isdir(exercise_folder_path):
            print(f"Processing exercise folder: {exercise_folder}")

           
            data_folder_path = os.path.join(exercise_folder_path, 'data')
            if not os.path.exists(data_folder_path):
                continue  

           
            for video_file in os.listdir(data_folder_path):
                video_file_path = os.path.join(data_folder_path, video_file)

           
                if video_file.endswith('.mp4'):
                    print(f"Processing video: {video_file}")

                
                    json_file_path = os.path.splitext(video_file_path)[0] + '.json'
                    try:
                        with open(json_file_path, 'r') as json_file:
                            annotations = json.load(json_file)
                    except json.JSONDecodeError:
                        print(f"Error decoding JSON in file: {json_file_path}")
                        continue 
                    

                  
                    cap = cv2.VideoCapture(video_file_path)
                    frame_timestamp_ms = 0 
                    video_out_folder = os.path.join(exercise_folder_path, 'MediapipeVideos')
                    if not os.path.exists(video_out_folder):
                        os.makedirs(video_out_folder)
                                
                    video_out_path = os.path.join(video_out_folder, video_file)
                    fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
                    fps = int(cap.get(cv2.CAP_PROP_FPS))
                    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                    out = cv2.VideoWriter(video_out_path, fourcc, fps, (frame_width, frame_height))
                    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

                    print(f"Total frames: {frame_count}")
                    
                   


                    with PoseLandmarker.create_from_options(options) as landmarker: 
                        while True:
                            ret, frame = cap.read()
                            if not ret:
                                break
                                
                                
                                
                         
                            original_width, original_height = frame.shape[1], frame.shape[0]

                           

                           
                            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                         
                      
                            image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb.astype(np.uint8))

                          
                            frame_timestamp_ms = int(cap.get(cv2.CAP_PROP_POS_MSEC))
                            frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) *3
                           
                            detection_result = landmarker.detect_for_video(image, frame_timestamp_ms)

                            if detection_result.pose_landmarks:
                                
                              
                               
                                landmarks = [landmark for landmark in detection_result.pose_landmarks[0]]
                               
                                frame_rgb = draw_landmarks_on_image(frame_rgb,detection_result)



                             

                                if frame_number < len(annotations["annotations"]):
                                    real_keypoints_frame_data = annotations["annotations"][frame_number]
                                else:
                                    print(f"No annotation found for frame {frame_number} in video {video_file}")
                                    out.write(cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR))
                                    
                                    continue  

                                if "armature_keypoints" in real_keypoints_frame_data:
                                    real_keypoints = real_keypoints_frame_data["armature_keypoints"]
                                else:
                                    print(f"No keypoints found for frame {frame_number} in video {video_file}")
                                    out.write(cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR))
                                
                                
                                    continue


                                
                               
                                print("Processing keypoints")
                                for keypoint_name in real_keypoints:
                                    if keypoint_name in keypoint_mapping:
                                        mediapipe_index = keypoint_mapping[keypoint_name]
                                        

                                     

                                      
                                        detected_x = landmarks[mediapipe_index].x *original_width
                                        detected_y = landmarks[mediapipe_index].y *original_height
                                       

                                    else:
                                        continue

                                    
                                    ground_truth_x = real_keypoints[keypoint_name]['x'] 
                                    ground_truth_y = real_keypoints[keypoint_name]['y'] 
                                    cv2.circle(frame_rgb, (int(ground_truth_x), int(ground_truth_y)), 3, (255, 0, 0), -1)
                                    
                                
                            
                                 
                                    euclidean_distance = ((detected_x  - ground_truth_x) ** 2 + 
                                (detected_y  - ground_truth_y) ** 2) ** 0.5
                                   

                                  
                                  
                                    normalization_factor = max(
                                    ((real_keypoints['left_shoulder']['x'] - real_keypoints['right_hip']['x'])**2 +
                                (real_keypoints['left_shoulder']['y'] - real_keypoints['right_hip']['y'])**2)**0.5,
                                    ((real_keypoints['right_shoulder']['x'] - real_keypoints['left_hip']['x'])**2 +
                                (real_keypoints['right_shoulder']['y'] - real_keypoints['left_hip']['y'])**2)**0.5
                                )
                                    
                                    print(f'Writing to file: Video: {video_file_path}, Frame: {frame_number}, Keypoint {keypoint_name}, Euclidean Distance = {euclidean_distance},Torso Diameter = {normalization_factor}')

                        
                                    distance_file.write(f'Video: {video_file_path}, Frame: {frame_number}, Keypoint {keypoint_name}, Euclidean Distance = {euclidean_distance},Torso Diameter = {normalization_factor}\n')
                                    
                                out.write(cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR))
                             
                            else:
                                    out.write(cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR))

                       
                        cap.release()
                        out.release()


distance_file.close()
    