# Multi-Object Tracking using YOLO11 + DeepSort

This notebook demonstrates multi-object tracking for person detection using YOLO11 and DeepSort.

**Objectives:**
- Track multiple persons across video frames with unique IDs
- Maintain track identity even during occlusions
- Visualize tracking trajectories
- Analyze track persistence and lifetime

**Models:** 
- Detection: YOLO11s (small variant - optimized for CPU)
- Tracking: DeepSort (Deep Learning + Hungarian Algorithm)

## 1. Setup & Installation

In [None]:
# Install required packages (run once)
# !pip install ultralytics opencv-python matplotlib pandas tqdm deep-sort-realtime

In [1]:
# Import libraries
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import cm
from pathlib import Path
from tqdm import tqdm
import json
from collections import defaultdict

from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# Set up paths
PROJECT_ROOT = Path("../")
DATA_DIR = PROJECT_ROOT / "data"
OUTPUT_DIR = PROJECT_ROOT / "outputs" / "tracking"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print("Libraries imported successfully!")

Libraries imported successfully!


## 2. Load Models

In [2]:
# Load YOLO11s for detection
yolo_model = YOLO("yolo11s.pt")
print(f"YOLO Model: {yolo_model.model_name}")

# Initialize DeepSort tracker
tracker = DeepSort(
    max_age=30,              # Maximum frames to keep track alive without detections
    n_init=3,                # Number of consecutive detections before track is confirmed
    nms_max_overlap=1.0,     # NMS threshold
    max_cosine_distance=0.3, # Maximum cosine distance for feature matching
    nn_budget=None,          # Maximum size of feature gallery
    embedder="mobilenet",    # Feature extractor for ReID
    embedder_gpu=False       # Use CPU for embeddings
)

print(f"DeepSort tracker initialized")
print(f"  Max age: 30 frames")
print(f"  Init frames: 3")
print(f"  Feature extractor: MobileNet (CPU)")

YOLO Model: yolo11s.pt


  import pkg_resources


DeepSort tracker initialized
  Max age: 30 frames
  Init frames: 3
  Feature extractor: MobileNet (CPU)


## 3. Helper Functions

In [None]:
def get_color_for_id(track_id, max_id=100):
    """
    Generate consistent color for each track ID.
    
    Args:
        track_id: Unique track identifier
        max_id: Maximum expected ID for color normalization
    
    Returns:
        color: BGR color tuple
    """
    # Use colormap for consistent colors
    colormap = cm.get_cmap('hsv')
    normalized_id = (track_id % max_id) / max_id
    color_rgb = colormap(normalized_id)[:3]
    color_bgr = tuple(int(c * 255) for c in reversed(color_rgb))
    return color_bgr


