In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.156-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading n

In [None]:
!pip install mediapipe



In [None]:
# import libraries
import mediapipe as mp
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import cv2
import torch
import math
import ultralytics
from ultralytics import YOLO
import shutil
import os
import time

In [None]:
# global variables
model_path = 'pose_landmarker_full.task'

In [None]:
# convert input image to mediapipe.Image object
def create_mp_image(image_path):
    mp_image = mp.Image.create_from_file(image_path)
    return mp_image

def draw_landmarks_on_image(rgb_image, detection_result):
    if not detection_result.pose_landmarks:
        return rgb_image

    pose_landmarks_list = detection_result.pose_landmarks
    annotated_image = np.copy(rgb_image)

    # loop through detected poses
    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]

        # draw landmarks
        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
        ])

        solutions.drawing_utils.draw_landmarks(
            annotated_image,
            pose_landmarks_proto,
            solutions.pose.POSE_CONNECTIONS,
            solutions.drawing_styles.get_default_pose_landmarks_style()
        )
    return annotated_image

def calc_euclidean_distance(x1, y1, x2, y2):
    '''
        returns the euclidean distance between two points

        Args:
            x1 (float): x coordinate of first point
            y1 (float): y coordinate of first point
            x2 (float): x coordinate of second point
            y2 (float): y coordinate of second point

        Returns:
            float: euclidean distance between the two points
    '''
    return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)

def get_video_properties(cap):
    '''
        returns the video properties

        Args:
            cap (cv2.VideoCapture): video capture object

        Returns:
            tuple: fps, width, height, total_frames
    '''
    original_fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    return original_fps, width, height, total_frames

def add_text_to_frame(display_text, annotated_frame, font=cv2.FONT_HERSHEY_SIMPLEX, font_scale=1.5, thickness=2, bgr=(218, 232, 0)):
    '''
        adds text to a frame

        Args:
            display_text (str): text to be displayed
            font (int): font type
            font_scale (float): font scale
            thickness (int): thickness of the text
            annotated_frame (numpy.ndarray): frame to add text to
            bgr (tuple): background color of the text
    '''
    (text_width, text_height), baseline = cv2.getTextSize(display_text, font, font_scale, thickness)
    line_height = text_height + baseline + 5
    y_offset = 50

    for i, line in enumerate(display_text.split('\n')):
        cv2.putText(annotated_frame, line,
                        org=(10, y_offset + i * line_height),
                        fontFace=font,
                        fontScale=font_scale,
                        color=bgr,
                        thickness=thickness)

def get_classification(pose_detector, det_model, frame, image_path):

  # get mp image
  mp_image = create_mp_image(image_path)

  det_results = det_model(frame, verbose=False)
  pose_result = pose_detector.detect(mp_image)

  light_coordinates = [] # stores tuples of ((x_mid, y_mid, width, height))
  light_detected = False
  l_mid_x, l_mid_y = 0, 0 # midpoint of light bounding box
  annotated_frame = frame
  total_mid_x, total_mid_y = 0, 0

  if det_results and len(det_results) > 0:
    det_result = det_results[0]
    if det_result.boxes is not None and len(det_result.boxes) > 0:
        annotated_frame = det_result.plot() # plot detection results on frame

        # calculate average midpoint of all light detections
        class_indices = det_result.boxes.cls.int().cpu().numpy() if det_result.boxes.cls is not None else []
        for i, class_idx in enumerate(class_indices):
          class_name = det_model.names[class_idx]

          if class_name == 'light':
            light_coordinates.append(det_result.boxes.xyxyn[i].cpu().numpy())
            light_detected = True

  if light_detected:
    for l_x1, l_y1, l_x2, l_y2 in light_coordinates:
      curr_x_mid = (l_x1 + l_x2) / 2
      curr_y_mid = (l_y1 + l_y2) / 2
      total_mid_x += curr_x_mid
      total_mid_y += curr_y_mid
    l_mid_x, l_mid_y = total_mid_x / len(light_coordinates), total_mid_y / len(light_coordinates)

    if pose_result.pose_landmarks is None or len(pose_result.pose_landmarks) == 0:
      display_text = 'light detected but no pose landmarks found'
      class_name = 'unknown'
      return annotated_frame, display_text, class_name

    pose_landmarks = pose_result.pose_landmarks[0]  # Get first detected pose

        # Check if we have enough landmarks (MediaPipe pose has 33 landmarks, indices 0-32)
    if len(pose_landmarks) > 12:  # Need at least landmarks up to index 12
      # get left shoulder, right shoulder, left eye and right eye landmarks
      left_shoulder = pose_landmarks[11]
      right_shoulder = pose_landmarks[12]
      left_eye = pose_landmarks[2]
      right_eye = pose_landmarks[5]

      # check if landmarks are detected
      left_eye_present = False if left_eye.visibility < 0.3 else True
      right_eye_present = False if right_eye.visibility < 0.3 else True
      left_shoulder_present = False if left_shoulder.visibility < 0.3 else True
      right_shoulder_present = False if right_shoulder.visibility < 0.3 else True

      if left_shoulder_present and right_shoulder_present:
        # measure distance between light and shoulders
        left_shoulder_dist = calc_euclidean_distance(l_mid_x, l_mid_y, left_shoulder.x, left_shoulder.y)
        right_shoulder_dist = calc_euclidean_distance(l_mid_x, l_mid_y, right_shoulder.x, right_shoulder.y)

        if left_shoulder_dist < right_shoulder_dist:
          class_name = 'left'
        else:
          class_name = 'right'

        if left_eye_present and right_eye_present:
          class_name += '_AP'
        else:
          class_name += '_outlet'


        display_text = (
          f'Class: {class_name}\n'
          f'Left Shoulder Distance: {left_shoulder_dist:.2f} p\n'
          f'Right Shoulder Distance: {right_shoulder_dist:.2f} p\n'
          f'Left Eye Present: {left_eye_present}\n'
          f'Right Eye Present: {right_eye_present}'
        )

      else:
        display_text = 'Pose detected but no shoulders visible'
        class_name = 'unknown'
    else:
      display_text = 'Pose detected but insufficient landmarks'
      class_name = 'unknown'
  else:
    display_text = 'No light detected'
    class_name = 'unknown'

  return annotated_frame, display_text, class_name
