In [15]:
import numpy as np
from collections import defaultdict

from scipy.stats import multivariate_normal as mvn, chi2
from scipy.spatial import KDTree

import matplotlib.pylab as plt
import matplotlib.transforms as transforms
import matplotlib.gridspec as gridspec

from matplotlib.patches import Ellipse
from matplotlib.lines import Line2D
from matplotlib.animation import FuncAnimation

import imageio.v2 as imageio

from IPython.display import display
from PIL import Image

import os


np.set_printoptions(precision=2)

In [16]:
dt = 1              # Time step
q = np.sqrt(.5)     # Process noise magnitude
r = 5.0             # Measurement noise std deviation

# System matrices

# State transition matrix
A = np.array([
                [1, 0, dt, 0],
                [0, 1, 0, dt],
                [0, 0, 1, 0],
                [0, 0, 0, 1]
])
    
# Observation matrix
H = np.array([[1, 0, 0, 0],
                [0, 1, 0, 0]])
    
# Process noise covariance
Q = q**2 * np.array([
                [dt**3/3, 0      , dt**2/2, 0      ],
                [0,       dt**3/3, 0,       dt**2/2],
                [dt**2/2, 0,       dt,      0      ],
                [0,       dt**2/2, 0,       dt     ]
])
    
    
# Measurement noise covariance
R = r**2 * np.eye(2)

In [17]:
def generate_trajectories(num_trajectories=5, num_points=100, init_area=(-100, -100, 100, 100), seed=3):
    """
    Generate multiple trajectories for a linear dynamical system with random initial positions.
    
    Parameters:
    -----------
    num_trajectories : int
        Number of trajectories to generate
    num_points : int
        Number of points in each trajectory
    init_area : tuple
        Area to randomly initialize positions (x_min, y_min, x_max, y_max)
    seed : int
        Random seed for reproducibility
    
    Returns:
    --------
    x_trajectories : list of arrays
        List of state trajectories, each of shape (num_points, 4) - exact location
    z_trajectories : list of arrays
        List of observation trajectories, each of shape (num_points, 2) - with noise
    """
    # Set random seed for reproducibility
    np.random.seed(seed)

    # Initialize lists to store trajectories
    x_trajectories = []
    z_trajectories = []
    
    # Parse initialization area
    x_min, y_min, x_max, y_max = init_area
    
    # Generate each trajectory
    for traj_idx in range(num_trajectories):
        # Initialize states and observations
        x = np.zeros((num_points, 4))
        
        # Generate random initial position within the specified area
        init_x_pos = np.random.uniform(x_min, x_max)
        init_y_pos = np.random.uniform(y_min, y_max)
        
        # Initial velocities - small random values
        init_x_vel = np.random.uniform(-1, 1)
        init_y_vel = np.random.uniform(-1, 1)
        
        # Set initial state [x_pos,y_pos, x_vel, y_vel]
        x[0] = [init_x_pos, init_y_pos, init_x_vel, init_y_vel]
            
        z = np.zeros((num_points, 2))
        z[0] = H @ x[0] + mvn.rvs(cov=R)
        
        # Generate trajectory
        for k in range(1, num_points):
            x[k] = A @ x[k-1] + mvn.rvs(cov=Q)  # Process noise
            z[k] = H @ x[k] + mvn.rvs(cov=R)    # Measurement noise
        
        x_trajectories.append(x)
        z_trajectories.append(z)
    
    return x_trajectories, z_trajectories

