## Extended Object Tracking Example (MTT-EOT)


#### Describe n targets as ground truths;
The target shape is described by the 'length' and 'width', semi-major and semi-minor axis of the ellipse, and the 'orientation'.

### General imports



In [1]:
import numpy as np
from datetime import datetime, timedelta
from scipy.stats import uniform, invwishart, multivariate_normal
from scipy.cluster.vq import kmeans

### Stone Soup components



In [2]:
from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \
    ConstantVelocity
from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState
from stonesoup.types.state import GaussianState, State
from stonesoup.models.measurement.linear import LinearGaussian
from stonesoup.types.detection import Detection, Clutter

from extended_object_data_generation import get_5_object_targets, get_truths_for_targets

# Simulation setup
start_time = datetime.now().replace(microsecond=0)
np.random.seed(1908)  # fix the seed
num_steps = 65  # simulation steps
poisson_distribution_lambda = 1 # average number of detections per timestep, but range of noise created from a poisson distribution
max_points_from_target = 4  # max number of detections from a target
min_points_from_target = 1  # max number of clutter points
expected_velocity = 1  # expected velocity of the targets used in transition model
time_measurement_delta = 5 # time betweeen measurements in the plots when generated
default_tail_length = 0.2  # default tail length for the plot
scale_of_noise = 250  # scale of the noise for the detection

rng = np.random.default_rng()  # random number generator for number of detections

# Define the transition model
transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(expected_velocity),
                                                          ConstantVelocity(expected_velocity)])

targets, metadatas = get_5_object_targets(start_time)
truths = get_truths_for_targets(targets, metadatas, num_steps, transition_model) # Get the set of truths for the targets

# number_k_cluster = len(targets) # number of clusters for k-means - using dbscan instead now, so not needed!!

#### 2. Collect mock measurements from the targets;

In [6]:
# Define the measurement model
measurement_model = LinearGaussian(ndim_state=4,
                                   mapping=(0, 2),
                                   noise_covar=np.diag([25, 25]))

# create a series of scans to collect the measurements and clutter
scans = []
for k in range(num_steps):
    measurement_set = set()

    # iterate for each case
    for truth in truths:

        # current state
        current = truth[k]

        # Identify how many detections to obtain
        sampling_points = rng.integers(low=min_points_from_target, high=max_points_from_target, size=1)

        # Centre of the distribution
        mean_centre = np.array([current.state_vector[0],
                                current.state_vector[2]])

        # covariance of the distribution
        covar = np.diag([current.metadata['length'], current.metadata['width']])

        # rotation matrix of the ellipse
        rotation = np.array([[np.cos(current.metadata['orientation']),
                              -np.sin(current.metadata['orientation'])],
                             [np.sin(current.metadata['orientation']),
                              np.cos(current.metadata['orientation'])]])

        rot_covar = np.dot(rotation, np.dot(covar, rotation.T))

        # use the elliptic covariance matrix
        covariance_matrix = invwishart.rvs(df=3, scale=rot_covar)

        # Sample points
        sample_point = np.atleast_2d(multivariate_normal.rvs(mean_centre,
                                                             covariance_matrix,
                                                             size=sampling_points))

        for ipoint in range(len(sample_point)):
            point = State(np.array([sample_point[ipoint, 0], current.state_vector[1],
                                    sample_point[ipoint, 1], current.state_vector[3]]))

            # Collect the measurement
            measurement = measurement_model.function(point, noise=True)

            # add the measurement on the measurement set
            measurement_set.add(Detection(state_vector=measurement,
                                          timestamp=current.timestamp,
                                          measurement_model=measurement_model))

        # Clutter detections
        truth_x = current.state_vector[0]
        truth_y = current.state_vector[2]

        for _ in range(np.random.poisson(poisson_distribution_lambda)):
            # Generate one sample from a uniform distribution ranging from -100 to 50
            # loc = -100 (starting point), scale = 150 (extends 150 units from the loc)
            x = uniform.rvs(loc=-100, scale=scale_of_noise)
            y = uniform.rvs(loc=-100, scale=scale_of_noise)
            measurement_set.add(Clutter(np.array([[truth_x + x], [truth_y + y]]),
                                        timestamp=current.timestamp,
                                        measurement_model=measurement_model))

    scans.append(measurement_set)

State(
    state_vector=StateVector([[ 2.99734249e+03],
                              [-1.00000000e-01],
                              [ 6.98297655e+02],
                              [-1.60000000e+00]]),
    timestamp=None)
[[2989.65951209]
 [ 695.00477812]]
State(
    state_vector=StateVector([[ 3.00563934e+03],
                              [-1.00000000e-01],
                              [ 6.95126898e+02],
                              [-1.60000000e+00]]),
    timestamp=None)
[[3002.87315773]
 [ 691.39357009]]
