In [26]:
# Load required tudatpy modules
import numpy as np
from matplotlib import pyplot as plt
from tudatpy.interface import spice
from tudatpy.astro import time_conversion, element_conversion
from tudatpy.math import interpolators
from tudatpy import numerical_simulation
from tudatpy.numerical_simulation import environment_setup, environment, propagation_setup, estimation, estimation_setup
from tudatpy.numerical_simulation.estimation_setup import observation
from tudatpy.numerical_simulation.environment import Tle
from datetime import datetime, timedelta
import matplotlib.dates as mdatesx  
from itertools import zip_longest
from tudatpy.util import result2array
import math

spice.load_standard_kernels()

In [27]:
def tle_epoch_to_datetime(epoch_str: str) -> datetime:
    """
    Convert a TLE epoch string (YYDDD.FFFFFFFF) to a Python datetime in UTC.
    
    Parameters:
    -----------
    epoch_str : str
        TLE epoch in the format 'YYDDD.FFFFFFFFF', e.g., '25147.20450743'
    
    Returns:
    --------
    datetime (UTC)
    """
    # Parse year (YY)
    yy = int(epoch_str[:2])
    # TLE years: 00-56 => 2000-2056, 57-99 => 1957-1999
    year = 2000 + yy if yy < 57 else 1900 + yy
    
    # Parse day-of-year (DDD)
    day_of_year = int(epoch_str[2:5])
    
    # Fractional part of the day
    frac_day = float("0." + epoch_str.split('.', 1)[1])
    
    # Compute the base date (January 1 of year) + day offset
    base_date = datetime(year, 1, 1) + timedelta(days=day_of_year - 1)
    # Add the fractional day as a timedelta
    full_date = base_date + timedelta(days=frac_day)

    # Round to the nearest second
    # Convert to seconds since UNIX epoch for rounding
    epoch_origin = datetime(1970, 1, 1)
    total_seconds = (full_date - epoch_origin).total_seconds()
    rounded_seconds = round(total_seconds)
    rounded_date = epoch_origin + timedelta(seconds=rounded_seconds)

    return rounded_date

In [28]:
def sat_prop(
        simulation_start_epoch,
        simulation_end_epoch,
        target,
        target_TLE,
        target_initial_state = None,
        ):
    """
    Propogates the target object

    Parameters:
    simulation_start_epoch: float
        Simulation start epoch, in seconds from J2000
    simulation_end_epoch: float
        Simulation end eopch, in sends from J000
    target: str
        Name of celestial body being observed
    target_TLE: tudatpy.kernel.numerical_simulation.environment.Tle object
        TLE of target object. Run TLE values through environment.Tle
    target_initial_state: numpy array
        Contains cartesian position and velocity of target's initial state

    Output:
    states_array: numpy array
        7 column array with time, position and velocities, in m and m/s
    """

    bodies_to_create = ['Sun','Earth','Moon']
    body_settings = environment_setup.get_default_body_settings(bodies_to_create,'Earth','J2000')
    body_settings.add_empty_settings(target)
    bodies_to_propagate = [target]
    central_bodies = ["Earth"]

    # CREATE ACCELERATION SETTINGS
    # Aerodynamic drag
    ref_area = np.pi * 0.5**2
    drag_coef = 2.0
    aero_coef_settings = environment_setup.aerodynamic_coefficients.constant(ref_area,[drag_coef,0.0,0.0])

    # Solar radiation pressure
    rad_coef = 1.2
    occulting_bodies_dict = dict()
    occulting_bodies_dict["Sun"] = ["Earth"]
    vehicle_target_settings = environment_setup.radiation_pressure.cannonball_radiation_target(
        ref_area, rad_coef, occulting_bodies_dict)
    
    body_settings.get(target).aerodynamic_coefficient_settings = aero_coef_settings
    body_settings.get(target).radiation_pressure_target_settings = vehicle_target_settings
    
    bodies = environment_setup.create_system_of_bodies(body_settings)
    bodies.get(target).mass = 10

    accelerations_settings_target = dict(
        Sun=[
            propagation_setup.acceleration.radiation_pressure(),
            propagation_setup.acceleration.point_mass_gravity()
        ],
        Earth=[
            propagation_setup.acceleration.spherical_harmonic_gravity(8, 8),
            propagation_setup.acceleration.aerodynamic()
        ],
        Moon=[
            propagation_setup.acceleration.point_mass_gravity()
        ],
    )

    # Create global accelerations settings dictionary.
    acceleration_settings = {target: accelerations_settings_target}

    # Create acceleration models.
    acceleration_models = propagation_setup.create_acceleration_models(
        bodies,
        acceleration_settings,
        bodies_to_propagate,
        central_bodies)
    
    # Get initial states
    if target_initial_state is None:
        target_ephemeris = environment.TleEphemeris("Earth", "J2000", target_TLE, False)
        initial_state = target_ephemeris.cartesian_state(simulation_start_epoch)
    else:
        initial_state = target_initial_state

    # Create termination settings
    termination_condition = propagation_setup.propagator.time_termination(simulation_end_epoch)

    # Create numerical integrator settings
    fixed_step_size = 60.0
    integrator_settings = propagation_setup.integrator.runge_kutta_fixed_step(
        fixed_step_size, coefficient_set=propagation_setup.integrator.CoefficientSets.rk_4
    )

    # Create propagation settings
    propagator_settings = propagation_setup.propagator.translational(
        central_bodies,
        acceleration_models,
        bodies_to_propagate,
        initial_state,
        simulation_start_epoch,
        integrator_settings,
        termination_condition,
    )

    # Create simulation object and propagate the dynamics
    dynamics_simulator = numerical_simulation.create_dynamics_simulator(
        bodies, propagator_settings
    )

    # Extract the resulting state and dependent variable history and convert it to an ndarray
    states = dynamics_simulator.propagation_results.state_history
    states_array = result2array(states)

    return states_array

