In [1]:
%load_ext autoreload
%autoreload 2

import matplotlib.pyplot as plt

def format_plt():
    """
    Formats matplotlib to appear use Latex + Seaborn theme.
    """
    global plt
    plt.rcParams.update(
        {
            "text.usetex": True,
            "font.family": "serif",
            "font.serif": ["Computer Modern Roman"],  # or another LaTeX font
            "axes.grid": False,
            "text.color": "black",
        }
    )
    plt.rcParams["figure.figsize"] = [5, 5]
    plt.style.use("seaborn-v0_8-bright")

format_plt()

In [2]:
# from stonesoup.models.transition.linear import (
#     ConstantVelocity,
#     CombinedLinearGaussianTransitionModel
#     )
# from stonesoup.predictor.kalman import ExtendedKalmanPredictor
# from stonesoup.updater.kalman import ExtendedKalmanUpdater
# from stonesoup.models.measurement.nonlinear import (
#     CartesianToElevationBearingRange
#     )
# from stonesoup.types.array import CovarianceMatrix


# transition_model = CombinedLinearGaussianTransitionModel(
#     [ConstantVelocity(1.0),
#      ConstantVelocity(1.0),
#      ConstantVelocity(1.0)])

# # Model coords = elev, bearing, range. Angles in radians
# meas_covar = np.diag([np.radians(np.sqrt(10.0))**2,
#                       np.radians(0.6)**2,
#                       3.14**2])

# meas_covar_trk = CovarianceMatrix(1.0*meas_covar)
# meas_model = CartesianToElevationBearingRange(
#             ndim_state=6,
#             mapping=np.array([0, 2, 4]),
#             noise_covar=meas_covar_trk)
# predictor = ExtendedKalmanPredictor(transition_model)
# updater = ExtendedKalmanUpdater(measurement_model=meas_model)

In [15]:
from mystonesoup.drivers import AlphaStableDriver
from mystonesoup.base import CombinedLinearNonGaussianTransitionExtModel
from mystonesoup.models import Langevin
from stonesoup.types.array import CovarianceMatrix
from stonesoup.models.measurement.nonlinear import (
    CartesianToElevationBearingRange
    )
import numpy as np

seed = 4

c = 10
noise_case = 2


mu_W = 0.
sigma_W = 0.00005
alpha = 0.6
theta = -0.7
as_driver_z = AlphaStableDriver(c=c, alpha=alpha, mu_W=mu_W, sigma_W=sigma_W, noise_case=noise_case, seed=seed)
langevin_z = Langevin(theta=theta, ng_driver=as_driver_z)


mu_W = 0.
sigma_W = 0.00005
alpha = 1.2
theta = -0.1
as_driver_x = AlphaStableDriver(c=c, alpha=alpha, mu_W=mu_W, sigma_W=sigma_W, noise_case=noise_case, seed=seed)
langevin_x = Langevin(theta=theta, ng_driver=as_driver_x)

langevin_y = Langevin(theta=theta, ng_driver=as_driver_x)
transition_model = CombinedLinearNonGaussianTransitionExtModel([langevin_x, langevin_z, langevin_y])

# Model coords = elev, bearing, range. Angles in radians
meas_covar = np.diag([np.radians(np.sqrt(10.0))**2,
                      np.radians(0.6)**2,
                      3.14**2])

meas_covar_trk = CovarianceMatrix(1.0*meas_covar)
meas_model = CartesianToElevationBearingRange(
            ndim_state=6,
            mapping=np.array([0, 2, 4]),
            noise_covar=meas_covar_trk)

from mystonesoup.filters import RBParticlePredictor
predictor = RBParticlePredictor(transition_model=transition_model)

from stonesoup.resampler.particle import SystematicResampler
resampler = SystematicResampler()

from mystonesoup.filters import RBParticleUpdater
updater = RBParticleUpdater(meas_model, resampler)

In [16]:
from stonesoup.reader.generic import CSVGroundTruthReader

ground_truth_reader = CSVGroundTruthReader(
    path='dataset/UAV_Rot.csv',
    state_vector_fields=['longitude', 'Vx m/s', 'latitude', 'Vy m/s', 'altitude (m)'],
    time_field='time',
    path_id_field='groupNb',
)

from stonesoup.feeder.geo import LLAtoENUConverter
sensor_location = [-30.948, 50.297311666, 0]  # Radar position [long, lat, alt]
ground_truth_reader = LLAtoENUConverter(ground_truth_reader, sensor_location, [0, 2, 4])

