In [2]:
import subprocess

def cut_video_ffmpeg(input_video_path, output_video_path, start_time, duration):
    """
    Cut video using FFmpeg (fastest method)
    
    Args:
        input_video_path: Path to input video
        output_video_path: Path to output video
        start_time: Start time in seconds (e.g., 30.5) or "MM:SS" format (e.g., "01:30")
        duration: Duration in seconds (e.g., 20) or "MM:SS" format (e.g., "00:20")
    """
    try:
        # Convert to string format if needed
        if isinstance(start_time, (int, float)):
            start_str = str(start_time)
        else:
            start_str = start_time
            
        if isinstance(duration, (int, float)):
            duration_str = str(duration)
        else:
            duration_str = duration
            
        cmd = [
            'ffmpeg',
            '-y',  # Overwrite output file
            '-i', input_video_path,
            '-ss', start_str,  # Start time
            '-t', duration_str,  # Duration
            '-c', 'copy',  # Copy streams without re-encoding (fastest)
            output_video_path
        ]
        
        print(f"Cutting video from {start_str}s for {duration_str}s...")
        subprocess.run(cmd, check=True)
        print(f"Video cut successfully: {output_video_path}")
        
    except Exception as e:
        print(f"Error cutting video: {e}")
        return False
    return True

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import json
import subprocess
# import os
# from mediapipe.framework.formats import landmark_pb2
import statistics
# import pandas as pd
from datetime import datetime

# ============================================================================
# LANDMARK DETECTION VARIABILITY TESTING FOR THE HANDS
# ============================================================================

def test_landmark_variability_hands(video_path, num_iterations=5, output_report_path="variability_report_hands.json"):
    """
    Test variability of landmark detection by running the same video multiple times
    
    Args:
        video_path: Path to the video file
        num_iterations: Number of times to run landmark detection
        output_report_path: Path to save the variability report
    """
    print(f"Testing landmark detection variability with {num_iterations} iterations...")
    
    # Initialize MediaPipe
    mp_hands = mp.solutions.hands
    # mp_drawing = mp.solutions.drawing_utils
    
    # Store results from all iterations
    all_iterations_data = []
    
    for iteration in range(num_iterations):
        print(f"\n--- Iteration {iteration + 1}/{num_iterations} ---")
        
        # Initialize video capture
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"Error opening video: {video_path}")
            return
        
        # Get video properties
        # fps = cap.get(cv2.CAP_PROP_FPS)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        # height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        # Calculate right half coordinates
        right_half_width = width // 2
        start_x = width - right_half_width
        
        # Store landmarks for this iteration
        iteration_data = {
            "iteration": iteration + 1,
            "frames": []
        }
        
        with mp_hands.Hands(
            static_image_mode=False,
            max_num_hands=2,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        ) as hands:
            
            frame_idx = 0
            while cap.isOpened():
                ret, frame_bgr = cap.read()
                if not ret:
                    break
                
                # Process only right half
                right_half_frame = frame_bgr[:, start_x:width]
                image = cv2.cvtColor(right_half_frame, cv2.COLOR_BGR2RGB)
                image.flags.writeable = False
                
                # Detect hands
                results = hands.process(image)
                
                # Store frame data
                frame_data = {
                    "frame": frame_idx,
                    "hands": []
                }
                
                if results.multi_hand_landmarks and results.multi_handedness:
                    for hand_landmarks, handedness in zip(results.multi_hand_landmarks, results.multi_handedness):
                        hand_label = handedness.classification[0].label
                        
                        # Extract landmarks
                        landmarks = []
                        for lm in hand_landmarks.landmark:
                            landmarks.append({
                                "x": lm.x,
                                "y": lm.y,
                                "z": lm.z
                            })
                        
                        frame_data["hands"].append({
                            "label": hand_label,
                            "landmarks": landmarks
                        })
                
                iteration_data["frames"].append(frame_data)
                frame_idx += 1
                
                # Progress update
                if frame_idx % 50 == 0:
                    print(f"Processed {frame_idx}/{frame_count} frames")
        
        cap.release()
        all_iterations_data.append(iteration_data)
        print(f"Iteration {iteration + 1} completed: {frame_idx} frames processed")
    
    # Analyze variability
    variability_stats = analyze_landmark_variability(all_iterations_data)
    
    # Save complete report
    report = {
        "metadata": {
            "video_path": video_path,
            "num_iterations": num_iterations,
            "timestamp": datetime.now().isoformat()
        },
        "raw_data": all_iterations_data,
        "variability_analysis": variability_stats
    }
    
    with open(output_report_path, 'w') as f:
        json.dump(report, f, indent=2)
    
    # print(f"\nVariability test completed!")
    print(f"Report saved to: {output_report_path}")
    print("\nVariability Summary:")
    print(f"Average detection rate: {variability_stats['detection_rate_stats']['mean']:.2f}%")
    print(f"Detection consistency (std): {variability_stats['detection_rate_stats']['std']:.2f}%")
    
    return report

