# Sit and Reach Fitness Test Analysis System

This notebook provides a comprehensive system for analyzing "Sit and Reach" fitness test videos using MediaPipe Pose estimation.

## Features:
- **Pose Detection**: Uses MediaPipe Pose for robust human keypoint detection
- **Angle Measurement**: Accurately measures waist/torso angle throughout the test
- **Cycle Counting**: Detects and counts proper reach attempts
- **Threshold Validation**: Validates reaches against configurable thresholds
- **Visualization**: Annotates video frames with landmarks and measurements
- **Comprehensive Analysis**: Provides detailed statistics and insights

## Requirements:
- Python 3.7+
- MediaPipe
- OpenCV
- NumPy
- Matplotlib
- Pandas (for data analysis)


In [17]:
# Import necessary libraries
import cv2
import mediapipe as mp
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from typing import List, Tuple, Dict, Optional
import math
from collections import deque
import os
import json
from datetime import datetime

# Set up matplotlib for better plots
plt.style.use('default')
%matplotlib inline

# Initialize MediaPipe
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

print("✅ All libraries imported successfully!")
print(f"OpenCV version: {cv2.__version__}")
print(f"MediaPipe version: {mp.__version__}")
print(f"NumPy version: {np.__version__}")


✅ All libraries imported successfully!
OpenCV version: 4.11.0
MediaPipe version: 0.10.9
NumPy version: 1.24.3


## Configuration and Constants

Define key parameters for the analysis including pose detection settings, angle thresholds, and visualization options.


In [18]:
# Configuration parameters
class SitReachConfig:
    """Configuration class for Sit and Reach analysis"""
    
    # MediaPipe Pose settings
    POSE_CONFIDENCE = 0.7
    POSE_TRACKING_CONFIDENCE = 0.5
    
    # Angle thresholds (in degrees)
    MIN_REACH_ANGLE = 45.0  # Minimum angle to consider as a reach attempt
    PROPER_REACH_THRESHOLD = 60.0  # Angle threshold for "proper" reach
    MAX_REASONABLE_ANGLE = 120.0  # Maximum reasonable angle (for filtering outliers)
    
    # Cycle detection parameters
    SMOOTHING_WINDOW = 5  # Frames to smooth angle measurements
    MIN_CYCLE_DURATION = 15  # Minimum frames between reach cycles
    ANGLE_CHANGE_THRESHOLD = 10.0  # Minimum angle change to detect movement
    
    # Visualization settings
    LANDMARK_COLOR = (0, 255, 0)  # Green
    CONNECTION_COLOR = (255, 0, 0)  # Red
    ANGLE_TEXT_COLOR = (255, 255, 255)  # White
    PROPER_REACH_COLOR = (0, 255, 0)  # Green
    IMPROPER_REACH_COLOR = (0, 0, 255)  # Red
    
    # Output settings
    SAVE_ANNOTATED_VIDEO = True
    SAVE_ANALYSIS_PLOTS = True
    EXPORT_DATA_CSV = True

config = SitReachConfig()
print("✅ Configuration loaded successfully!")


✅ Configuration loaded successfully!


## Core Functions

### 1. Pose Extraction and Processing


In [19]:
class PoseExtractor:
    """
    Handles pose extraction and landmark processing using MediaPipe Pose
    """
    
    def __init__(self, confidence=0.7, tracking_confidence=0.5):
        """
        Initialize the pose extractor
        
        Args:
            confidence: Minimum confidence for pose detection
            tracking_confidence: Minimum confidence for pose tracking
        """
        self.pose = mp_pose.Pose(
            static_image_mode=False,
            model_complexity=2,  # Higher complexity for better accuracy
            enable_segmentation=False,
            min_detection_confidence=confidence,
            min_tracking_confidence=tracking_confidence
        )
        
    def extract_pose(self, frame: np.ndarray) -> Optional[Dict]:
        """
        Extract pose landmarks from a single frame
        
        Args:
            frame: Input video frame (BGR format)
            
        Returns:
            Dictionary containing landmarks and metadata, or None if no pose detected
        """
        # Convert BGR to RGB for MediaPipe
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Process the frame
        results = self.pose.process(rgb_frame)
        
        if results.pose_landmarks is None:
            return None
            
        # Extract landmark coordinates
        landmarks = {}
        for idx, landmark in enumerate(results.pose_landmarks.landmark):
            landmarks[idx] = {
                'x': landmark.x,
                'y': landmark.y,
                'z': landmark.z,
                'visibility': landmark.visibility
            }
        
        return {
            'landmarks': landmarks,
            'pose_landmarks': results.pose_landmarks,
            'frame_shape': frame.shape
        }
    
    def get_landmark_coords(self, pose_data: Dict, landmark_idx: int, frame_shape: Tuple) -> Optional[Tuple[int, int]]:
        """
        Convert normalized landmark coordinates to pixel coordinates
        
        Args:
            pose_data: Pose data from extract_pose()
            landmark_idx: MediaPipe landmark index
            frame_shape: Shape of the frame (height, width, channels)
            
        Returns:
            Tuple of (x, y) pixel coordinates, or None if landmark not visible enough
        """
        if pose_data is None or landmark_idx not in pose_data['landmarks']:
            return None
            
        landmark = pose_data['landmarks'][landmark_idx]
        
        # Check visibility threshold
        if landmark['visibility'] < 0.5:
            return None
            
        # Convert to pixel coordinates
        h, w = frame_shape[:2]
        x = int(landmark['x'] * w)
        y = int(landmark['y'] * h)
        
        return (x, y)
    
    def close(self):
        """Clean up resources"""
        self.pose.close()

# Test the pose extractor
pose_extractor = PoseExtractor(
    confidence=config.POSE_CONFIDENCE,
    tracking_confidence=config.POSE_TRACKING_CONFIDENCE
)
print("✅ Pose extractor initialized successfully!")


✅ Pose extractor initialized successfully!


### 2. Angle Calculation Functions


