In [4]:
import cv2
import mediapipe as mp
import numpy as np

In [2]:
def get_test_frame():
    video_path = '../input/video/video1.mov'
    cap = cv2.VideoCapture(video_path)
    try:
        if not cap.isOpened():
            print("Error: Could not open video.")
            exit()
        
        success, first_frame = cap.read()
        first_frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
        if not success:
            print("Error: Could not read frame.")
            exit()
    except Exception as e:
        print(e)
    finally:
        cap.release()
    return first_frame

In [5]:
def detect_ball(frame, param2=40):
    
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray = cv2.medianBlur(gray, 5)
    rows = gray.shape[0]
    #print(f"Using param2 value: {param2}")
    circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, 1, rows / 8,
                               param1=100, param2=param2,
                               minRadius=1, maxRadius=30)
     
    
    if circles is not None:
        circles = np.uint16(np.around(circles))

        for i in circles[0, :]:
            center = (i[0], i[1])
            # circle center
            cv2.circle(frame, center, 1, (0, 100, 100), 3)
            # circle outline
            radius = i[2]
            cv2.circle(frame, center, radius, (255, 0, 255), 3)
    
 
        if len(circles) > 1:
            print("Error: multiple circles detected")
            exit()
    #cv2.imshow("detected circles", frame)
    #cv2.waitKey(0)
    x, y, r = circles[0, 0]  # Get center x, y and radius of first circle
    
    scaled_r = r * 1.2 #bounding box is best with slightly larger than exact circle diameter
    
    # Convert to bounding box format (x, y, width, height) where x,y is top-left corner (bounding box needs corner not center)
    bbox = (int(x-scaled_r), int(y-scaled_r), int(2.1*scaled_r), int(2.1*scaled_r))
    
 
    return bbox

In [6]:
def failed_track_check(position_history, current_position, stopped_frames_count=0):
    import math
    
    # Configuration parameters
    motion_threshold = 1
    stopped_frames_threshold = 10
    long_term_frames = 30
    long_term_threshold = 5  # Minimum movement required over multiple frames
    
    # Add detailed movement logging
    if position_history:
        prev_position = position_history[-1]
        dx = current_position[0] - prev_position[0]
        dy = current_position[1] - prev_position[1]
        distance = math.sqrt(dx**2 + dy**2)
        
        # Log actual values for debugging
        #print(f"Frame movement: dx={dx}, dy={dy}, distance={distance:.2f}")
        
        # Check long-term movement if we have enough history
        if len(position_history) >= long_term_frames:
            old_position = position_history[-long_term_frames]
            long_dx = current_position[0] - old_position[0]
            long_dy = current_position[1] - old_position[1]
            long_distance = math.sqrt(long_dx**2 + long_dy**2)
            #print(f"{long_term_frames}-frame movement: {long_distance:.2f} pixels")
            
            # If significant movement over multiple frames, reset counter
            if long_distance > long_term_threshold:
                stopped_frames_count = 0
                #print(f"Significant movement over {long_term_frames} frames: {long_distance:.2f} pixels")
                
            # Otherwise, check frame-to-frame movement
            elif distance < motion_threshold:
                stopped_frames_count += 1
                #print(f"MOTION BELOW THRESHOLD: {stopped_frames_count}/{stopped_frames_threshold}")
            else:
                stopped_frames_count = 0
                #print(f"Motion detected: {distance:.2f} pixels")
        
        # If not enough history, just check frame-to-frame
        elif distance < motion_threshold:
            stopped_frames_count += 1
            #print(f"MOTION BELOW THRESHOLD: {stopped_frames_count}/{stopped_frames_threshold}")
        else:
            stopped_frames_count = 0
            #print(f"Motion detected: {distance:.2f} pixels")
    
    # Update position history
    position_history.append(current_position)
    if len(position_history) > 100:
        position_history.pop(0)
    
    # Check if we need to re-detect
    need_to_redetect = stopped_frames_count >= stopped_frames_threshold
    if need_to_redetect:
        print("Motion stopped for too long - triggering re-detection")
    
    # Return tracking status, updated history, and counter
    return not need_to_redetect, position_history, stopped_frames_count

