# Object Tracking Fundamentals for Autonomous Driving

This notebook introduces the core concepts of object tracking in autonomous driving scenarios. We'll implement basic tracking algorithms and understand the challenges in maintaining object identities over time.

## Learning Objectives
- Understand the importance of object tracking in autonomous driving
- Implement a basic Kalman filter for single object tracking
- Explore data association challenges in multi-object tracking
- Evaluate tracking performance using standard metrics

In [None]:
# Import required libraries
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from scipy.optimize import linear_sum_assignment
from scipy.spatial.distance import euclidean
import random
from dataclasses import dataclass
from typing import List, Tuple, Optional

# Set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

print("Libraries imported successfully!")

## 1. Understanding Object Tracking

Object tracking maintains consistent identities of moving objects across frames. Key challenges include:

- **Occlusions**: Objects temporarily hidden behind others
- **False Positives**: Incorrect detections
- **Identity Switches**: Confusion between similar objects
- **Crowded Scenes**: Multiple objects in close proximity

In [None]:
@dataclass
class Detection:
    """Simple detection class"""
    x: float  # Center x coordinate
    y: float  # Center y coordinate
    width: float
    height: float
    confidence: float = 1.0
    
    @property
    def bbox(self):
        """Return bounding box as [x_min, y_min, x_max, y_max]"""
        return [
            self.x - self.width/2,
            self.y - self.height/2,
            self.x + self.width/2,
            self.y + self.height/2
        ]

@dataclass
class Track:
    """Track representation"""
    track_id: int
    state: np.ndarray  # [x, y, vx, vy]
    covariance: np.ndarray
    age: int = 0
    hits: int = 0
    time_since_update: int = 0
    
# Visualization helper
def visualize_detections_and_tracks(detections, tracks, frame_size=(800, 600), title=""):
    """Visualize detections and tracks"""
    fig, ax = plt.subplots(figsize=(10, 8))
    ax.set_xlim(0, frame_size[0])
    ax.set_ylim(0, frame_size[1])
    ax.invert_yaxis()  # Image coordinates
    
    # Draw detections
    for det in detections:
        bbox = det.bbox
        rect = patches.Rectangle((bbox[0], bbox[1]), 
                               bbox[2] - bbox[0], bbox[3] - bbox[1],
                               linewidth=2, edgecolor='red', facecolor='none',
                               label='Detection' if det == detections[0] else "")
        ax.add_patch(rect)
        ax.text(det.x, det.y - 20, f'Det {det.confidence:.2f}', 
                ha='center', va='bottom', color='red')
    
    # Draw tracks
    colors = ['blue', 'green', 'orange', 'purple', 'brown']
    for i, track in enumerate(tracks):
        color = colors[i % len(colors)]
        x, y = track.state[0], track.state[1]
        
        # Draw track center
        ax.plot(x, y, 'o', color=color, markersize=8, 
                label=f'Track {track.track_id}' if i == 0 else "")
        
        # Draw uncertainty ellipse
        eigenvals, eigenvecs = np.linalg.eigh(track.covariance[:2, :2])
        angle = np.degrees(np.arctan2(eigenvecs[1, 0], eigenvecs[0, 0]))
        width, height = 2 * np.sqrt(eigenvals)
        
        ellipse = patches.Ellipse((x, y), width*20, height*20, angle=angle,
                                alpha=0.3, color=color)
        ax.add_patch(ellipse)
        
        # Track ID
        ax.text(x, y + 20, f'T{track.track_id}', 
                ha='center', va='top', color=color, fontweight='bold')
    
    ax.set_title(title)
    ax.legend()
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

print("Data structures and visualization functions defined!")

## 2. Kalman Filter Implementation

The Kalman filter provides optimal state estimation for linear systems with Gaussian noise. It's the foundation of many tracking algorithms.

