In [1]:
import numpy as np
import pandas as pd
import plotly.express as px
import math

from ostk.physics.environment.objects.celestial_bodies import Earth
from ostk.physics.environment.gravitational import Earth as EarthGravitationalModel
from ostk.physics.environment.atmospheric import Earth as EarthAtmosphericModel
from ostk.physics.environment.magnetic import Earth as EarthMagneticModel
from ostk.physics.time import Instant, Duration, Scale
from ostk.physics.coordinate import Frame, Position, Velocity
from ostk.physics.units import Length, Angle

from ostk.astrodynamics.trajectory.state import NumericalSolver
from ostk.astrodynamics.trajectory import Propagator, State
from ostk.astrodynamics.trajectory.state import CoordinatesSubset, CoordinatesBroker
from ostk.astrodynamics.trajectory.state.coordinates_subset import (
    CartesianPosition,
    CartesianVelocity,
)
from ostk.astrodynamics.trajectory.orbit.models.kepler import COE
from ostk.astrodynamics.flight.system import Dynamics
from ostk.astrodynamics.flight.system.dynamics import (
    CentralBodyGravity,
    PositionDerivative,
)

In [19]:
cartesian_position = CartesianPosition.default()
cartesian_velocity = CartesianVelocity.default()
mass_subset = CoordinatesSubset.mass()

In [64]:
def central_difference(
    state_array: np.ndarray,
    target_epochs: list,
    get_state_coordinates,
    step_pct: float = 1e-3,
) -> np.ndarray:
    """
    Obtain the Jacobian via central differencing.

    Args:
        state_array (np.ndarray): Initial state array.
        target_epochs (list): Target epochs at the observation.
        get_state_coordinates (Callable): Function to get the states at the target epochs.
        step_pct (float): Step size as a percentage. Defaults to 1e-3.

    Returns:
        np.ndarray: Jacobian matrix
    """

    state_vector_dimension: int = len(state_array)

    A: np.ndarray = np.zeros(
        (state_vector_dimension, state_vector_dimension * len(target_epochs))
    )

    for i in range(state_vector_dimension):
        step_size: float = (state_array[i] * step_pct) or step_pct

        perturbed_state_array: np.ndarray = state_array.copy()

        # perturb state forward
        perturbed_state_array[i] = state_array[i] + step_size

        forward_steps: list = get_state_coordinates(perturbed_state_array, target_epochs)
        forward_step_array: np.ndarray = np.array(forward_steps)

        # perturb state backward
        perturbed_state_array[i] = state_array[i] - step_size

        backward_steps: list = get_state_coordinates(perturbed_state_array, target_epochs)
        backward_step_array: np.ndarray = np.array(backward_steps)

        # central difference
        A[:, i::state_vector_dimension] = (
            (forward_step_array - backward_step_array) / (2.0 * step_size)
        ).transpose()

    return A

In [73]:
class OE:
    def __init__(
        self,
        sma: float,
        ecc: float,
        inc: float,
        raan: float,
        aop: float,
        true_anomaly: float,
    ):
        self._state_array = np.array([sma, ecc, inc, raan, aop, true_anomaly])

    @classmethod
    def from_COE(cls, coe: COE):
        sma = float(coe.get_semi_major_axis().in_meters())
        ecc = float(coe.get_eccentricity())
        inc = float(coe.get_inclination().in_radians())
        raan = float(coe.get_raan().in_radians())
        aop = float(coe.get_aop().in_radians())
        true_anomaly = float(coe.get_true_anomaly().in_radians())

        return cls(sma, ecc, inc, raan, aop, true_anomaly)

    @classmethod
    def from_array(cls, state_array: np.ndarray):
        return cls(*state_array)

    @property
    def state_array(self) -> np.ndarray:
        return np.array(self._state_array)

    @property
    def sma(self) -> float:
        return self._state_array[0]

    @property
    def ecc(self) -> float:
        return self._state_array[1]

    @property
    def inc(self) -> float:
        return self._state_array[2]

    @property
    def raan(self) -> float:
        return self._state_array[3]

    @property
    def aop(self) -> float:
        return self._state_array[4]

    @property
    def true_anomaly(self) -> float:
        return self._state_array[5]

    @property
    def p(self) -> float:
        return self.sma * (1.0 - self.ecc**2)

    def h(self, mu: float) -> float:
        return (mu * self.p) ** 0.5

    @property
    def r(self) -> float:
        return (self.sma * (1 - self.ecc**2)) / (
            1 + self.ecc * math.cos(self.true_anomaly)
        )

