In [53]:
import pickle
import numpy as np
import pandas as pd

from tqdm import tqdm
from dataclasses import dataclass
from scipy.linalg import block_diag

from dynamics import Dynamics, SatelliteDynamics
from utils import get_form_initial_conditions, save_simulation_data

In [40]:
@dataclass
class SimulationConfig:
    # Simulation parameters
    dt: float = 10.0  # Time step [s] (1, 5, 10, 20, 30, 60)
    K: int = (
        6
        * 395  # Simulation duration in timesteps (57000, 11400, 5700, 2850, 1900, 950)
    )
    H: int = 2  # Window size [timesteps]
    seed: int = 42  # Random seed for reproducibility

    # Network parameters
    N: int = 4  # Number of systems
    number_of_chiefs: int = 1  # Number of chiefs
    number_of_deputies: int = N - number_of_chiefs  # Number of deputies
    n_p: int = 3  # Position vector dimension
    n_v: int = 3  # Velocity vector dimension
    n_x: int = n_p + n_v  # State vector dimension
    n: int = N * n_x  # Global state vector dimension
    o_chief: int = 3  # Chief observation vector dimension
    o_deputy: int = 3  # Deputy observation vector dimension
    o: int = (
        number_of_chiefs * o_chief + number_of_deputies * o_deputy
    )  # Global observation vector dimension

    # Observation noise
    r_chief_pos: float = 1e-1  # [m]
    R_chief: np.ndarray = np.diag(np.concatenate([r_chief_pos * np.ones(o_chief)])) ** 2
    r_deputy_pos: float = 1e0  # [m]
    R_deputy: np.ndarray = (
        np.diag(np.concatenate([r_deputy_pos * np.ones(o_deputy)])) ** 2
    )
    R_deputies: np.ndarray = block_diag(R_deputy, R_deputy, R_deputy)
    R: np.ndarray = block_diag(R_chief, R_deputies)

    # Initial deviation noise
    # Warm-start parameters
    p_pos_initial: float = 1e-20  # [m]
    p_vel_initial: float = 1e-20  # [m / s]
    # Cold-start parameters
    # p_pos_initial: float = 1e2  # [m]
    # p_vel_initial: float = 1e0  # [m / s]
    P_0_spacecraft: np.ndarray = (
        np.diag(
            np.concatenate([p_pos_initial * np.ones(n_p), p_vel_initial * np.ones(n_v)])
        )
        ** 2
    )
    P_0: np.ndarray = block_diag(
        P_0_spacecraft, P_0_spacecraft, P_0_spacecraft, P_0_spacecraft
    )

    # Levenberg-Marquardt parameters
    lambda_0: float = 1.0
    epsilon: float = 1e-6
    max_iter: int = 100

    # Consensus parameters
    L: int = 1  # Number of consensus iterations
    gamma: float = N  # Consensus gain 1
    pi: float = 1 / N  # Consensus gain 2

    # Newton's method-based algorithms parameters
    grad_norm_order_mag: bool = True
    grad_norm_tol: float = 1e-6
    max_iterations: int = 20

    # Majorization-Minimization parameters
    mm_tol: float = 1e0
    mm_max_iter: int = 20

    # Post-processing parameters
    invalid_rmse: float = 1e2  # [m]
    K_RMSE: int = H  # Index from which the RMSE is calculated

    # Spacecraft parameters
    mass: float = 1.0  # Mass [kg]
    C_drag: float = 2.2  # Drag coefficient
    A_drag: float = 0.01  # Drag area [m^2]
    C_SRP: float = 1.2  # SRP coefficient
    A_SRP: float = 0.01  # SRP area [m^2]

    # Observation model
    @classmethod
    def h(cls, x_vec):
        p_vecs = [x_vec[i : i + cls.n_p] for i in range(0, cls.n, cls.n_x)]
        distances = [p_vecs[i] - p_vecs[j] for (i, j) in [(1, 0), (2, 0), (3, 0)]]
        return np.concatenate((p_vecs[0], np.array(distances).reshape(-1, 1)))

    @classmethod
    def Dh(cls, x_vec):
        first_order_der = np.zeros((cls.o, cls.n))

        first_order_der[: cls.n_p, : cls.n_p] = np.eye(cls.n_p)

        for k, (i, j) in enumerate([(1, 0), (2, 0), (3, 0)], start=1):
            first_order_der[
                cls.n_p * k : cls.n_p * (k + 1), i * cls.n_x : i * cls.n_x + cls.n_p
            ] = np.eye(cls.n_p)
            first_order_der[
                cls.n_p * k : cls.n_p * (k + 1), j * cls.n_x : j * cls.n_x + cls.n_p
            ] = -np.eye(cls.n_p)

        return first_order_der

    @classmethod
    def Hh(cls, x_vec):
        return np.zeros((cls.o * cls.n, cls.n))