In [None]:
class KalmanFilter:
    """2D Kalman filter for object tracking"""
    
    def __init__(self, dt=1.0):
        self.dt = dt
        
        # State vector: [x, y, vx, vy]
        self.x = np.zeros(4)
        
        # State transition matrix (constant velocity model)
        self.F = np.array([
            [1, 0, dt, 0],
            [0, 1, 0, dt],
            [0, 0, 1, 0],
            [0, 0, 0, 1]
        ])
        
        # Observation matrix (we observe position only)
        self.H = np.array([
            [1, 0, 0, 0],
            [0, 1, 0, 0]
        ])
        
        # Process noise covariance
        self.Q = np.array([
            [dt**4/4, 0, dt**3/2, 0],
            [0, dt**4/4, 0, dt**3/2],
            [dt**3/2, 0, dt**2, 0],
            [0, dt**3/2, 0, dt**2]
        ]) * 0.1  # Process noise strength
        
        # Observation noise covariance
        self.R = np.eye(2) * 10.0  # Measurement noise
        
        # Initial covariance
        self.P = np.eye(4) * 1000.0
    
    def predict(self):
        """Prediction step"""
        # Predict state
        self.x = self.F @ self.x
        
        # Predict covariance
        self.P = self.F @ self.P @ self.F.T + self.Q
        
        return self.x.copy()
    
    def update(self, measurement):
        """Update step with measurement [x, y]"""
        # Innovation
        y = measurement - self.H @ self.x
        
        # Innovation covariance
        S = self.H @ self.P @ self.H.T + self.R
        
        # Kalman gain
        K = self.P @ self.H.T @ np.linalg.inv(S)
        
        # Update state
        self.x = self.x + K @ y
        
        # Update covariance
        I = np.eye(4)
        self.P = (I - K @ self.H) @ self.P
        
        return self.x.copy()
    
    def get_state(self):
        """Get current state"""
        return self.x.copy()
    
    def get_covariance(self):
        """Get current covariance"""
        return self.P.copy()

# Test Kalman filter with synthetic data
def generate_trajectory(n_points=50, noise_std=5.0):
    """Generate a curved trajectory with noise"""
    t = np.linspace(0, 4*np.pi, n_points)
    
    # Circular trajectory
    x_true = 400 + 150 * np.cos(t)
    y_true = 300 + 100 * np.sin(t)
    
    # Add noise to measurements
    x_measured = x_true + np.random.normal(0, noise_std, n_points)
    y_measured = y_true + np.random.normal(0, noise_std, n_points)
    
    return np.column_stack([x_true, y_true]), np.column_stack([x_measured, y_measured])

# Generate test data
true_trajectory, noisy_measurements = generate_trajectory()

# Run Kalman filter
kf = KalmanFilter(dt=1.0)

# Initialize with first measurement
kf.x[:2] = noisy_measurements[0]
kf.x[2:] = 0  # Initial velocity

estimated_trajectory = []
predictions = []

for i, measurement in enumerate(noisy_measurements):
    # Predict
    prediction = kf.predict()
    predictions.append(prediction[:2].copy())
    
    # Update with measurement
    estimate = kf.update(measurement)
    estimated_trajectory.append(estimate[:2].copy())

estimated_trajectory = np.array(estimated_trajectory)
predictions = np.array(predictions)

# Visualize results
plt.figure(figsize=(12, 8))
plt.plot(true_trajectory[:, 0], true_trajectory[:, 1], 'g-', linewidth=3, 
         label='True Trajectory', alpha=0.8)
plt.scatter(noisy_measurements[:, 0], noisy_measurements[:, 1], 
           c='red', alpha=0.6, s=30, label='Noisy Measurements')
plt.plot(estimated_trajectory[:, 0], estimated_trajectory[:, 1], 'b-', linewidth=2, 
         label='Kalman Filter Estimate')
plt.plot(predictions[:, 0], predictions[:, 1], 'orange', linestyle='--', 
         alpha=0.7, label='Predictions')

plt.xlabel('X Position')
plt.ylabel('Y Position')
plt.title('Kalman Filter Tracking Performance')
plt.legend()
plt.grid(True, alpha=0.3)
plt.axis('equal')
plt.tight_layout()
plt.show()

# Calculate tracking error
tracking_error = np.linalg.norm(estimated_trajectory - true_trajectory, axis=1)
print(f"Mean tracking error: {np.mean(tracking_error):.2f} pixels")
print(f"Max tracking error: {np.max(tracking_error):.2f} pixels")

## 3. Multi-Object Tracking with Data Association

Multi-object tracking requires solving the data association problem: which detection corresponds to which track?

