In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import sys
sys.path.append(os.path.realpath('..'))

In [3]:
import textwrap

import numpy as np
import pandas as pd
from scipy.stats import chi2

import plotly.express as ex
import plotly.graph_objects as go

In [4]:
from tracking_v2.sensor import GeometricSensor
from tracking_v2.target import SingleTurnTarget, ConstantVelocityTarget
from tracking_v2.motion import ConstantVelocityModel
from tracking_v2.np import as_column
from tracking.util import to_df

In [5]:
class SensorMeasurement:
    def __init__(self, orientation, z, R):
        self.orientation = np.asarray(orientation)
        self.z           = np.asarray(z)
        self.R           = np.asarray(R)


class GeometricInformationSensor:
    """Omni-directional sensor located in (0,0,0), pointing East. Produces a rotation
    matrix into the measurement frame (frame centered on the ray), a detection
    (range, 0, 0) and the inverse of the measurement noise matrix, R^-1"""
    
    def __init__(self, R, seed=0):
        self.R = np.array(R)
        self.R_inv = np.linalg.inv(R)
        self.rng = np.random.default_rng(seed=seed)
    
    def generate_measurement(self, t: float, target):
        if isinstance(target, list) or isinstance(target, np.ndarray):
            position = np.asarray(target)
        else:
            position = target.true_state(t)
            
        measurement = self.rng.multivariate_normal(position.squeeze()[:3], self.R, size=1)
        return SensorMeasurement(None, measurement, self.R)


class RadarInformationSensor:
    """Omni-directional sensor located in (0,0,0), pointing East. Produces a rotation
    matrix into the measurement frame (frame centered on the ray), a detection
    (range, 0, 0) and the inverse of the measurement noise matrix, R^-1"""
    
    def __init__(self, R, seed=0):
        self.R = np.array(R)
        self.R_inv = np.linalg.inv(R)
        self.rng = np.random.default_rng(seed=seed)
    
    def generate_measurement(self, t: float, target):
        if isinstance(target, list) or isinstance(target, np.ndarray):
            position = np.asarray(target)
        else:
            position = target.true_state(t)
            
        measurement = self.rng.multivariate_normal(position.squeeze()[:3], self.R, size=1)
        
        x, y, z = measurement.squeeze()
        xy = np.sqrt(x*x + y*y)
        xyz = np.linalg.norm(measurement)

        sin_alpha = y/xy
        cos_alpha = np.sqrt(1 - sin_alpha*sin_alpha)
        
        sin_beta = z/xyz
        cos_beta = np.sqrt(1 - sin_beta*sin_beta)
        
        Ry = np.array([[ cos_beta, 0, sin_beta],
                       [ 0,        1, 0       ],
                       [-sin_beta, 0, cos_beta]])
        Rz = np.array([[cos_alpha, -sin_alpha, 0],
                       [sin_alpha,  cos_alpha, 0],
                       [0,          0,         1]])
        
        return SensorMeasurement(Ry @ Rz, (xyz, 0, 0), self.R)

In [33]:
class InformationFilter:
    def __init__(self, H, motion_model):
        self.state_dim = 6
        
        self.y_hat = as_column(np.zeros(6)) # P^-1 @ x_hat
        self.Y_hat = np.eye(6)   # P^-1
        
        self.H     = np.atleast_2d(H)
        self.motion_model = motion_model

        assert motion_model.state_dim == self.state_dim
        assert motion_model.state_dim == self.H.shape[1]

    def initialize(self, x, P):
        P_inv = np.linalg.inv(P)
        r, c = P_inv.shape
        self.Y_hat[:r, :c] = P_inv

        x = np.asarray(x).squeeze()
        x = np.pad(x, (0, self.state_dim - len(x)), constant_values=[0])
        self.y_hat = self.Y_hat @ as_column(x)

    def predict(self, dt):
        # https://en.wikipedia.org/wiki/Kalman_filter#Information_filter
        F = self.motion_model.F(dt)
        F_inv = np.linalg.inv(F)

        Q = self.motion_model.Q(dt)
        Q_inv = np.linalg.inv(Q)

        M = F_inv.T @ self.Y_hat @ F_inv

        self.Y_hat = np.linalg.inv(np.eye(self.state_dim) + M @ Q) @ M
        self.y_hat = np.linalg.inv(np.eye(self.state_dim) + M @ Q) @ F_inv.T @ self.y_hat

    def update(self, orientation, z, R):
        if orientation is not None:
            raise Exception("orientation not supported")

        R_inv = np.linalg.inv(R)
        I = self.H.T @ R_inv @ self.H
        i = self.H.T @ R_inv @ as_column(z)

        self.Y_hat += I
        self.y_hat += i

        if orientation is not None:
            raise Exception("orientation not supported")

    @property
    def P_hat(self):
        return np.linalg.inv(self.Y_hat)

    @property
    def x_hat(self):
        return self.P_hat @ self.y_hat

In [34]:
#target = SingleTurnTarget(30, 1, [-1000, -1000, 100])

target = ConstantVelocityTarget()
sensor = GeometricInformationSensor(np.diag([10, 10, 10]))

In [35]:
inf_flt = InformationFilter([[1, 0, 0, 0, 0, 0],
                             [0, 1, 0, 0, 0, 0],
                             [0, 0, 1, 0, 0, 0]],
                            ConstantVelocityModel())

T = 1
t = 0

m = sensor.generate_measurement(t, target)
inf_flt.initialize(m.z, m.R)

In [58]:
t += T

inf_flt.predict(T)
print(inf_flt.x_hat.T)

m = sensor.generate_measurement(t, target)
inf_flt.update(None, m.z, m.R)

print(inf_flt.x_hat.T)

[[ 6.93009937e+02  2.42531283e+00  1.23282970e+00  3.11254418e+01
   9.75616868e-01 -2.28686472e-02]]
[[6.87536702e+02 1.18521101e+00 1.74250834e+00 2.90053185e+01
  4.95248475e-01 1.74561520e-01]]
