# Multi-Target Tracking: Data Association and Tracking

This notebook covers multi-target tracking techniques using the Tracker Component Library:

1. **Data Association Problem** - Matching measurements to tracks
2. **Global Nearest Neighbor (GNN)** - Simple greedy association
3. **Joint Probabilistic Data Association (JPDA)** - Soft association with probabilities
4. **Multiple Hypothesis Tracking (MHT)** - Deferred decision tracking
5. **Track Management** - Initiation, confirmation, and deletion

## Prerequisites

```bash
pip install nrl-tracker plotly numpy scipy
```

In [None]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.stats import chi2

from pytcl.dynamic_estimation import kf_predict, kf_update
from pytcl.assignment_algorithms import (
    hungarian_assignment,
    jpda, jpda_update,
    compute_likelihood_matrix, jpda_probabilities,
)
from pytcl.performance_evaluation import ospa

np.random.seed(42)

# Plotly dark theme template
dark_template = go.layout.Template()
dark_template.layout = go.Layout(
    paper_bgcolor='#0d1117',
    plot_bgcolor='#0d1117',
    font=dict(color='#e6edf3'),
    xaxis=dict(gridcolor='#30363d', zerolinecolor='#30363d'),
    yaxis=dict(gridcolor='#30363d', zerolinecolor='#30363d'),
)

## 1. The Data Association Problem

In multi-target tracking, we must determine which measurements correspond to which tracks. This is challenging due to:

- **Clutter**: False alarms that don't correspond to any target
- **Missed detections**: Targets that aren't detected
- **Close targets**: Ambiguous associations when targets are near each other

### Scenario Setup

In [None]:
# Simulation parameters
dt = 1.0  # Time step
n_steps = 50
n_targets = 3

# Detection parameters
P_detection = 0.9  # Probability of detection
lambda_clutter = 2  # Average number of clutter points per scan
surveillance_area = [[-50, 50], [-50, 50]]  # x and y bounds

# Motion model: constant velocity
F = np.array([
    [1, dt, 0, 0],
    [0, 1, 0, 0],
    [0, 0, 1, dt],
    [0, 0, 0, 1]
])

q = 0.1  # Process noise
Q = q * np.array([
    [dt**3/3, dt**2/2, 0, 0],
    [dt**2/2, dt, 0, 0],
    [0, 0, dt**3/3, dt**2/2],
    [0, 0, dt**2/2, dt]
])

# Measurement model: position only
H = np.array([
    [1, 0, 0, 0],
    [0, 0, 1, 0]
])

R = np.eye(2) * 2.0  # Measurement noise covariance

print(f"Simulating {n_targets} targets over {n_steps} time steps")
print(f"P_d = {P_detection}, Î»_clutter = {lambda_clutter}")

In [None]:
# Generate target trajectories
initial_states = [
    np.array([-40, 1.5, -30, 0.5]),   # Target 1: moving right-up
    np.array([30, -1.0, -20, 1.0]),   # Target 2: moving left-up
    np.array([0, 0.2, 40, -1.2]),     # Target 3: moving right-down
]

true_tracks = [[] for _ in range(n_targets)]
states = [s.copy() for s in initial_states]

for k in range(n_steps):
    for i in range(n_targets):
        true_tracks[i].append(states[i].copy())
        states[i] = F @ states[i] + np.random.multivariate_normal(np.zeros(4), Q)

true_tracks = [np.array(t) for t in true_tracks]

# Visualize true trajectories
colors = ['#00d4ff', '#ff4757', '#00ff88']

fig = go.Figure()

for i, track in enumerate(true_tracks):
    # Trajectory line
    fig.add_trace(
        go.Scatter(x=track[:, 0], y=track[:, 2], mode='lines',
                   name=f'Target {i+1}', line=dict(color=colors[i], width=2))
    )
    # Start marker
    fig.add_trace(
        go.Scatter(x=[track[0, 0]], y=[track[0, 2]], mode='markers',
                   marker=dict(color=colors[i], size=12, symbol='circle'),
                   name=f'Start {i+1}', showlegend=False)
    )
    # End marker
    fig.add_trace(
        go.Scatter(x=[track[-1, 0]], y=[track[-1, 2]], mode='markers',
                   marker=dict(color=colors[i], size=12, symbol='square'),
                   name=f'End {i+1}', showlegend=False)
    )

fig.update_layout(
    template=dark_template,
    title='True Target Trajectories',
    xaxis_title='X position',
    yaxis_title='Y position',
    height=500,
    xaxis=dict(range=surveillance_area[0], scaleanchor='y', scaleratio=1),
    yaxis=dict(range=surveillance_area[1]),
)
fig.show()

In [None]:
# Generate measurements with missed detections and clutter
all_measurements = []
detection_flags = []  # Track which targets were detected