In [None]:
class SimpleTracker:
    """Simple multi-object tracker"""
    
    def __init__(self, max_disappeared=5, max_distance=50):
        self.max_disappeared = max_disappeared
        self.max_distance = max_distance
        self.next_track_id = 0
        self.tracks = {}
        
    def create_track(self, detection):
        """Create new track from detection"""
        kf = KalmanFilter()
        kf.x[:2] = [detection.x, detection.y]
        kf.x[2:] = 0  # Initial velocity
        
        track = Track(
            track_id=self.next_track_id,
            state=kf.x.copy(),
            covariance=kf.P.copy()
        )
        
        self.tracks[self.next_track_id] = {
            'track': track,
            'kalman': kf
        }
        
        self.next_track_id += 1
        return track
    
    def calculate_cost_matrix(self, detections):
        """Calculate cost matrix for Hungarian assignment"""
        if not self.tracks or not detections:
            return np.array([]), [], []
        
        track_ids = list(self.tracks.keys())
        cost_matrix = np.zeros((len(detections), len(track_ids)))
        
        for i, detection in enumerate(detections):
            for j, track_id in enumerate(track_ids):
                track_state = self.tracks[track_id]['track'].state
                distance = euclidean([detection.x, detection.y], track_state[:2])
                
                if distance > self.max_distance:
                    cost_matrix[i, j] = 1e6  # Very high cost
                else:
                    cost_matrix[i, j] = distance
        
        return cost_matrix, track_ids, detections
    
    def update(self, detections):
        """Update tracker with new detections"""
        # Predict all existing tracks
        for track_data in self.tracks.values():
            kf = track_data['kalman']
            predicted_state = kf.predict()
            track_data['track'].state = predicted_state
            track_data['track'].covariance = kf.P.copy()
            track_data['track'].time_since_update += 1
            track_data['track'].age += 1
        
        # Data association using Hungarian algorithm
        cost_matrix, track_ids, det_list = self.calculate_cost_matrix(detections)
        
        matched_det_indices = set()
        matched_track_ids = set()
        
        if cost_matrix.size > 0:
            det_indices, track_indices = linear_sum_assignment(cost_matrix)
            
            for det_idx, track_idx in zip(det_indices, track_indices):
                if cost_matrix[det_idx, track_idx] < self.max_distance:
                    # Valid match
                    track_id = track_ids[track_idx]
                    detection = det_list[det_idx]
                    
                    # Update track with detection
                    kf = self.tracks[track_id]['kalman']
                    updated_state = kf.update([detection.x, detection.y])
                    
                    self.tracks[track_id]['track'].state = updated_state
                    self.tracks[track_id]['track'].covariance = kf.P.copy()
                    self.tracks[track_id]['track'].time_since_update = 0
                    self.tracks[track_id]['track'].hits += 1
                    
                    matched_det_indices.add(det_idx)
                    matched_track_ids.add(track_id)
        
        # Create new tracks for unmatched detections
        for i, detection in enumerate(detections):
            if i not in matched_det_indices:
                self.create_track(detection)
        
        # Remove tracks that haven't been updated for too long
        tracks_to_remove = []
        for track_id, track_data in self.tracks.items():
            if track_data['track'].time_since_update > self.max_disappeared:
                tracks_to_remove.append(track_id)
        
        for track_id in tracks_to_remove:
            del self.tracks[track_id]
        
        return [track_data['track'] for track_data in self.tracks.values()]

# Generate synthetic multi-object scenario
def generate_multi_object_scene(n_frames=30):
    """Generate multiple objects moving in different patterns"""
    scenarios = []
    
    for frame in range(n_frames):
        detections = []
        
        # Object 1: Moving right
        if frame < 25:  # Disappears near the end
            x1 = 100 + frame * 15
            y1 = 200 + np.random.normal(0, 5)
            detections.append(Detection(x1, y1, 40, 60))
        
        # Object 2: Moving diagonally
        x2 = 600 - frame * 10
        y2 = 150 + frame * 8
        detections.append(Detection(x2, y2, 35, 50))
        
        # Object 3: Circular motion (appears later)
        if frame > 10:
            t = (frame - 10) * 0.3
            x3 = 400 + 80 * np.cos(t)
            y3 = 300 + 60 * np.sin(t)
            detections.append(Detection(x3, y3, 30, 45))
        
        # Add some noise to all detections
        for det in detections:
            det.x += np.random.normal(0, 3)
            det.y += np.random.normal(0, 3)
        
        # Occasionally miss a detection (simulate false negatives)
        if np.random.random() < 0.1:
            if detections:
                detections.pop(np.random.randint(len(detections)))
        
        scenarios.append(detections)
    
    return scenarios

# Run multi-object tracking demo
tracker = SimpleTracker(max_disappeared=3, max_distance=60)
scenarios = generate_multi_object_scene()

print(f"Generated {len(scenarios)} frames with multi-object scenarios")