def image_pose_light_classify(pose_model_path, det_model_path, img_path, output_path):
    '''
        real time classification using pose estimation and x-ray light detection. saves images in input_path, and saves to output_path

        Args:
            pose_model_path (str): path to the pose estimation model
            det_model_path (str): path to the x-ray light detection model
            images_path (str): path to the images directory
            output_path (str): path to the output video file
            display_frame (bool): whether to display the frame or not

        Returns:
            None
    '''

    # load models

    base_options = python.BaseOptions(model_asset_path=pose_model_path)
    options = vision.PoseLandmarkerOptions(base_options=base_options, output_segmentation_masks=True)
    pose_model = vision.PoseLandmarker.create_from_options(options)

    det_model = YOLO(det_model_path)

    # load image
    frame = cv2.imread(img_path)

    annotated_frame, display_text, class_name = get_classification(pose_model, det_model, frame, img_path)

    add_text_to_frame(font_scale=0.5, display_text=display_text, annotated_frame=annotated_frame)

    # save image
    cv2.imwrite(output_path, annotated_frame)

    return class_name

def pose_detect_batch_classify(input_dir, output_dir, pose_model_path, det_model_path,
                               classes=['left_AP', 'right_AP', 'left_outlet', "right_outlet"]):
    '''
        classify images in input_dir using pose estimation and x-ray light detection. saves images in output_dir
        and prints the per class accuracy

        Args:
            input_dir (str): path to the input directory
            output_dir (str): path to the output directory
            pose_model_path (str): path to the pose estimation model
            det_model_path (str): path to the x-ray light detection model

        Returns:
            None
    '''
    total_processed = 0
    total_correct = 0
    for class_name in classes:
        curr_input_path = os.path.join(input_dir, class_name)
        curr_output_path = os.path.join(output_dir, class_name)
        shutil.rmtree(curr_output_path, ignore_errors=True)
        os.makedirs(curr_output_path, exist_ok=True)

        print('Processing class:', class_name, 'from', input_dir)

        imgs = [img for img in os.listdir(curr_input_path) if img.endswith('.jpg')]

        total_images = len(imgs)
        correct_classified_count = 0
        processed_count = 0

        print(f'Total images: {total_images}')
        for img in imgs:
            img_path = os.path.join(curr_input_path, img)
            img_output_path = os.path.join(curr_output_path, img)
            predicted_class_name = image_pose_light_classify(pose_model_path=pose_model_path, det_model_path=det_model_path,
                                                img_path=img_path, output_path=img_output_path)

            if predicted_class_name == class_name:
                correct_classified_count += 1

            processed_count += 1

            if processed_count % 30 == 0:
                print(f'Processed {processed_count} of {total_images}')

        print(f'Correctly classified images: {correct_classified_count}')
        print(f'Class accuracy: {correct_classified_count / total_images}')

        total_processed += total_images
        total_correct += correct_classified_count

    print(f'Total processed: {total_processed}')
    print(f'Total correct: {total_correct}')
    print(f'Total Accuracy: {total_correct / total_processed}')