State(
    state_vector=StateVector([[ 2.99604287e+03],
                              [-1.00000000e-01],
                              [ 7.01524763e+02],
                              [-1.60000000e+00]]),
    timestamp=None)
[[2988.94363092]
 [ 710.47696872]]
State(
    state_vector=StateVector([[ 4.66847827e-01],
                              [ 5.00000000e-02],
                              [-9.00368668e+02],
                              [ 9.00000000e-01]]),
    timestamp=N

#### Visualise the tracks and the detections



In [7]:
from stonesoup.plotter import AnimatedPlotterly

timesteps = [start_time + timedelta(seconds=time_measurement_delta*k) for k in range(num_steps)]
plotter = AnimatedPlotterly(timesteps, tail_length=0.2)
plotter.plot_ground_truths(truths, [0, 2])
plotter.plot_measurements(scans, [0, 2])
# print(scans)
plotter.fig

In [5]:
timesteps = [start_time + timedelta(seconds=time_measurement_delta*k) for k in range(num_steps)]
plotter = AnimatedPlotterly(timesteps, tail_length=default_tail_length)
plotter.plot_ground_truths(truths, [0, 2])
plotter.plot_measurements(scans, [0, 2])
plotter.fig

#### 3. Prepare tracking algorithm, run cluster:
Use ExtendedKalman filter for prediction
Use K-means cluster method where k=<# objects tracked above>
We could dynamically identify the number of objects though.

In [6]:
# Load the tracking components
from stonesoup.predictor.kalman import ExtendedKalmanPredictor
from stonesoup.updater.kalman import ExtendedKalmanUpdater

# Initiator and deleter
from stonesoup.deleter.time import UpdateTimeStepsDeleter
from stonesoup.initiator.simple import MultiMeasurementInitiator

# data associator
from stonesoup.dataassociator.neighbour import GlobalNearestNeighbour
from stonesoup.hypothesiser.distance import DistanceHypothesiser
from stonesoup.measures import Mahalanobis

# Tracker

from stonesoup.tracker.simple import MultiTargetTracker
# Prepare the predictor and updater
predictor = ExtendedKalmanPredictor(transition_model)
updater = ExtendedKalmanUpdater(measurement_model)

# Instantiate the deleter
deleter = UpdateTimeStepsDeleter(5)

# Hypothesiser
hypothesiser = DistanceHypothesiser(
    predictor=predictor,
    updater=updater,
    measure=Mahalanobis(),
    missed_distance=10)

# Data associator
data_associator = GlobalNearestNeighbour(hypothesiser)

# Initiator
initiator = MultiMeasurementInitiator(
    prior_state=GaussianState(np.array([0, 0, 0, 0]),
                              np.diag([10, 0.5, 10, 0.5]) ** 2,
                              timestamp=start_time),
    measurement_model=None,
    deleter=deleter,
    data_associator=data_associator,
    updater=updater,
    min_points=7)

#### Cluster the detections
Before running the tracker we employ a simple clustering method to identify neighbouring
detections to the same target and identify a detection centroid, which will be used by the
tracker.

In [7]:
from sklearn.cluster import DBSCAN
from scipy.spatial import ConvexHull
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse

def plot_ellipse(position, covariance, ax=None, **kwargs):
    """Plot an ellipse with a given position and covariance"""
    ax = ax or plt.gca()
    
    # Convert covariance to principal axes
    if covariance.shape == (2, 2):
        U, s, Vt = np.linalg.svd(covariance)
        angle = np.degrees(np.arctan2(U[1, 0], U[0, 0]))
        width, height = 2 * np.sqrt(s)
    else:
        angle = 0
        width, height = 2 * np.sqrt(covariance)
    
    # Draw the Ellipse
    for nsig in range(1, 4):
        ax.add_patch(Ellipse(position, nsig * width, nsig * height, angle, **kwargs))

# Create a list of detections and timestamps
centroid_detections = []
timestamps = []

for iscan in range(len(scans)):  # loop over the scans
    detections = []
    detection_set = set()
    # loop over the items of the scan, both detection and clutter
    for item in scans[iscan]:
        detections.append(np.array([item.state_vector[0], item.state_vector[1]]))
        
    detections = np.array(detections)
    
    # Find clusters in the data using DBSCAN
    # eps and min_samples need to be chosen based on your specific data distribution
    clustering = DBSCAN(eps=100, min_samples=2).fit(detections)
    labels = clustering.labels_
    
    # Analyze cluster results
    unique_labels = set(labels)
    n_clusters = len(unique_labels) - (1 if -1 in labels else 0)
    # print(f"Scan {iscan+1}:")
    # print(f"Number of clusters: {n_clusters}")
    # print("Number of detections per cluster:")
    # for label in unique_labels:
    #     if label == -1:
    #         print(f"Outliers: {list(labels).count(label)}")
    #     else:
    #         print(f"Cluster {label}: {list(labels).count(label)}")
    
    # Gather centroids of clusters (average of points in each cluster)
    unique_labels = set(labels)
    centroids = np.array([detections[labels == label].mean(axis=0) for label in unique_labels if label != -1])
    print(centroids)
    
    # # Find clusters in the data
    # centroids, _ = kmeans(detections, number_k_cluster)

    # iterate over the centroids to create a new set of detections
    for idet in range(len(centroids)):
        detection_set.add(Detection(state_vector=[[centroids[idet][0]], [centroids[idet][1]]],
                                    timestamp=item.timestamp,
                                    measurement_model=item.measurement_model))

    # # Now a new set of scans with the centroid detections
    centroid_detections.append(detection_set)
    timestamps.append(item.timestamp)
    
    # # Visualize the clusters for this scan
    # plt.figure(figsize=(8, 6))
    # ax = plt.gca()
    # colors = plt.cm.rainbow(np.linspace(0, 1, len(set(labels))))
    
    # for k, col in zip(set(labels), colors):
    #     class_member_mask = (labels == k)
    #     xy = detections[class_member_mask]
    #     plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=col, markeredgecolor='k', markersize=5, label=f'Cluster {k}')
        
    #     if k != -1:  # Draw Ellipse for actual clusters, not outliers
    #         # Fit an ellipse to the cluster
    #         if len(xy) > 2:  # Need at least three points to fit an ellipse
    #             cov = np.cov(xy, rowvar=False)
    #             plot_ellipse(xy.mean(axis=0), cov, ax=ax, edgecolor=col, lw=2, facecolor='none')

    # plt.title(f'Cluster Visualization for Scan {iscan+1}')
    # plt.xlabel('X coordinate')
    # plt.ylabel('Y coordinate')
    # plt.legend()
    # plt.grid(True)
    # plt.show()
    
     # Optional: Visualize the clusters for this scan
    # Visualize the clusters for this scan
    # plt.figure(figsize=(8, 6))
    # colors = plt.cm.rainbow(np.linspace(0, 1, len(unique_labels)))
    # for k, col in zip(unique_labels, colors):
    #     if k == -1:
    #         # Black used for noise.
    #         col = 'k'

    #     class_member_mask = (labels == k)
    #     xy = detections[class_member_mask]
    #     plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=col, markeredgecolor='k', markersize=6, label=f'Cluster {k}')

    #     if k != -1:  # Draw Convex Hull for actual clusters, not outliers
    #         if len(xy) > 2:  # ConvexHull needs at least three points to compute
    #             hull = ConvexHull(xy)
    #             for simplex in hull.simplices:
    #                 plt.plot(xy[simplex, 0], xy[simplex, 1], col)

    # plt.title(f'Cluster Visualization for Scan {iscan+1}')
    # plt.xlabel('X coordinate')
    # plt.ylabel('Y coordinate')
    # plt.legend()
    # plt.grid(True)
    # plt.show()

# prepare the tracker
tracker = MultiTargetTracker(
    initiator=initiator,
    deleter=deleter,
    data_associator=data_associator,
    updater=updater,
    detector=zip(timestamps, centroid_detections))

[[2996.80070822  700.97329009]
 [-445.40554575  201.13128684]]
[]
[[3024.09947911  665.90766874]
 [-407.72144207  145.796233  ]]
[[3029.57192581  658.59358447]
 [-472.75601965  151.42920804]]
[[3036.57734056  640.13356106]
 [-481.25488034  110.09011954]]
[[-489.92532174   78.72964702]
 [3079.96630009  644.50912592]]
[[-486.96122042   30.73655631]
 [3042.1936759   630.8290697 ]]
[[3035.84114543  638.87722152]]
[[-444.6990799   -35.2655156 ]
 [3067.4905579   578.63021774]]
[[3023.31523658  537.79937437]
 [-410.24942476  -80.1517806 ]]
[[3060.63103241  568.84394614]
 [-361.66191817 -123.32617186]]
[[-305.83043826 -147.34993365]
 [3054.50813579  584.7151922 ]
 [3179.63266078  538.65158738]]
[[-222.43209579 -222.56037445]
 [3019.72982367  543.63709672]]
[[2976.30787073  518.318904  ]
 [-212.24631233 -247.17785188]]
[[-158.76672984 -326.1032299 ]
 [3042.82255459  498.19018121]]
[[-142.31298134 -384.54679676]
 [2845.96222478  519.10432255]]
[[2768.64414833  410.30012538]
 [ -53.11018492 -461.

#### 4. Run the tracker, visualise the results.

In [8]:
tracks = set()
for time, current_tracks in tracker:
    tracks.update(current_tracks)

plotter.plot_measurements(centroid_detections, [0, 2], marker=dict(color='red'),
                          measurements_label='Cluster centroids')
plotter.plot_tracks(tracks, [0, 2])
plotter.fig