In [20]:
class AngleCalculator:
    """
    Handles angle calculations for the sit and reach test
    """
    
    @staticmethod
    def calculate_angle(point1: Tuple[float, float], 
                       point2: Tuple[float, float], 
                       point3: Tuple[float, float]) -> float:
        """
        Calculate angle between three points (point2 is the vertex)
        
        Args:
            point1: First point coordinates (x, y)
            point2: Vertex point coordinates (x, y)
            point3: Third point coordinates (x, y)
            
        Returns:
            Angle in degrees
        """
        # Create vectors
        vector1 = np.array([point1[0] - point2[0], point1[1] - point2[1]])
        vector2 = np.array([point3[0] - point2[0], point3[1] - point2[1]])
        
        # Calculate angle using dot product
        dot_product = np.dot(vector1, vector2)
        norms = np.linalg.norm(vector1) * np.linalg.norm(vector2)
        
        # Avoid division by zero
        if norms == 0:
            return 0.0
            
        # Calculate angle in radians then convert to degrees
        cos_angle = np.clip(dot_product / norms, -1.0, 1.0)
        angle_rad = np.arccos(cos_angle)
        angle_deg = np.degrees(angle_rad)
        
        return angle_deg
    
    @staticmethod
    def calculate_waist_angle(pose_data: Dict, frame_shape: Tuple, 
                             pose_extractor: PoseExtractor) -> Optional[float]:
        """
        Calculate the waist/torso angle for sit and reach analysis
        
        The angle is measured between:
        - Hip to shoulder line (torso)
        - Hip to knee line (thigh)
        
        Args:
            pose_data: Pose landmarks data
            frame_shape: Frame dimensions
            pose_extractor: PoseExtractor instance for coordinate conversion
            
        Returns:
            Waist angle in degrees, or None if landmarks not available
        """
        if pose_data is None:
            return None
            
        # MediaPipe landmark indices
        # Using left side landmarks (can be adjusted based on video orientation)
        LEFT_SHOULDER = 11
        LEFT_HIP = 23
        LEFT_KNEE = 25
        
        # Get landmark coordinates
        shoulder = pose_extractor.get_landmark_coords(pose_data, LEFT_SHOULDER, frame_shape)
        hip = pose_extractor.get_landmark_coords(pose_data, LEFT_HIP, frame_shape)
        knee = pose_extractor.get_landmark_coords(pose_data, LEFT_KNEE, frame_shape)
        
        # Check if all landmarks are available
        if not all([shoulder, hip, knee]):
            # Try right side landmarks as backup
            RIGHT_SHOULDER = 12
            RIGHT_HIP = 24
            RIGHT_KNEE = 26
            
            shoulder = pose_extractor.get_landmark_coords(pose_data, RIGHT_SHOULDER, frame_shape)
            hip = pose_extractor.get_landmark_coords(pose_data, RIGHT_HIP, frame_shape)
            knee = pose_extractor.get_landmark_coords(pose_data, RIGHT_KNEE, frame_shape)
            
            if not all([shoulder, hip, knee]):
                return None
        
        # Calculate angle with hip as vertex
        angle = AngleCalculator.calculate_angle(shoulder, hip, knee)
        
        # For sit and reach, we want the forward bending angle
        # Adjust angle calculation based on the expected range
        if angle > 90:
            angle = 180 - angle
            
        return angle
    
    @staticmethod
    def smooth_angles(angles: List[float], window_size: int = 5) -> List[float]:
        """
        Apply smoothing to angle measurements to reduce noise
        
        Args:
            angles: List of angle measurements
            window_size: Size of the smoothing window
            
        Returns:
            Smoothed angle measurements
        """
        if len(angles) < window_size:
            return angles
            
        smoothed = []
        for i in range(len(angles)):
            start_idx = max(0, i - window_size // 2)
            end_idx = min(len(angles), i + window_size // 2 + 1)
            window_angles = [a for a in angles[start_idx:end_idx] if a is not None]
            
            if window_angles:
                smoothed.append(np.mean(window_angles))
            else:
                smoothed.append(angles[i])
                
        return smoothed
    
    @staticmethod
    def is_valid_angle(angle: Optional[float]) -> bool:
        """
        Check if an angle measurement is valid
        
        Args:
            angle: Angle measurement to validate
            
        Returns:
            True if angle is valid, False otherwise
        """
        if angle is None:
            return False
        return 0 <= angle <= config.MAX_REASONABLE_ANGLE

# Test angle calculator
angle_calc = AngleCalculator()
print("✅ Angle calculator initialized successfully!")

# Test angle calculation with sample points
test_angle = angle_calc.calculate_angle((0, 0), (1, 0), (1, 1))
print(f"Test angle calculation: {test_angle:.1f}° (expected: 90.0°)")


✅ Angle calculator initialized successfully!
Test angle calculation: 90.0° (expected: 90.0°)


### 3. Cycle Detection and Reach Counting


In [21]:
class CycleDetector:
    """
    Detects reach cycles and counts proper reaches in the sit and reach test
    """
    
    def __init__(self, min_cycle_duration: int = 15, angle_change_threshold: float = 10.0):
        """
        Initialize cycle detector
        
        Args:
            min_cycle_duration: Minimum frames between reach cycles
            angle_change_threshold: Minimum angle change to detect movement
        """
        self.min_cycle_duration = min_cycle_duration
        self.angle_change_threshold = angle_change_threshold
        self.reset()
    
    def reset(self):
        """Reset detector state"""
        self.angle_history = deque(maxlen=100)  # Keep last 100 angle measurements
        self.reach_cycles = []
        self.current_cycle = None
        self.last_cycle_end = -self.min_cycle_duration
        
    def detect_peaks_and_valleys(self, angles: List[float]) -> Tuple[List[int], List[int]]:
        """
        Detect peaks (maximum reach) and valleys (minimum reach) in angle data
        
        Args:
            angles: List of angle measurements
            
        Returns:
            Tuple of (peak_indices, valley_indices)
        """
        if len(angles) < 3:
            return [], []
            
        peaks = []
        valleys = []
        
        for i in range(1, len(angles) - 1):
            if angles[i] is None:
                continue
                
            prev_angle = angles[i-1] if angles[i-1] is not None else angles[i]
            next_angle = angles[i+1] if angles[i+1] is not None else angles[i]
            
            # Peak detection (local maximum)
            if angles[i] > prev_angle and angles[i] > next_angle:
                if angles[i] >= config.MIN_REACH_ANGLE:
                    peaks.append(i)
            
            # Valley detection (local minimum)
            elif angles[i] < prev_angle and angles[i] < next_angle:
                valleys.append(i)
                
        return peaks, valleys
    
    def process_frame_angle(self, angle: Optional[float], frame_idx: int) -> Dict:
        """
        Process a single frame's angle measurement for cycle detection
        
        Args:
            angle: Current frame's waist angle
            frame_idx: Current frame index
            
        Returns:
            Dictionary with cycle detection results for this frame
        """
        result = {
            'frame_idx': frame_idx,
            'angle': angle,
            'new_reach_detected': False,
            'reach_completed': False,
            'is_proper_reach': False,
            'cycle_info': None
        }
        
        if angle is None or not AngleCalculator.is_valid_angle(angle):
            return result
            
        self.angle_history.append((frame_idx, angle))
        
        # Need at least a few frames to detect cycles
        if len(self.angle_history) < 5:
            return result
            
        # Get recent angles for analysis
        recent_angles = [a[1] for a in list(self.angle_history)[-10:]]
        recent_frames = [a[0] for a in list(self.angle_history)[-10:]]
        
        # Detect if we're starting a new reach (angle increasing significantly)
        if self.current_cycle is None:
            if len(recent_angles) >= 3:
                angle_trend = recent_angles[-1] - recent_angles[-3]
                if (angle_trend > self.angle_change_threshold and 
                    frame_idx - self.last_cycle_end >= self.min_cycle_duration and
                    angle >= config.MIN_REACH_ANGLE):
                    
                    # Start new cycle
                    self.current_cycle = {
                        'start_frame': frame_idx,
                        'start_angle': angle,
                        'max_angle': angle,
                        'max_angle_frame': frame_idx,
                        'angles': [(frame_idx, angle)],
                        'is_ascending': True
                    }
                    result['new_reach_detected'] = True
        
        # Update current cycle if active
        elif self.current_cycle is not None:
            self.current_cycle['angles'].append((frame_idx, angle))
            
            # Update maximum angle
            if angle > self.current_cycle['max_angle']:
                self.current_cycle['max_angle'] = angle
                self.current_cycle['max_angle_frame'] = frame_idx
                self.current_cycle['is_ascending'] = True
            
            # Check if we're descending (reach completed)
            elif (self.current_cycle['is_ascending'] and 
                  len(recent_angles) >= 3 and
                  angle < self.current_cycle['max_angle'] - self.angle_change_threshold):
                
                # Complete the cycle
                self.current_cycle['end_frame'] = frame_idx
                self.current_cycle['end_angle'] = angle
                self.current_cycle['duration'] = frame_idx - self.current_cycle['start_frame']
                
                # Validate the reach
                is_proper = self.current_cycle['max_angle'] >= config.PROPER_REACH_THRESHOLD
                self.current_cycle['is_proper_reach'] = is_proper
                
                # Add to completed cycles
                self.reach_cycles.append(self.current_cycle.copy())
                
                result['reach_completed'] = True
                result['is_proper_reach'] = is_proper
                result['cycle_info'] = self.current_cycle.copy()
                
                # Reset for next cycle
                self.last_cycle_end = frame_idx
                self.current_cycle = None
                
        return result
    
    def analyze_complete_sequence(self, angles: List[float]) -> Dict:
        """
        Analyze a complete sequence of angles to detect all cycles
        
        Args:
            angles: Complete list of angle measurements
            
        Returns:
            Dictionary with comprehensive cycle analysis
        """
        self.reset()
        
        # Process each frame
        frame_results = []
        for frame_idx, angle in enumerate(angles):
            result = self.process_frame_angle(angle, frame_idx)
            frame_results.append(result)
        
        # Complete any ongoing cycle at the end
        if self.current_cycle is not None:
            self.current_cycle['end_frame'] = len(angles) - 1
            self.current_cycle['end_angle'] = angles[-1] if angles[-1] is not None else 0
            self.current_cycle['duration'] = self.current_cycle['end_frame'] - self.current_cycle['start_frame']
            self.current_cycle['is_proper_reach'] = self.current_cycle['max_angle'] >= config.PROPER_REACH_THRESHOLD
            self.reach_cycles.append(self.current_cycle)
        
        # Calculate summary statistics
        total_reaches = len(self.reach_cycles)
        proper_reaches = sum(1 for cycle in self.reach_cycles if cycle['is_proper_reach'])
        
        max_angles = [cycle['max_angle'] for cycle in self.reach_cycles if cycle['max_angle'] is not None]
        overall_max_angle = max(max_angles) if max_angles else 0
        average_max_angle = np.mean(max_angles) if max_angles else 0
        
        return {
            'total_reaches': total_reaches,
            'proper_reaches': proper_reaches,
            'improper_reaches': total_reaches - proper_reaches,
            'overall_max_angle': overall_max_angle,
            'average_max_angle': average_max_angle,
            'reach_cycles': self.reach_cycles,
            'frame_results': frame_results,
            'success_rate': (proper_reaches / total_reaches * 100) if total_reaches > 0 else 0
        }

# Test cycle detector
cycle_detector = CycleDetector(
    min_cycle_duration=config.MIN_CYCLE_DURATION,
    angle_change_threshold=config.ANGLE_CHANGE_THRESHOLD
)
print("✅ Cycle detector initialized successfully!")

# Test with sample angle data
test_angles = [30, 35, 45, 60, 75, 70, 50, 35, 40, 55, 80, 65, 45, 30]
test_analysis = cycle_detector.analyze_complete_sequence(test_angles)
print(f"Test analysis - Total reaches: {test_analysis['total_reaches']}, Proper reaches: {test_analysis['proper_reaches']}")


✅ Cycle detector initialized successfully!
Test analysis - Total reaches: 1, Proper reaches: 1


### 4. Visualization and Video Annotation


In [22]:
class VideoAnnotator:
    """
    Handles video annotation and visualization for pose analysis
    """
    
    def __init__(self, config: SitReachConfig):
        self.config = config
        
    def draw_pose_landmarks(self, frame: np.ndarray, pose_data: Dict) -> np.ndarray:
        """
        Draw pose landmarks on the frame
        
        Args:
            frame: Input frame
            pose_data: Pose landmarks data
            
        Returns:
            Annotated frame
        """
        if pose_data is None or 'pose_landmarks' not in pose_data:
            return frame
            
        annotated_frame = frame.copy()
        
        # Draw pose landmarks
        mp_drawing.draw_landmarks(
            annotated_frame,
            pose_data['pose_landmarks'],
            mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
        )
        
        return annotated_frame
    
    def draw_angle_measurement(self, frame: np.ndarray, pose_data: Dict, 
                             angle: Optional[float], pose_extractor: PoseExtractor) -> np.ndarray:
        """
        Draw angle measurement and key points on the frame
        
        Args:
            frame: Input frame
            pose_data: Pose landmarks data
            angle: Calculated angle
            pose_extractor: PoseExtractor instance
            
        Returns:
            Annotated frame
        """
        if pose_data is None or angle is None:
            return frame
            
        annotated_frame = frame.copy()
        frame_shape = frame.shape
        
        # Get key landmark positions
        LEFT_SHOULDER = 11
        LEFT_HIP = 23
        LEFT_KNEE = 25
        
        shoulder = pose_extractor.get_landmark_coords(pose_data, LEFT_SHOULDER, frame_shape)
        hip = pose_extractor.get_landmark_coords(pose_data, LEFT_HIP, frame_shape)
        knee = pose_extractor.get_landmark_coords(pose_data, LEFT_KNEE, frame_shape)
        
        # If left side not available, try right side
        if not all([shoulder, hip, knee]):
            RIGHT_SHOULDER = 12
            RIGHT_HIP = 24
            RIGHT_KNEE = 26
            
            shoulder = pose_extractor.get_landmark_coords(pose_data, RIGHT_SHOULDER, frame_shape)
            hip = pose_extractor.get_landmark_coords(pose_data, RIGHT_HIP, frame_shape)
            knee = pose_extractor.get_landmark_coords(pose_data, RIGHT_KNEE, frame_shape)
        
        if all([shoulder, hip, knee]):
            # Draw angle lines
            cv2.line(annotated_frame, shoulder, hip, self.config.CONNECTION_COLOR, 3)
            cv2.line(annotated_frame, hip, knee, self.config.CONNECTION_COLOR, 3)
            
            # Draw key points
            cv2.circle(annotated_frame, shoulder, 8, self.config.LANDMARK_COLOR, -1)
            cv2.circle(annotated_frame, hip, 10, (0, 255, 255), -1)  # Yellow for hip (vertex)
            cv2.circle(annotated_frame, knee, 8, self.config.LANDMARK_COLOR, -1)
            
            # Draw angle text
            angle_text = f"Angle: {angle:.1f}°"
            text_position = (hip[0] + 20, hip[1] - 20)
            
            # Choose color based on angle threshold
            text_color = self.config.PROPER_REACH_COLOR if angle >= self.config.PROPER_REACH_THRESHOLD else self.config.IMPROPER_REACH_COLOR
            
            cv2.putText(annotated_frame, angle_text, text_position, 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, text_color, 2)
        
        return annotated_frame
    
    def draw_reach_status(self, frame: np.ndarray, cycle_result: Dict, 
                         total_reaches: int, proper_reaches: int) -> np.ndarray:
        """
        Draw reach status and counters on the frame
        
        Args:
            frame: Input frame
            cycle_result: Current frame's cycle detection result
            total_reaches: Total reach count
            proper_reaches: Proper reach count
            
        Returns:
            Annotated frame
        """
        annotated_frame = frame.copy()
        h, w = frame.shape[:2]
        
        # Status panel background
        panel_height = 120
        cv2.rectangle(annotated_frame, (10, 10), (400, panel_height), (0, 0, 0), -1)
        cv2.rectangle(annotated_frame, (10, 10), (400, panel_height), (255, 255, 255), 2)
        
        # Status text
        y_offset = 35
        font = cv2.FONT_HERSHEY_SIMPLEX
        font_scale = 0.6
        
        # Total reaches
        cv2.putText(annotated_frame, f"Total Reaches: {total_reaches}", 
                   (20, y_offset), font, font_scale, (255, 255, 255), 2)
        
        # Proper reaches
        y_offset += 25
        cv2.putText(annotated_frame, f"Proper Reaches: {proper_reaches}", 
                   (20, y_offset), font, font_scale, self.config.PROPER_REACH_COLOR, 2)
        
        # Success rate
        y_offset += 25
        success_rate = (proper_reaches / total_reaches * 100) if total_reaches > 0 else 0
        cv2.putText(annotated_frame, f"Success Rate: {success_rate:.1f}%", 
                   (20, y_offset), font, font_scale, (255, 255, 0), 2)
        
        # Current status
        y_offset += 25
        if cycle_result['new_reach_detected']:
            status_text = "NEW REACH STARTED"
            status_color = (0, 255, 255)  # Cyan
        elif cycle_result['reach_completed']:
            if cycle_result['is_proper_reach']:
                status_text = "PROPER REACH COMPLETED!"
                status_color = self.config.PROPER_REACH_COLOR
            else:
                status_text = "Reach completed (below threshold)"
                status_color = self.config.IMPROPER_REACH_COLOR
        else:
            status_text = "Monitoring..."
            status_color = (255, 255, 255)
        
        cv2.putText(annotated_frame, status_text, (20, y_offset), 
                   font, font_scale, status_color, 2)
        
        return annotated_frame
    
    def create_angle_plot(self, angles: List[float], reach_cycles: List[Dict], 
                         fps: float = 30.0) -> plt.Figure:
        """
        Create a plot showing angle measurements over time
        
        Args:
            angles: List of angle measurements
            reach_cycles: List of detected reach cycles
            fps: Video frame rate for time conversion
            
        Returns:
            Matplotlib figure
        """
        fig, ax = plt.subplots(figsize=(12, 6))
        
        # Convert frame indices to time
        time_points = [i / fps for i in range(len(angles))]
        valid_angles = [a if a is not None else np.nan for a in angles]
        
        # Plot angle measurements
        ax.plot(time_points, valid_angles, 'b-', linewidth=2, label='Waist Angle')
        
        # Mark reach cycles
        for i, cycle in enumerate(reach_cycles):
            start_time = cycle['start_frame'] / fps
            end_time = cycle['end_frame'] / fps
            max_time = cycle['max_angle_frame'] / fps
            
            color = 'green' if cycle['is_proper_reach'] else 'red'
            alpha = 0.3
            
            # Highlight reach period
            ax.axvspan(start_time, end_time, alpha=alpha, color=color)
            
            # Mark maximum angle
            ax.plot(max_time, cycle['max_angle'], 'o', color=color, markersize=8)
            ax.annotate(f"{cycle['max_angle']:.1f}°", 
                       (max_time, cycle['max_angle']), 
                       xytext=(5, 5), textcoords='offset points',
                       fontsize=9, color=color)
        
        # Add threshold lines
        ax.axhline(y=self.config.PROPER_REACH_THRESHOLD, color='green', 
                  linestyle='--', alpha=0.7, label=f'Proper Reach Threshold ({self.config.PROPER_REACH_THRESHOLD}°)')
        ax.axhline(y=self.config.MIN_REACH_ANGLE, color='orange', 
                  linestyle='--', alpha=0.7, label=f'Min Reach Angle ({self.config.MIN_REACH_ANGLE}°)')
        
        ax.set_xlabel('Time (seconds)')
        ax.set_ylabel('Waist Angle (degrees)')
        ax.set_title('Sit and Reach Analysis - Angle Measurements Over Time')
        ax.legend()
        ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        return fig

# Initialize video annotator
video_annotator = VideoAnnotator(config)
print("✅ Video annotator initialized successfully!")


✅ Video annotator initialized successfully!


### 5. Main Video Processing Pipeline


In [23]:
class SitReachAnalyzer:
    """
    Main class that orchestrates the complete sit and reach analysis pipeline
    """
    
    def __init__(self, config: SitReachConfig):
        self.config = config
        self.pose_extractor = PoseExtractor(
            confidence=config.POSE_CONFIDENCE,
            tracking_confidence=config.POSE_TRACKING_CONFIDENCE
        )
        self.angle_calculator = AngleCalculator()
        self.cycle_detector = CycleDetector(
            min_cycle_duration=config.MIN_CYCLE_DURATION,
            angle_change_threshold=config.ANGLE_CHANGE_THRESHOLD
        )
        self.video_annotator = VideoAnnotator(config)
        
    def analyze_video(self, video_path: str, output_dir: str = "output") -> Dict:
        """
        Analyze a complete video file for sit and reach performance
        
        Args:
            video_path: Path to the input video file
            output_dir: Directory to save output files
            
        Returns:
            Dictionary containing comprehensive analysis results
        """
        # Create output directory
        os.makedirs(output_dir, exist_ok=True)
        
        # Open video file
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {video_path}")
        
        # Get video properties
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        print(f"📹 Processing video: {video_path}")
        print(f"   📊 Properties: {width}x{height}, {fps:.1f} FPS, {frame_count} frames")
        print(f"   ⏱️ Duration: {frame_count/fps:.1f} seconds")
        
        # Prepare output video writer if enabled
        output_video_path = None
        video_writer = None
        if self.config.SAVE_ANNOTATED_VIDEO:
            output_video_path = os.path.join(output_dir, f"annotated_{os.path.basename(video_path)}")
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
        
        # Initialize tracking variables
        all_angles = []
        all_pose_data = []
        frame_results = []
        total_reaches = 0
        proper_reaches = 0
        error_frames = []
        
        # Process each frame
        frame_idx = 0
        progress_interval = max(1, frame_count // 20)  # Update progress 20 times
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
                
            try:
                # Extract pose
                pose_data = self.pose_extractor.extract_pose(frame)
                all_pose_data.append(pose_data)
                
                # Calculate angle
                angle = self.angle_calculator.calculate_waist_angle(
                    pose_data, frame.shape, self.pose_extractor
                )
                all_angles.append(angle)
                
                # Detect cycles
                cycle_result = self.cycle_detector.process_frame_angle(angle, frame_idx)
                
                # Update counters
                if cycle_result['reach_completed']:
                    total_reaches += 1
                    if cycle_result['is_proper_reach']:
                        proper_reaches += 1
                
                # Create annotated frame
                if video_writer is not None:
                    annotated_frame = frame.copy()
                    
                    # Draw pose landmarks
                    annotated_frame = self.video_annotator.draw_pose_landmarks(
                        annotated_frame, pose_data
                    )
                    
                    # Draw angle measurement
                    annotated_frame = self.video_annotator.draw_angle_measurement(
                        annotated_frame, pose_data, angle, self.pose_extractor
                    )
                    
                    # Draw status information
                    annotated_frame = self.video_annotator.draw_reach_status(
                        annotated_frame, cycle_result, total_reaches, proper_reaches
                    )
                    
                    video_writer.write(annotated_frame)
                
                # Store frame result
                frame_results.append({
                    'frame_idx': frame_idx,
                    'angle': angle,
                    'pose_detected': pose_data is not None,
                    'cycle_result': cycle_result
                })
                
            except Exception as e:
                print(f"⚠️ Error processing frame {frame_idx}: {str(e)}")
                error_frames.append(frame_idx)
                all_angles.append(None)
                all_pose_data.append(None)
                frame_results.append({
                    'frame_idx': frame_idx,
                    'angle': None,
                    'pose_detected': False,
                    'cycle_result': {'frame_idx': frame_idx, 'angle': None, 
                                   'new_reach_detected': False, 'reach_completed': False,
                                   'is_proper_reach': False, 'cycle_info': None}
                })
            
            frame_idx += 1
            
            # Progress update
            if frame_idx % progress_interval == 0:
                progress = (frame_idx / frame_count) * 100
                print(f"   🔄 Progress: {progress:.1f}% ({frame_idx}/{frame_count} frames)")
        
        # Clean up
        cap.release()
        if video_writer is not None:
            video_writer.release()
        
        # Perform final analysis
        print("📈 Performing final analysis...")
        
        # Smooth angles
        smoothed_angles = self.angle_calculator.smooth_angles(
            all_angles, self.config.SMOOTHING_WINDOW
        )
        
        # Complete cycle analysis
        final_analysis = self.cycle_detector.analyze_complete_sequence(smoothed_angles)
        
        # Calculate additional statistics
        valid_angles = [a for a in smoothed_angles if a is not None]
        pose_detection_rate = sum(1 for p in all_pose_data if p is not None) / len(all_pose_data) * 100
        
        # Compile results
        results = {
            'video_info': {
                'path': video_path,
                'fps': fps,
                'frame_count': frame_count,
                'duration_seconds': frame_count / fps,
                'resolution': (width, height)
            },
            'analysis_results': final_analysis,
            'statistics': {
                'pose_detection_rate': pose_detection_rate,
                'total_valid_angles': len(valid_angles),
                'error_frames': len(error_frames),
                'error_rate': len(error_frames) / frame_count * 100,
                'average_angle': np.mean(valid_angles) if valid_angles else 0,
                'angle_std': np.std(valid_angles) if valid_angles else 0,
                'min_angle': min(valid_angles) if valid_angles else 0,
                'max_angle': max(valid_angles) if valid_angles else 0
            },
            'data': {
                'angles': all_angles,
                'smoothed_angles': smoothed_angles,
                'frame_results': frame_results,
                'error_frames': error_frames
            },
            'output_files': {
                'annotated_video': output_video_path,
                'output_directory': output_dir
            }
        }
        
        # Save analysis data
        if self.config.EXPORT_DATA_CSV:
            self._save_data_csv(results, output_dir)
        
        # Generate and save plots
        if self.config.SAVE_ANALYSIS_PLOTS:
            self._save_analysis_plots(results, output_dir)
        
        print("✅ Analysis completed successfully!")
        return results
    
    def _save_data_csv(self, results: Dict, output_dir: str):
        """Save analysis data to CSV files"""
        # Frame-by-frame data
        frame_data = []
        for i, (angle, smoothed_angle, frame_result) in enumerate(zip(
            results['data']['angles'],
            results['data']['smoothed_angles'],
            results['data']['frame_results']
        )):
            frame_data.append({
                'frame': i,
                'time_seconds': i / results['video_info']['fps'],
                'raw_angle': angle,
                'smoothed_angle': smoothed_angle,
                'pose_detected': frame_result['pose_detected'],
                'new_reach_detected': frame_result['cycle_result']['new_reach_detected'],
                'reach_completed': frame_result['cycle_result']['reach_completed'],
                'is_proper_reach': frame_result['cycle_result']['is_proper_reach']
            })
        
        frame_df = pd.DataFrame(frame_data)
        frame_csv_path = os.path.join(output_dir, 'frame_analysis.csv')
        frame_df.to_csv(frame_csv_path, index=False)
        
        # Reach cycles data
        if results['analysis_results']['reach_cycles']:
            cycles_data = []
            for i, cycle in enumerate(results['analysis_results']['reach_cycles']):
                cycles_data.append({
                    'cycle_number': i + 1,
                    'start_frame': cycle['start_frame'],
                    'end_frame': cycle['end_frame'],
                    'max_angle_frame': cycle['max_angle_frame'],
                    'start_time': cycle['start_frame'] / results['video_info']['fps'],
                    'end_time': cycle['end_frame'] / results['video_info']['fps'],
                    'duration_seconds': cycle['duration'] / results['video_info']['fps'],
                    'max_angle': cycle['max_angle'],
                    'is_proper_reach': cycle['is_proper_reach']
                })
            
            cycles_df = pd.DataFrame(cycles_data)
            cycles_csv_path = os.path.join(output_dir, 'reach_cycles.csv')
            cycles_df.to_csv(cycles_csv_path, index=False)
        
        print(f"📄 Data exported to CSV files in {output_dir}")
    
    def _save_analysis_plots(self, results: Dict, output_dir: str):
        """Generate and save analysis plots"""
        # Angle over time plot
        fig = self.video_annotator.create_angle_plot(
            results['data']['smoothed_angles'],
            results['analysis_results']['reach_cycles'],
            results['video_info']['fps']
        )
        
        plot_path = os.path.join(output_dir, 'angle_analysis.png')
        fig.savefig(plot_path, dpi=300, bbox_inches='tight')
        plt.close(fig)
        
        # Summary statistics plot
        self._create_summary_plot(results, output_dir)
        
        print(f"📊 Analysis plots saved to {output_dir}")
    
    def _create_summary_plot(self, results: Dict, output_dir: str):
        """Create summary statistics visualization"""
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
        
        analysis = results['analysis_results']
        stats = results['statistics']
        
        # Reach success pie chart
        if analysis['total_reaches'] > 0:
            labels = ['Proper Reaches', 'Improper Reaches']
            sizes = [analysis['proper_reaches'], analysis['improper_reaches']]
            colors = ['#2ecc71', '#e74c3c']
            ax1.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
            ax1.set_title(f'Reach Success Rate\n(Total: {analysis["total_reaches"]} reaches)')
        else:
            ax1.text(0.5, 0.5, 'No reaches detected', ha='center', va='center', transform=ax1.transAxes)
            ax1.set_title('Reach Success Rate')
        
        # Angle distribution histogram
        valid_angles = [a for a in results['data']['smoothed_angles'] if a is not None]
        if valid_angles:
            ax2.hist(valid_angles, bins=20, alpha=0.7, color='skyblue', edgecolor='black')
            ax2.axvline(config.PROPER_REACH_THRESHOLD, color='green', linestyle='--', 
                       label=f'Proper Reach Threshold ({config.PROPER_REACH_THRESHOLD}°)')
            ax2.axvline(config.MIN_REACH_ANGLE, color='orange', linestyle='--', 
                       label=f'Min Reach Angle ({config.MIN_REACH_ANGLE}°)')
            ax2.set_xlabel('Angle (degrees)')
            ax2.set_ylabel('Frequency')
            ax2.set_title('Angle Distribution')
            ax2.legend()
        
        # Performance metrics bar chart
        metrics = ['Pose Detection Rate', 'Success Rate', 'Error Rate']
        values = [stats['pose_detection_rate'], analysis['success_rate'], stats['error_rate']]
        colors = ['#3498db', '#2ecc71', '#e74c3c']
        
        bars = ax3.bar(metrics, values, color=colors, alpha=0.7)
        ax3.set_ylabel('Percentage (%)')
        ax3.set_title('Performance Metrics')
        ax3.set_ylim(0, 100)
        
        # Add value labels on bars
        for bar, value in zip(bars, values):
            height = bar.get_height()
            ax3.text(bar.get_x() + bar.get_width()/2., height + 1,
                    f'{value:.1f}%', ha='center', va='bottom')
        
        # Reach cycles timeline
        if analysis['reach_cycles']:
            y_pos = 0
            for i, cycle in enumerate(analysis['reach_cycles']):
                start_time = cycle['start_frame'] / results['video_info']['fps']
                duration = cycle['duration'] / results['video_info']['fps']
                color = '#2ecc71' if cycle['is_proper_reach'] else '#e74c3c'
                
                ax4.barh(y_pos, duration, left=start_time, height=0.6, 
                        color=color, alpha=0.7, label='Proper' if cycle['is_proper_reach'] else 'Improper')
                
                # Add angle label
                ax4.text(start_time + duration/2, y_pos, f"{cycle['max_angle']:.0f}°", 
                        ha='center', va='center', fontsize=8, fontweight='bold')
                
                y_pos += 1
            
            ax4.set_xlabel('Time (seconds)')
            ax4.set_ylabel('Reach Cycle')
            ax4.set_title('Reach Cycles Timeline')
            ax4.set_ylim(-0.5, len(analysis['reach_cycles']) - 0.5)
            
            # Create legend
            handles = [plt.Rectangle((0,0),1,1, color='#2ecc71', alpha=0.7, label='Proper Reach'),
                      plt.Rectangle((0,0),1,1, color='#e74c3c', alpha=0.7, label='Improper Reach')]
            ax4.legend(handles=handles)
        else:
            ax4.text(0.5, 0.5, 'No reach cycles detected', ha='center', va='center', transform=ax4.transAxes)
            ax4.set_title('Reach Cycles Timeline')
        
        plt.tight_layout()
        summary_plot_path = os.path.join(output_dir, 'summary_analysis.png')
        fig.savefig(summary_plot_path, dpi=300, bbox_inches='tight')
        plt.close(fig)
    
    def print_analysis_summary(self, results: Dict):
        """Print a formatted summary of the analysis results"""
        print("\n" + "="*60)
        print("📋 SIT AND REACH ANALYSIS SUMMARY")
        print("="*60)
        
        # Video info
        video_info = results['video_info']
        print(f"🎥 Video: {os.path.basename(video_info['path'])}")
        print(f"   Duration: {video_info['duration_seconds']:.1f} seconds")
        print(f"   Resolution: {video_info['resolution'][0]}x{video_info['resolution'][1]}")
        print(f"   Frame Rate: {video_info['fps']:.1f} FPS")
        
        # Analysis results
        analysis = results['analysis_results']
        print(f"\n🎯 PERFORMANCE RESULTS:")
        print(f"   Total Reach Attempts: {analysis['total_reaches']}")
        print(f"   Proper Reaches: {analysis['proper_reaches']}")
        print(f"   Improper Reaches: {analysis['improper_reaches']}")
        print(f"   Success Rate: {analysis['success_rate']:.1f}%")
        print(f"   Maximum Angle Achieved: {analysis['overall_max_angle']:.1f}°")
        print(f"   Average Maximum Angle: {analysis['average_max_angle']:.1f}°")
        
        # Technical stats
        stats = results['statistics']
        print(f"\n🔧 TECHNICAL STATISTICS:")
        print(f"   Pose Detection Rate: {stats['pose_detection_rate']:.1f}%")
        print(f"   Error Rate: {stats['error_rate']:.1f}%")
        print(f"   Valid Angle Measurements: {stats['total_valid_angles']}")
        print(f"   Average Angle: {stats['average_angle']:.1f}°")
        print(f"   Angle Range: {stats['min_angle']:.1f}° - {stats['max_angle']:.1f}°")
        
        # Output files
        output_files = results['output_files']
        print(f"\n📁 OUTPUT FILES:")
        print(f"   Output Directory: {output_files['output_directory']}")
        if output_files['annotated_video']:
            print(f"   Annotated Video: {os.path.basename(output_files['annotated_video'])}")
        
        print("="*60)
    
    def cleanup(self):
        """Clean up resources"""
        self.pose_extractor.close()

# Initialize the main analyzer
analyzer = SitReachAnalyzer(config)
print("✅ Sit and Reach Analyzer initialized successfully!")


✅ Sit and Reach Analyzer initialized successfully!


## Usage Examples and Demo

### How to Use This System

1. **Basic Usage**: Analyze a video file
2. **Custom Configuration**: Adjust parameters for different scenarios
3. **Batch Processing**: Process multiple videos
4. **Results Interpretation**: Understanding the output


In [24]:
# Example 1: Basic Video Analysis
def analyze_sit_reach_video(video_path: str, output_dir: str = "sit_reach_output"):
    """
    Simple function to analyze a sit and reach video
    
    Args:
        video_path: Path to the video file
        output_dir: Directory to save results
        
    Returns:
        Analysis results dictionary
    """
    try:
        # Initialize analyzer
        analyzer = SitReachAnalyzer(config)
        
        # Analyze the video
        results = analyzer.analyze_video(video_path, output_dir)
        
        # Print summary
        analyzer.print_analysis_summary(results)
        
        # Clean up
        analyzer.cleanup()
        
        return results
        
    except Exception as e:
        print(f"❌ Error analyzing video: {str(e)}")
        return None

# Example usage (uncomment and modify the path to use)
# video_path = "path/to/your/sit_and_reach_video.mp4"
# results = analyze_sit_reach_video(video_path)

print("✅ Basic analysis function ready!")


✅ Basic analysis function ready!


In [25]:
# Example 2: Custom Configuration
def create_custom_config(proper_reach_threshold: float = 70.0, 
                        min_reach_angle: float = 40.0,
                        pose_confidence: float = 0.8) -> SitReachConfig:
    """
    Create a custom configuration for specific requirements
    
    Args:
        proper_reach_threshold: Minimum angle for proper reach (degrees)
        min_reach_angle: Minimum angle to consider as reach attempt (degrees)
        pose_confidence: Confidence threshold for pose detection
        
    Returns:
        Custom SitReachConfig object
    """
    custom_config = SitReachConfig()
    custom_config.PROPER_REACH_THRESHOLD = proper_reach_threshold
    custom_config.MIN_REACH_ANGLE = min_reach_angle
    custom_config.POSE_CONFIDENCE = pose_confidence
    
    print(f"🔧 Custom configuration created:")
    print(f"   Proper Reach Threshold: {proper_reach_threshold}°")
    print(f"   Min Reach Angle: {min_reach_angle}°")
    print(f"   Pose Confidence: {pose_confidence}")
    
    return custom_config

def analyze_with_custom_config(video_path: str, custom_config: SitReachConfig):
    """
    Analyze video with custom configuration
    """
    analyzer = SitReachAnalyzer(custom_config)
    results = analyzer.analyze_video(video_path, "custom_output")
    analyzer.print_analysis_summary(results)
    analyzer.cleanup()
    return results

# Example: Create stricter requirements
# strict_config = create_custom_config(proper_reach_threshold=80.0, min_reach_angle=50.0)
# results = analyze_with_custom_config("video.mp4", strict_config)

print("✅ Custom configuration functions ready!")


✅ Custom configuration functions ready!


In [26]:
# Example 3: Batch Processing Multiple Videos
def batch_analyze_videos(video_paths: List[str], output_base_dir: str = "batch_output") -> List[Dict]:
    """
    Analyze multiple videos in batch
    
    Args:
        video_paths: List of video file paths
        output_base_dir: Base directory for all outputs
        
    Returns:
        List of analysis results for each video
    """
    all_results = []
    
    for i, video_path in enumerate(video_paths):
        print(f"\n🎬 Processing video {i+1}/{len(video_paths)}: {os.path.basename(video_path)}")
        
        # Create individual output directory
        video_name = os.path.splitext(os.path.basename(video_path))[0]
        output_dir = os.path.join(output_base_dir, video_name)
        
        try:
            # Analyze video
            analyzer = SitReachAnalyzer(config)
            results = analyzer.analyze_video(video_path, output_dir)
            analyzer.cleanup()
            
            all_results.append(results)
            
        except Exception as e:
            print(f"❌ Error processing {video_path}: {str(e)}")
            all_results.append(None)
    
    # Create batch summary
    create_batch_summary(all_results, output_base_dir)
    
    return all_results

def create_batch_summary(all_results: List[Dict], output_dir: str):
    """Create a summary report for batch processing"""
    valid_results = [r for r in all_results if r is not None]
    
    if not valid_results:
        print("❌ No valid results to summarize")
        return
    
    # Compile batch statistics
    batch_stats = {
        'total_videos': len(all_results),
        'successful_analyses': len(valid_results),
        'total_reaches': sum(r['analysis_results']['total_reaches'] for r in valid_results),
        'total_proper_reaches': sum(r['analysis_results']['proper_reaches'] for r in valid_results),
        'average_success_rate': np.mean([r['analysis_results']['success_rate'] for r in valid_results]),
        'max_angle_overall': max(r['analysis_results']['overall_max_angle'] for r in valid_results),
        'average_pose_detection': np.mean([r['statistics']['pose_detection_rate'] for r in valid_results])
    }
    
    # Print batch summary
    print(f"\n{'='*60}")
    print("📊 BATCH ANALYSIS SUMMARY")
    print(f"{'='*60}")
    print(f"Videos Processed: {batch_stats['successful_analyses']}/{batch_stats['total_videos']}")
    print(f"Total Reach Attempts: {batch_stats['total_reaches']}")
    print(f"Total Proper Reaches: {batch_stats['total_proper_reaches']}")
    print(f"Average Success Rate: {batch_stats['average_success_rate']:.1f}%")
    print(f"Maximum Angle Achieved: {batch_stats['max_angle_overall']:.1f}°")
    print(f"Average Pose Detection Rate: {batch_stats['average_pose_detection']:.1f}%")
    print(f"{'='*60}")
    
    # Save batch summary to file
    summary_path = os.path.join(output_dir, "batch_summary.json")
    with open(summary_path, 'w') as f:
        json.dump(batch_stats, f, indent=2)
    
    print(f"📄 Batch summary saved to: {summary_path}")

# Example usage:
# video_list = ["video1.mp4", "video2.mp4", "video3.mp4"]
# batch_results = batch_analyze_videos(video_list)

print("✅ Batch processing functions ready!")


✅ Batch processing functions ready!


### Quick Start Guide

**To use this system with your own video:**

1. **Place your video file** in an accessible location
2. **Update the video path** in the example below
3. **Run the analysis** and review results

**Required video format:**
- Common formats: MP4, AVI, MOV
- Clear view of the person performing sit and reach
- Person should be visible from the side for best angle measurement
- Good lighting and minimal background clutter recommended


In [27]:
# # 🚀 READY TO USE - UNCOMMENT AND MODIFY THE PATH BELOW

# # Step 1: Set your video path
# video_path = "C:\Users\Yash\Documents\GitHub\KhelSetu\sit_up.mp4"

# # Step 2: Run the analysis
# print("🎬 Starting Sit and Reach Analysis...")
# results = analyze_sit_reach_video(video_path, "my_analysis_output")

# # Step 3: Review results
# if results:
#     print("\\n✅ Analysis completed! Check the output folder for:")
#     print("   📹 Annotated video with pose landmarks and measurements")
#     print("   📊 Analysis plots and charts")  
#     print("   📄 CSV files with detailed data")
#     print("   📈 Summary statistics and performance metrics")
# else:
#     print("❌ Analysis failed. Please check your video path and format.")

# # Alternative: Test with a demo (if you have OpenCV test data)
# def test_with_sample_data():
#     """
#     Test the system with generated sample data
#     This is useful for testing without a video file
#     """
#     print("🧪 Testing system with sample data...")
    
#     # Generate sample angle data that simulates sit and reach movements
#     sample_angles = []
#     for i in range(300):  # 10 seconds at 30 FPS
#         base_angle = 30
#         if 50 <= i <= 80:  # First reach
#             reach_angle = base_angle + 30 * np.sin((i - 50) * np.pi / 30)
#         elif 120 <= i <= 150:  # Second reach  
#             reach_angle = base_angle + 45 * np.sin((i - 120) * np.pi / 30)
#         elif 200 <= i <= 230:  # Third reach
#             reach_angle = base_angle + 35 * np.sin((i - 200) * np.pi / 30)
#         else:
#             reach_angle = base_angle + np.random.normal(0, 2)  # Small random variation
        
#         sample_angles.append(max(0, reach_angle))
    
#     # Test cycle detection
#     cycle_detector_test = CycleDetector()
#     test_analysis = cycle_detector_test.analyze_complete_sequence(sample_angles)
    
#     print(f"📊 Sample Data Analysis Results:")
#     print(f"   Total reaches detected: {test_analysis['total_reaches']}")
#     print(f"   Proper reaches: {test_analysis['proper_reaches']}")
#     print(f"   Success rate: {test_analysis['success_rate']:.1f}%")
#     print(f"   Max angle: {test_analysis['overall_max_angle']:.1f}°")
    
#     # Create a simple plot
#     fig, ax = plt.subplots(figsize=(12, 6))
#     ax.plot(sample_angles, 'b-', linewidth=2, label='Simulated Waist Angle')
#     ax.axhline(y=config.PROPER_REACH_THRESHOLD, color='green', linestyle='--', 
#                label=f'Proper Reach Threshold ({config.PROPER_REACH_THRESHOLD}°)')
#     ax.set_xlabel('Frame')
#     ax.set_ylabel('Angle (degrees)')
#     ax.set_title('Sample Sit and Reach Data')
#     ax.legend()
#     ax.grid(True, alpha=0.3)
#     plt.tight_layout()
#     plt.show()
    
#     return test_analysis

# # Uncomment to test with sample data
# # test_results = test_with_sample_data()

# print("🎯 System is ready! Uncomment the example above to start analyzing your videos.")


## Installation Requirements

To run this notebook, you'll need to install the following packages:

```bash
pip install opencv-python mediapipe numpy matplotlib pandas
```

**For optimal performance:**
```bash
pip install opencv-contrib-python  # Additional OpenCV modules
pip install scikit-learn           # For advanced analysis (optional)
```

## System Features Summary

✅ **Pose Detection**: MediaPipe Pose with high accuracy settings  
✅ **Angle Measurement**: Precise waist/torso angle calculation  
✅ **Cycle Detection**: Automatic reach attempt counting  
✅ **Threshold Validation**: Configurable proper reach criteria  
✅ **Video Annotation**: Real-time pose landmarks and measurements  
✅ **Comprehensive Analysis**: Detailed statistics and insights  
✅ **Multiple Output Formats**: CSV data, plots, annotated video  
✅ **Batch Processing**: Handle multiple videos efficiently  
✅ **Error Handling**: Robust error detection and reporting  
✅ **Customizable**: Adjustable parameters for different scenarios

## Output Files Generated

1. **`annotated_video.mp4`** - Video with pose landmarks and measurements
2. **`angle_analysis.png`** - Time series plot of angle measurements  
3. **`summary_analysis.png`** - Comprehensive statistics dashboard
4. **`frame_analysis.csv`** - Frame-by-frame detailed data
5. **`reach_cycles.csv`** - Individual reach cycle information

---

**🎯 This system is ready for production use in fitness assessment applications!**


## 🔧 Fix for Windows File Paths

**Important:** When using Windows file paths in Python, you need to handle backslashes properly. Here are three ways to fix the path issue:


In [31]:

# Method 1: Use forward slashes (recommended - works on all platforms)
video_path = "C:/Users/Yash/Documents/GitHub/KhelSetu/Sit_and_Reach_Test_Animation_Generated.mp4"

import os

print(f"✅ Video path set to: {video_path}")
print(f"📁 File exists: {os.path.exists(video_path)}")

# Now run the analysis
print("🎬 Starting Sit and Reach Analysis...")
results = analyze_sit_reach_video(video_path, "sit_reach_analysis_output2")

# Review results
if results:
    print("\n✅ Analysis completed! Check the output folder for:")
    print("   📹 Annotated video with pose landmarks and measurements")
    print("   📊 Analysis plots and charts")  
    print("   📄 CSV files with detailed data")
    print("   📈 Summary statistics and performance metrics")
else:
    print("❌ Analysis failed. Please check your video path and format.")

✅ Video path set to: C:/Users/Yash/Documents/GitHub/KhelSetu/Sit_and_Reach_Test_Animation_Generated.mp4
📁 File exists: True
🎬 Starting Sit and Reach Analysis...
📹 Processing video: C:/Users/Yash/Documents/GitHub/KhelSetu/Sit_and_Reach_Test_Animation_Generated.mp4
   📊 Properties: 1280x720, 24.0 FPS, 187 frames
   ⏱️ Duration: 7.8 seconds
   🔄 Progress: 4.8% (9/187 frames)
   🔄 Progress: 9.6% (18/187 frames)
   🔄 Progress: 14.4% (27/187 frames)
   🔄 Progress: 19.3% (36/187 frames)
   🔄 Progress: 24.1% (45/187 frames)
   🔄 Progress: 28.9% (54/187 frames)
   🔄 Progress: 33.7% (63/187 frames)
   🔄 Progress: 38.5% (72/187 frames)
   🔄 Progress: 43.3% (81/187 frames)
   🔄 Progress: 48.1% (90/187 frames)
   🔄 Progress: 52.9% (99/187 frames)
   🔄 Progress: 57.8% (108/187 frames)
   🔄 Progress: 62.6% (117/187 frames)
   🔄 Progress: 67.4% (126/187 frames)
   🔄 Progress: 72.2% (135/187 frames)
   🔄 Progress: 77.0% (144/187 frames)
   🔄 Progress: 81.8% (153/187 frames)
   🔄 Progress: 86.6% (162/18