In [38]:
from datetime import datetime, timedelta
from time import time
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd

from tudatpy import constants, numerical_simulation
from tudatpy import util as tudat_util
from tudatpy.astro import time_conversion, element_conversion
from tudatpy.interface import spice
from tudatpy.numerical_simulation import propagation_setup, environment_setup, estimation_setup, estimation
from tudatpy.util import result2array
from tudatpy.math import interpolators

In [39]:
spice.load_standard_kernels()
np.random.seed(0)

In [40]:
def import_data_MPC():
    widths = [15, 5, 3, 2, 7, 3, 3, 6, 1, 3, 3, 5, 21, 3, 5]
    data_path = 'chang_e_3_data/2013-070B_clean_uncertainty.txt'
    observation_array_raw = np.genfromtxt(data_path, delimiter=widths, dtype=str)
    year = observation_array_raw[:, 1].astype(int)
    month = observation_array_raw[:, 2].astype(int)
    day = observation_array_raw[:, 3].astype(int)
    fraction_of_day = observation_array_raw[:, 4].astype(float)
    sign_column = observation_array_raw[:, 8]
    obs_code = observation_array_raw[:, 13]
    uncertainty = observation_array_raw[:, 14].astype(float) * (2 * np.pi) / (360 * 3600)
    TDB_epoch = []
    sign_decl = []
    time_scale_converter = time_conversion.default_time_scale_converter()
    for i in range(len(observation_array_raw)):
        # Calculate the time based on the fraction of the day
        seconds_in_day = 24 * 60 * 60
        time_delta = timedelta(seconds=fraction_of_day[i] * seconds_in_day)

        # Create a datetime object and convert from UTC to TDB
        datetime_object = datetime(year[i], month[i], day[i]) + time_delta
        tudat_datetime_UTC = time_conversion.datetime_to_tudat(datetime_object).epoch()
        tudat_datetime_TDB = time_scale_converter.convert_time(
            input_scale=time_conversion.utc_scale,
            output_scale=time_conversion.tdb_scale,
            input_value=tudat_datetime_UTC)
        TDB_epoch.append(tudat_datetime_TDB)

        # Save and convert the sign for each declination entry
        if sign_column[i] == '-':
            sign_decl.append('-1')
        else:
            sign_decl.append('1')

    # Convert RA / Decl from its current format to Radians, adjust declination sign corresponding to sign_check
    sign_column_array = np.array(list(map(float, sign_decl)))
    ra_hour, ra_min, ra_sec = np.array(list(map(float, observation_array_raw[:, 5]))), \
        np.array(list(map(float, observation_array_raw[:, 6]))), \
        np.array(list(map(float, observation_array_raw[:, 7])))

    decl_deg, decl_min, decl_sec = np.array(list(map(float, observation_array_raw[:, 9]))), \
        np.array(list(map(float, observation_array_raw[:, 10]))), \
        np.array(list(map(float, observation_array_raw[:, 11])))

    ra_rad = ra_hour * (np.pi / 12.0) + ra_min * (np.pi / (12.0 * 60.0)) + ra_sec * (np.pi / (12.0 * 3600.0))
    for i in range(len(ra_rad)):
        if ra_rad[i] > np.pi:
            ra_rad[i] = ra_rad[i] - 2.0 * np.pi
        elif ra_rad[i] < -np.pi:
            ra_rad[i] = ra_rad[i] + 2.0 * np.pi
    decl_rad = sign_column_array * (
            decl_deg * (2.0 * np.pi / 360.0) + decl_min * (2 * np.pi / (360.0 * 60.0)) + decl_sec * (
            2 * np.pi / (360.0 * 3600.0)))

    # Save observation array
    observation_array_temp = np.vstack([list(map(float, TDB_epoch)), ra_rad, decl_rad, uncertainty, obs_code]).T

    # Calculate and save list of ground stations + their locations according to the MPC website
    ground_stations_unfiltered = np.loadtxt(r'chang_e_3_data/MPC_ground_stations.txt',
                                            dtype=str, delimiter=None, skiprows=1, usecols=(0, 1, 2, 3, 4))

    # Filter out rows which do not contain actual ground stations
    filter_rows = np.any(np.isin(ground_stations_unfiltered, ['Geocentric', 'Hipparcos']), axis=1)
    ground_stations_raw = ground_stations_unfiltered[~filter_rows, :]

    # Transform
    code = ground_stations_raw[:, 0]
    name = ground_stations_raw[:, 4]
    longitude = np.array(list(map(float, ground_stations_raw[:, 1]))) * (2 * np.pi / 360)
    latitude = np.arctan(np.array(list(map(float, ground_stations_raw[:, 3]))) /
                         np.array(list(map(float, ground_stations_raw[:, 2]))))

    radius_Earth = 6371008.366666666
    x_obs = np.array(list(map(float, ground_stations_raw[:, 2]))) * radius_Earth * np.cos(
        np.array(list(map(float, ground_stations_raw[:, 1]))) * (2 * np.pi / 360))
    y_obs = np.array(list(map(float, ground_stations_raw[:, 2]))) * radius_Earth * np.sin(
        np.array(list(map(float, ground_stations_raw[:, 1]))) * (2 * np.pi / 360))
    z_obs = np.array(list(map(float, ground_stations_raw[:, 3]))) * radius_Earth
    distance_to_center = (np.array(list(map(float, ground_stations_raw[:, 2]))) / np.cos(latitude)) * radius_Earth
    ground_stations_MPC = np.vstack([code, name, distance_to_center, latitude, longitude, x_obs, y_obs, z_obs]).T

    # Merge ground station location to observation array and save in right format/data type
    observation_df_temp = pd.DataFrame(observation_array_temp,
                                       columns=['epoch', 'ra', 'decl', 'uncertainty', 'obs_code'])
    ground_stations_df = pd.DataFrame(ground_stations_MPC,
                                      columns=['obs_code', 'name', 'distance_to_center', 'latitude', 'longitude', 'X',
                                               'Y', 'Z'])
    observation_df_merged = pd.merge(observation_df_temp, ground_stations_df, on='obs_code', how='inner')
    observation_array_str = np.array(observation_df_merged)  # Convert to array (still in dtype = string)
    observation_array = np.vstack([np.array(list(map(float, observation_array_str[:, 0]))),
                                   np.array(list(map(float, observation_array_str[:, 1]))),
                                   np.array(list(map(float, observation_array_str[:, 2]))),
                                   observation_array_str[:, 4],
                                   observation_array_str[:, 5],
                                   np.array(list(map(float, observation_array_str[:, 6]))),
                                   np.array(list(map(float, observation_array_str[:, 7]))),
                                   np.array(list(map(float, observation_array_str[:, 8]))),
                                   np.array(list(map(float, observation_array_str[:, 9]))),
                                   np.array(list(map(float, observation_array_str[:, 10]))),
                                   np.array(list(map(float, observation_array_str[:, 11]))),
                                   np.array(list(map(float, observation_array_str[:, 3])))
                                   ]).T

    print("Amount of observations in total dataset:", len(observation_array))
    observation_df = pd.DataFrame(observation_array)
    observation_dict = observation_df.set_index(0).T.to_dict('list')  # Convert to dictionary
    return observation_array, observation_dict, ground_stations_MPC