# Process a few key frames
key_frames = [0, 10, 20, 29]
for frame_idx in key_frames:
    detections = scenarios[frame_idx]
    tracks = tracker.update(detections)
    
    print(f"\nFrame {frame_idx}:")
    print(f"  Detections: {len(detections)}")
    print(f"  Active tracks: {len(tracks)}")
    for track in tracks:
        print(f"    Track {track.track_id}: age={track.age}, hits={track.hits}")
    
    visualize_detections_and_tracks(
        detections, tracks, 
        title=f"Frame {frame_idx} - Multi-Object Tracking"
    )

## 4. Tracking Performance Metrics

Let's implement standard tracking evaluation metrics:

In [None]:
def calculate_tracking_metrics(ground_truth_tracks, predicted_tracks, distance_threshold=50):
    """Calculate basic tracking performance metrics"""
    
    # Simplified metrics for demonstration
    total_gt_objects = len(ground_truth_tracks)
    total_pred_objects = len(predicted_tracks)
    
    # Count matches based on distance threshold
    matches = 0
    for gt_track in ground_truth_tracks:
        gt_pos = gt_track.state[:2]
        
        for pred_track in predicted_tracks:
            pred_pos = pred_track.state[:2]
            distance = euclidean(gt_pos, pred_pos)
            
            if distance <= distance_threshold:
                matches += 1
                break  # Each GT track can only match once
    
    # Basic metrics
    precision = matches / total_pred_objects if total_pred_objects > 0 else 0
    recall = matches / total_gt_objects if total_gt_objects > 0 else 0
    f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    
    return {
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'total_gt': total_gt_objects,
        'total_pred': total_pred_objects,
        'matches': matches
    }

def plot_tracking_performance(metrics_over_time):
    """Plot tracking performance over time"""
    frames = list(range(len(metrics_over_time)))
    precisions = [m['precision'] for m in metrics_over_time]
    recalls = [m['recall'] for m in metrics_over_time]
    f1_scores = [m['f1_score'] for m in metrics_over_time]
    
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    plt.plot(frames, precisions, 'b-', label='Precision', linewidth=2)
    plt.plot(frames, recalls, 'r-', label='Recall', linewidth=2)
    plt.plot(frames, f1_scores, 'g-', label='F1-Score', linewidth=2)
    plt.xlabel('Frame')
    plt.ylabel('Score')
    plt.title('Tracking Performance Metrics')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.ylim(0, 1.1)
    
    plt.subplot(1, 2, 2)
    gt_counts = [m['total_gt'] for m in metrics_over_time]
    pred_counts = [m['total_pred'] for m in metrics_over_time]
    matches = [m['matches'] for m in metrics_over_time]
    
    plt.plot(frames, gt_counts, 'b-', label='Ground Truth Objects', linewidth=2)
    plt.plot(frames, pred_counts, 'r-', label='Predicted Objects', linewidth=2)
    plt.plot(frames, matches, 'g-', label='Correct Matches', linewidth=2)
    plt.xlabel('Frame')
    plt.ylabel('Count')
    plt.title('Object Counts Over Time')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# Create ground truth for evaluation (simplified)
def create_ground_truth(frame_idx):
    """Create simplified ground truth tracks for evaluation"""
    gt_tracks = []
    
    # GT Object 1
    if frame_idx < 25:
        x1 = 100 + frame_idx * 15
        y1 = 200
        gt_tracks.append(Track(track_id=1, state=np.array([x1, y1, 15, 0]), 
                              covariance=np.eye(4)))
    
    # GT Object 2
    x2 = 600 - frame_idx * 10
    y2 = 150 + frame_idx * 8
    gt_tracks.append(Track(track_id=2, state=np.array([x2, y2, -10, 8]), 
                          covariance=np.eye(4)))
    
    # GT Object 3
    if frame_idx > 10:
        t = (frame_idx - 10) * 0.3
        x3 = 400 + 80 * np.cos(t)
        y3 = 300 + 60 * np.sin(t)
        gt_tracks.append(Track(track_id=3, state=np.array([x3, y3, 0, 0]), 
                              covariance=np.eye(4)))
    
    return gt_tracks

# Reset tracker and evaluate performance
tracker = SimpleTracker(max_disappeared=3, max_distance=60)
metrics_over_time = []

for frame_idx, detections in enumerate(scenarios):
    predicted_tracks = tracker.update(detections)
    ground_truth_tracks = create_ground_truth(frame_idx)
    
    metrics = calculate_tracking_metrics(ground_truth_tracks, predicted_tracks)
    metrics_over_time.append(metrics)

