# Orbit Determination using Unscented Kalman Filter

This tutorial demonstrates how to implement an Unscented Kalman Filter (UKF) for orbit determination. 

Soon filters and methods will be implemented directly in OSTk and be usable in a couple of lines of code only ! This notebook uses GNSS Data.

## Setup

In [None]:
import os
import datetime
from dateutil import parser

import numpy as np
import pandas as pd

import plotly.graph_objs as go
from plotly.subplots import make_subplots

from scipy.linalg import sqrtm

from ostk.mathematics.objects import RealInterval

from ostk.physics.units import Length
from ostk.physics.units import Angle
from ostk.physics.units import Time
from ostk.physics.units import Derived
from ostk.physics.time import Scale
from ostk.physics.time import Instant
from ostk.physics.time import Duration
from ostk.physics.time import Interval
from ostk.physics.time import DateTime
from ostk.physics.coordinate import Position
from ostk.physics.coordinate import Velocity
from ostk.physics.coordinate.spherical import LLA
from ostk.physics.coordinate.spherical import AER
from ostk.physics.coordinate import Frame
from ostk.physics import Environment
from ostk.physics.environment.objects.celestial_bodies import Earth

from ostk.astrodynamics import Trajectory
from ostk.astrodynamics.trajectory import Orbit
from ostk.astrodynamics.trajectory.orbit.models import Kepler
from ostk.astrodynamics.trajectory.orbit.models.kepler import COE
from ostk.astrodynamics.trajectory.orbit.models import SGP4
from ostk.astrodynamics.trajectory.orbit.models.sgp4 import TLE
from ostk.astrodynamics import Access
from ostk.astrodynamics.access import Generator as AccessGenerator

In [None]:
def convert_state (instant, state):
    
    '''
    Using ECI (GCRF) reference frame.
    '''
    
    return [
        repr(instant),
        *state.get_position().in_frame(Frame.GCRF(), instant).get_coordinates().transpose().tolist(),
        *state.get_velocity().in_frame(state.get_position(), Frame.GCRF(), instant).get_coordinates().transpose().tolist(),
    ]

def generate_orbit_df (orbit, instants, step):

    orbit_data = [
        convert_state(instant, orbit.get_state_at(instant))
        for instant in instants
    ]

    return pd.DataFrame(
        orbit_data,
        columns =
            [
                'Time (ISO-YMD)',
                'x_ECI (m)', 'y_ECI (m)', 'z_ECI (m)', 'vx_ECI (m/sec)', 'vy_ECI (m/sec)', 'vz_ECI (m/sec)'
            ]
    )

In [None]:
def w (dim, cov):
    return np.random.multivariate_normal(np.zeros(dim), cov)

def g (C_t, X_t):
    return np.dot(C_t, X_t)

In [None]:
def UT (mu, Sigma, lda = 2):

    '''
    Unscented Transform.
    '''

    n = len(mu)

    xtab = np.zeros((2 * n + 1, n))
    wtab = np.ones((2 * n + 1, 1)) / 2 / (lda + n)

    xtab[0] = mu
    wtab[0] = lda / (lda + n)

    S = sqrtm((lda + n) * Sigma)

    for i in range(1, n + 1):

        xtab[i] = mu + S[:, i - 1]
        xtab[i + n] = mu - S[:, i - 1] 

    assert np.sum(wtab) == 1

    return xtab, wtab


def UTi (xtab, wtab):

    '''
    Inverse Unscented Transform.
    '''

    mu = np.sum(xtab * wtab, axis = 0)
    xh = xtab - mu
    Sigma = np.dot(xh.T, xh * wtab)

    return mu, Sigma