In [257]:
class QLaw:
    def __init__(self, control_weights: list[float], m: int = 3, n: int = 4, r: int = 2):
        self._control_weights: list[float] = control_weights
        self._m: int = m
        self._n: int = n
        self._r: int = r
        self._b: float = 0.01
        self._mu: float = 398600.49*1e9

    def _get_orbital_elements_maximal_change(
        self, orbital_elements: OE, thrust_acceleration: float
    ) -> list[float]:
        sma_xx: float = (
            2
            * thrust_acceleration
            * (
                orbital_elements.sma**3
                * (1.0 + orbital_elements.ecc)
                / (self._mu * (1.0 - orbital_elements.ecc))
            )
            ** 0.5
        )
        ecc_xx: float = (
            2 * orbital_elements.p * thrust_acceleration / orbital_elements.h(self._mu)
        )
        inc_xx: float = (orbital_elements.p * thrust_acceleration) / (
            orbital_elements.h(self._mu)
            * (1.0 - orbital_elements.ecc**2 * math.sin(orbital_elements.aop) ** 2)
            ** 0.5
            - orbital_elements.ecc * abs(math.cos(orbital_elements.aop))
        )
        raan_xx: float = (orbital_elements.p * thrust_acceleration) / (
            orbital_elements.h(self._mu)
            * math.sin(orbital_elements.inc)
            * (
                (1.0 - orbital_elements.ecc**2 * math.cos(orbital_elements.aop) ** 2)
                - orbital_elements.ecc * abs(math.sin(orbital_elements.aop))
            )
        )

        alpha: float = (1 - orbital_elements.ecc**2) / (2 * orbital_elements.ecc**3)
        beta: float = (0.25 * (2 * alpha) ** 2 + 1 / 27) ** 0.5

        cos_theta_xx: float = (
            (alpha + beta) ** (1.0 / 3.0)
            - (-alpha + beta) ** (1.0 / 3.0)
            - 1 / orbital_elements.ecc
        )
        r_xx: float = orbital_elements.p / (1.0 + orbital_elements.ecc * cos_theta_xx)

        w_xx_i: float = (
            thrust_acceleration / (orbital_elements.ecc * orbital_elements.h(self._mu))
        ) * (
            orbital_elements.p**2 * cos_theta_xx**2
            + (orbital_elements.p + r_xx) ** 2 * (1.0 - cos_theta_xx**2)
        ) ** 0.5
        w_xx_o: float = raan_xx * abs(math.cos(orbital_elements.inc))
        aop_xx: float = (w_xx_i + self._b * w_xx_o) / (1 + self._b)

        return [
            sma_xx,
            ecc_xx,
            inc_xx,
            raan_xx,
            aop_xx,
        ]

    @staticmethod
    def _get_d_orbital_elements_dot_d_f(
        orbital_elements: OE, mu: float
    ) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        sma_dot_f_theta: float = (
            (2 * orbital_elements.sma**2 / orbital_elements.h(mu))
            * orbital_elements.p
            / orbital_elements.r
        )
        sma_dot_f_r: float = (
            2 * orbital_elements.sma**2 / orbital_elements.h(mu)
        ) * orbital_elements.ecc * math.sin(orbital_elements.true_anomaly)
        sma_dot_f_h: float = 0.0

        ecc_dot_f_theta: float = (
            (orbital_elements.p + orbital_elements.r)
            * math.cos(orbital_elements.true_anomaly)
            + (orbital_elements.r * orbital_elements.ecc)
        ) / (orbital_elements.h(mu))
        ecc_dot_f_r: float = (orbital_elements.p / orbital_elements.h(mu)) * math.sin(
            orbital_elements.true_anomaly
        )
        ecc_dot_f_h: float = 0.0

        inc_dot_f_theta: float = 0.0
        inc_dot_f_r: float = 0.0
        inc_dot_f_h: float = (orbital_elements.r / orbital_elements.h(mu)) * math.cos(
            orbital_elements.true_anomaly + orbital_elements.aop
        )

        omega_dot_f_theta: float = 0.0
        omega_dot_f_r: float = 0.0
        omega_dot_f_h: float = (
            orbital_elements.r
            * math.sin(orbital_elements.true_anomaly + orbital_elements.aop)
        ) / (orbital_elements.h(mu) * math.sin(orbital_elements.inc))

        aop_dot_f_theta: float = (
            (orbital_elements.p + orbital_elements.r)
            * math.sin(orbital_elements.true_anomaly)
            / (orbital_elements.ecc * orbital_elements.h(mu))
        )
        aop_dot_f_r: float = (
            -orbital_elements.p
            * math.cos(orbital_elements.true_anomaly)
            / (orbital_elements.ecc * orbital_elements.h(mu))
        )
        aop_dot_f_h: float = (
            -orbital_elements.r
            * math.sin(orbital_elements.true_anomaly + orbital_elements.aop)
            * math.cos(orbital_elements.inc)
        ) / (
            orbital_elements.ecc * orbital_elements.h(mu)**2 * math.sin(orbital_elements.inc)
        )

        oe_dot_f_theta: np.ndarray = np.array(
            [
                sma_dot_f_theta,
                ecc_dot_f_theta,
                inc_dot_f_theta,
                omega_dot_f_theta,
                aop_dot_f_theta,
            ]
        )
        oe_dot_f_r: np.ndarray = np.array(
            [sma_dot_f_r, ecc_dot_f_r, inc_dot_f_r, omega_dot_f_r, aop_dot_f_r]
        )
        oe_dot_f_h: np.ndarray = np.array(
            [sma_dot_f_h, ecc_dot_f_h, inc_dot_f_h, omega_dot_f_h, aop_dot_f_h]
        )

        return (oe_dot_f_theta, oe_dot_f_r, oe_dot_f_h)

    def _get_Q(
        self,
        orbital_elements: OE,
        target_orbital_elements: OE,
        thrust_acceleration: float,
    ) -> float:
        deltas: list[float] = [
            orbital_elements.sma - target_orbital_elements.sma,
            orbital_elements.ecc - target_orbital_elements.ecc,
            orbital_elements.inc - target_orbital_elements.inc,
            math.acos(math.cos(orbital_elements.raan - target_orbital_elements.raan)),
            math.acos(math.cos(orbital_elements.aop - target_orbital_elements.aop)),
        ]

        W_oe: list[float] = self._control_weights

        S_oe: list[float] = [
            (1.0 + (deltas[0] / (self._m * target_orbital_elements.sma)) ** self._n)
            ** 1.0
            / self._r,
            1.0,
            1.0,
            1.0,
            1.0,
        ]

        Q: float = 0.0
        for weight, S, delta, oe_xx in zip(
            W_oe,
            S_oe,
            deltas,
            self._get_orbital_elements_maximal_change(
                orbital_elements, thrust_acceleration
            ),
        ):
            Q += weight * S * (delta / oe_xx) ** 2
        return Q

    def _get_u(
        self,
        orbital_elements: OE,
        target_orbital_elements: OE,
        thrust_acceleration: float,
    ) -> np.ndarray:
        oe_dot_f_theta, oe_dot_f_r, oe_dot_f_h = self._get_d_orbital_elements_dot_d_f(
            orbital_elements, self._mu
        )
        dq: np.ndarray = central_difference(
            orbital_elements.state_array,
            [0],
            lambda state_array, epoch: self._get_Q(
                OE.from_array(state_array), target_orbital_elements, thrust_acceleration
            ),
        )

        D1: np.ndarray = np.sum(np.dot(dq[:5, :5], oe_dot_f_theta))
        D2: np.ndarray = np.sum(np.dot(dq[:5, :5], oe_dot_f_r))
        D3: np.ndarray = np.sum(np.dot(dq[:5, :5], oe_dot_f_h))

        return -np.array([D1, D2, D3]) / (D1**2 + D2**2 + D3**2) ** 0.5

    def get_acceleration(
        self, position: Position, velocity: Velocity, target_coe: COE, thrust_acceleration: float
    ) -> np.ndarray:
        coe = COE.cartesian(
            (position, velocity), EarthGravitationalModel.EGM2008.gravitational_parameter
        )

        acceleration_thetarh = self.get_acceleration_from_oe(OE.from_COE(coe), OE.from_COE(target_coe), thrust_acceleration)

        # convert to GCRF
        return np.dot(thetarh_to_gcrf, acceleration_thetarh)

    def get_acceleration_from_oe(self, oe: OE, target_oe: OE, thrust_acceleration: float) -> np.ndarray:

        u: np.ndarray = self._get_u(oe, target_oe, thrust_acceleration)

        # get the rotation matrix from Theta r h to GCRF
        thetarh_to_gcrf: np.ndaray = self._thetarh_to_gcrf(
            position.get_coordinates(), velocity.get_coordinates()
        )

        # calculate acceleration in Theta r h
        return u * thrust_acceleration

    @staticmethod
    def _thetarh_to_gcrf(
        position_coordinates: np.ndarray, velocity_coordinates: np.ndarray
    ) -> np.ndarray:
        r_axis: np.ndarray = position_coordinates / np.linalg.norm(position_coordinates)

        h_axis: np.ndarrah = np.cross(position_coordinates, velocity_coordinates)
        h_axis = h_axis / np.linalg.norm(h_axis)

        theta_axis: np.ndarray = np.cross(h_axis, r_axis)

        return np.array([theta_axis, r_axis, h_axis]).transpose()