In [29]:
def simulate_observation(az_true, el_true, meas_noise_std=[0.05,0.05]):
    # In degrees

    az_obs = az_true + np.random.normal(0,meas_noise_std[0])
    el_obs = el_true + np.random.normal(0,meas_noise_std[1])
    return np.array([az_obs,el_obs])

In [30]:
def ekf_update(mu, cov, z, h_func, H_func, R):
    """
    Perform an Extended Kalman Filter (EKF) measurement update step.
    
    Parameters
    ----------
    mu : ndarray of shape (6,)
        Current state estimate (mean), e.g., [x, y, z, vx, vy, vz] in inertial frame.
        
    cov : ndarray of shape (6, 6)
        Current state covariance matrix.
        
    z : ndarray of shape (2,)
        Measurement vector: [azimuth, elevation] in radians.
        
    h_func : callable
        Function that maps the current state to the predicted measurement:
        z_pred = h_func(mu)
        Should return ndarray of shape (2,).
        
    H_func : callable
        Function that computes the Jacobian of h_func at the current state.
        Should return ndarray of shape (2, 6).
        
    R : ndarray of shape (2, 2)
        Measurement noise covariance matrix.
    
    Returns
    -------
    mu_post : ndarray of shape (6,)
        Updated state estimate (posterior mean).
    
    cov_post : ndarray of shape (6, 6)
        Updated state covariance matrix (posterior covariance).
    """

    # Predict measurement from current state
    z_pred = h_func(mu)

    # Innovation (residual) between actual and predicted measurement
    y = z - z_pred

    # Compute observation matrix (Jacobian)
    H = H_func(mu)

    # Innovation covariance
    S = H @ cov @ H.T + R

    # Kalman gain
    K = cov @ H.T @ np.linalg.inv(S)

    # Updated state estimate (posterior mean)
    mu_post = mu + K @ y

    # Updated covariance
    I = np.eye(len(mu))
    cov_post = (I - K @ H) @ cov

    return mu_post, cov_post

In [None]:
targets = ["GSAT0101 (GALILEO-PFM)","STARLINK-1008","ISS (ZARYA)"]