In [41]:
config = SimulationConfig()

In [42]:
class EKF:
    """
    This class implements the Extended Kalman Filter (EKF).
    """

    def __init__(self, Q, R):
        """
        Initialize the FCEKF.

        Parameters:
        Q (np.array): Process noise covariance.
        R (np.array): Measurement noise covariance.
        x_priori (np.array): State a priori estimate vector.
        P_priori (np.array): State covariance a priori estimate.
        SatelliteDynamics (class): Satellite dynamics model.
        """
        # Define noise covariances
        self.Q = Q  # Process noise covariance
        self.R = R  # Measurement noise covariance

        self.dynamic_model = SatelliteDynamics()

    def h_function_chief(self, x_vec):
        """
        Computes the measurement vector based on the current state vector.
        The measurement vector includes position components.

        Parameters:
        x_vec (np.array): The current state vector of the satellite (position [km] and velocity [km / s]).
        """
        return x_vec[0:3]

    def H_jacobian_chief(self):
        """
        Computes the Jacobian of the measurement function.
        """
        H = np.zeros((3, 6))
        H[0:3, 0:3] = np.eye(3)
        return H

    def apply(self, dt, x_est_old, P_est_old, y):
        """
        Applies the Extended Kalman Filter to the current state vector and state covariance estimate.

        Parameters:
        dt (float): Time step.
        x_est_old (np.array): The current state vector of the satellite (position [km] and velocity [km / s]).
        P_est_old (np.array): The current state covariance estimate.
        y (np.array): The measurement vector of the satellite (range [km]).

        Returns:
        x_est_new (np.array): The new state vector of the satellite (position [km] and velocity [km / s]).
        P_est_new (np.array): The new state covariance estimate.
        """
        # Prediction Step
        x_pred, F = self.dynamic_model.x_new_and_F(dt, x_est_old)
        P_pred = F @ P_est_old @ F.T + self.Q

        # Kalman Gain
        H = self.H_jacobian_chief()
        K = P_pred @ H.T @ np.linalg.inv(H @ P_pred @ H.T + self.R)

        # Update Step
        y_est = self.h_function_chief(x_pred)
        x_est_new = x_pred + K @ (y - y_est)
        P_est_new = P_pred - K @ H @ P_pred

        return x_est_new, P_est_new