In [61]:
def get_state_FindOrb():
    cartesian_state_FindOrb = np.loadtxt(r'chang_e_3_data/FindOrb_2013_070B_Bill.txt',
                                         dtype=float, delimiter=None, skiprows=1)
    first_epoch = time_conversion.datetime_to_tudat(datetime(2013, 12, 8, 0, 0, 0)).epoch()
    steps = 40110
    step_size = 8640

    epoch_array = np.arange(first_epoch, first_epoch + steps * step_size, step_size)

    # Convert time from UTC to TDB
    time_scale_converter = time_conversion.default_time_scale_converter()
    for i in range(len(epoch_array)):
        epoch_array[i] = time_scale_converter.convert_time(
            input_scale=time_conversion.utc_scale,
            output_scale=time_conversion.tdb_scale,
            input_value=epoch_array[i])

    cartesian_state_FindOrb[:, 0] = epoch_array
    cartesian_state_FindOrb[:, 1:4] = cartesian_state_FindOrb[:, 1:4] * 1000
    cartesian_state_FindOrb[:, 4:7] = cartesian_state_FindOrb[:, 4:7] * 1000
    cartesian_state_FindOrb_df = pd.DataFrame(cartesian_state_FindOrb)
    cartesian_state_FindOrb_dict = cartesian_state_FindOrb_df.set_index(0).T.to_dict('list')  # Convert to dictionary
    return cartesian_state_FindOrb, cartesian_state_FindOrb_dict