def draw_tracks(frame, tracks):
    """
    Draw bounding boxes and IDs for tracks on frame.
    
    Args:
        frame: Input frame
        tracks: List of track objects from DeepSort
    
    Returns:
        annotated_frame: Frame with track visualizations
    """
    annotated = frame.copy()
    
    for track in tracks:
        if not track.is_confirmed():
            continue
        
        # Convert track_id to int to avoid string formatting issues
        track_id = int(track.track_id)
        ltrb = track.to_ltrb()  # [left, top, right, bottom]
        
        x1, y1, x2, y2 = map(int, ltrb)
        
        # Get color for this ID
        color = get_color_for_id(track_id)
        
        # Draw bounding box
        cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
        
        # Draw ID label
        label = f"ID: {track_id}"
        label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        label_y = max(y1 - 10, label_size[1] + 10)
        
        # Background for label
        cv2.rectangle(annotated, 
                     (x1, label_y - label_size[1] - 5), 
                     (x1 + label_size[0] + 5, label_y + 5), 
                     color, -1)
        
        # Label text
        cv2.putText(annotated, label, (x1 + 2, label_y), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
    
    return annotated


def draw_trajectories(frame, trajectories, max_history=30):
    """
    Draw trajectory paths for tracks.
    
    Args:
        frame: Input frame
        trajectories: Dictionary of track_id -> list of (x, y) centers
        max_history: Maximum number of points to show in trajectory
    
    Returns:
        annotated_frame: Frame with trajectory visualization
    """
    annotated = frame.copy()
    
    for track_id, points in trajectories.items():
        if len(points) < 2:
            continue
        
        # Get recent points
        recent_points = points[-max_history:]
        color = get_color_for_id(track_id)
        
        # Draw line segments
        for i in range(len(recent_points) - 1):
            pt1 = tuple(map(int, recent_points[i]))
            pt2 = tuple(map(int, recent_points[i + 1]))
            
            # Fade out older points
            alpha = (i + 1) / len(recent_points)
            thickness = max(1, int(3 * alpha))
            
            cv2.line(annotated, pt1, pt2, color, thickness)
        
        # Draw current position as circle
        if recent_points:
            current = tuple(map(int, recent_points[-1]))
            cv2.circle(annotated, current, 4, color, -1)
    
    return annotated

## 4. Video Tracking

In [None]:
def track_video(video_path, output_path, conf_threshold=0.25, show_trajectories=True):
    """
    Track persons in a video file.
    
    Args:
        video_path: Path to input video
        output_path: Path to save output video
        conf_threshold: Detection confidence threshold
        show_trajectories: Whether to draw trajectory paths
    
    Returns:
        tracking_data: Dictionary with tracking statistics
    """
    video_path = Path(video_path)
    output_path = Path(output_path)
    
    # Open video
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise ValueError(f"Could not open video: {video_path}")
    
    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video: {video_path.name}")
    print(f"  Resolution: {width}x{height}")
    print(f"  FPS: {fps}")
    print(f"  Total frames: {total_frames}")
    
    # Setup video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
    
    # Tracking data structures
    trajectories = defaultdict(list)  # track_id -> [(x, y), ...]
    track_lifetimes = {}  # track_id -> (first_frame, last_frame)
    frame_counts = []  # persons per frame
    
    # Reset tracker for new video
    tracker_local = DeepSort(
        max_age=30,
        n_init=3,
        nms_max_overlap=1.0,
        max_cosine_distance=0.3,
        nn_budget=None,
        embedder="mobilenet",
        embedder_gpu=False
    )
    
    for frame_idx in tqdm(range(total_frames), desc="Tracking frames"):
        ret, frame = cap.read()
        if not ret:
            break
        
        # Run YOLO detection
        results = yolo_model.predict(
            source=frame,
            classes=[0],  # Person only
            conf=conf_threshold,
            verbose=False
        )
        
        # Extract detections for DeepSort
        detections = []
        if results[0].boxes is not None:
            for box in results[0].boxes:
                bbox = box.xyxy[0].cpu().numpy()  # [x1, y1, x2, y2]
                conf = float(box.conf[0])
                
                # Convert to [left, top, width, height] for DeepSort
                x1, y1, x2, y2 = bbox
                w, h = x2 - x1, y2 - y1
                detection = ([x1, y1, w, h], conf, 'person')
                detections.append(detection)
        
        # Update tracker
        tracks = tracker_local.update_tracks(detections, frame=frame)
        
        # Count active tracks
        active_tracks = [t for t in tracks if t.is_confirmed()]
        frame_counts.append(len(active_tracks))
        
        # Update trajectories and lifetimes
        for track in active_tracks:
            # Convert track_id to int to avoid string formatting issues
            track_id = int(track.track_id)
            
            # Get center point
            ltrb = track.to_ltrb()
            cx = (ltrb[0] + ltrb[2]) / 2
            cy = (ltrb[1] + ltrb[3]) / 2
            trajectories[track_id].append((cx, cy))
            
            # Update lifetime
            if track_id not in track_lifetimes:
                track_lifetimes[track_id] = [frame_idx, frame_idx]
            else:
                track_lifetimes[track_id][1] = frame_idx
        
        # Draw tracks
        annotated = draw_tracks(frame, tracks)
        
        # Draw trajectories if enabled
        if show_trajectories:
            annotated = draw_trajectories(annotated, trajectories)
        
        # Add frame info
        info_text = f"Frame: {frame_idx+1}/{total_frames} | Tracks: {len(active_tracks)}"
        cv2.putText(annotated, info_text, (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        
        # Write frame
        out.write(annotated)
    
    cap.release()
    out.release()
    
    print(f"\nTracking complete!")
    print(f"Output saved to: {output_path}")
    
    return {
        'frame_counts': frame_counts,
        'trajectories': dict(trajectories),
        'track_lifetimes': track_lifetimes,
        'total_tracks': len(track_lifetimes),
        'total_frames': total_frames
    }

## 5. Process Video

In [5]:
# Check for available videos
video_dir = DATA_DIR / "videos"
video_dir.mkdir(parents=True, exist_ok=True)

video_extensions = [".mp4", ".avi", ".mov", ".mkv"]
videos = [f for f in video_dir.iterdir() if f.suffix.lower() in video_extensions]

if videos:
    print(f"Found {len(videos)} video(s):")
    for v in videos:
        print(f"  - {v.name}")
else:
    print("No videos found in data/videos/")
    print("Please add a video file to test tracking.")
    print("\nYou can download a sample video using:")
    print('  !wget -O ../data/videos/sample.mp4 "YOUR_VIDEO_URL"')

Found 1 video(s):
  - Bangkok.mp4


In [6]:
# Process video if available
if videos:
    video_path = videos[0]
    output_path = OUTPUT_DIR / f"{video_path.stem}_tracked.mp4"
    
    tracking_data = track_video(
        video_path=video_path,
        output_path=output_path,
        conf_threshold=0.25,
        show_trajectories=True
    )
else:
    print("Skipping video processing - no videos available")

Video: Bangkok.mp4
  Resolution: 1280x720
  FPS: 25
  Total frames: 650


  colormap = cm.get_cmap('hsv')
Tracking frames:   0%|          | 2/650 [00:03<17:48,  1.65s/it]


TypeError: not all arguments converted during string formatting

## 6. Tracking Statistics & Visualization

In [None]:
if videos and 'tracking_data' in locals():
    # Plot track count over time
    frames = list(range(len(tracking_data['frame_counts'])))
    counts = tracking_data['frame_counts']
    
    plt.figure(figsize=(14, 5))
    
    # Subplot 1: Track count over time
    plt.subplot(1, 2, 1)
    plt.plot(frames, counts, 'b-', linewidth=1, alpha=0.7)
    plt.fill_between(frames, counts, alpha=0.3)
    plt.xlabel('Frame')
    plt.ylabel('Number of Active Tracks')
    plt.title('Active Tracks Over Time')
    plt.grid(True, alpha=0.3)
    
    # Subplot 2: Track lifetime distribution
    plt.subplot(1, 2, 2)
    lifetimes = [(end - start + 1) for start, end in tracking_data['track_lifetimes'].values()]
    plt.hist(lifetimes, bins=20, edgecolor='black', alpha=0.7)
    plt.xlabel('Track Lifetime (frames)')
    plt.ylabel('Number of Tracks')
    plt.title('Track Lifetime Distribution')
    plt.axvline(np.mean(lifetimes), color='r', linestyle='--', 
                label=f'Mean: {np.mean(lifetimes):.1f} frames')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig(OUTPUT_DIR / "tracking_statistics.png", dpi=150, bbox_inches='tight')
    plt.show()
    
    # Print statistics
    print(f"\n=== Tracking Statistics ===")
    print(f"Total unique tracks: {tracking_data['total_tracks']}")
    print(f"Total frames processed: {tracking_data['total_frames']}")
    print(f"\nTrack count per frame:")
    print(f"  Min: {min(counts)}")
    print(f"  Max: {max(counts)}")
    print(f"  Average: {np.mean(counts):.2f}")
    print(f"\nTrack lifetimes:")
    print(f"  Min: {min(lifetimes)} frames")
    print(f"  Max: {max(lifetimes)} frames")
    print(f"  Average: {np.mean(lifetimes):.1f} frames")
    print(f"  Median: {np.median(lifetimes):.1f} frames")
else:
    print("No tracking data to visualize")

## 7. Export Tracking Data

In [None]:
if videos and 'tracking_data' in locals():
    # Export track lifetimes to CSV
    lifetime_data = []
    for track_id, (start, end) in tracking_data['track_lifetimes'].items():
        lifetime_data.append({
            'track_id': track_id,
            'first_frame': start,
            'last_frame': end,
            'lifetime_frames': end - start + 1
        })
    
    df = pd.DataFrame(lifetime_data)
    csv_path = OUTPUT_DIR / "track_lifetimes.csv"
    df.to_csv(csv_path, index=False)
    print(f"Track lifetimes saved to: {csv_path}")
    print("\nPreview:")
    print(df.head(10))
    
    # Export frame counts to JSON
    json_data = {
        'total_tracks': tracking_data['total_tracks'],
        'total_frames': tracking_data['total_frames'],
        'frame_counts': tracking_data['frame_counts']
    }
    json_path = OUTPUT_DIR / "tracking_summary.json"
    with open(json_path, 'w') as f:
        json.dump(json_data, f, indent=2)
    print(f"\nTracking summary saved to: {json_path}")
else:
    print("No tracking data to export")

## Summary

This notebook demonstrated:
1. **Model Setup**: Loading YOLO11s for detection and DeepSort for tracking
2. **Video Tracking**: Tracking multiple persons with unique IDs across frames
3. **Trajectory Visualization**: Drawing path history for each tracked person
4. **Track Statistics**: Analyzing track lifetimes and active track counts
5. **Data Export**: Saving tracking results to CSV and JSON formats

**Key Concepts:**
- **Track ID Persistence**: Each person maintains a unique ID across frames
- **Occlusion Handling**: Tracker can re-identify persons after brief occlusions
- **Feature Matching**: Uses MobileNet features for person re-identification
- **Hungarian Algorithm**: Optimally matches detections to existing tracks

**Next Steps:**
- Experiment with different `max_age` values for better occlusion handling
- Try different confidence thresholds to balance precision/recall
- Analyze trajectory patterns for behavior recognition