station_name ='REY30' 
geodetic_position = {'REY30': [250,-29.0464, 115.3456]}    # alt, lat, lon (m,deg,deg)

# Select the global frame and orientation 
global_frame_origin = 'Earth'
global_frame_orientation = 'J2000'

# Initializing TLE from celestrack
tle_ISS_line1 = "1 25544U 98067A   25147.20450743  .00010885  00000+0  20003-3 0  9993"
tle_ISS_line2 = "2 25544  51.6384  51.0343 0002206 149.5344 343.0812 15.49769564511908"

  
tle_galileo_line1 = "1 37846U 11060A   25144.52916540 -.00000075  00000+0  00000+0 0  9995"
tle_galileo_line2 = "2 37846  57.0978 352.2961 0003612  39.4719 320.5962  1.70475644 84499"

         
tle_starlink_line1 = "1 44714U 19074B   25145.95841286  .00000108  00000+0  26155-4 0  9992"
tle_starlink_line2 = "2 44714  53.0564  38.2925 0002036  81.0993 279.0226 15.06391243305386"

tle_line1 = ["1 37846U 11060A   25144.52916540 -.00000075  00000+0  00000+0 0  9995","1 44714U 19074B   25145.95841286  .00000108  00000+0  26155-4 0  9992","1 25544U 98067A   25147.20450743  .00010885  00000+0  20003-3 0  9993"]
tle_line2 = ["2 37846  57.0978 352.2961 0003612  39.4719 320.5962  1.70475644 84499","2 44714  53.0564  38.2925 0002036  81.0993 279.0226 15.06391243305386","2 25544  51.6384  51.0343 0002206 149.5344 343.0812 15.49769564511908"]

tles = dict()
for idx in range (0,len(targets)):
    tles[targets[idx]] = Tle(tle_line1[idx],tle_line2[idx])


# Defining time window of simulation from TLE epoch to Tle epoch + 1 day
tle_epoch = tle_ISS_line1[18:32]
start_epoch = tle_epoch_to_datetime(tle_epoch)
end_epoch = start_epoch + timedelta(days=1)

time_format = '%Y-%m-%d %H:%M:%S'

# start_epoch_string = start_epoch.strftime(time_format)
# end_epoch_string = end_epoch.strftime(time_format)

# Time window corresponding to Heavens Above data
start_epoch_string = "2025-05-27 00:00:00"
end_epoch_string = "2025-05-27 03:00:00"

time_step = '1m'

bodies_to_create = ["Earth"]

# Get start and end time in UTS, Julian Day and seconds since J2000
jd_start_epoch = time_conversion.calendar_date_to_julian_day(datetime.strptime(start_epoch_string, time_format))
jd_end_epoch = time_conversion.calendar_date_to_julian_day(datetime.strptime(end_epoch_string, time_format))
n_day_buffer = 0.1

utc_simulation_start_epoch = time_conversion.julian_day_to_calendar_date(jd_start_epoch - n_day_buffer)
utc_simulation_end_epoch = time_conversion.julian_day_to_calendar_date(jd_end_epoch + n_day_buffer)

sim_seconds_start_epoch = time_conversion.julian_day_to_seconds_since_epoch(jd_start_epoch - n_day_buffer)
sim_seconds_end_epoch = time_conversion.julian_day_to_seconds_since_epoch(jd_end_epoch + n_day_buffer)

actual_seconds_start_epoch  = time_conversion.julian_day_to_seconds_since_epoch(jd_start_epoch)
actual_seconds_end_epoch  = time_conversion.julian_day_to_seconds_since_epoch(jd_end_epoch)

actual_time_array = np.linspace(actual_seconds_start_epoch, actual_seconds_end_epoch, math.floor((actual_seconds_end_epoch-actual_seconds_start_epoch)/60))

# Creating Earth settings
body_settings = environment_setup.get_default_body_settings_time_limited(bodies_to_create, sim_seconds_start_epoch, sim_seconds_end_epoch, global_frame_origin, global_frame_orientation)
for target in targets:
    body_settings.add_empty_settings(target)

   
