# YOLOv11 Object Detection and Tracking Tutorial

This comprehensive notebook demonstrates how to build a complete object detection and tracking system using YOLOv11 with Label Studio for data annotation.

## Features Covered:
- üéØ Custom dataset preparation with Label Studio
- ü§ñ YOLO 11 model training and fine-tuning
- üìπ Real-time camera object detection
- üîÑ Multi-object tracking with unique IDs
- üíæ Export tracking results and annotated videos

## Prerequisites:
- Python 3.8+
- Webcam or video file for testing
- Label Studio account (optional for advanced features)

## 1. Install Required Dependencies

First, let's install all the necessary packages for YOLO 11, Label Studio, and object tracking.

In [None]:
# Install core packages for YOLO 11 and computer vision
!pip install ultralytics>=8.0.196
!pip install opencv-python>=4.8.0
!pip install torch torchvision

# Install Label Studio and SDK for dataset management
!pip install label-studio>=1.9.0
!pip install label-studio-sdk>=0.0.31

# Install tracking and data processing libraries
!pip install filterpy>=1.4.5
!pip install scipy>=1.10.0
!pip install numpy>=1.24.0
!pip install pandas>=2.0.0
!pip install matplotlib>=3.7.0

# Additional utilities
!pip install tqdm pyyaml requests

print("‚úÖ All packages installed successfully!")

^C


## 2. Import Libraries and Setup

Import all necessary libraries and set up the environment.

In [None]:
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
import json
import time
from datetime import datetime
import yaml

# YOLO 11 and deep learning
from ultralytics import YOLO
import torch

# Label Studio SDK
from label_studio_sdk import Client

# Tracking libraries
from filterpy.kalman import KalmanFilter
from scipy.spatial.distance import cdist

# Utility libraries
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Set up matplotlib for notebook display
plt.rcParams['figure.figsize'] = (12, 8)
%matplotlib inline