In [62]:
def state_filter(state, start, end, excluded_times=None):
    # Filter state to be within simulation window
    state_filtered = state[(state[:, 0] > start) & (state[:, 0] < end)]
    if excluded_times is not None:
        state_filtered = state_filtered[~np.isin(state_filtered[:, 0], excluded_times)]
    state_ordered = state_filtered[np.argsort(state_filtered[:, 0])]
    state_ordered_df = pd.DataFrame(state_ordered)
    state_ordered_dict = state_ordered_df.set_index(0).T.to_dict('list')  # Convert to dictionary
    return state_ordered, state_ordered_dict

In [63]:
def get_environment_settings(bodies_to_create,
                             global_frame_origin,
                             global_frame_orientation,
                             reference_area_radiation,
                             radiation_pressure_coefficient,
                             occulting_bodies,
                             observation_estimation,
                             cartesian_state_findorb_dict):
    # Create body settings and system of planetary bodies
    body_settings = environment_setup.get_default_body_settings(
        bodies_to_create,
        global_frame_origin,
        global_frame_orientation)

    # Create empty body for object
    body_settings.add_empty_settings("Chang'e 3")

    # Define radiation pressure interface for current object
    body_settings.get("Chang'e 3").radiation_pressure_target_settings = \
        environment_setup.radiation_pressure.cannonball_radiation_target(reference_area_radiation,
                                                                         radiation_pressure_coefficient,
                                                                         {'Sun': occulting_bodies})

    body_settings.get("Chang'e 3").ephemeris_settings = environment_setup.ephemeris.tabulated(
        body_state_history=cartesian_state_findorb_dict,
        frame_origin=global_frame_origin,
        frame_orientation=global_frame_orientation)

    return body_settings

In [64]:
def get_bodies(body_settings, object_mass, reference_area, drag_coefficient,
               observation_array):
    bodies = environment_setup.create_system_of_bodies(body_settings)

    for i in range(len(observation_array)):
        station_code = observation_array[i, 3]

        if station_code not in bodies.get("Earth").ground_station_list:
            # Create ground station settings for all ground station combinations
            ground_station_settings = environment_setup.ground_station.basic_station(
                station_code,
                [observation_array[i, 8], observation_array[i, 9], observation_array[i, 10]],
                element_conversion.cartesian_position_type)

            # Add the ground station to the environment
            environment_setup.add_ground_station(
                bodies.get_body("Earth"),
                ground_station_settings)

    # Create object to be propagated
    bodies.get("Chang'e 3").mass = object_mass

    # Create aerodynamic interface of object
    aero_coefficient_settings = environment_setup.aerodynamic_coefficients.constant(
        reference_area, [drag_coefficient, 0, 0])
    environment_setup.add_aerodynamic_coefficient_interface(
        bodies, "Chang'e 3", aero_coefficient_settings)

    return bodies

In [65]:
def get_initial_state(cartesian_state_dict, simulation_start_epoch):
    interpolation_lower_limit = list(cartesian_state_dict.keys())[3]
    interpolation_upper_limit = list(cartesian_state_dict.keys())[-3]

    if interpolation_lower_limit < simulation_start_epoch < interpolation_upper_limit:
        # Create interpolator settings
        interpolator_settings = interpolators.lagrange_interpolation(
            8, boundary_interpolation=interpolators.use_boundary_value)

        interpolator = interpolators.create_one_dimensional_vector_interpolator(
            cartesian_state_dict, interpolator_settings)

        initial_state = interpolator.interpolate(simulation_start_epoch)
    else:
        print('Simulation start epoch is outside of epoch range')

    return initial_state