def UKF (X, Y, mu0, Sigma0, Q, R):

    '''
    Unscented Kalman Filter.
    '''

    T, n = X.shape

    mu = np.zeros((T, n))
    Sigma = np.zeros((T, n, n))

    mu[0, :] = mu0
    Sigma[0, :, :] = Sigma0
    
    for t in range(1, T):

        # Predict

        # Compute sigma-points and weights
        xtab, wtab = UT(mu[t - 1, :], Sigma[t - 1, :, :])

        for i in range(0, len(xtab)):
            xtab[i] = next_state(ref_orbit, instants, t-1)
        
        # Predict mean and covariance
        mu[t, :], Sigma[t, :, :] = UTi(xtab, wtab)
        Sigma[t,:,:] += Q
        
        # Update

        # Recompute sigma-points with predictions
        xtab, wtab = UT(mu[t, :], Sigma[t, :, :])
        
        # Sigma-point measurements
        ytab = np.zeros((2 * n + 1, n))

        for i in range(0, len(xtab)):
            ytab[i] = g(np.eye(n), xtab[i])
        
        # Expected measurement
        yh = np.sum(ytab * wtab, axis = 0)
        
        # Empirical covariances
        Sigma_y = np.dot((ytab - yh).T, wtab * (ytab - yh))
        Sigma_y += R
        Sigma_xy = np.dot((xtab - mu[t, :]).T, wtab * (ytab - yh))
        
        # Update
        K = np.dot(Sigma_xy, np.linalg.inv(Sigma_y))
        mu[t, :] += np.dot(K, (Y[t, :] - yh))
        Sigma[t, :, :] -= np.dot(K, Sigma_xy.T)

    return mu, Sigma

In [None]:
def next_state (orbit, instants, t):
    
    next_state = orbit.get_state_at(instants[t + 1])

    return [
        *next_state.get_position().in_frame(Frame.GCRF(), instants[t + 1]).get_coordinates().transpose().tolist(),
        *next_state.get_velocity().in_frame(next_state.get_position(), Frame.GCRF(), instants[t + 1]).get_coordinates().transpose().tolist()
    ]

---

In [None]:
step = Duration.seconds(1.0)

---

Bootstrap environment:

In [None]:
earth = Environment.default().access_celestial_object_with_name('Earth')

## Reference Trajectory

In [None]:
tle = TLE(
    '1 99994U          21181.85503887  .00000000  00000-0  00000-0 0 00000',
    '2 99994 097.5068 310.6402 0011081 218.2042 145.7731 15.12723100000014'
)

ref_orbit = Orbit(SGP4(tle), earth)

In [None]:
tle.get_epoch()

---

## Configuration

Instants:

In [None]:
orbit_file = f'data/gps_2021070209270000Z_202107020930000Z.csv'

all_measured_profile_df = pd.read_csv(os.path.join(os.getcwd(), orbit_file))[
    [
        'Timestamp',
        'vel_eci_x', 'vel_eci_y', 'vel_eci_z', 'pos_eci_x', 'pos_eci_y', 'pos_eci_z'
    ]
]

Trim data to use for training only (using a series of continuous data at the moment)

In [None]:
start = 0
measured_profile_df = all_measured_profile_df

In [None]:
measured_profile_df

Generate a time grid:

In [None]:
instants = [
#     Instant.date_time(datetime.datetime.strptime(measured_profile_df['Timestamp'][index + start], "%Y-%m-%d %H:%M:%S"), Scale.UTC)
    Instant.date_time(parser.parse(measured_profile_df['Timestamp'][index + start]).replace(tzinfo = datetime.timezone.utc), Scale.UTC)
    for (index, _) in enumerate(measured_profile_df['Timestamp'])
]

In [None]:
start_instant = instants[0]
end_instant = instants[-1]

In [None]:
ref_orbit_df = generate_orbit_df(ref_orbit, instants, step)

In [None]:
# pd.set_option("display.max_rows", None, "display.max_columns", None)
ref_orbit_df

In [None]:
measurement_instants = instants

---

In [None]:
# Covariance of process noise
Q = np.diag([1e0, 1e0, 1e0, 1e-3, 1e-3, 1e-3])

# Covariance of measurement noise
R = np.diag([1e3, 1e3, 1e3, 1e-1, 1e-1, 1e-1])

mu0 = np.array([
    *ref_orbit.get_state_at(start_instant).get_position().in_frame(Frame.GCRF(), start_instant).get_coordinates().transpose().tolist(),
    *ref_orbit.get_state_at(start_instant).get_velocity().in_frame(ref_orbit.get_state_at(start_instant).get_position(), Frame.GCRF(), start_instant).get_coordinates().transpose().tolist()
]) \
+ np.random.multivariate_normal(np.zeros(6), R)

Sigma0 = np.diag([1e0, 1e0, 1e0, 1e-3, 1e-3, 1e-3])

