# Basketball Tracking Preprocessor
## Multi-Angle Player & Ball Tracking with TrackStudio

This notebook processes raw basketball videos from multiple backboard angles to generate:
- **Tracking-enhanced video** with player/ball overlays
- **Tracking data JSON** for downstream analysis
- **Stitched multi-angle view** for comprehensive court coverage

### Input:
- Raw videos from 2 backboard camera angles
- Videos should be synchronized (same game time)

### Output:
- Enhanced video with tracking overlays
- Tracking data JSON file
- Stored in `tracking_output/` directory

### Next Step:
Use `basketball_analysis_with_tracking.ipynb` to analyze the enhanced videos

## 1. Setup and Dependencies

In [1]:
# Install and import required packages
import subprocess
import sys
import os
import json
import time
import cv2
import numpy as np
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass, asdict
import logging
import tempfile
import requests
import subprocess
from datetime import datetime

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def install_package(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# Install required packages
required_packages = [
    "opencv-python", 
    "numpy",
    "requests",
    "ffmpeg-python"
]

print("Installing required packages...")
for package in required_packages:
    try:
        install_package(package)
        print(f"✅ {package}")
    except Exception as e:
        print(f"❌ {package}: {e}")

print("\n📦 Package installation completed!")

Installing required packages...
✅ opencv-python
✅ numpy
✅ requests
✅ ffmpeg-python

📦 Package installation completed!


## 2. Configuration and Data Models

In [None]:
# Configuration
class TrackingConfig:
    # Directories
    DATA_DIR = Path('data')
    OUTPUT_DIR = Path('tracking_output')
    TEMP_DIR = Path('temp_tracking')
    
    # Video Processing
    OUTPUT_FPS = 60
    MAX_VIDEO_DURATION = 15 * 60  # 15 minutes
    
    # Tracking Settings - REAL TRACKSTUDIO INTEGRATION
    MOCK_MODE = False  # ✅ ALWAYS False - Using real TrackStudio
    TRACKSTUDIO_URL = "http://localhost:8000"  # TrackStudio server endpoint
    
    def __post_init__(self):
        # Create directories
        for dir_path in [self.DATA_DIR, self.OUTPUT_DIR, self.TEMP_DIR]:
            dir_path.mkdir(exist_ok=True)

config = TrackingConfig()
config.__post_init__()

print("✅ Configuration initialized")
print(f"📁 Data directory: {config.DATA_DIR}")
print(f"📁 Output directory: {config.OUTPUT_DIR}")
print(f"🎯 TrackStudio URL: {config.TRACKSTUDIO_URL}")
print("🚀 REAL TRACKING ENABLED - Mock mode disabled")

In [3]:
# Data Models for Tracking
@dataclass
class BoundingBox:
    """Represents a bounding box with confidence"""
    x: float
    y: float
    width: float
    height: float
    confidence: float = 0.0

@dataclass
class TrackedObject:
    """Represents a tracked object (player or ball)"""
    id: int
    type: str  # 'player' or 'ball'
    bbox: BoundingBox
    team: Optional[str] = None  # For players: 'team_a', 'team_b'
    jersey_number: Optional[int] = None

@dataclass
class TrackingFrame:
    """Represents tracking data for a single frame"""
    frame_number: int
    timestamp: float  # seconds
    objects: List[TrackedObject]
    frame_width: int
    frame_height: int

@dataclass
class TrackingResult:
    """Complete tracking result for a video"""
    video_path: str
    output_video_path: str
    tracking_data_path: str
    processing_time: float
    total_frames: int
    tracking_frames: List[TrackingFrame]

print("✅ Data models defined")

✅ Data models defined


## 3. Video Processing Pipeline

In [4]:
import ffmpeg

class VideoProcessor:
    """Handles video processing operations"""
    
    def __init__(self, config: TrackingConfig):
        self.config = config
        self._check_ffmpeg()
    
    def _check_ffmpeg(self):
        """Check if FFmpeg is available"""
        try:
            subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
            logger.info("FFmpeg is available")
        except (subprocess.CalledProcessError, FileNotFoundError):
            raise RuntimeError("FFmpeg not found. Please install FFmpeg.")
    
    def get_video_info(self, video_path: str) -> Dict:
        """Get video information using ffprobe"""
        try:
            probe = ffmpeg.probe(video_path)
            video_stream = next((stream for stream in probe['streams'] 
                               if stream['codec_type'] == 'video'), None)
            
            if video_stream is None:
                return {}
            
            info = {
                'duration': float(probe['format']['duration']),
                'fps': eval(video_stream['r_frame_rate']),
                'width': int(video_stream['width']),
                'height': int(video_stream['height']),
                'total_frames': int(video_stream.get('nb_frames', 0))
            }
            return info
        except Exception as e:
            logger.error(f"Error getting video info: {e}")
            return {}
    
    def stitch_videos_side_by_side(self, video1_path: str, video2_path: str, 
                                   output_path: str) -> bool:
        """Stitch two videos side by side"""
        try:
            input1 = ffmpeg.input(video1_path)
            input2 = ffmpeg.input(video2_path)
            
            # Side by side composition
            joined = ffmpeg.filter([input1, input2], 'hstack')
            
            output = ffmpeg.output(joined, output_path, 
                                 vcodec='libx264', acodec='aac', 
                                 r=self.config.OUTPUT_FPS)
            ffmpeg.run(output, overwrite_output=True, quiet=True)
            
            logger.info(f"Videos stitched successfully: {output_path}")
            return True
            
        except Exception as e:
            logger.error(f"Error stitching videos: {e}")
            return False

# Initialize processor
video_processor = VideoProcessor(config)
print("✅ Video Processor initialized")

2025-08-07 19:19:20,062 - INFO - FFmpeg is available


✅ Video Processor initialized


## 4. TrackStudio Integration

In [None]:
class TrackStudioIntegration:
    """Integration with TrackStudio tracking system"""
    
    def __init__(self, config: TrackingConfig):
        self.config = config
        self.server_url = config.TRACKSTUDIO_URL
    
    def process_video_with_tracking(self, video_path: str) -> List[TrackingFrame]:
        """Process video with TrackStudio tracking system"""
        logger.info(f"Processing video with TrackStudio: {video_path}")
        return self._process_with_trackstudio(video_path)
    
    def _process_with_trackstudio(self, video_path: str) -> List[TrackingFrame]:
        """Process video with actual TrackStudio system via API"""
        try:
            # Check if TrackStudio server is running
            health_response = requests.get(f"{self.server_url}/health", timeout=5)
            if health_response.status_code != 200:
                raise ConnectionError("TrackStudio server not responding")
            
            logger.info("TrackStudio server is healthy")
            
            # Send video for processing
            output_name = Path(video_path).stem
            request_data = {
                "video_path": str(video_path),
                "output_name": output_name
            }
            
            logger.info(f"Sending video to TrackStudio for processing: {output_name}")
            response = requests.post(
                f"{self.server_url}/api/track",
                json=request_data,
                timeout=300  # 5 minute timeout for processing
            )
            
            if response.status_code == 200:
                result = response.json()
                logger.info(f"TrackStudio processing successful: {result['status']}")
                
                # Download the tracking data
                tracking_data_filename = f"{output_name}_tracking_data.json"
                tracking_response = requests.get(
                    f"{self.server_url}/api/download/data/{tracking_data_filename}",
                    timeout=30
                )
                
                if tracking_response.status_code == 200:
                    tracking_json = tracking_response.json()
                    return self._parse_trackstudio_data(tracking_json)
                else:
                    logger.error(f"Failed to download tracking data: {tracking_response.status_code}")
                    return []
            else:
                logger.error(f"TrackStudio API error: {response.status_code} - {response.text}")
                return []
                
        except requests.exceptions.ConnectionError:
            logger.error("❌ Cannot connect to TrackStudio server!")
            logger.error(f"Make sure the server is running at: {self.server_url}")
            logger.error("Run: python trackstudio_server/basketball_tracking_server.py")
            raise ConnectionError("TrackStudio server not available")
        except requests.exceptions.Timeout:
            logger.error("TrackStudio processing timeout")
            return []
        except Exception as e:
            logger.error(f"TrackStudio processing error: {e}")
            return []
    
    def _parse_trackstudio_data(self, tracking_data: Dict) -> List[TrackingFrame]:
        """Parse TrackStudio tracking data into TrackingFrame objects"""
        try:
            tracking_frames = []
            
            for frame_data in tracking_data.get('tracking_frames', []):
                # Parse objects in frame
                objects = []
                
                for obj_data in frame_data.get('objects', []):
                    bbox_data = obj_data.get('bbox', {})
                    bbox = BoundingBox(
                        x=bbox_data.get('x', 0),
                        y=bbox_data.get('y', 0), 
                        width=bbox_data.get('width', 0),
                        height=bbox_data.get('height', 0),
                        confidence=bbox_data.get('confidence', 0.0)
                    )
                    
                    tracked_obj = TrackedObject(
                        id=obj_data.get('id', 0),
                        type=obj_data.get('type', 'unknown'),
                        bbox=bbox,
                        team=obj_data.get('team'),
                        jersey_number=obj_data.get('jersey_number')
                    )
                    objects.append(tracked_obj)
                
                tracking_frame = TrackingFrame(
                    frame_number=frame_data.get('frame_number', 0),
                    timestamp=frame_data.get('timestamp', 0.0),
                    objects=objects,
                    frame_width=frame_data.get('frame_width', 0),
                    frame_height=frame_data.get('frame_height', 0)
                )
                tracking_frames.append(tracking_frame)
            
            logger.info(f"Parsed {len(tracking_frames)} tracking frames from TrackStudio")
            return tracking_frames
            
        except Exception as e:
            logger.error(f"Error parsing TrackStudio data: {e}")
            return []

# Initialize TrackStudio integration
trackstudio = TrackStudioIntegration(config)
print("✅ TrackStudio Integration initialized")
print("🎯 Mode: Live TrackStudio Server")
print(f"🔗 Server URL: {config.TRACKSTUDIO_URL}")

## 5. Tracking Visualization

In [6]:
class TrackingVisualizer:
    """Creates visual overlays for tracking data"""
    
    def __init__(self, config: TrackingConfig):
        self.config = config
        self.colors = {
            'team_a': (0, 255, 0),     # Green
            'team_b': (0, 0, 255),     # Blue
            'ball': (0, 255, 255),     # Yellow
            'default': (255, 255, 255) # White
        }
    
    def create_tracking_video(self, video_path: str, tracking_frames: List[TrackingFrame], 
                             output_path: str) -> bool:
        """Create video with tracking overlays"""
        try:
            cap = cv2.VideoCapture(video_path)
            fps = int(cap.get(cv2.CAP_PROP_FPS))
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
            
            frame_num = 0
            tracking_dict = {tf.frame_number: tf for tf in tracking_frames}
            
            logger.info(f"Creating tracking video: {output_path}")
            
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                
                # Add tracking overlays if available for this frame
                if frame_num in tracking_dict:
                    frame = self._add_tracking_overlays(frame, tracking_dict[frame_num])
                
                out.write(frame)
                frame_num += 1
                
                if frame_num % 300 == 0:  # Log progress every 10 seconds at 30fps
                    logger.info(f"Processed {frame_num} frames")
            
            cap.release()
            out.release()
            
            logger.info(f"Tracking video created successfully: {output_path}")
            return True
            
        except Exception as e:
            logger.error(f"Error creating tracking video: {e}")
            return False
    
    def _add_tracking_overlays(self, frame: np.ndarray, tracking_frame: TrackingFrame) -> np.ndarray:
        """Add tracking overlays to a single frame"""
        overlay = frame.copy()
        
        for obj in tracking_frame.objects:
            # Get color based on object type/team
            if obj.type == 'ball':
                color = self.colors['ball']
            elif obj.type == 'player' and obj.team:
                color = self.colors.get(obj.team, self.colors['default'])
            else:
                color = self.colors['default']
            
            # Draw bounding box
            bbox = obj.bbox
            x1, y1 = int(bbox.x), int(bbox.y)
            x2, y2 = int(bbox.x + bbox.width), int(bbox.y + bbox.height)
            
            cv2.rectangle(overlay, (x1, y1), (x2, y2), color, 2)
            
            # Add object label
            if obj.type == 'ball':
                label = f"Ball {obj.bbox.confidence:.2f}"
            else:
                jersey = f"#{obj.jersey_number}" if obj.jersey_number else f"ID:{obj.id}"
                label = f"{jersey} {obj.bbox.confidence:.2f}"
            
            # Draw label background
            label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
            cv2.rectangle(overlay, (x1, y1 - label_size[1] - 5), 
                         (x1 + label_size[0], y1), color, -1)
            
            # Draw label text
            cv2.putText(overlay, label, (x1, y1 - 5), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
        
        # Add frame info
        frame_info = f"Frame: {tracking_frame.frame_number} | Time: {tracking_frame.timestamp:.1f}s"
        cv2.putText(overlay, frame_info, (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        return overlay

# Initialize visualizer
visualizer = TrackingVisualizer(config)
print("✅ Tracking Visualizer initialized")

✅ Tracking Visualizer initialized


## 6. Main Processing Pipeline

In [9]:
class BasketballTrackingPipeline:
    """Main pipeline for basketball video tracking preprocessing"""
    
    def __init__(self, config: TrackingConfig):
        self.config = config
        self.video_processor = video_processor
        self.trackstudio = trackstudio
        self.visualizer = visualizer
    
    def process_single_video(self, video_path: str, output_name: str = None) -> TrackingResult:
        """Process a single video with tracking"""
        start_time = time.time()
        
        if not output_name:
            output_name = Path(video_path).stem
        
        logger.info(f"Processing single video: {video_path}")
        
        try:
            # Step 1: Get video info
            video_info = self.video_processor.get_video_info(video_path)
            if not video_info:
                raise ValueError(f"Could not get video info for: {video_path}")
            
            logger.info(f"Video info: {video_info['duration']:.1f}s, {video_info['fps']:.1f}fps")
            
            # Step 2: Process with tracking
            logger.info("Step 2: Processing with tracking system...")
            tracking_frames = self.trackstudio.process_video_with_tracking(video_path)
            
            if not tracking_frames:
                raise ValueError("No tracking data generated")
            
            # Step 3: Create tracking-enhanced video
            output_video_path = self.config.OUTPUT_DIR / f"{output_name}_tracking_enhanced.mp4"
            logger.info("Step 3: Creating tracking-enhanced video...")
            
            success = self.visualizer.create_tracking_video(
                video_path, tracking_frames, str(output_video_path)
            )
            
            if not success:
                raise ValueError("Failed to create tracking video")
            
            # Step 4: Export tracking data as JSON
            tracking_data_path = self.config.OUTPUT_DIR / f"{output_name}_tracking_data.json"
            logger.info("Step 4: Exporting tracking data...")
            
            self._export_tracking_data(tracking_frames, tracking_data_path)
            
            processing_time = time.time() - start_time
            
            result = TrackingResult(
                video_path=video_path,
                output_video_path=str(output_video_path),
                tracking_data_path=str(tracking_data_path),
                processing_time=processing_time,
                total_frames=len(tracking_frames),
                tracking_frames=tracking_frames
            )
            
            logger.info(f"Single video processing completed in {processing_time:.2f}s")
            return result
            
        except Exception as e:
            logger.error(f"Error processing single video: {e}")
            raise
    
    def process_multi_angle_videos(self, video1_path: str, video2_path: str, 
                                   output_name: str = None) -> TrackingResult:
        """Process two videos from different angles with stitching"""
        start_time = time.time()
        
        if not output_name:
            output_name = f"multi_angle_{Path(video1_path).stem}_{Path(video2_path).stem}"
        
        logger.info(f"Processing multi-angle videos: {video1_path}, {video2_path}")
        
        try:
            # Step 1: Stitch videos side by side
            stitched_video_path = self.config.TEMP_DIR / f"{output_name}_stitched.mp4"
            logger.info("Step 1: Stitching videos side by side...")
            
            success = self.video_processor.stitch_videos_side_by_side(
                video1_path, video2_path, str(stitched_video_path)
            )
            
            if not success:
                raise ValueError("Failed to stitch videos")
            
            # Step 2: Process stitched video with tracking
            logger.info("Step 2: Processing stitched video with tracking...")
            result = self.process_single_video(str(stitched_video_path), output_name)
            
            # Clean up temporary stitched file
            try:
                stitched_video_path.unlink()
            except:
                pass
            
            processing_time = time.time() - start_time
            result.processing_time = processing_time
            
            logger.info(f"Multi-angle processing completed in {processing_time:.2f}s")
            return result
            
        except Exception as e:
            logger.error(f"Error processing multi-angle videos: {e}")
            raise
    
    def _export_tracking_data(self, tracking_frames: List[TrackingFrame], output_path: Path):
        """Export tracking data to JSON file"""
        try:
            # Convert tracking frames to serializable format
            tracking_data = {
                'export_timestamp': datetime.now().isoformat(),
                'total_frames': len(tracking_frames),
                'tracking_frames': [asdict(frame) for frame in tracking_frames]
            }
            
            with open(output_path, 'w') as f:
                json.dump(tracking_data, f, indent=2)
            
            logger.info(f"Tracking data exported: {output_path}")
            
        except Exception as e:
            logger.error(f"Error exporting tracking data: {e}")
    
    def print_result_summary(self, result: TrackingResult):
        """Print processing result summary"""
        print("\n" + "=" * 60)
        print("🏀 BASKETBALL TRACKING PREPROCESSING COMPLETE")
        print("=" * 60)
        
        print(f"\n📹 Input Video: {Path(result.video_path).name}")
        print(f"⏱️  Processing Time: {result.processing_time:.2f} seconds")
        print(f"🎯 Tracking Frames: {result.total_frames}")
        
        print("\n📁 OUTPUT FILES:")
        print(f"  🎥 Enhanced Video: {result.output_video_path}")
        print(f"  📊 Tracking Data: {result.tracking_data_path}")
        
        # Count objects
        if result.tracking_frames:
            sample_frame = result.tracking_frames[0]
            players = [obj for obj in sample_frame.objects if obj.type == 'player']
            balls = [obj for obj in sample_frame.objects if obj.type == 'ball']
            
            print(f"\n🎯 TRACKING INFO:")
            print(f"  👥 Players detected: {len(players)}")
            print(f"  🏀 Ball tracking: {'✅' if balls else '❌'}")
        
        print("\n📝 NEXT STEP:")
        print("  Run basketball_analysis_with_tracking.ipynb to analyze the enhanced video")

# Initialize main pipeline
pipeline = BasketballTrackingPipeline(config)
print("✅ Basketball Tracking Pipeline initialized")

✅ Basketball Tracking Pipeline initialized


## 7. Processing Interface

In [10]:
# Main processing interface
def process_basketball_videos():
    """Main function to process basketball videos"""
    
    # Find available videos in data directory
    data_dir = Path(config.DATA_DIR)
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv']
    available_videos = []
    
    for ext in video_extensions:
        available_videos.extend(list(data_dir.glob(f"*{ext}")))
    
    if not available_videos:
        print("❌ No video files found in data/ directory")
        print("Please add basketball videos to the data/ folder")
        return
    
    print(f"📹 Found {len(available_videos)} video file(s):")
    for i, video in enumerate(available_videos, 1):
        print(f"  {i}. {video.name}")
    
    try:
        if len(available_videos) >= 2:
            print("\n🎯 Processing multi-angle videos (using first 2 videos)")
            video1 = str(available_videos[0])
            video2 = str(available_videos[1])
            
            result = pipeline.process_multi_angle_videos(video1, video2)
            pipeline.print_result_summary(result)
            
        else:
            print("\n🎯 Processing single video")
            video_path = str(available_videos[0])
            
            result = pipeline.process_single_video(video_path)
            pipeline.print_result_summary(result)
    
    except Exception as e:
        print(f"❌ Processing error: {e}")
        logger.error(f"Processing failed: {e}")

# Run the processing
if __name__ == "__main__":
    process_basketball_videos()
else:
    print("\n" + "=" * 50)
    print("🏀 BASKETBALL TRACKING PREPROCESSOR READY")
    print("=" * 50)
    
    print("\n💡 TO PROCESS VIDEOS:")
    print("1. Add your basketball videos to the data/ folder")
    print("2. For best results, use 2 videos from different backboard angles")
    print("3. Videos should be synchronized (same game time)")
    print("4. Supported formats: .mp4, .avi, .mov, .mkv")
    print("5. Run: process_basketball_videos()")
    
    data_dir = Path(config.DATA_DIR)
    video_files = list(data_dir.glob("*.mp4")) + list(data_dir.glob("*.avi"))
    
    if video_files:
        print(f"\n📹 Found {len(video_files)} video file(s) in data/ folder")
        print("Ready to process! Run the cell above to start.")
    else:
        print("\n📁 data/ folder is empty")
        print("Please add basketball videos to get started")

2025-08-07 19:21:05,676 - INFO - Processing multi-angle videos: data/sample60s_video-2.mp4, data/sample60s_video-1.mp4
2025-08-07 19:21:05,677 - INFO - Step 1: Stitching videos side by side...


📹 Found 2 video file(s):
  1. sample60s_video-2.mp4
  2. sample60s_video-1.mp4

🎯 Processing multi-angle videos (using first 2 videos)


2025-08-07 19:22:03,103 - INFO - Videos stitched successfully: temp_tracking/multi_angle_sample60s_video-2_sample60s_video-1_stitched.mp4
2025-08-07 19:22:03,109 - INFO - Step 2: Processing stitched video with tracking...
2025-08-07 19:22:03,110 - INFO - Processing single video: temp_tracking/multi_angle_sample60s_video-2_sample60s_video-1_stitched.mp4
2025-08-07 19:22:03,394 - INFO - Video info: 60.5s, 60.0fps
2025-08-07 19:22:03,395 - INFO - Step 2: Processing with tracking system...
2025-08-07 19:22:03,395 - INFO - Using mock tracking system
2025-08-07 19:22:03,716 - INFO - Generated 726 tracking frames
2025-08-07 19:22:03,717 - INFO - Step 3: Creating tracking-enhanced video...
2025-08-07 19:22:03,780 - INFO - Creating tracking video: tracking_output/multi_angle_sample60s_video-2_sample60s_video-1_tracking_enhanced.mp4
2025-08-07 19:22:09,293 - INFO - Processed 300 frames
2025-08-07 19:22:14,566 - INFO - Processed 600 frames
2025-08-07 19:22:19,641 - INFO - Processed 900 frames
202


🏀 BASKETBALL TRACKING PREPROCESSING COMPLETE

📹 Input Video: multi_angle_sample60s_video-2_sample60s_video-1_stitched.mp4
⏱️  Processing Time: 120.49 seconds
🎯 Tracking Frames: 726

📁 OUTPUT FILES:
  🎥 Enhanced Video: tracking_output/multi_angle_sample60s_video-2_sample60s_video-1_tracking_enhanced.mp4
  📊 Tracking Data: tracking_output/multi_angle_sample60s_video-2_sample60s_video-1_tracking_data.json

🎯 TRACKING INFO:
  👥 Players detected: 3
  🏀 Ball tracking: ✅

📝 NEXT STEP:
  Run basketball_analysis_with_tracking.ipynb to analyze the enhanced video