In [66]:
def get_termination_condition(simulation_start_epoch,
                              simulation_end_epoch):
    # Create termination settings based on end of simulation and whether it crashes into Earth or Moon surface
    f_time_termination_condition = propagation_setup.propagator.time_termination(simulation_end_epoch)

    distance_to_earth_variable = propagation_setup.dependent_variable.relative_distance("Chang'e 3",
                                                                                        "Earth")
    earth_termination_condition = propagation_setup.propagator.dependent_variable_termination(
        dependent_variable_settings=distance_to_earth_variable,
        limit_value=6378000.0,
        use_as_lower_limit=True)

    distance_to_moon_variable = propagation_setup.dependent_variable.relative_distance("Chang'e 3", "Moon")
    moon_termination_condition = propagation_setup.propagator.dependent_variable_termination(
        dependent_variable_settings=distance_to_moon_variable,
        limit_value=1737400.0,
        use_as_lower_limit=True)

    # Store all termination setting objects in a list
    f_termination_settings_list = [f_time_termination_condition, earth_termination_condition,
                                   moon_termination_condition]

    # Create hybrid termination settings
    f_termination_condition = propagation_setup.propagator.hybrid_termination(f_termination_settings_list,
                                                                              fulfill_single_condition=True)
    
    b_time_termination_condition = propagation_setup.propagator.time_termination(simulation_start_epoch)
    b_termination_settings_list = [b_time_termination_condition, earth_termination_condition,
                                   moon_termination_condition]
    b_termination_condition = propagation_setup.propagator.hybrid_termination(b_termination_settings_list,
                                                                              fulfill_single_condition=True)
    termination_condition = propagation_setup.propagator.non_sequential_termination(f_termination_condition,
                                                                                    b_termination_condition)

    return termination_condition

In [67]:
def get_propagator_settings(bodies,
                            initial_state,
                            simulation_start_epoch,
                            integrator_settings,
                            termination_condition,
                            dependent_variables_to_save,
                            current_propagator):
    # Define bodies that are propagated and their central bodies of propagation
    bodies_to_propagate = ["Chang'e 3"]
    central_bodies = ['Earth']

    # Define Benchmark acceleration settings
    accelerations_settings_object = {
        'Sun': [propagation_setup.acceleration.point_mass_gravity()],
        'Earth': [propagation_setup.acceleration.spherical_harmonic_gravity(5, 5)],
        'Moon': [propagation_setup.acceleration.point_mass_gravity()],
        'Jupiter': [propagation_setup.acceleration.point_mass_gravity()]}

    accelerations_settings_object['Sun'].append(propagation_setup.acceleration.radiation_pressure())

    # Create global accelerations dictionary
    acceleration_settings = {"Chang'e 3": accelerations_settings_object}
    acceleration_models = propagation_setup.create_acceleration_models(
        bodies,
        acceleration_settings,
        bodies_to_propagate,
        central_bodies)

    # POSSIBILITY TO APPLY INITIAL STATE UNCERTAINTY HERE

    # Create propagation settings for the benchmark
    propagator_settings = propagation_setup.propagator.translational(
        central_bodies,
        acceleration_models,
        bodies_to_propagate,
        initial_state,
        simulation_start_epoch,
        integrator_settings,
        termination_condition,
        current_propagator,
        output_variables=dependent_variables_to_save
    )

    # Update object ephemeris to allow for observation simulation
    propagator_settings.processing_settings.set_integrated_result = True

    return propagator_settings