In [7]:
def draw_pose():
    
    mp_pose = mp.solutions.pose
    mp_drawing = mp.solutions.drawing_utils
    pose = mp_pose.Pose(min_detection_confidence=0.5,
                        min_tracking_confidence=0.90,
                        model_complexity=2,)
    
    tracker = cv2.TrackerCSRT_create()
    
    video_path = '../input/video/right_profile_1.mov'
    cap = cv2.VideoCapture(video_path)

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    rotated_width = height
    rotated_height = width
    
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    # Get original video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
    
    # Create VideoWriter object
    output_path = '../output/video/pose_analysis_video1.avi'
    out = cv2.VideoWriter(output_path, fourcc, 30, (rotated_width, rotated_height))
    
    frame_ratio = frame_rate/30
    frame_idx = 0
    bbox = ()
    position_history = []
    stopped_frames_count = 0

    pass_count = 0 #number of frames where detection failed and logic passed
    
    try:  
        while cap.isOpened():
            # Read a frame from the video
            success, frame = cap.read()
            
            if not success:
                print("Can't read video frame. Exiting...")
                break
                
            if frame_idx == 0:
                bbox = detect_ball(frame)
                tracker.init(frame, bbox)  # Initialize only once
            else:
                success, bbox = tracker.update(frame)  # Update the tracker
            
                if success:
                    p1 = (int(bbox[0]), int(bbox[1]))
                    p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
                    cv2.rectangle(frame, p1, p2, (255,0,0), 2, 1)
                    
                    center_x = int(bbox[0] + bbox[2]/2)
                    center_y = int(bbox[1] + bbox[3]/2)
                    current_position = (center_x, center_y)
                    success, updated_position_history, stopped_frames_count = failed_track_check(position_history, current_position, stopped_frames_count)
                    position_history = updated_position_history
                    
                    
                if not success:
                    # Tracking failure, reset tracking
                    print(f"resetting tracking (pass_count: {pass_count})")
                    try:
                        # can change sensitivity if needed based on pass count
                        param2_value = 40 if pass_count > 0 else 40
                        #print(f"Attempting detection with param2={param2_value}, pass_count={pass_count}")
                        
                        new_bbox = detect_ball(frame, param2=param2_value)
                        
                        if new_bbox is not None:
                            # Ball found
                            bbox = new_bbox
                            tracker = cv2.TrackerCSRT_create()
                            success = tracker.init(frame, bbox)
                            position_history = []
                            stopped_frames_count = 0
                            pass_count = 0  # Reset pass count on successful detection
                            #print(f"New ball detected with param2={param2_value} and tracking initialized")
                        else:
                            # Detection failed, increment pass count
                            pass_count += 1
                            print(f"Detection failed, pass_count now: {pass_count}")
                    except Exception as e:
                        print(f"Error during ball detection: {e}")
                        pass_count += 1
                        print(f"Detection exception, pass_count now: {pass_count}")

            if frame_idx % frame_ratio < 1.0:

                frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
                
                # Convert BGR image to RGB
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
                # Process the image to find pose landmarks
                results = pose.process(frame_rgb)
                
                # Draw pose landmarks on the frame
                if results.pose_landmarks:
                    mp_drawing.draw_landmarks(
                        frame, 
                        results.pose_landmarks, 
                        mp_pose.POSE_CONNECTIONS
                    )
                out.write(frame)
                
                # Display the frame
                cv2.imshow('MediaPipe Pose Detection', frame)
                
            frame_idx +=1
            # Exit when 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    
    # Release resources
    except Exception as e:
        print(f"An error occurred: {e}")
    
    finally:
        # Release resources (this will run even if there's an error)
        
        print("Cleaning up resources...")
        cap.release()
        out.release()
        pose.close()
        cv2.destroyAllWindows()
        
        # Force close any remaining windows
        for i in range(5):  # Try multiple times
            cv2.waitKey(1)
        
        print("Done.")

In [8]:
draw_pose()

I0000 00:00:1744761192.484610 8881179 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M4 Pro
W0000 00:00:1744761192.531197 8882151 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744761192.556355 8882151 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744761192.648218 8882155 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
2025-04-15 19:53:13.082 Python[86527:8881179] +[IMKClient subclass]: chose IMKClient_Modern
2025-04-15 19:53:13.082 Python[86527:8881179] +[IMKInputSession subclass]: chose IMKInputSession_Modern


Can't read video frame. Exiting...
Cleaning up resources...
Done.