class CCEKF:
    """
    This class implements the Consider Covariance Extended Kalman Filter (FCEKF).
    """

    def __init__(self, Q, R):
        """
        Initialize the FCEKF.

        Parameters:
        Q (np.array): Process noise covariance.
        R (np.array): Measurement noise covariance.
        x_priori (np.array): State a priori estimate vector.
        P_priori (np.array): State covariance a priori estimate.
        SatelliteDynamics (class): Satellite dynamics model.
        """
        # Define noise covariances
        self.Q = Q  # Process noise covariance
        self.R = R  # Measurement noise covariance

        self.dynamic_model = SatelliteDynamics()

    def h_function_deputy(self, x_vec, c_vec):
        """
        Computes the measurement vector based on the current state vector.

        Parameters:
        x_vec (np.array): The current state vector of the satellite (position [km] and velocity [km / s]).

        Returns:
        y (np.array): The measurement vector of the satellite (range [km]).
        """
        r_chief = c_vec[:3]
        r_deputy1 = x_vec[:3]
        r_deputy2 = x_vec[6:9]
        r_deputy3 = x_vec[12:15]

        range_deputy1_chief = r_deputy1 - r_chief
        range_deputy2_chief = r_deputy2 - r_chief
        range_deputy3_chief = r_deputy3 - r_chief

        return np.concatenate((range_deputy1_chief, range_deputy2_chief, range_deputy3_chief))

    def H_x(self, x_vec, c_vec):
        H = np.zeros((9, 18))
        H[:3, :3] = np.eye(3)
        H[3:6, 6:9] = np.eye(3)
        H[6:9, 12:15] = np.eye(3)
        return H

    def H_c(self, x_vec, c_vec):
        H = np.zeros((9, 6))
        H[:3, :3] = -np.eye(3)
        H[3:6, 0:3] = -np.eye(3)
        H[6:9, 0:3] = -np.eye(3)
        return H

    def apply(
        self, dt, x_est_old, P_xx_est_old, P_xc_est_old, y, c_est_new, P_cc_est_new
    ):
        # Prediction Step
        x_pred, F = self.dynamic_model.x_new_and_F(dt, x_est_old)
        P_xx_pred = F @ P_xx_est_old @ F.T + self.Q
        P_xc_pred = F @ P_xc_est_old
        P_cx_pred = P_xc_pred.T

        # Kalman Gain
        H_x = self.H_x(x_pred, c_est_new)
        H_c = self.H_c(x_pred, c_est_new)
        K = (P_xx_pred @ H_x.T + P_xc_pred @ H_c.T) @ np.linalg.inv(
            H_x @ P_xx_pred @ H_x.T
            + H_x @ P_xc_pred @ H_c.T
            + H_c @ P_cx_pred @ H_x.T
            + H_c @ P_cc_est_new @ H_c.T
            + self.R
        )

        # Update Step
        y_est = self.h_function_deputy(x_pred, c_est_new)
        x_est_new = x_pred + K @ (y - y_est)
        P_xx_est_new = P_xx_pred - K @ H_x @ P_xx_pred - K @ H_c @ P_cx_pred
        P_xc_est_new = P_xc_pred - K @ H_x @ P_xc_pred - K @ H_c @ P_cc_est_new

        return x_est_new, P_xx_est_new, P_xc_est_new


In [43]:
formation = 1

# Initial conditions and true state vectors (from Tudat)
X_initial = get_form_initial_conditions(formation)
with open(f"../data/tudatpy_form{formation}_ts_{int(config.dt)}.pkl", "rb") as file:
    X_true = pickle.load(file)
    
# Process noise covariance matrix estimation
dynamics_propagator = Dynamics()
K_true = X_true.shape[2]
X_dynamics_propagator = np.zeros((config.n, 1, K_true))
X_dynamics_propagator[:, :, 0] = X_initial
X_tudat_propagator = X_true.transpose(2, 0, 1).reshape(K_true, config.n)
for k, X_k in enumerate(X_tudat_propagator[:-1, :]):
    X_dynamics_propagator[:, :, k + 1] = dynamics_propagator.f(
        config.dt, X_k.reshape(config.n, 1)
    )
X_dynamics_propagator = X_dynamics_propagator.transpose(2, 0, 1).reshape(
    K_true, config.n
)
diff = X_tudat_propagator - X_dynamics_propagator
Q = np.cov(diff.T)
Q_chief = Q[: config.n_x, : config.n_x]
Q_deputies = Q[config.n_x :, config.n_x :]
# if args.algorithm == "lm" or args.algorithm == "hcmci":
#     Q = np.diag(np.diag(Q))
print("Estimated process noise covariance matrix :\n", pd.DataFrame(Q))
X_true = X_true[:, :, : config.K] 

# Set the random seed for reproducibility
if config.seed is not None:
    np.random.seed(config.seed)

In [51]:
M = 100

class Args:
    monte_carlo_sims = M
    algorithm = "ccekf"
    formation = formation
    
args = Args()