ref_orbit_states = np.array([
    [
        *ref_orbit.get_state_at(instant).get_position().get_coordinates().transpose().tolist(),
        *ref_orbit.get_state_at(instant).get_velocity().get_coordinates().transpose().tolist()
    ]
    for instant in instants
]) \
+ np.array(
    [
        np.random.multivariate_normal(np.zeros(6), Q)
        for instant in instants
    ]
) # process noise

measurements = np.array([
    [
        measured_profile_df['pos_eci_x'][index + start],
        measured_profile_df['pos_eci_y'][index + start],
        measured_profile_df['pos_eci_z'][index + start],
        measured_profile_df['vel_eci_x'][index + start],
        measured_profile_df['vel_eci_y'][index + start],
        measured_profile_df['vel_eci_z'][index + start],
    ]
    for (index, _) in enumerate(instants)
]) \
+ np.array(
    [
        np.random.multivariate_normal(np.zeros(6), R)
        for instant in instants
    ]
) # measurement noise

mu_UKF, _ = UKF(
    X = ref_orbit_states,
    Y = measurements,
    mu0 = mu0,
    Sigma0 = Sigma0,
    Q = Q,
    R = R
)

---

In [None]:
dx_ECI_norm = np.linalg.norm(
    ref_orbit_df[['x_ECI (m)', 'y_ECI (m)', 'z_ECI (m)']].to_numpy()
    - mu_UKF[:, 0:3],
    axis = 1
)

dv_ECI_norm = np.linalg.norm(
    ref_orbit_df[['vx_ECI (m/sec)', 'vy_ECI (m/sec)', 'vz_ECI (m/sec)']].to_numpy()
    - mu_UKF[:, 3:6],
    axis = 1
)

print(f'Min. |Δx_ECI|: {min(dx_ECI_norm):.3f} [m]')
print(f'Avg. |Δx_ECI|: {np.mean(dx_ECI_norm):.3f} [m]')
print(f'Max. |Δx_ECI|: {max(dx_ECI_norm):.3f} [m]')
print('\n')
print(f'Min. |Δv_ECI|: {min(dv_ECI_norm):.3f} [m/s]')
print(f'Avg. |Δv_ECI|: {np.mean(dv_ECI_norm):.3f} [m/s]')
print(f'Max. |Δv_ECI|: {max(dv_ECI_norm):.3f} [m/s]')

def mean(x):
    return np.mean(x) * np.ones(np.shape(x))

data = [
    dx_ECI_norm,
    dv_ECI_norm
]

figure = go.FigureWidget(make_subplots(rows = len(data), cols = 1))

for (i, d) in enumerate(data):

    figure.append_trace(
        go.Scatter(
            y = d,
            mode = 'lines+markers',
            marker_size = 3,
            marker_symbol = 'cross',
            line_width = 1,
        ),
        row = i + 1,
        col = 1
    )

    figure.append_trace(
        go.Scatter(
            y = mean(d),
            mode = 'lines',
            line = dict(
                color = 'rgb(0, 0, 0)',
                width = 1
            ),
        ),
        row = i + 1,
        col = 1
    )

figure.update_xaxes(title_text = 'Time points', row = len(data), col = 1)
figure.update_yaxes(title_text = '|Δx_ECI| (m)', row = 1, col = 1)
figure.update_yaxes(title_text = '|Δv_ECI| (m/s)', row = 2, col = 1)

figure.update_layout(showlegend = False)

figure.show()

In [None]:
# analysis_instant = instants[-1]
analysis_instant = instants[int(len(instants) / 2)]

In [None]:
mu_UKF[instants.index(analysis_instant), 0:3]

In [None]:
mu_UKF[instants.index(analysis_instant), 3:6]

---

Estimated state at analysis instant:

In [None]:
analysis_instant

In [None]:
Position.meters(
    mu_UKF[instants.index(analysis_instant), 0:3],
    Frame.GCRF()
)

In [None]:
Velocity.meters_per_second(
    mu_UKF[instants.index(analysis_instant), 3:6],
    Frame.GCRF()
)

---

Reference state at analysis instant:

In [None]:
ref_orbit.get_state_at(analysis_instant).in_frame(Frame.GCRF())

---