for k in range(n_steps):
    scan_measurements = []
    scan_detections = []
    
    # Target-originated measurements
    for i, track in enumerate(true_tracks):
        detected = np.random.random() < P_detection
        scan_detections.append(detected)
        
        if detected:
            true_pos = H @ track[k]
            z = true_pos + np.random.multivariate_normal(np.zeros(2), R)
            scan_measurements.append(z)
    
    # Clutter
    n_clutter = np.random.poisson(lambda_clutter)
    for _ in range(n_clutter):
        x_clut = np.random.uniform(*surveillance_area[0])
        y_clut = np.random.uniform(*surveillance_area[1])
        scan_measurements.append(np.array([x_clut, y_clut]))
    
    # Shuffle to remove ordering information
    if scan_measurements:
        perm = np.random.permutation(len(scan_measurements))
        scan_measurements = [scan_measurements[j] for j in perm]
    
    all_measurements.append(scan_measurements)
    detection_flags.append(scan_detections)

# Statistics
total_detections = sum(sum(d) for d in detection_flags)
total_clutter = sum(len(m) for m in all_measurements) - total_detections
print(f"Total detections: {total_detections} / {n_steps * n_targets} ({100*total_detections/(n_steps*n_targets):.1f}%)")
print(f"Total clutter: {total_clutter}")
print(f"Average measurements per scan: {np.mean([len(m) for m in all_measurements]):.1f}")

## 2. Global Nearest Neighbor (GNN)

The simplest association method: assign each track to its nearest measurement using the Hungarian algorithm. Drawbacks:
- Hard decisions can lead to track swaps
- Doesn't handle ambiguity well

In [None]:
class Track:
    """Simple track class for demonstration."""
    def __init__(self, x0, P0, track_id):
        self.x = x0.copy()
        self.P = P0.copy()
        self.id = track_id
        self.history = [x0.copy()]
        self.missed_count = 0
        
    def predict(self, F, Q):
        pred = kf_predict(self.x, self.P, F, Q)
        self.x, self.P = pred.x, pred.P
        
    def update(self, z, H, R):
        upd = kf_update(self.x, self.P, z, H, R)
        self.x, self.P = upd.x, upd.P
        self.history.append(self.x.copy())
        self.missed_count = 0
        
    def coast(self):
        self.history.append(self.x.copy())
        self.missed_count += 1

def mahalanobis_distance(track, z, H, R):
    """Compute Mahalanobis distance from track to measurement."""
    z_pred = H @ track.x
    S = H @ track.P @ H.T + R
    residual = z - z_pred
    d2 = residual @ np.linalg.solve(S, residual)
    return d2

def gnn_associate(tracks, measurements, H, R, gate_threshold=9.21):  # chi2(2, 0.99)
    """Global Nearest Neighbor association."""
    n_tracks = len(tracks)
    n_meas = len(measurements)
    
    if n_tracks == 0 or n_meas == 0:
        return {}, list(range(n_meas))
    
    # Build cost matrix (Mahalanobis distances)
    cost_matrix = np.full((n_tracks, n_meas), np.inf)
    for i, track in enumerate(tracks):
        for j, z in enumerate(measurements):
            d2 = mahalanobis_distance(track, z, H, R)
            if d2 < gate_threshold:
                cost_matrix[i, j] = d2
    
    # Hungarian algorithm
    row_ind, col_ind, cost = hungarian_assignment(cost_matrix)
    
    # Build associations
    associations = {}
    assigned_meas = set()
    for i, j in zip(row_ind, col_ind):
        if cost_matrix[i, j] < gate_threshold:
            associations[i] = j
            assigned_meas.add(j)
    
    unassigned = [j for j in range(n_meas) if j not in assigned_meas]
    return associations, unassigned

In [None]:
# Run GNN tracker
# Initialize with first measurements (simplified - assume first 3 are targets)
P0 = np.diag([R[0,0], 1.0, R[1,1], 1.0])  # Initial covariance

gnn_tracks = []
for i, state in enumerate(initial_states):
    # Initialize near true state (cheating slightly for demo)
    x0 = state + np.random.multivariate_normal(np.zeros(4), P0 * 0.1)
    gnn_tracks.append(Track(x0, P0.copy(), i))

# Process all scans
for k, measurements in enumerate(all_measurements):
    # Predict
    for track in gnn_tracks:
        track.predict(F, Q)
    
    # Associate
    associations, unassigned = gnn_associate(gnn_tracks, measurements, H, R)
    
    # Update associated tracks
    for track_idx, meas_idx in associations.items():
        gnn_tracks[track_idx].update(measurements[meas_idx], H, R)
    
    # Coast unassociated tracks
    for i, track in enumerate(gnn_tracks):
        if i not in associations:
            track.coast()