In [18]:
def plot_trajectories_2d(x_trajectories, z_trajectories, figsize=(7, 7), x_dims=(0, 1), z_dims=(0, 1)):
    """
    Plot multiple trajectories in 2D space.
    
    Parameters:
    -----------
    x_trajectories : list of arrays
        List of state trajectories, each of shape (num_points, state_dim)
    z_trajectories : list of arrays
        List of observation trajectories, each of shape (num_points, obs_dim)
    figsize : tuple
        Figure size (width, height)
    x_dims : tuple
        Dimensions of the state to use for x and y coordinates in the plot
    z_dims : tuple
        Dimensions of the observations to use for x and y coordinates in the plot
    """
    plt.figure(figsize=figsize)
    
    colors = ['r', 'g', 'b', 'c', 'm', 'y', 'k', 'orange', 'purple', 'brown']
    
    for i, (x, z) in enumerate(zip(x_trajectories, z_trajectories)):
        color = colors[i % len(colors)]
        
        # Plot observations as dots
        plt.plot(z[:, z_dims[0]], z[:, z_dims[1]], '.', color=color)
        
        # Plot true trajectory as a line
        plt.plot(x[:, x_dims[0]], x[:, x_dims[1]], '-', color=color) 
    
    plt.axis('equal')
    plt.grid(True)
    plt.xlabel(f'x dimension')
    plt.ylabel(f'y dimension')
    plt.title('State trajectories and observations')
    
    # Create a custom legend that doesn't repeat for each trajectory
    custom_lines = [
        Line2D([0], [0], marker='.', color='k', linestyle='None', markersize=10),
        Line2D([0], [0], color='k', linestyle='-')
    ]
    plt.legend(custom_lines, ['Observations', 'True trajectories'])
    
    plt.tight_layout()
    plt.show()

In [None]:
x_trajs, z_trajs = generate_trajectories(num_trajectories=5, num_points=100, seed=9)

# Plot all trajectories in 2D space
plot_trajectories_2d(x_trajs, z_trajs)

In [20]:
def calculate_measurement_space(z_trajectories, padding=0.1):
    """
    Calculate the measurement space bounds given observation trajectories.
    Returns a 2x2 numpy array with format [[min_x, min_y], [max_x, max_y]]
    
    Parameters:
    -----------
    z_trajectories : list of arrays or single array
        List of observation trajectories, each of shape (num_points, obs_dim),
        or a single observation array of shape (num_points, obs_dim)
    padding : float
        Amount of padding to add around the bounds as a fraction of the range
    
    Returns:
    --------
    measurement_space : numpy.ndarray
        2x2 array with format [[min_x, min_y], [max_x, max_y]]
    """
    # Check if z_trajectories is a list or a single trajectory
    if isinstance(z_trajectories, list):
        # Combine all trajectories for finding bounds
        all_points_x = np.concatenate([z[:, 0] for z in z_trajectories])
        all_points_y = np.concatenate([z[:, 1] for z in z_trajectories])
    else:
        # Single trajectory
        all_points_x = z_trajectories[:, 0]
        all_points_y = z_trajectories[:, 1]
    
    # Calculate min and max for each dimension
    x_min, x_max = np.min(all_points_x), np.max(all_points_x)
    y_min, y_max = np.min(all_points_y), np.max(all_points_y)
    
    # Calculate ranges
    x_range = x_max - x_min
    y_range = y_max - y_min
    
    # Add padding
    x_min -= x_range * padding
    x_max += x_range * padding
    y_min -= y_range * padding
    y_max += y_range * padding
    
    # Create 2x2 array in the format [[min_x, min_y], [max_x, max_y]]
    measurement_space = np.array([[x_min, y_min], [x_max, y_max]])
    
    return measurement_space