In [258]:
class ThrusterDynamics(Dynamics):
    def __init__(self, isp: float, thrust: float, target_COE: COE, guidance_law):
        self._isp: float = isp
        self._thrust: float = thrust
        self._target_coe: COE = target_COE
        self._guidance_law = guidance_law

        super().__init__("thruster")

    @property
    def mass_flow_rate(self) -> float:
        return self._thrust / (self._isp * 9.805)

    def is_defined(self) -> bool:
        return self._isp is not None and self._thrust is not None

    def get_read_coordinates_subsets(self):
        return [
            CartesianPosition.default(),
            CartesianVelocity.default(),
            CoordinatesSubset.mass(),
        ]

    def get_write_coordinates_subsets(self):
        return [CartesianVelocity.default(), CoordinatesSubset.mass()]

    def compute_contribution(self, _, x, frame):
        acceleration = self._guidance_law.get_acceleration(
            Position.meters(x[:3], frame),
            Velocity.meters_per_second(x[3:6], frame),
            self._target_coe,
            self._thrust / x[6],
        )

        return np.array(
            [acceleration[0], acceleration[1], acceleration[2], -self.mass_flow_rate]
        )

In [259]:
coe_subset = CoordinatesSubset("COE", 6)

class VariationalDynamics(Dynamics):
    def __init__(self, isp: float, thrust: float, target_COE: COE, guidance_law):
        self._isp: float = isp
        self._thrust: float = thrust
        self._target_coe: COE = target_COE
        self._guidance_law = guidance_law

        super().__init__("thruster")

    @property
    def mass_flow_rate(self) -> float:
        return self._thrust / (self._isp * 9.80665)

    def is_defined(self) -> bool:
        return self._isp is not None and self._thrust is not None

    def get_read_coordinates_subsets(self):
        return [
            coe_subset,
            CoordinatesSubset.mass(),
        ]

    def get_write_coordinates_subsets(self):
        return [coe_subset, CoordinatesSubset.mass()]

    def compute_contribution(self, _, x, frame):
        mu = 398600.49*1e9
        oe = OE(*x[:-1])

        f_theta, f_r, f_h = self._guidance_law.get_acceleration_from_oe(
            oe,
            OE.from_COE(self._target_coe),
            self._thrust / x[6],
        )

        sma_dot = (2.0 * oe.sma ** 2)/(oe.h(mu)) * (oe.ecc * math.sin(oe.true_anomaly) * f_r + (oe.p * f_theta)/oe.r)
        ecc_dot = (oe.p * math.sin(oe.true_anomaly) * f_r + ((oe.p + oe.r) * math.cos(oe.true_anomaly) + (oe.r * oe.ecc)) * f_theta)/oe.h(mu)
        inc_dot = oe.r * math.cos(oe.true_anomaly + oe.aop) * f_h/oe.h(mu)
        raan_dot = oe.r * math.sin(oe.true_anomaly + oe.aop) * f_h/(oe.h(mu) * math.sin(oe.inc))
        aop_dot = (-oe.p * math.cos(oe.true_anomaly) * f_r + (oe.p + oe.r) * math.sin(oe.true_anomaly) * f_theta)/(oe.ecc * oe.h(mu)) - (oe.r*math.sin(oe.true_anomaly + oe.aop) * math.cos(oe.inc) * f_h)/(oe.h(mu) * math.sin(oe.inc))
        true_anomaly_dot = oe.h(mu)/(oe.r ** 2) + (oe.p * math.cos(oe.true_anomaly) * f_r - (oe.p + oe.r) * math.sin(oe.true_anomaly) * f_theta)/(oe.ecc * oe.h(mu))

        return np.array(
            [sma_dot, ecc_dot, inc_dot, raan_dot, aop_dot, true_anomaly_dot, -self.mass_flow_rate]
        )