In [68]:
def perform_estimation(observation_array,
                       bodies,
                       propagator_settings,
                       ephemeris_satellite_states,
                       estimate_cr=False):
    # Functions to perform estimation of current model configuration on defined dataset
    
    # Define Link Ends
    observation_set_list = []
    ground_station_list = []
    for ground_station in np.unique(observation_array[:, 3]):
        ground_station_list.append(ground_station)
        link_ends = dict()
        link_ends[
            estimation_setup.observation.receiver] = estimation_setup.observation.body_reference_point_link_end_id(
            "Earth", ground_station)
        link_ends[estimation_setup.observation.transmitter] = estimation_setup.observation. \
            body_origin_link_end_id("Chang'e 3")
        link_definition = estimation_setup.observation.LinkDefinition(link_ends)

        # Define observation time and angular position, remove outer data to allow for proper estimation near the edge
        observation_times = observation_array[observation_array[:, 3] == ground_station, 0]
        observation_angles = observation_array[observation_array[:, 3] == ground_station, 1:3]

        observation_set = estimation.single_observation_set(
            observable_type=estimation_setup.observation.angular_position_type,
            link_definition=link_definition,
            observations=observation_angles,
            observation_times=observation_times,
            reference_link_end=estimation_setup.observation.receiver)
        observation_set_list.append(observation_set)

        print('Checking ephemerides...')
        ephemeris_satellite_states = estimation.ObservationCollection(observation_set_list)

        # set create angular_position settings for each link in the list.
        position_observation_settings = list()
        link_list = list(
            ephemeris_satellite_states.get_link_definitions_for_observables(
                observable_type=estimation_setup.observation.angular_position_type
            )
        )

        # Define absolute bias
        bias_settings = estimation_setup.observation.absolute_bias(np.zeros(2))

        for link in link_list:
            # add optional bias settings here
            position_observation_settings.append(estimation_setup.observation.angular_position(link,
                                                                                               bias_settings=bias_settings))

    ##########################################################################################
    # Define parameters to be estimated
    ##########################################################################################
    # Define initial state to be estimated
    parameters_to_estimate_settings = estimation_setup.parameter.initial_states(propagator_settings, bodies)

    # Add other parameters to estimation (empirical accelerations can become full_empirical_acceleration_terms in the
    # future) In case of parameter estimation, define which parameters are estimated per model

    # Estimate model parameters
    if estimate_cr:
        parameters_to_estimate_settings.append(
            estimation_setup.parameter.radiation_pressure_coefficient("Chang'e 3"))

    # Finalize parameter vector
    parameters_to_estimate = estimation_setup.create_parameter_set(parameters_to_estimate_settings, bodies,
                                                                   propagator_settings)
    original_parameter_vector = parameters_to_estimate.parameter_vector

    ##########################################################################################
    # Carry out estimation
    ##########################################################################################

    print('Running propagation...')
    with tudat_util.redirect_std():
        estimator = numerical_simulation.Estimator(bodies, parameters_to_estimate,
                                                   position_observation_settings, propagator_settings,
                                                   integrate_on_creation=True)

    # Create input object for the estimation
    maximum_pod_iterations = 9
    minimum_improvement = 0.00001 * 10 ** (-6)

    # Number of ground station - RA/Decl combinations

    cr_std = 0.1

    # Define A priori covariance matrix
    if not estimate_cr:
        var_vector = np.array([np.inf, np.inf, np.inf, np.inf, np.inf, np.inf]) ** 2
    else:
        var_vector = np.array([np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, cr_std]) ** 2
        
    apriori_cov_matrix = np.linalg.inv(np.diag(var_vector))

    count_list = []
    count = 1
    for gs_times in ephemeris_satellite_states.get_observation_times():
        delta_t = np.diff(gs_times)
        for delta, i in enumerate(delta_t):
            if delta > 1e4:
                count_list.append(count)
                count = 1
            else:
                count += 1
        count_list.append(count)
        count = 1

    weight_value = 1 / ((1.5 * 2 * np.pi) / (360 * 3600)) ** 2
    weight_vector = np.zeros(2 * len(observation_array))
    idx = 0
    for c in count_list:
        weight = weight_value / np.sqrt(c)
        weight_vector[idx:idx + 2 * c] = weight
        idx += 2 * c

    ephemeris_satellite_states.set_tabulated_weights(weight_vector)

    # Create estimation input
    estimation_input = estimation.EstimationInput(observations_and_times=ephemeris_satellite_states,
                                                  inverse_apriori_covariance=apriori_cov_matrix,
                                                  convergence_checker=estimation.estimation_convergence_checker(
                                                      maximum_iterations=maximum_pod_iterations,
                                                      minimum_residual_change=minimum_improvement))

    # Set methodological options
    estimation_input.define_estimation_settings(save_state_history_per_iteration=True,
                                                reintegrate_variational_equations=True)

    # Perform the estimation
    print('Performing the estimation...')
    print(f'Original initial states: {original_parameter_vector}')

    estimation_output = estimator.perform_estimation(estimation_input)

    initial_states_updated = parameters_to_estimate.parameter_vector
    
    print('Done with the estimation...')
    print(f'Updated initial states: {initial_states_updated}')
    print('Formal errors estimation: ', estimation_output.formal_errors)

    return estimation_output, ephemeris_satellite_states, initial_states_updated, ground_station_list, estimator