def analyze_landmark_variability(all_iterations_data):
    """
    Analyze variability statistics from multiple iterations
    """
    # print("\nAnalyzing landmark variability...")
    
    # Calculate detection rates for each iteration
    detection_rates = []
    total_frames = len(all_iterations_data[0]["frames"]) if all_iterations_data else 0
    
    for iteration_data in all_iterations_data:
        frames_with_hands = sum(1 for frame in iteration_data["frames"] if frame["hands"])
        detection_rate = (frames_with_hands / total_frames) * 100 if total_frames > 0 else 0
        detection_rates.append(detection_rate)
    
    # Calculate statistics
    stats = {
        "detection_rate_stats": {
            "mean": np.round(statistics.mean(detection_rates), 2) if detection_rates else 0,
            "std": np.round(statistics.stdev(detection_rates), 2) if len(detection_rates) > 1 else 0,
            "min": np.round(min(detection_rates), 2) if detection_rates else 0,
            "max": np.round(max(detection_rates),2) if detection_rates else 0,
            "rates_per_iteration": detection_rates
        },
        "landmark_position_variability": calculate_landmark_position_variability(all_iterations_data)
    }
    
    return stats

def calculate_landmark_position_variability(all_iterations_data):
    """
    Calculate variability in landmark positions across iterations
    """
    if not all_iterations_data:
        return {}
    
    # Group landmarks by frame and landmark point
    frame_landmark_positions = {}
    
    for iteration_data in all_iterations_data:
        for frame_data in iteration_data["frames"]:
            frame_idx = frame_data["frame"]
            
            if frame_idx not in frame_landmark_positions:
                frame_landmark_positions[frame_idx] = {}
            
            for hand_data in frame_data["hands"]:
                hand_key = f"{hand_data['label']}_hand"
                
                if hand_key not in frame_landmark_positions[frame_idx]:
                    frame_landmark_positions[frame_idx][hand_key] = {i: [] for i in range(21)}  # 21 landmarks per hand
                
                for lm_idx, landmark in enumerate(hand_data["landmarks"]):
                    frame_landmark_positions[frame_idx][hand_key][lm_idx].append([
                        landmark["x"], landmark["y"], landmark["z"]
                    ])
    
    # Calculate variability statistics
    variability_stats = {}
    
    # NEW: Create lists to hold the std for each coordinate separately
    all_stds_x = []
    all_stds_y = []
    all_stds_z = []
    
    all_stds = []
    for frame_idx, frame_data in frame_landmark_positions.items():
        for hand_key, landmarks in frame_data.items():
            for lm_idx, positions in landmarks.items():
                if len(positions) > 1:  # Need at least 2 positions to calculate std
                    positions_array = np.array(positions)
                    std_per_coord = np.std(positions_array, axis=0)
                    all_stds_x.append(std_per_coord[0]) # std for x
                    all_stds_y.append(std_per_coord[1]) # std for y
                    all_stds_z.append(std_per_coord[2]) # std for z
                    # avg_std = np.mean(std_per_coord)
                    # all_stds.append(avg_std)
    
    if all_stds_x: # If we have data
        # Calculate and store the average and max std for each coordinate
        variability_stats["average_std"] = {
            "x": np.round(statistics.mean(all_stds_x), 4),
            "y": np.round(statistics.mean(all_stds_y), 4),
            "z": np.round(statistics.mean(all_stds_z), 4)
        }
        variability_stats["max_std"] = {
            "x": np.round(max(all_stds_x), 4),
            "y": np.round(max(all_stds_y), 4),
            "z": np.round(max(all_stds_z), 4)
        }

    return variability_stats


In [None]:
if __name__ == "__main__":
    # Example video path
    input_video = "TV-20241029-2025-3400.webxxl.h264.mp4"
    
    print("=== VIDEO PROCESSING SUITE ===\n")
    
    # Step 1: Cut video to 20 seconds
    print("Step 1: Cutting video...")
    short_video = f"short_video_{10}sec.mp4"
    cut_video_ffmpeg(input_video, short_video, start_time=44, duration=10)    

    variability_report_hands = test_landmark_variability_hands(
        short_video, 
        num_iterations=3,  # Adjust as needed
        output_report_path="landmark_variability_report.json"
    )
       
    print("\n=== ALL PROCESSING COMPLETED ===")

=== VIDEO PROCESSING SUITE ===

Step 1: Cutting video...
Cutting video from 44s for 10s...


Video cut successfully: short_video_10sec.mp4
Testing landmark detection variability with 3 iterations...

--- Iteration 1/3 ---
Processed 50/202 frames
Processed 100/202 frames
Processed 150/202 frames
Processed 200/202 frames
Iteration 1 completed: 202 frames processed

--- Iteration 2/3 ---
Processed 50/202 frames
Processed 100/202 frames
Processed 150/202 frames
Processed 200/202 frames
Iteration 2 completed: 202 frames processed

--- Iteration 3/3 ---
Processed 50/202 frames
Processed 100/202 frames
Processed 150/202 frames
Processed 200/202 frames
Iteration 3 completed: 202 frames processed
Report saved to: landmark_variability_report.json

Variability Summary:
Average detection rate: 100.00%
Detection consistency (std): 0.00%