In [260]:
earth = Earth.from_models(
    EarthGravitationalModel(EarthGravitationalModel.Type.EGM96),
    EarthMagneticModel(EarthMagneticModel.Type.Undefined),
    EarthAtmosphericModel(EarthAtmosphericModel.Type.Undefined),
)
central_body_gravity = CentralBodyGravity(earth)

In [261]:
coe = COE(Length.meters(7000.0e3), 0.01, Angle.degrees(0.05), Angle.degrees(0.0), Angle.degrees(0.0), Angle.degrees(0.0))
target_coe = COE(Length.meters(42000.0e3), 0.01, Angle.degrees(0.05), Angle.degrees(0.0), Angle.degrees(0.0), Angle.degrees(0.0))

q_law = QLaw([1.0, 1.0, 0.0, 0.0, 0.0])
# thruster_dynamics = ThrusterDynamics(3100.0, 1.0, target_coe, q_law)
variational_dynamics = VariationalDynamics(3100.0, 1.0, target_coe, q_law)

In [312]:
# propagator = Propagator(NumericalSolver.default(), [central_body_gravity, thruster_dynamics, PositionDerivative()])
propagator = Propagator(NumericalSolver(NumericalSolver.LogType.NoLog, NumericalSolver.StepperType.RungeKutta4, 60.0, 1e-12, 1e-12), [variational_dynamics])