print(f"GNN tracking complete. {len(gnn_tracks)} tracks maintained.")

In [None]:
# Visualize GNN results
fig = go.Figure()

# True trajectories (semi-transparent)
for i, track in enumerate(true_tracks):
    fig.add_trace(
        go.Scatter(x=track[:, 0], y=track[:, 2], mode='lines',
                   name=f'True {i+1}', line=dict(color=colors[i], width=2),
                   opacity=0.5)
    )

# GNN estimates (dashed)
for i, track in enumerate(gnn_tracks):
    history = np.array(track.history)
    fig.add_trace(
        go.Scatter(x=history[:, 0], y=history[:, 2], mode='lines',
                   name=f'GNN Track {i+1}', line=dict(color=colors[i], width=2, dash='dash'))
    )

# Measurements from last scan
last_meas = all_measurements[-1]
fig.add_trace(
    go.Scatter(x=[z[0] for z in last_meas], y=[z[1] for z in last_meas], mode='markers',
               name='Last scan', marker=dict(color='gray', size=6, opacity=0.5))
)

fig.update_layout(
    template=dark_template,
    title='GNN Multi-Target Tracking',
    xaxis_title='X position',
    yaxis_title='Y position',
    height=550,
    xaxis=dict(scaleanchor='y', scaleratio=1),
)
fig.show()

## 3. Joint Probabilistic Data Association (JPDA)

JPDA computes the probability of each measurement-to-track association and uses a weighted combination for the update:

$$\hat{x}_{k|k} = \hat{x}_{k|k-1} + K \sum_{j=0}^{m} \beta_j (z_j - H\hat{x}_{k|k-1})$$

where $\beta_j$ is the probability that measurement $j$ originated from the track.

In [None]:
# Run JPDA tracker
jpda_tracks = []
for i, state in enumerate(initial_states):
    x0 = state + np.random.multivariate_normal(np.zeros(4), P0 * 0.1)
    jpda_tracks.append(Track(x0, P0.copy(), i))

# Clutter density
area = (surveillance_area[0][1] - surveillance_area[0][0]) * \
       (surveillance_area[1][1] - surveillance_area[1][0])
clutter_density = lambda_clutter / area

for k, measurements in enumerate(all_measurements):
    # Predict
    for track in jpda_tracks:
        track.predict(F, Q)
    
    if len(measurements) == 0:
        for track in jpda_tracks:
            track.coast()
        continue
    
    # Get track states and covariances
    states = [t.x for t in jpda_tracks]
    covs = [t.P for t in jpda_tracks]
    meas_array = np.array(measurements)
    
    # Compute likelihood matrix and association probabilities
    L, gated = compute_likelihood_matrix(states, covs, meas_array, H, R)
    beta = jpda_probabilities(L, gated, P_detection, clutter_density)
    
    # JPDA update for each track
    for i, track in enumerate(jpda_tracks):
        # Weighted innovation
        z_pred = H @ track.x
        S = H @ track.P @ H.T + R
        K = track.P @ H.T @ np.linalg.inv(S)
        
        # Weighted sum of innovations
        innovation = np.zeros(2)
        for j in range(len(measurements)):
            innovation += beta[i, j] * (measurements[j] - z_pred)
        
        # Update state
        track.x = track.x + K @ innovation
        
        # Update covariance (spread of the means + innovation uncertainty)
        P_c = (1 - sum(beta[i, :-1])) * track.P  # Coast term
        P_u = (np.eye(4) - K @ H) @ track.P  # Update term
        track.P = beta[i, -1] * track.P + (1 - beta[i, -1]) * P_u
        
        track.history.append(track.x.copy())
        
print(f"JPDA tracking complete. {len(jpda_tracks)} tracks maintained.")

In [None]:
# Compare GNN vs JPDA
fig = make_subplots(rows=1, cols=2, subplot_titles=('GNN Tracking', 'JPDA Tracking'),
                    horizontal_spacing=0.1)

tracker_data = [('GNN', gnn_tracks), ('JPDA', jpda_tracks)]

for col, (tracker_name, tracks) in enumerate(tracker_data, 1):
    # True trajectories
    for i, track in enumerate(true_tracks):
        fig.add_trace(
            go.Scatter(x=track[:, 0], y=track[:, 2], mode='lines',
                       name=f'True {i+1}' if col == 1 else None,
                       line=dict(color=colors[i], width=2), opacity=0.4,
                       showlegend=(col == 1)),
            row=1, col=col
        )
    
    # Tracker estimates
    for i, track in enumerate(tracks):
        history = np.array(track.history)
        fig.add_trace(
            go.Scatter(x=history[:, 0], y=history[:, 2], mode='lines',
                       name=f'{tracker_name} {i+1}' if col == 1 else None,
                       line=dict(color=colors[i], width=2, dash='dash'),
                       showlegend=(col == 1)),
            row=1, col=col
        )