In [17]:
from stonesoup.platform.base import FixedPlatform
from stonesoup.sensor.radar.radar import RadarElevationBearingRange
from stonesoup.simulator.platform import PlatformDetectionSimulator
from stonesoup.types.state import State

sensor = RadarElevationBearingRange(
    [0, 2, 4],
    meas_covar,
    6,
)
platform = FixedPlatform(
    State([0, 0, 0, 0, 0, 0]),  # Sensor at reference point, zero velocity
    [0, 2, 4],
    sensors=[sensor]
)

# Create the detector and initialize it.
detector = PlatformDetectionSimulator(ground_truth_reader, [platform])

In [18]:
from mystonesoup.filters import RBParticleStateUpdate, RBParticleState
from stonesoup.initiator.simple import SimpleMeasurementInitiator
from stonesoup.types.track import Track
from stonesoup.types.hypothesis import SingleHypothesis
from scipy.stats import multivariate_normal
from stonesoup.types.numeric import Probability  # Similar to a float type
from stonesoup.types.array import StateVectors

class Initiator(SimpleMeasurementInitiator):
    def initiate(self, detections, time, **kwargs):
        MAX_DEV = 400.
        tracks = set()
        measurement_model = self.measurement_model
        for detection in detections:
            
            state_vector = measurement_model.inverse_function(
                            detection)
            print(state_vector)
            model_covar = measurement_model.covar()

            el_az_range = np.sqrt(np.diag(model_covar)) #elev, az, range

            std_pos = detection.state_vector[2, 0]*el_az_range[1]
            stdx = np.abs(std_pos*np.sin(el_az_range[1]))
            stdy = np.abs(std_pos*np.cos(el_az_range[1]))
            stdz = np.abs(detection.state_vector[2, 0]*el_az_range[0])
            if stdx > MAX_DEV:
                print('Warning - X Deviation exceeds limit!!')
            if stdy > MAX_DEV:
                print('Warning - Y Deviation exceeds limit!!')
            if stdz > MAX_DEV:
                print('Warning - Z Deviation exceeds limit!!')
            C0 = np.diag(np.array([stdx, 30.0, stdy, 30.0, stdz, 30.0])**2)

            tracks.add(Track([RBParticleStateUpdate(
                state_vector,
                C0,
                SingleHypothesis(None, detection),
                timestamp=detection.timestamp)
            ]))
        return tracks

number_particles=2

# Sample from the prior Gaussian distribution
states = multivariate_normal.rvs(np.array([0, 0, 0, 0, 0, 0]), size=number_particles)
covars = np.stack([np.diag([0, 30.0, 0, 30.0, 0, 30.0])**2 for i in range(number_particles)], axis=2)

# Create prior particle state.
prior = RBParticleState(
    state_vector=StateVectors(states.T),
    covariance=covars,
    weight=np.array([Probability(1/number_particles)]*number_particles))

initiator = Initiator(prior, meas_model)

In [19]:
class MyDeleter:
    def delete_tracks(self, tracks):
        return set()

deleter = MyDeleter()


In [20]:
from stonesoup.measures import Euclidean
from stonesoup.dataassociator.neighbour import NearestNeighbour
from stonesoup.hypothesiser.distance import DistanceHypothesiser
from stonesoup.tracker.simple import SingleTargetTracker
meas = Euclidean()
hypothesiser = DistanceHypothesiser(predictor, updater, meas)
associator = NearestNeighbour(hypothesiser)


tracker = SingleTargetTracker(initiator,
                              deleter,
                              detector,
                              associator,
                              updater)


In [21]:
est_X=[]
est_Y=[]
meas_X=[]
meas_Y=[]
true_X = []
true_Y = []
for time, tracks in tracker:
    for ground_truth in ground_truth_reader.groundtruth_paths:
        true_X.append(ground_truth.state_vector[0])
        true_Y.append(ground_truth.state_vector[2])

    # Because this is a single target tracker, I know there is only 1 track.
    for track in tracks:

        #Get the corresponding measurement
        detection = track.states[-1].hypothesis.measurement
        # Convert measurement into xy
        xyz = meas_model.inverse_function(detection)
        meas_X.append(xyz[0])
        meas_Y.append(xyz[2])

        vec = track.states[-1].state_vector
        est_X.append(vec[0])
        est_Y.append(vec[2])


[[4.05801886e+03]
 [0.00000000e+00]
 [1.30084232e+00]
 [0.00000000e+00]
 [7.34995153e+02]
 [0.00000000e+00]]


ValueError: could not broadcast input array from shape (6,6) into shape (6,)