X_est_all = []
for m in tqdm(range(M), desc="MC runs", leave=True):
    chief_ekf = EKF(Q_chief, config.R_chief)
    deputy1_ccekf = CCEKF(Q_deputies, config.R_deputies)
    deputy2_ccekf = CCEKF(Q_deputies, config.R_deputies)
    deputy3_ccekf = CCEKF(Q_deputies, config.R_deputies)

    # Generate observations
    Y = np.zeros((config.o, 1, config.K))
    for k in range(config.K):
        Y[:, :, k] = config.h(X_true[:, :, k]) + np.random.multivariate_normal(np.zeros(config.o), config.R).reshape((config.o, 1))

    # Initial state vector and state covariance estimate
    X_est = np.zeros_like(X_true)
    X_est_chief = np.zeros((config.n_x, 1, config.K))
    X_est_deputy1 = np.zeros((config.number_of_deputies * config.n_x, 1, config.K))
    X_est_deputy2 = np.zeros((config.number_of_deputies * config.n_x, 1, config.K))
    X_est_deputy3 = np.zeros((config.number_of_deputies * config.n_x, 1, config.K))
    initial_dev = np.random.multivariate_normal(np.zeros(config.n), config.P_0).reshape((config.n, 1))
    X_est[:, :, 0] = X_initial + initial_dev
    X_est_chief[:, :, 0] = X_initial[:6] + initial_dev[:6]
    X_est_deputy1[:, :, 0] = X_initial[6:] + initial_dev[6:]
    X_est_deputy2[:, :, 0] = X_initial[6:] + initial_dev[6:]
    X_est_deputy3[:, :, 0] = X_initial[6:] + initial_dev[6:]
    P_chief = np.diag(initial_dev[:6].reshape(-1) ** 2)
    P_deputy1 = np.diag(initial_dev[6:].reshape(-1) ** 2)
    P_deputy2 = np.diag(initial_dev[6:].reshape(-1) ** 2)
    P_deputy3 = np.diag(initial_dev[6:].reshape(-1) ** 2)
    P_deputy1_chief = np.zeros((config.number_of_deputies * config.n_x, config.n_x))
    P_deputy2_chief = np.zeros((config.number_of_deputies * config.n_x, config.n_x))
    P_deputy3_chief = np.zeros((config.number_of_deputies * config.n_x, config.n_x))
    for t in range(1, config.K):
        X_est_chief[:, :, t], P_chief = chief_ekf.apply(config.dt, X_est_chief[:, :, t - 1], P_chief, Y[:3, :, t])
        X_est_deputy1[:, :, t], P_deputy1, P_deputy1_chief = deputy1_ccekf.apply(config.dt, X_est_deputy1[:, :, t - 1], P_deputy1, P_deputy1_chief, Y[3:, :, t], X_est_chief[:, :, t], P_chief)
        X_est_deputy2[:, :, t], P_deputy2, P_deputy2_chief = deputy2_ccekf.apply(config.dt, X_est_deputy2[:, :, t - 1], P_deputy2, P_deputy2_chief, Y[3:, :, t], X_est_chief[:, :, t], P_chief)
        X_est_deputy3[:, :, t], P_deputy3, P_deputy3_chief = deputy3_ccekf.apply(config.dt, X_est_deputy3[:, :, t - 1], P_deputy3, P_deputy3_chief, Y[3:, :, t], X_est_chief[:, :, t], P_chief)
        X_est[:, :, t] = np.concatenate((X_est_chief[:, :, t], X_est_deputy1[:6, :, t], X_est_deputy2[6:12, :, t], X_est_deputy3[12:18, :, t]), axis=0)
        # print(f"Position estimation error at time {t}: {np.linalg.norm(X_est[:6, :, t] - X_true[:6, :, t])} m, {np.linalg.norm(X_est[6:12, :, t] - X_true[6:12, :, t])} m, {np.linalg.norm(X_est[12:18, :, t] - X_true[12:18, :, t])} m, {np.linalg.norm(X_est[18:24, :, t] - X_true[18:24, :, t])} m")
    X_est_all.append(X_est)

MC runs:   0%|          | 0/100 [00:00<?, ?it/s]

MC runs:   0%|          | 0/100 [00:02<?, ?it/s]


KeyboardInterrupt: 

In [54]:
save_simulation_data(args, X_true, X_est_all)

IndexError: list index out of range