In [21]:
class Tracker:
    """
    IPDA tracker 
    
    Attributes:
        id (int): Unique value for each tracker 
        state (np.ndarray): State vector representing the target (position and velocity)
        covariance (np.ndarray): - 4x4 state covariance matrix (uncertainty)
        lambda_density (float): Clutter intensity in validation region
        PD (float): Probability of Detection
        PG (float): Probability of Gating
        P_s (float): Probability of target survival 
        r (float): Probability that target exists
        active (bool): True if tracker is currently active
        threshold_termination (float): Minimum existence probability threshold below which tracker is deleted 
        threshold_confirmation (float): Existence probability threshold to confirm a tentative tracker.
        age (int): Number of time steps the tracker has been alive
    
    """
    next_id = 0
    
   
 
    def __init__(self, state, covariance, lambda_density, PD, PG, r_0, P_s):
        Tracker.next_id += 1
        self.id = Tracker.next_id
    
        self.state = state 
        self.covariance = covariance

        self.lambda_density = lambda_density
        
        self.PD = PD
        self.PG = PG
        
        self.P_s = P_s
        self.r = r_0
                 
        self.active = False
        self.threshold_termination = 0.2
        self.threshold_confirmation = 0.9
  
        self.age = 1
        

    def update(self, trackers_measurements, v_k, l_k):
        """ Kalman update for IPDA filter """
        
        if len(trackers_measurements) != 0:
            
            ll_k = (1-self.PD) + sum(self.PD * l_k/self.lambda_density)
            self.r = (self.r * ll_k)/((1-self.r) + (self.r * ll_k))
            
            betas = [self.lambda_density * (1 - self.PG * self.PD)]
            for j in range(len(trackers_measurements)):  
                betas.append(self.PD * l_k[j])
            betas = np.array(betas)
            
            coef = sum(betas)
            betas = betas/coef
        
            
            s_k = self.innovation_covariance()

            vk = np.array(betas[1:]) @ np.array(v_k)

            k_k = self.covariance @ H.T @ np.linalg.inv(s_k)  # Kalman gain

            x_post = self.state +  k_k @  vk 

            self.state = x_post

            p_c = self.covariance - k_k @ s_k @ k_k.T


            p_spread_z = np.zeros((2, 2))  

            for beta, v in zip(betas[1:], v_k):
                p_spread_z += beta * (v @ v.T)


            p_spread_z -= (vk @ vk.T)

            p_tilde = k_k @ p_spread_z @ k_k.T

            p_post = np.array(betas[0]) * self.covariance + (1 - betas[0]) * p_c + p_tilde      

            self.covariance = p_post
            
    def innovation_covariance(self):
        return H @ self.covariance @ H.T + R
        
    def predicted_measurement(self):
        return H @ self.state
        
    def predict(self):
        """ Prediction of next state """
        self.age += 1
        
        self.state = A @ self.state
        self.covariance = A @ self.covariance @ A.T + Q
        self.r = self.r * self.P_s
        


    def should_delete(self):
        if self.r <= self.threshold_termination:
            return True
        if self.r >= self.threshold_confirmation:
            self.active = True
        return False
        

In [22]:
def ellipsoidal_gating(z, z_hat, s_k):
    try:
        s_k_inv = np.linalg.inv(s_k)
    except np.linalg.LinAlgError:
        #Use Moore-Penrose pseudo-inverse matrix if s_k is singular (not regular) 
        s_k_inv = np.linalg.pinv(s_k)
    
    return (z-z_hat)@ s_k_inv @ (z-z_hat).T
def gamma(pg, dim_z):
    return chi2.ppf(pg, df=dim_z)
def compute_likelihood(z_i, z_pred, s):
    return mvn.pdf(z_i, mean=z_pred, cov=s)