# Add Earth Shape settings
eq_rad = 6378*1e3 # in meters
flat = 1/298
body_settings.get('Earth').shape_settings = environment_setup.shape.oblate_spherical(
    equatorial_radius = eq_rad,
    flattening = flat,
)

# Add Earth Rotation model
body_settings.get('Earth').rotation_model_settings = environment_setup.rotation_model.gcrs_to_itrs(
    environment_setup.rotation_model.iau_2006, global_frame_orientation,
    interpolators.interpolator_generation_settings_float(interpolators.cubic_spline_interpolation(),
                                                            sim_seconds_start_epoch, sim_seconds_end_epoch, 60),
    interpolators.interpolator_generation_settings_float(interpolators.cubic_spline_interpolation(),
                                                            sim_seconds_start_epoch, sim_seconds_end_epoch, 60),
    interpolators.interpolator_generation_settings_float(interpolators.cubic_spline_interpolation(),
                                                            sim_seconds_start_epoch, sim_seconds_end_epoch, 60))

# DEFINE ACCELERATIONS ACTING ON TARGET
accelerations_settings_target = dict(
    Earth=[
        propagation_setup.acceleration.point_mass_gravity(),
    ]
)


# Get propogated satellite coordinates from tudat propogator
states = dict()
for target in targets:
    states[target] = sat_prop(sim_seconds_start_epoch, sim_seconds_end_epoch-60, target, tles[target])

# 1) Build a dict-of-dicts of ephemeris samples:
targets_states_dicts = {
    target: {
        float(row[0]): row[1:].reshape((6,1))
        for row in state
    }
    for target, state in states.items()
}

# 2) Assign each one into your body_settings:
for target, state in targets_states_dicts.items():
    body_settings.get(target).ephemeris_settings = environment_setup.ephemeris.tabulated(state,global_frame_origin,global_frame_orientation)



# Set up ground station settings
gs_alt_m = geodetic_position[station_name][0]
gs_lat_deg = geodetic_position[station_name][1]
gs_lon_deg = geodetic_position[station_name][2]

gs_settings = []
gs_settings.append(environment_setup.ground_station.basic_station(
    station_name,
    [gs_alt_m,np.deg2rad(gs_lat_deg),np.deg2rad(gs_lon_deg)],
    element_conversion.geodetic_position_type))

body_settings.get('Earth').ground_station_settings = gs_settings

bodies = environment_setup.create_system_of_bodies(body_settings)

for target,state in targets_states_dicts.items():
    link_ends = {
        observation.receiver: observation.body_reference_point_link_end_id('Earth', target)
        }

    angles_range = estimation.compute_target_angles_and_range(
        bodies = bodies,
        station_id = ('Earth', station_name),
        target_body = target,
        observation_times = actual_time_array,
        is_station_transmitting = False  # Station is the receiver
    )

    elevations = []
    azimuths = []
    times_filtered = []

    for t in sorted(angles_range.keys()):
        elevation_rad, azimuth_rad, range_m = angles_range[t]
        
        elevations.append(np.rad2deg(elevation_rad))
        azimuths.append(np.rad2deg(azimuth_rad) % 360)  # normalize to [0, 360)
        times_filtered.append(t)

    times_filtered_utc = []
    for time_change in times_filtered:
        time_change_jd = time_conversion.seconds_since_epoch_to_julian_day(time_change)
        time_change_utc = time_conversion.julian_day_to_calendar_date(time_change_jd)
        times_filtered_utc.append(time_change_utc)

    plt.figure(figsize=(8, 4))
    plt.scatter(times_filtered_utc,azimuths,marker='.',label='Azimuth')
    plt.scatter(times_filtered_utc,elevations,marker='.',label='Elevation')
    plt.title('Observation of '+target)
    plt.xlabel('Time in UTC')
    plt.ylabel('Degrees')
    plt.grid(True)
    plt.legend()
    plt.xticks(rotation=45)
    plt.tight_layout()

    plt.show()




In [None]:

# 2) Observation function h(x): state→[az,el]
def make_h_func(bodies, station_id, target_body, obs_time):
    """
    Returns h_func(state_vector) -> [az, el] in radians,
    by temporarily injecting `state_vector` at `obs_time` and 
    calling Tudat's compute_target_angles_and_range.
    """
    def h_func(x):
        # overwrite target_body ephemeris with this one state
        from tudatpy.numerical_simulation.environment import Ephemeris
        bodies.get(target_body).ephemeris = environment_setup.ephemeris.constant(
            x.reshape((6,1)), obs_time, global_frame_origin, global_frame_orientation
        )
        # compute angles
        out = estimation.compute_target_angles_and_range(
            bodies, station_id, target_body, [obs_time], is_station_transmitting=False
        )
        az, el, _ = list(out.values())[0]
        return np.array([az, el])
    return h_func


In [None]:

# 3) Numerical Jacobian H(x) via central differences
def make_H_func(h_func, epsilon=1e-3):
    """
    Returns H_func(x): a 2×6 Jacobian of h_func at x.
    """
    def H_func(x):
        H = np.zeros((2,6))
        for i in range(6):
            dx = np.zeros_like(x); dx[i] = epsilon
            z_plus  = h_func(x + dx)
            z_minus = h_func(x - dx)
            H[:,i] = (z_plus - z_minus)/(2*epsilon)
        return H
    return H_func


In [None]:

# 4) Initialize beliefs for each satellite (mean,covariance)
# sat_beliefs = {}
# measurement noise covariance R  (radians^2)
R = np.diag([np.deg2rad(0.05)**2, np.deg2rad(0.05)**2])

est_state = {}
est_cov = {}

for target in targets:
    # get initial mean from TLE ephemeris at start time
    tle_ephem = environment.TleEphemeris("Earth","J2000", tles[target], False)
    mu0 = tle_ephem.cartesian_state(actual_seconds_start_epoch).flatten()
    # 1 km σ in pos, 1 m/s σ in vel
    cov0 = np.diag([1e6,1e6,1e6,1,1,1])
    est_state[target][0] = mu0
    est_cov[target][0] = cov0 
    # sat_beliefs[target] = {"mean": mu0, "cov": cov0}

# 5) Greedy sensor tasking over each observation time
station_id = ('Earth', station_name)
for idx,t in enumerate(actual_time_array):
    # # find which sats are visible
    # visible = []
    # for target in targets:
    #     az, el, _ = estimation.compute_target_angles_and_range(
    #         bodies, station_id, target, [t], False
    #     )[t]
    #     if el > 0:
    #         visible.append(target)
    # if not visible:
    #     continue

    # # evaluate expected KL if we were to observe each visible sat
    # best_kl = -np.inf
    # best_sat = None
    # best_post = None

    for target in targets:
        state_prior = est_state[target][idx]
        cov_prior = est_cov[target][idx]
        
        # build observation model at (target, t)
        h  = make_h_func(bodies, station_id, target, t)
        Hf = make_H_func(h)

        # simulate one noisy measurement from 'true' angles
        true_az, true_el, _ = estimation.compute_target_angles_and_range(
            bodies, station_id, target, [t], False
        )[t]
        z_meas = simulate_observation(np.rad2deg(true_az), np.rad2deg(true_el))
        z_meas = np.deg2rad(z_meas)  # convert back to radians

        # EKF update to get posterior
        state_post, cov_post = ekf_update(state_prior, cov_prior, z_meas, h, Hf, R)

        est_state[target][idx+1] = state_post
        est_cov[target][idx+1] = cov_post

        # # compute KL(prior || posterior)
        # kl = kl_divergence_gaussians(mu, cov, mu_post, cov_post)
        # if kl > best_kl:
        #     best_kl = kl
        #     best_sat = target
        #     best_post = (mu_post, cov_post)

    # # Task the best satellite and update its belief
    # sat_beliefs[best_sat]["mean"], sat_beliefs[best_sat]["cov"] = best_post

    # print(f"t={time_conversion.julian_day_to_calendar_date(time_conversion.seconds_since_epoch_to_julian_day(t))}  -> observing {best_sat}, KL={best_kl:.2f}")