In [313]:
# broker = CoordinatesBroker([cartesian_position, cartesian_velocity, mass_subset])
# position, velocity = coe.get_cartesian_state(EarthGravitationalModel.EGM2008.gravitational_parameter, Frame.GCRF())
# state = State(Instant.J2000(), np.array(position.get_coordinates().tolist() + velocity.get_coordinates().tolist() +  [300.0]), Frame.GCRF(), broker)

broker = CoordinatesBroker([coe_subset, mass_subset])
state = State(Instant.J2000(), np.array(OE.from_COE(coe).state_array.tolist() +  [300.0]), Frame.GCRF(), broker)

In [314]:
end_state = propagator.calculate_state_at(state, Instant.J2000() + Duration.days(15.0))

In [306]:
states = propagator.access_numerical_solver().get_observed_states()

In [307]:
states[-1].get_coordinates()

array([4.20742581e+07, 1.23918905e-02, 8.72664626e-04, 0.00000000e+00,
       1.62679879e+00, 5.80010159e+02, 2.57369283e+02])

In [308]:
q_law._get_Q(OE(*states[-1].get_coordinates()[:-1]), OE.from_COE(target_coe), 1.0/states[-1].get_coordinates()[-1])

1136046.17657664

In [309]:
len(states[::30])

361

In [310]:
Qs = []
for state in states:
    Qs.append(dict(timestamp=float((state.get_instant() - states[0].get_instant()).in_days()), q=q_law._get_Q(OE(*state.get_coordinates()[:-1]), OE.from_COE(target_coe), 1.0/state.get_coordinates()[-1])))

In [311]:
px.scatter(pd.DataFrame(Qs), x="timestamp", y="q")

In [272]:
coes = []
for state in states:
    tmp = COE.cartesian((state.get_position(), state.get_velocity()), EarthGravitationalModel.EGM2008.gravitational_parameter)
    coes.append(dict(sma=float(tmp.get_semi_major_axis().in_meters() - EarthGravitationalModel.EGM2008.equatorial_radius.in_meters()), timestamp=state.get_instant().get_date_time(Scale.UTC)))

RuntimeError: 

In [129]:
px.scatter(pd.DataFrame(coes), x="timestamp", y="sma")