def video_pose_light_classify(pose_model_path, det_model_path, video_path, output_path, display_frame=False):
    '''
        real time classification using pose estimation and light detection

        Args:
            pose_model_path (str): path to the pose estimation model
            det_model_path (str): path to the light detection model
            video_path (str): path to the video file
            output_path (str): path to the output video file
            display_frame (bool): whether to display the frame or not

        Returns:
            None
    '''
    # load models
    base_options = python.BaseOptions(model_asset_path=pose_model_path)
    options = vision.PoseLandmarkerOptions(base_options=base_options, output_segmentation_masks=True)
    pose_model = vision.PoseLandmarker.create_from_options(options)

    det_model = YOLO(det_model_path)

    # open video capture
    cap = cv2.VideoCapture(video_path)

    # get video properties
    original_fps, width, height, total_frames = get_video_properties(cap)

    print(f"Video properties: fps={original_fps}, width={width}, height={height}, total frames={total_frames}")

    # initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_path, fourcc, original_fps, (width, height))

    if not video_writer.isOpened():
        print('Error opening video writer')
        cap.release()
        return

    frame_count = 0
    processed_count = 0
    start_time = time.time()

    # process video
    try:
        while cap.isOpened():
            success, frame = cap.read()

            if not success:
                print('Video frame is empty or video processing successfully completed')
                break

            frame_count += 1

            annotated_frame, display_text, class_name = get_classification_v2(pose_model, det_model, frame)

            # add text to frame
            add_text_to_frame(display_text=display_text, annotated_frame=annotated_frame)

            if annotated_frame.shape[:2] != (height, width):
                annotated_frame = cv2.resize(annotated_frame, (width, height))

            video_writer.write(annotated_frame) # save frame to output video
            processed_count += 1

            # print progress
            if frame_count % 30 == 0:
                print(f"Processed {processed_count} frames / {frame_count} total")

            # display frame in real time if requested
            if display_frame:
                final_frame = resize_with_aspect_ratio(annotated_frame, width=500, height=500)

                cv2.imshow('YOLO11 Real-time', final_frame)
                key = cv2.waitKey(1) & 0xFF

                if key == ord('q'):
                    print('Processing stopped by user')
                    break
                elif key == ord('p'):
                    print('Processing paused, press any key to resume')
                    cv2.waitKey(0)
                    print('Processing resumed')

    except Exception as e:
        e.with_traceback(e)
    finally:
        end_time = time.time()
        cap.release()
        video_writer.release()
        cv2.destroyAllWindows()
        print('Video processing completed')
        print(f'Input frames: {frame_count}')
        print(f'Processed frames: {processed_count}')
        print(f'Output saved to: {output_path}')
        print('Inference speed: ', ((end_time - start_time) * 10**3) / frame_count, 'ms/frame')




In [None]:
image_pose_light_classify(pose_model_path='pose_landmarker_full.task', det_model_path='best.pt', img_path='/content/Left 2_well_2.jpg', output_path='/content/output_img.jpg')

'left_AP'

In [None]:
!unzip /content/input_videos.zip

Archive:  /content/input_videos.zip
   creating: input_videos/
  inflating: input_videos/seg_test_vid.mp4  
  inflating: input_videos/test_video.mp4  
  inflating: input_videos/test_video2.mp4  
  inflating: input_videos/test_video3.mp4  
  inflating: input_videos/test_video4.mp4  
  inflating: input_videos/test_video5.mp4  


In [None]:
for input_video in os.listdir('/content/input_videos'):
  input_video_path = os.path.join('/content/input_videos', input_video)
  output_video_path = os.path.join('/content/output_videos', input_video)
  video_pose_light_classify(pose_model_path='/content/pose_landmarker_full.task', det_model_path='/content/best.pt',
                              video_path=input_video_path, output_path=output_video_path)

Video properties: fps=30.0, width=1080, height=1920, total frames=333
Error opening video writer
Video properties: fps=0.0, width=0, height=0, total frames=0
Error opening video writer
Video properties: fps=30.0, width=1080, height=1920, total frames=337
Error opening video writer
Video properties: fps=30.21165555233401, width=1080, height=1920, total frames=521
Error opening video writer
Video properties: fps=30.011688030155746, width=1080, height=1920, total frames=1035
Error opening video writer


In [None]:

pose_detect_batch_classify(input_dir='/content/test/test', output_dir='/content/mediapipe_pose/test',
                           det_model_path='/content/best.pt', pose_model_path='/content/pose_landmarker_full.task')

Processing class: left_AP from /content/test/test
Total images: 45
Processed 30 of 45
Correctly classified images: 4
Class accuracy: 0.08888888888888889
Processing class: right_AP from /content/test/test
Total images: 30
Processed 30 of 30
Correctly classified images: 20
Class accuracy: 0.6666666666666666
Processing class: left_outlet from /content/test/test
Total images: 133
Processed 30 of 133
Processed 60 of 133
Processed 90 of 133
Processed 120 of 133
Correctly classified images: 0
Class accuracy: 0.0
Processing class: right_outlet from /content/test/test
Total images: 134
Processed 30 of 134
Processed 60 of 134
Processed 90 of 134
Processed 120 of 134
Correctly classified images: 0
Class accuracy: 0.0
Total processed: 342
Total correct: 24
Total Accuracy: 0.07017543859649122


In [None]:
!zip -r /content/mediapipe_pose.zip /content/mediapipe_pose

  adding: content/mediapipe_pose/ (stored 0%)
  adding: content/mediapipe_pose/test/ (stored 0%)
  adding: content/mediapipe_pose/test/left_outlet/ (stored 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left2_well_24.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left 15_well_14.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left 15_int_8.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left 15_well_20.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left2_internally_int_10.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left2_externally_ext_2.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left 15_well_12.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left 15_ext_4.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left2_well_18.jpg (deflated 0%)
  adding: content/mediapipe_pose/test/left_outlet/Left 15_int_12.jpg (deflate