fig.update_layout(
    template=dark_template,
    height=450,
)
fig.update_xaxes(title_text='X position', row=1, col=1)
fig.update_xaxes(title_text='X position', row=1, col=2)
fig.update_yaxes(title_text='Y position', row=1, col=1)
fig.update_yaxes(title_text='Y position', row=1, col=2)

fig.show()

In [None]:
# Compute tracking errors
def compute_position_rmse(tracker_tracks, true_tracks):
    """Compute RMSE for each track."""
    rmse = []
    for i, (tracker, truth) in enumerate(zip(tracker_tracks, true_tracks)):
        history = np.array(tracker.history)
        # Align lengths
        min_len = min(len(history), len(truth))
        pos_error = np.sqrt((history[:min_len, 0] - truth[:min_len, 0])**2 + 
                            (history[:min_len, 2] - truth[:min_len, 2])**2)
        rmse.append(np.sqrt(np.mean(pos_error**2)))
    return rmse

gnn_rmse = compute_position_rmse(gnn_tracks, true_tracks)
jpda_rmse = compute_position_rmse(jpda_tracks, true_tracks)

print("Position RMSE (m):")
print(f"{'Track':<10} {'GNN':>10} {'JPDA':>10}")
print("-" * 32)
for i in range(n_targets):
    print(f"Target {i+1:<4} {gnn_rmse[i]:>10.3f} {jpda_rmse[i]:>10.3f}")
print("-" * 32)
print(f"{'Average':<10} {np.mean(gnn_rmse):>10.3f} {np.mean(jpda_rmse):>10.3f}")

## 4. Performance Evaluation with OSPA

The Optimal Subpattern Assignment (OSPA) metric provides a principled way to measure multi-target tracking performance:

$$d_{OSPA}(X, Y) = \left( \frac{1}{n} \left( \min_{\pi} \sum_{i=1}^m d^p(x_i, y_{\pi(i)}) + c^p(n-m) \right) \right)^{1/p}$$

In [None]:
# Compute OSPA over time
def compute_ospa_sequence(tracker_tracks, true_tracks, c=10, p=2):
    """Compute OSPA at each time step."""
    ospa_values = []
    
    # Get minimum length
    min_len = min(
        min(len(t.history) for t in tracker_tracks),
        min(len(t) for t in true_tracks)
    )
    
    for k in range(min_len):
        # Get positions at time k
        est_pos = np.array([[t.history[k][0], t.history[k][2]] for t in tracker_tracks])
        true_pos = np.array([[t[k, 0], t[k, 2]] for t in true_tracks])
        
        # Compute OSPA
        ospa_val = ospa(est_pos, true_pos, c=c, p=p)
        ospa_values.append(ospa_val)
    
    return np.array(ospa_values)

gnn_ospa = compute_ospa_sequence(gnn_tracks, true_tracks)
jpda_ospa = compute_ospa_sequence(jpda_tracks, true_tracks)

fig = go.Figure()

time = np.arange(len(gnn_ospa)) * dt
fig.add_trace(
    go.Scatter(x=time, y=gnn_ospa, mode='lines',
               name=f'GNN (mean: {np.mean(gnn_ospa):.2f})',
               line=dict(color='#00d4ff', width=2))
)
fig.add_trace(
    go.Scatter(x=time, y=jpda_ospa, mode='lines',
               name=f'JPDA (mean: {np.mean(jpda_ospa):.2f})',
               line=dict(color='#ff4757', width=2))
)

fig.update_layout(
    template=dark_template,
    title='OSPA Metric Over Time',
    xaxis_title='Time (s)',
    yaxis_title='OSPA distance (m)',
    height=400,
)
fig.show()

## Summary

| Method | Pros | Cons |
|--------|------|------|
| **GNN** | Simple, fast | Hard decisions, track swaps |
| **JPDA** | Soft associations, handles ambiguity | Higher complexity, maintains all associations |
| **MHT** | Optimal, deferred decisions | Exponential complexity, requires pruning |

## Exercises

1. Add track initiation and deletion logic to handle new targets
2. Implement a crossing scenario where targets pass close to each other
3. Compare performance with different clutter densities
4. Implement basic MHT with hypothesis pruning

## References

1. Bar-Shalom, Y., & Li, X. R. (1995). *Multitarget-Multisensor Tracking*.
2. Blackman, S. S., & Popoli, R. (1999). *Design and Analysis of Modern Tracking Systems*.
3. Schuhmacher, D., et al. (2008). A consistent metric for performance evaluation of multi-object filters. *IEEE TSP*.