print("‚úÖ Libraries imported successfully!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"OpenCV version: {cv2.__version__}")

# Create output directories
output_dirs = ['data', 'models', 'results', 'videos']
for dir_name in output_dirs:
    Path(dir_name).mkdir(exist_ok=True)
    
print("üìÅ Output directories created")

## 3. Setup Label Studio Connection

Configure the connection to Label Studio for dataset management and annotation.

In [None]:
# Label Studio configuration
LABEL_STUDIO_URL = "http://localhost:8080"
LABEL_STUDIO_API_TOKEN = ""  # Add your API token here
PROJECT_NAME = "YOLOv11_Simple_Detection_Dataset"

class LabelStudioManager:
    def __init__(self, url, api_token=None):
        self.url = url
        self.api_token = api_token
        self.client = None
        self.project = None
        
        if api_token:
            try:
                self.client = Client(url=url, api_key=api_token)
                print("‚úÖ Connected to Label Studio")
            except Exception as e:
                print(f"‚ùå Failed to connect to Label Studio: {e}")
        else:
            print("‚ö†Ô∏è No API token provided. Some features will be limited.")
    
    def create_project(self, project_name, label_config=None):
        if not self.client:
            print("‚ùå No Label Studio client available")
            return None
        
        if not label_config:
            # Simple 3-class object detection label config
            label_config = '''
            <View>
              <Image name="image" value="$image"/>
              <RectangleLabels name="label" toName="image">
                <Label value="person" background="red"/>
                <Label value="micro" background="blue"/>
                <Label value="singer" background="green"/>
              </RectangleLabels>
            </View>
            '''
        
        try:
            self.project = self.client.start_project(
                title=project_name,
                label_config=label_config,
                description="Simplified object detection dataset for person, microphone, and singer tracking"
            )
            print(f"‚úÖ Created project: {project_name}")
            return self.project
        except Exception as e:
            print(f"‚ùå Failed to create project: {e}")
            return None

# Initialize Label Studio manager
ls_manager = LabelStudioManager(LABEL_STUDIO_URL, LABEL_STUDIO_API_TOKEN)

# If you don't have Label Studio running, you can skip this section
# and work with sample data
print("üìù Label Studio setup complete!")

## 4. Load and Prepare Dataset from Label Studio

Fetch labeled data from Label Studio and convert annotations to YOLO format for training.

In [None]:
def convert_labelstudio_to_yolo(annotations, class_mapping):
    """Convert Label Studio annotations to YOLO format"""
    yolo_annotations = []
    
    for annotation in annotations:
        for result in annotation.get('result', []):
            if result['type'] == 'rectanglelabels':
                value = result['value']
                class_name = value['rectanglelabels'][0]
                
                if class_name not in class_mapping:
                    continue
                
                class_id = class_mapping[class_name]
                
                # Convert Label Studio coordinates to YOLO format
                # Label Studio uses percentages, YOLO needs normalized center coordinates
                x = value['x'] / 100.0
                y = value['y'] / 100.0
                width = value['width'] / 100.0
                height = value['height'] / 100.0
                
                # Convert to center coordinates
                x_center = x + width / 2
                y_center = y + height / 2
                
                yolo_annotations.append(f"{class_id} {x_center} {y_center} {width} {height}")
    
    return yolo_annotations

def prepare_yolo_dataset(output_dir="data/yolo_dataset"):
    """Prepare dataset in YOLO format"""
    output_path = Path(output_dir)
    
    # Create directory structure
    for split in ['train', 'val', 'test']:
        (output_path / 'images' / split).mkdir(parents=True, exist_ok=True)
        (output_path / 'labels' / split).mkdir(parents=True, exist_ok=True)
    
    # Define simplified class mapping for person, micro, and singer
    class_mapping = {
        'person': 0,
        'micro': 1,
        'singer': 2
    }
    
    # Create dataset.yaml file
    dataset_config = {
        'path': str(output_path.absolute()),
        'train': 'images/train',
        'val': 'images/val',
        'test': 'images/test',
        'nc': len(class_mapping),
        'names': list(class_mapping.keys())
    }
    
    with open(output_path / 'dataset.yaml', 'w') as f:
        yaml.dump(dataset_config, f)
    
    print(f"‚úÖ YOLO dataset structure created at: {output_path}")
    print(f"üìä Classes: {list(class_mapping.keys())}")
    
    return str(output_path / 'dataset.yaml'), class_mapping

# Create sample dataset structure (for demo purposes)
dataset_config_path, class_mapping = prepare_yolo_dataset()
print(f"Dataset config saved to: {dataset_config_path}")

# If you have Label Studio data, you can export and convert it here
# For now, we'll proceed with the pretrained YOLO model

## 5. Configure YOLO 11 Model

Load the YOLO 11 model and configure it for our detection and tracking system.

In [None]:
# YOLO model configuration
MODEL_SIZE = "yolo11n"  # Options: yolo11n, yolo11s, yolo11m, yolo11l, yolo11x
CONFIDENCE_THRESHOLD = 0.25
IOU_THRESHOLD = 0.45

class YOLODetector:
    def __init__(self, model_name=MODEL_SIZE, conf_thresh=CONFIDENCE_THRESHOLD, iou_thresh=IOU_THRESHOLD):
        """Initialize YOLO detector"""
        print(f"üîÑ Loading YOLO 11 model: {model_name}")
        self.model = YOLO(f"{model_name}.pt")
        self.conf_thresh = conf_thresh
        self.iou_thresh = iou_thresh
        
        # Check device
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print(f"üñ•Ô∏è Using device: {self.device}")
        
        # Get class names
        self.class_names = self.model.names
        print(f"üìã Model classes: {len(self.class_names)} classes")
        
    def detect(self, frame, verbose=False):
        """Run detection on a frame"""
        results = self.model(
            frame,
            conf=self.conf_thresh,
            iou=self.iou_thresh,
            verbose=verbose,
            device=self.device
        )
        
        detections = []
        for result in results:
            boxes = result.boxes
            if boxes is not None:
                for box in boxes:
                    # Extract detection data
                    xyxy = box.xyxy[0].cpu().numpy()
                    conf = float(box.conf[0])
                    class_id = int(box.cls[0])
                    class_name = self.class_names[class_id]
                    
                    detection = {
                        'bbox': xyxy,
                        'confidence': conf,
                        'class_id': class_id,
                        'class_name': class_name
                    }
                    detections.append(detection)
        
        return detections
    
    def draw_detections(self, frame, detections, draw_conf=True):
        """Draw detection boxes on frame"""
        annotated_frame = frame.copy()
        
        for detection in detections:
            bbox = detection['bbox']
            conf = detection['confidence']
            class_name = detection['class_name']
            
            # Draw bounding box
            x1, y1, x2, y2 = map(int, bbox)
            color = (0, 255, 0)  # Green
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
            
            # Draw label
            label = f"{class_name}"
            if draw_conf:
                label += f": {conf:.2f}"
            
            label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
            cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10), 
                         (x1 + label_size[0], y1), color, -1)
            cv2.putText(annotated_frame, label, (x1, y1 - 5), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        
        return annotated_frame

# Initialize YOLO detector
detector = YOLODetector()

print("‚úÖ YOLO 11 detector initialized!")
print(f"Available classes: {list(detector.class_names.values())[:10]}...")  # Show first 10 classes

## 6. Train Custom YOLO 11 Model (Optional)

If you have custom labeled data, you can fine-tune the YOLO 11 model on your dataset.

In [None]:
# Training configuration (uncomment to train a custom model)
TRAIN_CUSTOM_MODEL = False  # Set to True if you want to train on your data

def train_custom_yolo(dataset_config_path, epochs=100, batch_size=16, img_size=640):
    """Train a custom YOLO 11 model"""
    print(f"üèãÔ∏è Starting YOLO 11 training...")
    print(f"Dataset: {dataset_config_path}")
    print(f"Epochs: {epochs}, Batch size: {batch_size}, Image size: {img_size}")
    
    # Initialize model for training
    model = YOLO("yolo11n.pt")  # Start with pretrained weights
    
    # Train the model
    results = model.train(
        data=dataset_config_path,
        epochs=epochs,
        batch=batch_size,
        imgsz=img_size,
        project="models",
        name="custom_yolo11",
        exist_ok=True,
        verbose=True,
        save_period=10  # Save checkpoint every 10 epochs
    )
    
    # Return path to best model
    best_model_path = results.save_dir / "weights" / "best.pt"
    print(f"‚úÖ Training completed! Best model: {best_model_path}")
    
    return str(best_model_path)

def validate_model(model_path, dataset_config_path):
    """Validate the trained model"""
    model = YOLO(model_path)
    
    # Run validation
    results = model.val(data=dataset_config_path, verbose=True)
    
    # Print metrics
    print(f"üìä Validation Results:")
    print(f"mAP50: {results.box.map50:.4f}")
    print(f"mAP50-95: {results.box.map:.4f}")
    print(f"Precision: {results.box.mp:.4f}")
    print(f"Recall: {results.box.mr:.4f}")
    
    return results

if TRAIN_CUSTOM_MODEL:
    # Train custom model (only if you have labeled data)
    print("üéØ Training custom YOLO 11 model...")
    custom_model_path = train_custom_yolo(
        dataset_config_path,
        epochs=50,  # Reduced for demo
        batch_size=8,
        img_size=640
    )
    
    # Validate the model
    validation_results = validate_model(custom_model_path, dataset_config_path)
    
    # Update detector with custom model
    detector = YOLODetector(custom_model_path)
    print("‚úÖ Custom model loaded for detection!")
    
else:
    print("‚ÑπÔ∏è Skipping custom training - using pretrained YOLO 11 model")
    print("üí° Set TRAIN_CUSTOM_MODEL = True to train on your custom dataset")

## 7. Initialize Camera and Tracking System

Set up camera capture and initialize the multi-object tracking system.

In [None]:
# Tracking configuration
MAX_TRACK_AGE = 30
MIN_TRACK_HITS = 3
IOU_THRESHOLD = 0.3
PROXIMITY_THRESHOLD = 50  # pixels for person-micro proximity detection

class SimpleTracker:
    """Simple multi-object tracker with singer detection logic"""
    
    def __init__(self, max_age=MAX_TRACK_AGE, min_hits=MIN_TRACK_HITS, iou_threshold=IOU_THRESHOLD):
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        
        self.tracks = []
        self.track_id_counter = 0
        
    def calculate_iou(self, box1, box2):
        """Calculate Intersection over Union (IoU) between two boxes"""
        x1 = max(box1[0], box2[0])
        y1 = max(box1[1], box2[1])
        x2 = min(box1[2], box2[2])
        y2 = min(box1[3], box2[3])
        
        if x2 <= x1 or y2 <= y1:
            return 0.0
        
        intersection = (x2 - x1) * (y2 - y1)
        area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
        area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
        union = area1 + area2 - intersection
        
        return intersection / union if union > 0 else 0.0
    
    def calculate_distance(self, box1, box2):
        """Calculate distance between centers of two boxes"""
        center1_x = (box1[0] + box1[2]) / 2
        center1_y = (box1[1] + box1[3]) / 2
        center2_x = (box2[0] + box2[2]) / 2
        center2_y = (box2[1] + box2[3]) / 2
        
        return ((center1_x - center2_x) ** 2 + (center1_y - center2_y) ** 2) ** 0.5
    
    def detect_singers(self, detections):
        """Detect singers based on person-microphone proximity"""
        modified_detections = []
        person_indices = []
        micro_indices = []
        
        # Separate persons and microphones
        for i, detection in enumerate(detections):
            if detection['class_name'] == 'person':
                person_indices.append(i)
            elif detection['class_name'] == 'micro':
                micro_indices.append(i)
            else:
                modified_detections.append(detection)
        
        # Check each person for nearby microphone
        used_micros = set()
        for person_idx in person_indices:
            person_detection = detections[person_idx]
            closest_micro = None
            min_distance = float('inf')
            closest_micro_idx = None
            
            # Find closest microphone
            for micro_idx in micro_indices:
                if micro_idx in used_micros:
                    continue
                    
                micro_detection = detections[micro_idx]
                distance = self.calculate_distance(person_detection['bbox'], micro_detection['bbox'])
                
                if distance < min_distance and distance < PROXIMITY_THRESHOLD:
                    min_distance = distance
                    closest_micro = micro_detection
                    closest_micro_idx = micro_idx
            
            if closest_micro is not None:
                # Convert person with microphone to singer
                singer_detection = person_detection.copy()
                singer_detection['class_name'] = 'singer'
                singer_detection['class_id'] = 2  # Singer class ID
                singer_detection['original_class'] = 'person'
                singer_detection['has_micro'] = True
                singer_detection['micro_distance'] = min_distance
                
                modified_detections.append(singer_detection)
                used_micros.add(closest_micro_idx)
                
                # Still add the microphone as separate detection
                modified_detections.append(closest_micro)
            else:
                # Add person without microphone
                modified_detections.append(person_detection)
        
        # Add remaining unused microphones
        for micro_idx in micro_indices:
            if micro_idx not in used_micros:
                modified_detections.append(detections[micro_idx])
        
        return modified_detections
    
    def update(self, detections):
        """Update tracks with new detections"""
        # First, detect singers based on person-microphone proximity
        detections = self.detect_singers(detections)
        
        # Predict existing tracks
        for track in self.tracks:
            track['age'] += 1
            track['time_since_update'] += 1
        
        # Associate detections with tracks
        matched_tracks = []
        unmatched_detections = list(range(len(detections)))
        
        for i, track in enumerate(self.tracks):
            if track['time_since_update'] > self.max_age:
                continue
                
            best_match = -1
            best_iou = 0
            
            for j in unmatched_detections:
                iou = self.calculate_iou(track['bbox'], detections[j]['bbox'])
                if iou > best_iou and iou > self.iou_threshold:
                    best_iou = iou
                    best_match = j
            
            if best_match != -1:
                # Update track with matched detection
                track['bbox'] = detections[best_match]['bbox']
                track['confidence'] = detections[best_match]['confidence']
                track['class_name'] = detections[best_match]['class_name']
                track['hits'] += 1
                track['time_since_update'] = 0
                
                # Store additional singer info if available
                if 'has_micro' in detections[best_match]:
                    track['has_micro'] = detections[best_match]['has_micro']
                    track['micro_distance'] = detections[best_match].get('micro_distance', 0)
                
                matched_tracks.append(i)
                unmatched_detections.remove(best_match)
        
        # Create new tracks for unmatched detections
        for j in unmatched_detections:
            new_track = {
                'id': self.track_id_counter,
                'bbox': detections[j]['bbox'],
                'confidence': detections[j]['confidence'],
                'class_name': detections[j]['class_name'],
                'class_id': detections[j]['class_id'],
                'hits': 1,
                'age': 0,
                'time_since_update': 0
            }
            
            # Add singer-specific info if available
            if 'has_micro' in detections[j]:
                new_track['has_micro'] = detections[j]['has_micro']
                new_track['micro_distance'] = detections[j].get('micro_distance', 0)
                new_track['original_class'] = detections[j].get('original_class', 'person')
            
            self.tracks.append(new_track)
            self.track_id_counter += 1
        
        # Remove old tracks
        self.tracks = [track for track in self.tracks 
                      if track['time_since_update'] <= self.max_age]
        
        # Return active tracks
        active_tracks = [track for track in self.tracks
                        if track['hits'] >= self.min_hits and track['time_since_update'] == 0]
        
        return active_tracks

# Initialize tracker
tracker = SimpleTracker()
print("‚úÖ Object tracker initialized with singer detection!")

## 8. Implement Object Detection and Tracking Functions

Create the core functions for object detection and tracking integration.

In [None]:
def draw_tracked_objects(frame, tracked_objects, show_trails=False):
    """Draw tracked objects with IDs on frame"""
    annotated_frame = frame.copy()
    
    # Colors for specific classes: person=red, micro=blue, singer=green
    class_colors = {'person': (0, 0, 255), 'micro': (255, 0, 0), 'singer': (0, 255, 0)}
    # Fallback colors for different track IDs
    colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), 
              (255, 0, 255), (0, 255, 255), (128, 0, 128), (255, 165, 0)]
    
    for track in tracked_objects:
        bbox = track['bbox']
        track_id = track['id']
        class_name = track['class_name']
        confidence = track['confidence']
        
        # Get color based on class first, then track ID
        color = class_colors.get(class_name, colors[track_id % len(colors)])
        
        # Draw bounding box
        x1, y1, x2, y2 = map(int, bbox)
        cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
        
        # Draw label with track ID and additional info for singers
        label = f"ID:{track_id} {class_name}: {confidence:.2f}"
        if class_name == 'singer' and track.get('has_micro', False):
            micro_dist = track.get('micro_distance', 0)
            label += f" (üé§ {micro_dist:.0f}px)"
        
        label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
        
        # Background for label
        cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10), 
                     (x1 + label_size[0], y1), color, -1)
        
        # Text
        cv2.putText(annotated_frame, label, (x1, y1 - 5), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        
        # Draw center point
        center_x = int((x1 + x2) / 2)
        center_y = int((y1 + y2) / 2)
        cv2.circle(annotated_frame, (center_x, center_y), 4, color, -1)
        
        # Special indicator for singers
        if class_name == 'singer':
            # Draw microphone icon indicator
            cv2.putText(annotated_frame, "üé§", (x2 - 20, y1 + 15), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
    
    return annotated_frame

def draw_info_panel(frame, detections_count, tracks_count, fps):
    """Draw information panel on frame"""
    annotated_frame = frame.copy()
    h, w = frame.shape[:2]
    
    # Panel background
    panel_height = 120
    panel_width = 300
    overlay = annotated_frame.copy()
    cv2.rectangle(overlay, (10, 10), (10 + panel_width, 10 + panel_height), (0, 0, 0), -1)
    cv2.addWeighted(overlay, 0.7, annotated_frame, 0.3, 0, annotated_frame)
    
    # Panel border
    cv2.rectangle(annotated_frame, (10, 10), (10 + panel_width, 10 + panel_height), (255, 255, 255), 2)
    
    # Information text
    info_text = [
        f"FPS: {fps:.1f}",
        f"Detections: {detections_count}",
        f"Active Tracks: {tracks_count}",
        f"Time: {datetime.now().strftime('%H:%M:%S')}"
    ]
    
    y_offset = 35
    for text in info_text:
        cv2.putText(annotated_frame, text, (20, y_offset), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        y_offset += 25
    
    return annotated_frame

def process_frame(frame, detector, tracker, draw_info=True):
    """Process a single frame through detection and tracking pipeline"""
    start_time = time.time()
    
    # Run object detection
    detections = detector.detect(frame)
    
    # Filter detections to only include our 3 classes (person, micro, singer)
    filtered_detections = []
    for detection in detections:
        class_name = detection['class_name']
        # Map YOLO classes to our simplified classes
        if class_name in ['person', 'microphone', 'mic']:
            if class_name in ['microphone', 'mic']:
                detection['class_name'] = 'micro'
                detection['class_id'] = 1
            filtered_detections.append(detection)
    
    # Update tracker with filtered detections
    tracked_objects = tracker.update(filtered_detections)
    
    # Draw results
    annotated_frame = draw_tracked_objects(frame, tracked_objects)
    
    # Calculate FPS
    processing_time = time.time() - start_time
    fps = 1.0 / processing_time if processing_time > 0 else 0
    
    # Draw info panel
    if draw_info:
        annotated_frame = draw_info_panel(
            annotated_frame, 
            len(filtered_detections), 
            len(tracked_objects), 
            fps
        )
    
    return annotated_frame, tracked_objects, filtered_detections, fps

def test_detection_on_sample():
    """Test detection on a sample image with simulated objects"""
    # Create a sample image
    test_image = np.zeros((480, 640, 3), dtype=np.uint8)
    cv2.putText(test_image, "Simplified Detection: Person + Micro = Singer", 
                (50, 240), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    
    # Create fake detections to test the singer logic
    fake_detections = [
        {
            'bbox': np.array([100, 150, 200, 350]),  # Person
            'confidence': 0.85,
            'class_name': 'person',
            'class_id': 0
        },
        {
            'bbox': np.array([180, 200, 220, 240]),  # Microphone near person
            'confidence': 0.90,
            'class_name': 'micro',
            'class_id': 1
        },
        {
            'bbox': np.array([400, 100, 500, 300]),  # Another person
            'confidence': 0.75,
            'class_name': 'person',
            'class_id': 0
        }
    ]
    
    # Test singer detection
    tracked_objects = tracker.update(fake_detections)
    annotated_frame = draw_tracked_objects(test_image, tracked_objects)
    
    # Display results
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.imshow(cv2.cvtColor(test_image, cv2.COLOR_BGR2RGB))
    plt.title("Original Frame")
    plt.axis('off')
    
    plt.subplot(1, 2, 2)
    plt.imshow(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))
    plt.title("Processed Frame (Singer Detection)")
    plt.axis('off')
    
    plt.tight_layout()
    plt.show()
    
    print(f"üìä Detection Results:")
    print(f"   Original detections: {len(fake_detections)}")
    print(f"   Active tracks: {len(tracked_objects)}")
    for track in tracked_objects:
        class_info = track['class_name']
        if track.get('has_micro'):
            class_info += f" (with microphone, distance: {track.get('micro_distance', 0):.0f}px)"
        print(f"   Track {track['id']}: {class_info}")

# Test the detection pipeline
print("üß™ Testing simplified detection pipeline...")
test_detection_on_sample()

## 9. Real-time Camera Processing

Run the complete detection and tracking system on live camera feed.

In [None]:
# Camera processing configuration
RUN_CAMERA_DEMO = False  # Set to True to run camera demo
DEMO_DURATION = 30  # seconds
SAVE_VIDEO = True

# Simple camera manager class
class CameraManager:
    def __init__(self):
        self.cap = None
        
    def start_camera(self, camera_id=0):
        """Start camera capture"""
        self.cap = cv2.VideoCapture(camera_id)
        if self.cap.isOpened():
            return True
        return False
    
    def get_frame(self):
        """Get current frame from camera"""
        if self.cap is None:
            return None
        ret, frame = self.cap.read()
        return frame if ret else None
    
    def release(self):
        """Release camera resources"""
        if self.cap:
            self.cap.release()

# Initialize camera manager
camera_manager = CameraManager()

def run_camera_detection(duration=30, save_video=False, video_path="results/detection_output.mp4"):
    """Run real-time object detection and tracking on camera feed"""
    try:
        # Initialize camera
        print("üìπ Starting camera...")
        if not camera_manager.start_camera():
            print("‚ùå Failed to start camera")
            return [], []
        
        # Initialize video writer if saving
        video_writer = None
        if save_video:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            video_writer = cv2.VideoWriter(video_path, fourcc, 20.0, (640, 480))
            print(f"üíæ Saving video to: {video_path}")
        
        # Statistics
        frame_count = 0
        start_time = time.time()
        fps_history = []
        tracking_data = []
        
        print("üé¨ Starting detection and tracking...")
        print("Press 'q' to quit, 's' to save screenshot")
        
        while time.time() - start_time < duration:
            # Get frame from camera
            frame = camera_manager.get_frame()
            if frame is None:
                print("‚ö†Ô∏è No frame received from camera")
                break
            
            # Process frame
            annotated_frame, tracked_objects, detections, fps = process_frame(
                frame, detector, tracker
            )
            
            # Collect statistics
            frame_count += 1
            fps_history.append(fps)
            
            # Store tracking data
            for track in tracked_objects:
                tracking_data.append({
                    'frame': frame_count,
                    'timestamp': time.time(),
                    'track_id': track['id'],
                    'class_name': track['class_name'],
                    'confidence': track['confidence'],
                    'bbox': track['bbox'].tolist(),
                    'center_x': (track['bbox'][0] + track['bbox'][2]) / 2,
                    'center_y': (track['bbox'][1] + track['bbox'][3]) / 2,
                    'has_micro': track.get('has_micro', False),
                    'micro_distance': track.get('micro_distance', 0)
                })
            
            # Display frame (in Jupyter, we'll save frames instead)
            if save_video and video_writer:
                # Resize frame to match video writer dimensions
                resized_frame = cv2.resize(annotated_frame, (640, 480))
                video_writer.write(resized_frame)
            
            # Show progress
            if frame_count % 30 == 0:  # Every 30 frames
                avg_fps = np.mean(fps_history[-30:])
                singers_count = len([t for t in tracked_objects if t['class_name'] == 'singer'])
                print(f"üìä Frame {frame_count}: {avg_fps:.1f} FPS, "
                      f"{len(tracked_objects)} tracks, {singers_count} singers detected")
            
            # Break if duration exceeded
            if time.time() - start_time > duration:
                break
        
        # Cleanup
        camera_manager.release()
        if video_writer:
            video_writer.release()
        
        # Print final statistics
        total_time = time.time() - start_time
        avg_fps = np.mean(fps_history) if fps_history else 0
        
        print(f"\nüìà Final Statistics:")
        print(f"   Total frames: {frame_count}")
        print(f"   Duration: {total_time:.1f} seconds")
        print(f"   Average FPS: {avg_fps:.2f}")
        print(f"   Total tracking records: {len(tracking_data)}")
        
        return tracking_data, fps_history
        
    except Exception as e:
        print(f"‚ùå Error during camera processing: {e}")
        camera_manager.release()
        if video_writer:
            video_writer.release()
        return [], []

def simulate_camera_demo():
    """Simulate camera demo with realistic person/microphone scenarios"""
    print("üé≠ Simulating camera demo with singer detection...")
    
    # Create sample frames with simulated detections
    tracking_data = []
    fps_list = []
    
    for i in range(20):
        # Simulate detections for each frame
        fake_detections = []
        
        # Person 1 - moves across screen, gets microphone in middle frames
        person1_x = 50 + i * 15
        person1_bbox = np.array([person1_x, 150, person1_x + 60, 300])
        fake_detections.append({
            'bbox': person1_bbox,
            'confidence': 0.85,
            'class_name': 'person',
            'class_id': 0
        })
        
        # Microphone appears near person 1 in frames 8-15 (simulating singing)
        if 8 <= i <= 15:
            micro1_x = person1_x + 30
            micro1_bbox = np.array([micro1_x, 180, micro1_x + 20, 220])
            fake_detections.append({
                'bbox': micro1_bbox,
                'confidence': 0.90,
                'class_name': 'micro',
                'class_id': 1
            })
        
        # Person 2 - stationary, no microphone
        if i >= 5:  # appears later in the sequence
            person2_bbox = np.array([400, 100, 460, 280])
            fake_detections.append({
                'bbox': person2_bbox,
                'confidence': 0.75,
                'class_name': 'person',
                'class_id': 0
            })
        
        # Standalone microphone (not near anyone)
        if i >= 10:
            standalone_micro_bbox = np.array([500, 350, 520, 390])
            fake_detections.append({
                'bbox': standalone_micro_bbox,
                'confidence': 0.80,
                'class_name': 'micro',
                'class_id': 1
            })
        
        # Update tracker with fake detections
        tracked_objects = tracker.update(fake_detections)
        
        # Simulate FPS
        fps_list.append(25.0 + np.random.uniform(-5, 5))
        
        # Store tracking data
        for track in tracked_objects:
            tracking_data.append({
                'frame': i,
                'track_id': track['id'],
                'class_name': track['class_name'],
                'confidence': track['confidence'],
                'center_x': (track['bbox'][0] + track['bbox'][2]) / 2,
                'center_y': (track['bbox'][1] + track['bbox'][3]) / 2,
                'has_micro': track.get('has_micro', False),
                'micro_distance': track.get('micro_distance', 0)
            })
    
    return tracking_data, fps_list

# Run camera demo or simulation
if RUN_CAMERA_DEMO:
    print("üöÄ Starting real camera demo...")
    print(f"‚è±Ô∏è Duration: {DEMO_DURATION} seconds")
    
    # Make sure results directory exists
    Path("results").mkdir(exist_ok=True)
    
    tracking_data, fps_history = run_camera_detection(
        duration=DEMO_DURATION,
        save_video=SAVE_VIDEO,
        video_path="results/detection_output.mp4"
    )
else:
    print("üîÑ Running simulation demo (set RUN_CAMERA_DEMO=True for real camera)")
    tracking_data, fps_history = simulate_camera_demo()

print(f"‚úÖ Demo completed! Collected {len(tracking_data)} tracking records")

# Show singer detection summary
if tracking_data:
    df = pd.DataFrame(tracking_data)
    if 'class_name' in df.columns:
        class_summary = df['class_name'].value_counts()
        print(f"\nüìä Object Detection Summary:")
        for class_name, count in class_summary.items():
            print(f"   {class_name}: {count} detections")
        
        # Singer-specific summary
        singer_data = df[df['class_name'] == 'singer']
        if len(singer_data) > 0:
            print(f"\nüé§ Singer Detection Details:")
            print(f"   Singer tracks: {singer_data['track_id'].nunique()}")
            print(f"   Total singer detections: {len(singer_data)}")
            if 'micro_distance' in singer_data.columns:
                avg_distance = singer_data[singer_data['micro_distance'] > 0]['micro_distance'].mean()
                print(f"   Average microphone distance: {avg_distance:.1f} pixels")

## 10. Display Results with Bounding Boxes and Tracking IDs

Visualize the detection and tracking results with detailed annotations.

In [None]:
# Visualization and analysis of results
def analyze_tracking_results(tracking_data):
    """Analyze tracking results and create visualizations"""
    
    if not tracking_data:
        print("‚ö†Ô∏è No tracking data available")
        return
    
    # Convert to DataFrame for analysis
    df = pd.DataFrame(tracking_data)
    
    print(f"üìä Tracking Analysis:")
    print(f"   Total tracking records: {len(df)}")
    print(f"   Unique objects tracked: {df['track_id'].nunique()}")
    print(f"   Frames processed: {df['frame'].nunique()}")
    print(f"   Object classes detected: {df['class_name'].unique()}")
    
    # Class distribution
    class_counts = df['class_name'].value_counts()
    print(f"\nüè∑Ô∏è Class Distribution:")
    for class_name, count in class_counts.items():
        print(f"   {class_name}: {count} detections")
    
    # Track duration analysis
    track_durations = df.groupby('track_id')['frame'].agg(['min', 'max', 'count'])
    track_durations['duration'] = track_durations['max'] - track_durations['min'] + 1
    
    print(f"\n‚è±Ô∏è Track Duration Statistics:")
    print(f"   Average track duration: {track_durations['duration'].mean():.1f} frames")
    print(f"   Longest track: {track_durations['duration'].max()} frames")
    print(f"   Shortest track: {track_durations['duration'].min()} frames")
    
    # Create visualizations
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # 1. Class distribution pie chart
    if len(class_counts) > 0:
        axes[0, 0].pie(class_counts.values, labels=class_counts.index, autopct='%1.1f%%')
        axes[0, 0].set_title('Object Class Distribution')
    else:
        axes[0, 0].text(0.5, 0.5, 'No data available', ha='center', va='center')
        axes[0, 0].set_title('Object Class Distribution')
    
    # 2. Track duration histogram
    if len(track_durations) > 0:
        axes[0, 1].hist(track_durations['duration'], bins=10, edgecolor='black')
        axes[0, 1].set_title('Track Duration Distribution')
        axes[0, 1].set_xlabel('Duration (frames)')
        axes[0, 1].set_ylabel('Number of tracks')
    else:
        axes[0, 1].text(0.5, 0.5, 'No data available', ha='center', va='center')
        axes[0, 1].set_title('Track Duration Distribution')
    
    # 3. Detections per frame
    if len(df) > 0:
        detections_per_frame = df.groupby('frame').size()
        axes[1, 0].plot(detections_per_frame.index, detections_per_frame.values)
        axes[1, 0].set_title('Detections per Frame')
        axes[1, 0].set_xlabel('Frame')
        axes[1, 0].set_ylabel('Number of detections')
    else:
        axes[1, 0].text(0.5, 0.5, 'No data available', ha='center', va='center')
        axes[1, 0].set_title('Detections per Frame')
    
    # 4. Confidence distribution
    if 'confidence' in df.columns and len(df) > 0:
        axes[1, 1].hist(df['confidence'], bins=20, edgecolor='black')
        axes[1, 1].set_title('Confidence Score Distribution')
        axes[1, 1].set_xlabel('Confidence')
        axes[1, 1].set_ylabel('Frequency')
    else:
        axes[1, 1].text(0.5, 0.5, 'No confidence data', ha='center', va='center')
        axes[1, 1].set_title('Confidence Score Distribution')
    
    plt.tight_layout()
    plt.show()
    
    return df

def create_tracking_trajectory_plot(tracking_data):
    """Create trajectory plot showing object movements"""
    
    if not tracking_data:
        print("‚ö†Ô∏è No tracking data for trajectory plot")
        return
    
    df = pd.DataFrame(tracking_data)
    
    if 'center_x' not in df.columns or 'center_y' not in df.columns:
        print("‚ö†Ô∏è No position data available for trajectory plot")
        return
    
    plt.figure(figsize=(12, 8))
    
    # Plot trajectory for each track
    unique_tracks = df['track_id'].unique()
    colors = plt.cm.tab10(np.linspace(0, 1, len(unique_tracks)))
    
    for i, track_id in enumerate(unique_tracks):
        track_data = df[df['track_id'] == track_id]
        
        if len(track_data) > 1:
            plt.plot(track_data['center_x'], track_data['center_y'], 
                    color=colors[i], linewidth=2, alpha=0.7, 
                    label=f'Track {track_id}')
            
            # Mark start and end points
            plt.scatter(track_data['center_x'].iloc[0], track_data['center_y'].iloc[0],
                       color=colors[i], s=100, marker='o', edgecolor='black')
            plt.scatter(track_data['center_x'].iloc[-1], track_data['center_y'].iloc[-1],
                       color=colors[i], s=100, marker='s', edgecolor='black')
    
    plt.title('Object Tracking Trajectories')
    plt.xlabel('X Position (pixels)')
    plt.ylabel('Y Position (pixels)')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, alpha=0.3)
    plt.gca().invert_yaxis()  # Invert Y axis to match image coordinates
    
    # Add legend for markers
    plt.scatter([], [], c='gray', s=100, marker='o', 
               edgecolor='black', label='Start')
    plt.scatter([], [], c='gray', s=100, marker='s', 
               edgecolor='black', label='End')
    
    plt.tight_layout()
    plt.show()

def plot_performance_metrics(fps_history):
    """Plot performance metrics"""
    
    if not fps_history:
        print("‚ö†Ô∏è No FPS data available")
        return
    
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(fps_history)
    plt.title('Real-time Performance (FPS)')
    plt.xlabel('Frame')
    plt.ylabel('FPS')
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    plt.hist(fps_history, bins=20, edgecolor='black')
    plt.title('FPS Distribution')
    plt.xlabel('FPS')
    plt.ylabel('Frequency')
    plt.axvline(np.mean(fps_history), color='red', linestyle='--', 
               label=f'Mean: {np.mean(fps_history):.1f} FPS')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print(f"üìà Performance Summary:")
    print(f"   Average FPS: {np.mean(fps_history):.2f}")
    print(f"   Min FPS: {np.min(fps_history):.2f}")
    print(f"   Max FPS: {np.max(fps_history):.2f}")
    print(f"   FPS Std Dev: {np.std(fps_history):.2f}")

# Analyze results
print("üìä Analyzing tracking results...")
df_results = analyze_tracking_results(tracking_data)

print("\nüõ§Ô∏è Creating trajectory visualization...")
create_tracking_trajectory_plot(tracking_data)

print("\nüìà Performance analysis...")
plot_performance_metrics(fps_history)

## 11. Save Tracking Results

Export tracking data and results for further analysis and record keeping.

In [None]:
def save_tracking_results(tracking_data, fps_history, output_dir="results"):   
    """Save all tracking results and analysis"""   
    output_path = Path(output_dir)    
    output_path.mkdir(exist_ok=True)       
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    # Save tracking data as CSV
    if tracking_data:        
        df = pd.DataFrame(tracking_data)
        csv_path = output_path / f"tracking_data_{timestamp}.csv"
        df.to_csv(csv_path, index=False)
        print(f"üíæ Tracking data saved to: {csv_path}")
        
        # Save summary statistics
        summary_path = output_path / f"tracking_summary_{timestamp}.txt"
        with open(summary_path, 'w') as f:
            f.write(f"YOLO 11 Object Detection and Tracking Results\n")
            f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"="*50 + "\n\n")
            
            f.write(f"Dataset Statistics:\n")
            f.write(f"  Total tracking records: {len(df)}\n")
            f.write(f"  Unique objects tracked: {df['track_id'].nunique()}\n")
            f.write(f"  Frames processed: {df['frame'].nunique()}\n")
            f.write(f"  Classes detected: {', '.join(df['class_name'].unique())}\n")
            
            if 'confidence' in df.columns:
                f.write(f"\nConfidence Statistics:\n")
                f.write(f"  Average confidence: {df['confidence'].mean():.3f}\n")
                f.write(f"  Min confidence: {df['confidence'].min():.3f}\n")
                f.write(f"  Max confidence: {df['confidence'].max():.3f}\n")
            
            # Track duration analysis
            track_durations = df.groupby('track_id')['frame'].agg(['min', 'max', 'count'])
            track_durations['duration'] = track_durations['max'] - track_durations['min'] + 1
            
            f.write(f"\nTracking Performance:\n")
            f.write(f"  Average track duration: {track_durations['duration'].mean():.1f} frames\n")
            f.write(f"  Longest track: {track_durations['duration'].max()} frames\n")
            f.write(f"  Shortest track: {track_durations['duration'].min()} frames\n")
            
        print(f"üìã Summary report saved to: {summary_path}")
    else:
        csv_path = None
        summary_path = None
    
    # Save FPS data
    if fps_history:
        fps_df = pd.DataFrame({
            'frame': range(len(fps_history)),
            'fps': fps_history
        })
        fps_path = output_path / f"fps_data_{timestamp}.csv"
        fps_df.to_csv(fps_path, index=False)
        print(f"‚ö° FPS data saved to: {fps_path}")
    else:
        fps_path = None
    
    # Save configuration snapshot
    config_snapshot = {
        'model': {
            'name': MODEL_SIZE,
            'confidence_threshold': CONFIDENCE_THRESHOLD,
            'iou_threshold': IOU_THRESHOLD
        },
        'tracking': {
            'max_age': MAX_TRACK_AGE,
            'min_hits': MIN_TRACK_HITS,
            'iou_threshold': IOU_THRESHOLD
        },
        'processing': {
            'timestamp': timestamp,
            'total_frames': len(fps_history) if fps_history else 0,
            'avg_fps': np.mean(fps_history) if fps_history else 0
        }
    }
    
    config_path = output_path / f"config_snapshot_{timestamp}.json"
    with open(config_path, 'w') as f:
        json.dump(config_snapshot, f, indent=2)
    print(f"‚öôÔ∏è Configuration snapshot saved to: {config_path}")
    
    return {
        'tracking_data_path': csv_path if tracking_data else None,
        'summary_path': summary_path if tracking_data else None,
        'fps_data_path': fps_path if fps_history else None,
        'config_path': config_path
    }

def export_to_label_studio_format(tracking_data, output_path="results/labelstudio_export.json"):
    """Export tracking results in Label Studio import format"""
    
    if not tracking_data:
        print("‚ö†Ô∏è No tracking data to export")
        return None
    
    df = pd.DataFrame(tracking_data)
    
    # Group by frame
    label_studio_tasks = []
    
    for frame_num in df['frame'].unique():
        frame_data = df[df['frame'] == frame_num]
        
        # Create task for this frame
        task = {
            "data": {
                "image": f"frame_{frame_num:06d}.jpg"  # Placeholder image name
            },
            "predictions": [{
                "model_version": "yolo11",
                "result": []
            }]
        }
        
        # Add annotations for each detection in this frame
        for _, row in frame_data.iterrows():
            if 'bbox' in row and isinstance(row['bbox'], list):
                x1, y1, x2, y2 = row['bbox']
                
                # Convert to Label Studio format (percentages)
                # Assuming frame size of 640x480 (you may need to adjust)
                frame_width, frame_height = 640, 480
                
                x_percent = (x1 / frame_width) * 100
                y_percent = (y1 / frame_height) * 100
                width_percent = ((x2 - x1) / frame_width) * 100
                height_percent = ((y2 - y1) / frame_height) * 100
                
                annotation = {
                    "from_name": "label",
                    "to_name": "image",
                    "type": "rectanglelabels",
                    "value": {
                        "x": x_percent,
                        "y": y_percent,
                        "width": width_percent,
                        "height": height_percent,
                        "rectanglelabels": [row['class_name']]
                    },
                    "score": float(row.get('confidence', 1.0))
                }
                
                task["predictions"][0]["result"].append(annotation)
        
        label_studio_tasks.append(task)
    
    # Save to JSON file
    output_file = Path(output_path)
    output_file.parent.mkdir(parents=True, exist_ok=True)
    
    with open(output_file, 'w') as f:
        json.dump(label_studio_tasks, f, indent=2)
    
    print(f"üì§ Label Studio export saved to: {output_file}")
    print(f"   Total tasks: {len(label_studio_tasks)}")
    
    return str(output_file)

# Save all results
print("üíæ Saving tracking results and analysis...")
Path("results").mkdir(exist_ok=True)

saved_files = save_tracking_results(tracking_data, fps_history)

print("\nüì§ Exporting to Label Studio format...")
labelstudio_export_path = export_to_label_studio_format(tracking_data)

print("\n‚úÖ All results saved successfully!")
print("\nüìÅ Generated Files:")
for key, path in saved_files.items():
    if path:
        print(f"   {key}: {path}")
        
if labelstudio_export_path:
    print(f"   labelstudio_export: {labelstudio_export_path}")

print("\nüéâ Tutorial completed successfully!")
print("\nüìù Next Steps:")
print("   1. Run with real camera data (set RUN_CAMERA_DEMO=True)")
print("   2. Train custom models with your labeled data")
print("   3. Integrate with Label Studio for continuous improvement")
print("   4. Optimize performance for your specific use case")
print("   5. Deploy as a service using the provided API code")