In [1]:
class TrackerManager:
    """
    Manages IPDA trackers
    
    Attributes:
    trackers (list): Active trackers
    track_history (list): Saves trackers info for further inspection (id, state, covariance, active)
    PG (float): Probability of Gating
    PD (float): Probability of Detection
    lambda_density (float): Clutter intensity in validation region
    P_s (float): Probability of target survival 
    r_0 (float): Initial probability that target exists
    distance_threshold (int): Distance value for tracker merging purposes     
    """
    def __init__(self, lambda_density, P_s, r_0):
        self.trackers = []
        self.track_history = []
        self.PG = 0.98
        self.PD = 0.95
        self.lambda_density = lambda_density
        self.P_s = P_s
        self.r_0 = r_0
        self.distance_threshold = 2

    def associate_and_update(self, measurements):
        """Association measurements to trackers with Mahalanobis distance and PDA update"""
              
        assigned_measurement_indices = set()

        for t in self.trackers:
            v_k = []
            l_k = []
            
            assigned_measurements = []
            
            # Pick the closest measurements to tracker and assign them  
            t.predict()
            z_k_hat = t.predicted_measurement()
            s_k = t.innovation_covariance()
            
            gamma_threshold = gamma(self.PG, 2)
            
            for j, z_j in enumerate(measurements):
                if ellipsoidal_gating(z_j, z_k_hat, s_k) < gamma_threshold:
                
                    v_k.append(np.array(z_j) - np.array(z_k_hat))
                    l_k.append(compute_likelihood(z_j, z_k_hat, s_k)) 
                    
                    assigned_measurements.append(z_j)
                    assigned_measurement_indices.add(j)
       
            l_k = np.array(l_k)
            v_k = np.array(v_k)
            
            if assigned_measurements:
                t.update(assigned_measurements, v_k, l_k)
            
        # Create new tracks for unassigned measurements

        unassigned_measurements = [m for i, m in enumerate(measurements) if i not in assigned_measurement_indices]

        for m in unassigned_measurements:
            self.trackers.append(Tracker(
                state=np.array([m[0], m[1], 0, 0]), 
                covariance=np.eye(4), 
                lambda_density=self.lambda_density, 
                r_0 = self.r_0, 
                P_s = self.P_s, 
                PD = self.PD, 
                PG = self.PG
            ))
        
    def remove_redundant_trackers(self):
        """
        Remove trackers that are close to each other, keeping the older ones.
        Uses KD-tree for efficient spatial search.
        """
        if len(self.trackers) <= 1:
            return
        
        # Extract positions (first two elements of state) for all trackers
        positions = np.array([t.state[:2] for t in self.trackers])
        
        # Build KD-tree for efficient nearest neighbor search
        tree = KDTree(positions)
        
        # Find pairs of trackers that are close to each other
        pairs = tree.query_pairs(self.distance_threshold)
        
        # Group trackers that should be merged
        groups = defaultdict(set)
        for i, j in pairs:
            groups[i].add(j)
            groups[j].add(i)
            
        # Add singleton trackers (those without close neighbors)
        for i in range(len(self.trackers)):
            if i not in groups:
                groups[i] = set()
        
        # Process groups to determine which trackers to keep
        trackers_to_remove = set()
        
        for idx, neighbors in groups.items():
            if idx in trackers_to_remove:
                continue
                
            # For this tracker and all its neighbors, keep the oldest one
            all_related = neighbors.union({idx})
            
            # Find the oldest tracker in the group (highest age)
            oldest_idx = idx
            for neighbor_idx in neighbors:
                if self.trackers[neighbor_idx].age > self.trackers[oldest_idx].age:
                    oldest_idx = neighbor_idx
            
            # Mark all except the oldest for removal
            for tracker_idx in all_related:
                if tracker_idx != oldest_idx:
                    trackers_to_remove.add(tracker_idx)
        
        # Remove the marked trackers (in reverse order to maintain indices)
        for idx in sorted(trackers_to_remove, reverse=True):
            del self.trackers[idx]

    def remove_tracks(self):
        
        """
        Removes redundant trackers and logs a snapshot of current tracker states.
         
        """
        snapshot = {"time": len(self.track_history), "tracks": []}
        
        self.trackers = [t for t in self.trackers if not t.should_delete()]
        self.remove_redundant_trackers()
        
        for t in self.trackers:
            snapshot["tracks"].append({
                "id": t.id, 
                "state": t.state.copy(), 
                "cov": t.covariance.copy(),
                "active": t.active
            })
        self.track_history.append(snapshot)
            