In [69]:
object_mass = 5000.0  # kg - LM-3B rocket
reference_area = 37.14  # m2 - LM-3B rocket
drag_coefficient = 1.2  # -  ?
reference_area_radiation = 37.14  # m2 - LM-3B rocket
radiation_pressure_coefficient = 1.2  # -  ?
estimate_cr = False

# Define which bodies to create, central body of propagation and reference frame
bodies_to_create = ["Sun", "Earth", "Moon", "Jupiter"]
occulting_bodies = ["Earth"]
global_frame_origin = "Earth"
global_frame_orientation = "J2000"

current_propagator = propagation_setup.propagator.cowell

In [75]:
observation_array_complete, observation_dict, ground_stations_MPC = import_data_MPC()

cartesian_state_FindOrb, cartesian_state_FindOrb_dict = get_state_FindOrb()

simulation_start_epoch = time_conversion.datetime_to_tudat(datetime(2014, 8, 5, 12, 0, 0)).epoch()
simulation_end_epoch = time_conversion.datetime_to_tudat(datetime(2014, 10, 23, 12, 0, 0)).epoch()
simulation_mid_epoch = (simulation_start_epoch + simulation_end_epoch) / 2

observation_array, observation_dict = state_filter(observation_array_complete, simulation_start_epoch,
                                                   simulation_end_epoch)


Amount of observations in total dataset: 881


In [76]:
body_settings = get_environment_settings(bodies_to_create, global_frame_origin,
                                         global_frame_orientation, reference_area_radiation,
                                         radiation_pressure_coefficient, occulting_bodies, True,
                                         cartesian_state_FindOrb_dict)

In [77]:
bodies = get_bodies(body_settings, object_mass, reference_area, drag_coefficient,
                    observation_array_complete)

In [78]:
integrator_settings = propagation_setup.integrator.runge_kutta_variable_step_size(initial_time_step=1.0,
                                                                                  coefficient_set=propagation_setup.integrator.CoefficientSets.rkf_78,
                                                                                  minimum_step_size=np.finfo(
                                                                                      float).eps,
                                                                                  maximum_step_size=10000,
                                                                                  relative_error_tolerance=1.0e-10,
                                                                                  absolute_error_tolerance=1.0e-10)

In [79]:
dependent_variables_to_save = []
initial_state = get_initial_state(cartesian_state_FindOrb_dict, simulation_mid_epoch)

In [80]:
termination_condition = get_termination_condition(simulation_start_epoch, simulation_end_epoch)

In [81]:
# Create propagation settings
propagator_settings = get_propagator_settings(bodies, initial_state,
                                              simulation_mid_epoch, integrator_settings,
                                              termination_condition, dependent_variables_to_save,
                                              current_propagator)

In [82]:
estimation_output, ephemeris_satellite_states, initial_states_updated, ground_station_list, estimator = \
    perform_estimation(observation_array,
                            bodies, propagator_settings, [], estimate_cr)

Checking ephemerides...
Checking ephemerides...
Checking ephemerides...
Checking ephemerides...
Running propagation...
Performing the estimation...
Original initial states: [ 4.17214415e+08 -8.37524741e+07  2.83922046e+07  4.85187461e+01
  2.94846328e+02 -2.07833756e+02]
Calculating residuals and partials 68
Done with the estimation...
Updated initial states: [ 4.13244999e+08 -5.68315338e+07  9.94603369e+06 -1.41564470e+02
  3.26875366e+02 -2.17104978e+02]
Formal errors estimation:  [3.40135689e+03 1.36024165e+03 1.25822993e+03 2.62266065e-02
 1.10799047e-02 7.07648498e-03]
Current residual: 0.0926884
Parameter update 3.79359e+06   2.5421e+07 -1.51576e+07     -100.101      26.4726    0.0737652
Calculating residuals and partials 68
Current residual: 0.0483896
Parameter update-3.9981e+06 1.01845e+06  -1.965e+06    -58.8473     3.09409    -3.74302
Calculating residuals and partials 68
Current residual: 0.0149033
Parameter update-3.27272e+06       426621 -1.18396e+06     -28.1255      2.52