# Plot performance
plot_tracking_performance(metrics_over_time)

# Print summary statistics
avg_precision = np.mean([m['precision'] for m in metrics_over_time])
avg_recall = np.mean([m['recall'] for m in metrics_over_time])
avg_f1 = np.mean([m['f1_score'] for m in metrics_over_time])

print(f"\nAverage Performance:")
print(f"  Precision: {avg_precision:.3f}")
print(f"  Recall: {avg_recall:.3f}")
print(f"  F1-Score: {avg_f1:.3f}")

## 5. Challenges and Advanced Concepts

Real-world tracking faces several challenges that require advanced techniques:

In [None]:
# Demonstrate tracking challenges

def create_challenging_scenario():
    """Create scenarios with common tracking challenges"""
    challenges = {
        'occlusion': 'Objects temporarily hidden behind others',
        'false_positives': 'Incorrect detections that create ghost tracks',
        'crowded_scene': 'Multiple objects in close proximity',
        'appearance_change': 'Objects changing appearance over time'
    }
    
    print("Common Tracking Challenges:")
    for challenge, description in challenges.items():
        print(f"  {challenge.title().replace('_', ' ')}: {description}")
    
    # Demonstrate occlusion scenario
    print("\nOcclusion Scenario Example:")
    
    fig, ax = plt.subplots(figsize=(12, 6))
    
    # Timeline of occlusion
    frames = np.arange(0, 20)
    object1_visible = np.ones(20)
    object2_visible = np.ones(20)
    
    # Object 1 gets occluded between frames 8-12
    object1_visible[8:13] = 0
    
    ax.fill_between(frames, 0, object1_visible, alpha=0.7, label='Object 1 Visible')
    ax.fill_between(frames, 1.1, 1.1 + object2_visible, alpha=0.7, label='Object 2 Visible')
    
    # Mark occlusion period
    ax.axvspan(8, 12, alpha=0.3, color='red', label='Occlusion Period')
    
    ax.set_xlabel('Frame Number')
    ax.set_ylabel('Object Visibility')
    ax.set_title('Object Occlusion Timeline')
    ax.legend()
    ax.grid(True, alpha=0.3)
    ax.set_ylim(-0.1, 2.3)
    
    plt.tight_layout()
    plt.show()
    
    return challenges

def advanced_tracking_concepts():
    """Explain advanced tracking concepts"""
    concepts = {
        'Deep SORT': 'Uses deep learning features for better data association',
        'Multi-Hypothesis Tracking': 'Maintains multiple track hypotheses',
        'Joint Detection and Tracking': 'End-to-end learning of detection and tracking',
        'Appearance Features': 'Use visual appearance for re-identification',
        'Motion Prediction': 'Predict future positions for better association'
    }
    
    print("\nAdvanced Tracking Concepts:")
    for concept, description in concepts.items():
        print(f"  {concept}: {description}")
    
    return concepts

# Show challenges and solutions
challenges = create_challenging_scenario()
concepts = advanced_tracking_concepts()

print("\nKey Takeaways:")
print("1. Kalman filters provide optimal state estimation for linear systems")
print("2. Data association is crucial for multi-object tracking")
print("3. Real-world challenges require sophisticated solutions")
print("4. Performance evaluation helps optimize tracking parameters")
print("5. Modern approaches combine deep learning with classical methods")

## 6. Exercises and Next Steps

Continue your learning with these exercises:

### Exercise 1: Extended Kalman Filter
- Implement EKF for non-linear motion models
- Compare performance with linear Kalman filter
- Test with curved trajectories

### Exercise 2: SORT Algorithm
- Implement the complete SORT algorithm
- Add track lifecycle management
- Optimize association thresholds

### Exercise 3: Multi-Sensor Tracking
- Combine camera and LiDAR detections
- Handle different coordinate systems
- Implement sensor fusion strategies

### Exercise 4: Performance Analysis
- Implement MOTA and MOTP metrics
- Analyze tracking in different scenarios
- Compare different tracking algorithms

In [None]:
# Summary and next steps
print("Object Tracking Fundamentals - Complete!")
print("\nTopics Covered:")
print("✓ Kalman filter implementation")
print("✓ Multi-object tracking with data association")
print("✓ Hungarian algorithm for optimal assignment")
print("✓ Tracking performance metrics")
print("✓ Common challenges and solutions")
print("\nNext: Explore SORT and DeepSORT implementations")
print("Continue to: 02_kalman_filter_tracking.ipynb")