In [27]:
def plot_tracks_gif(history,lambda_density, z_trajs=None, filename="tracks.gif", dpi=100, interval=0.1, age_threshold=10, measurement_space=0, clutter = []):
    """
    Create a GIF animation of tracks with multiple observation trajectories.
    
    Parameters:
    -----------
    history : list
        List of track snapshots
    z_trajs : list of arrays or None
        List of observation trajectories, each of shape (num_points, obs_dim)
    filename : str
        Output GIF filename
    dpi : int
        Resolution of saved frames
    interval : float
        Time between frames in seconds
    age_threshold : int
        Minimum age for a track to display its covariance ellipse
    measurement_space : numpy.ndarray
        2x2 array defining the plot boundaries [[min_x, min_y], [max_x, max_y]]
    """
    frames = {}
    
    # Define a list of colors for different trajectories
    traj_colors = ['blue', 'purple', 'cyan', 'magenta', 'orange', 'brown', 'pink', 'gray', 'olive', 'teal']
    
    for step, snapshot in enumerate(history):
        fig, ax = plt.subplots(figsize=(8, 6))
        
        # Set fixed axes limits
        ax.set_xlim(measurement_space[0][0], measurement_space[1][0])
        ax.set_ylim(measurement_space[0][1], measurement_space[1][1])
        
        # Plot observation trajectories if provided (up to current step)
        if z_trajs is not None:
            for i, z in enumerate(z_trajs):
                if step < len(z):
                    # Use modulo to cycle through colors if more trajectories than colors
                    color = traj_colors[i % len(traj_colors)]
                    
                    # Get whole path 
                    obs_path = z[:100, :2]
                    
                    # Plot path and current position
                    ax.plot(obs_path[:, 0], obs_path[:, 1], color=color, linestyle='-', 
                           linewidth=1, alpha=0.6, label=f'Observation {i+1}' if i == 0 else f'_obs{i+1}')
                    
                    # Plot current position with a marker
                    ax.scatter(obs_path[-1, 0], obs_path[-1, 1], c=color, s=30, marker='o', alpha=0.8)
        for clutt in clutter[step]:
            plt.scatter(clutt[0], clutt[1], color='gray', s=10) 
        
        # Plot tracks
        for track in snapshot["tracks"]:
            track_id = track["id"]
            x, y = track["state"][0], track["state"][1]
            cov_matrix = np.array(track["cov"])[:2, :2]
            
            if track_id not in frames:
                frames[track_id] = []
            frames[track_id].append((x, y))
            
           
                
            if track["active"] == True:
                ax.scatter(x, y, s=10, color='red', alpha=0.6)
                track_path = np.array(frames[track_id])
                ax.plot(track_path[:, 0], track_path[:, 1], color='green', linewidth=1)
                
                # Add covariance ellipse
                try:
                    lambda_, v = np.linalg.eig(cov_matrix)
                    lambda_ = np.sqrt(lambda_)
                    
                    from matplotlib.patches import Ellipse
                    ellipse = Ellipse(xy=(x, y),
                                    width=lambda_[0] * 2, height=lambda_[1] * 2,
                                    angle=np.rad2deg(np.arccos(v[0, 0])),
                                    edgecolor='red', fc='none', lw=1)
                    ax.add_patch(ellipse)
                except np.linalg.LinAlgError:
                    # Skip ellipse if there's a numerical issue
                    pass
                
        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_title(f'Time step {step}, lambda={lambda_density:.2e}')
        ax.grid()
        
        # Create custom legend with one entry per category
        if z_trajs is not None:
            custom_lines = [
                Line2D([0], [0], color='blue', linestyle='-', alpha=0.6),
                Line2D([0], [0], color='green', linestyle='-'),
                Line2D([0], [0], marker='o', color='red', linestyle='None')
            ]
            custom_labels = ['True path', 'Tracks', 'Trackers position']
            ax.legend(custom_lines, custom_labels, loc='upper right')
        
        plt.tight_layout()
        frame_path = f"frame_{step}.png"
        plt.savefig(frame_path, dpi=dpi)
        plt.close(fig)
    
    # Create GIF

    images = [imageio.imread(f"frame_{i}.png") for i in range(len(history))]
    imageio.mimsave(filename, images, duration=interval)
    
   
    image = Image.open("frame_99.png")  
    display(image)
    
    # Clean up temporary files
    for i in range(len(history)):
        os.remove(f"frame_{i}.png")
    
    print(f"GIF saved as {filename}")

In [28]:
lambda_clutter = 50

measurement_space = calculate_measurement_space(z_trajs)

V = (measurement_space[1][0] - measurement_space[0][0]) * (measurement_space[1][1] - measurement_space[0][1])


lambda_density = lambda_clutter / V
P_s = 0.6
r_0 = 0.3

clutter_hist = []

manager = TrackerManager(lambda_density, P_s, r_0)

for i in range(100):
    
    z_i_traj = [z[i] for z in z_trajs]
        
    num_clutter = np.random.poisson(lambda_clutter)
    clutter = np.random.uniform(    low=measurement_space[0], 
                                    high=measurement_space[1], 
                                    size=(num_clutter, 2))    
        
    clutter_hist.append(clutter)    
        

    
    measurements = np.vstack([z_i_traj, clutter])

    manager.associate_and_update(measurements)
    manager.remove_tracks()



In [None]:
x_modified = [sub_arr[:, [0, 1]] for sub_arr in x_trajs]
plot_tracks_gif(manager.track_history, z_trajs=x_modified, measurement_space=measurement_space, clutter=clutter_hist, lambda_density=lambda_density, filename= f"IPDA_{lambda_density:.2e}_8.gif")