# Constelation optimization performance analysis

In [None]:
import ephemerista

ephemerista.init(eop_path="../../tests/resources/finals2000A.all.csv", spk_path="../../tests/resources/de440s.bsp")

In [None]:
import random

import deap.base
import deap.creator
import deap.tools
import geojson_pydantic
import numpy as np
from deap import algorithms, tools

from ephemerista.analysis.coverage import Coverage
from ephemerista.constellation.design import Constellation, WalkerStar
from ephemerista.scenarios import Scenario
from ephemerista.time import Time, TimeDelta

%load_ext line_profiler

In [None]:
# These are lower values than the ones used in the optimization notebook, so
# that the tests run faster.

TIME = Time.from_iso("TDB", "2016-05-30T12:00:00")
START_TIME = Time.from_iso("TDB", "2016-05-30T12:00:00")
END_TIME = START_TIME + TimeDelta.from_hours(4)
PENALTY = 1e3
DISCRETIZATION_RESOLUTION = 15

LOWER_BOUND_SATS = 5
UPPER_BOUND_SATS = 15
LOWER_BOUND_PLANES = 1
UPPER_BOUND_PLANES = 15

POPULATION_SIZE = 20
GENERATIONS = 10

HALL_OF_FAME_LEN = 20

with open("../single_aoi.geojson") as f:
    AOI = geojson_pydantic.FeatureCollection.model_validate_json(f.read())

global_hof_result: deap.tools.HallOfFame | list = []
global_evaluate_constellation_cache = {}

In [None]:
%%prun


def evaluate_constellation(
    individual,
    time: Time = TIME,
    start_time: Time = START_TIME,
    end_time: Time = END_TIME,
    aoi: geojson_pydantic.FeatureCollection = AOI,
    discretization_resolution: int = DISCRETIZATION_RESOLUTION,
    penalty: float = PENALTY,
    evaluate_constellation_cache: dict = global_evaluate_constellation_cache,
) -> tuple:
    print(f"Evaluating individual: {individual}")  # noqa: T201
    nsats, nplanes = individual
    if (nsats, nplanes) in evaluate_constellation_cache:
        print(f"Using cached result for {nsats} satellites and {nplanes} planes")  # noqa: T201
        individual.fitness.values = evaluate_constellation_cache[(nsats, nplanes)][0]
        mean_cov = evaluate_constellation_cache[(nsats, nplanes)][1]
        return nsats, mean_cov

    constellation = Constellation(
        model=WalkerStar(
            time=time,
            nsats=nsats,
            nplanes=nplanes,
            semi_major_axis=7000,
            inclination=45,
            eccentricity=0.0,
            periapsis_argument=90,
        )
    )

    scenario = Scenario(
        name="Coverage analysis with constellation",
        start_time=start_time,
        end_time=end_time,
        areas_of_interest=aoi,
        time_step=600,
        constellations=[constellation],
        discretization_resolution=discretization_resolution,
    )

    cov = Coverage(scenario=scenario)
    results = cov.analyze()
    mean_cov = np.array(results.coverage_percent).mean()
    individual.fitness.values = (-nsats * penalty, mean_cov)
    print(f"Evaluated individual: {individual} with fitness {individual.fitness}")  # noqa: T201
    evaluate_constellation_cache[(nsats, nplanes)] = (individual.fitness.values, mean_cov, results)
    return nsats, mean_cov


def optimize_constellation(
    pop_size=20,
    generations=50,
    hall_of_fame_len=1,
    time: Time = TIME,
    start_time: Time = START_TIME,
    end_time: Time = END_TIME,
    aoi: geojson_pydantic.FeatureCollection = AOI,
    penalty: float = PENALTY,
    lower_bound_sats: int = LOWER_BOUND_SATS,
    upper_bound_sats: int = UPPER_BOUND_SATS,
    lower_bound_planes: int = LOWER_BOUND_PLANES,
    upper_bound_planes: int = UPPER_BOUND_PLANES,
) -> deap.tools.HallOfFame:
    # We use this function and not define evaluate_constellation here so we can profile the function
    def evaluate(individual) -> tuple:
        return evaluate_constellation(individual, time, start_time, end_time, aoi, penalty)

    # Minimize number of satellites while maximizing coverage while favoring the latter
    deap.creator.create("FitnessMulti", deap.base.Fitness, weights=(-0.5, 1.0))
    deap.creator.create("Individual", list, fitness=deap.creator.FitnessMulti)  # type: ignore

    def init_individual():
        """Randomly generate an individual ensuring integer values."""
        num_sats = random.randint(lower_bound_sats, upper_bound_sats)  # noqa: S311
        num_planes = random.randint(lower_bound_planes, upper_bound_sats)  # noqa: S311

        num_sats = max(num_planes, (num_sats // num_planes) * num_planes)

        indi = deap.creator.Individual([num_sats, num_planes])  # type: ignore
        print(f"Initialized individual: {indi} with fitness {indi.fitness.values}")  # noqa: T201
        return indi

    def mutate(individual, mu=0, sigma=5, indpb=0.2):
        print(f"Mutating individual {individual} with fitness {individual.fitness.values}")  # noqa: T201
        num_sats, num_planes = individual

        # Apply Gaussian mutation
        if random.random() < indpb:  # noqa: S311
            num_sats += int(random.gauss(mu, sigma))
        if random.random() < indpb:  # noqa: S311
            num_planes += int(random.gauss(mu, sigma))

        # Enforce bounds and ensure integers
        num_sats = max(lower_bound_sats, min(upper_bound_sats, round(num_sats)))
        num_planes = max(lower_bound_planes, min(upper_bound_planes, round(num_planes)))

        num_sats = max(num_planes, (num_sats // num_planes) * num_planes)

        individual[:] = [num_sats, num_planes]
        print(f"Mutated individual: {individual} with fitness {individual.fitness.values}")  # noqa: T201
        return (individual,)

    def crossover(ind1, ind2, indpb=0.5):
        print(f"Crossover between {ind1} with fitness {ind1.fitness} and {ind2} with fitness {ind2.fitness}")  # noqa: T201
        for i in range(len(ind1)):
            if random.random() < indpb:  # Each gene has probability `indpb` of swapping  # noqa: S311
                ind1[i], ind2[i] = ind2[i], ind1[i]  # Swap values

        # Ensure num_sats remains divisible by num_planes
        for ind in [ind1, ind2]:
            num_sats, num_planes = ind
            if num_planes > 0:
                num_sats = max(num_planes, (num_sats // num_planes) * num_planes)
            ind[:] = [num_sats, num_planes]  # Assign values back
            del ind.fitness.values  # Invalidate fitness to force re-evaluation

        print(  # noqa: T201
            "Resulting individuals after crossover: "
            f"{ind1} with fitness {ind1.fitness} and {ind2} with fitness {ind2.fitness}"
        )
        return ind1, ind2

    toolbox = deap.base.Toolbox()

    toolbox.register("individual", init_individual)
    toolbox.register("population", deap.tools.initRepeat, list, toolbox.individual)  # type: ignore
    toolbox.register("evaluate", evaluate)
    toolbox.register("mate", crossover, indpb=0.5)  # Blend crossover
    toolbox.register("mutate", mutate, mu=0, sigma=5, indpb=0.2)  # Gaussian mutation
    toolbox.register("select", deap.tools.selTournament, tournsize=2)

    stats = tools.Statistics(key=lambda ind: ind.fitness.values)
    stats.register("avg", np.mean, axis=0)
    stats.register("std", np.std, axis=0)
    stats.register("min", np.min, axis=0)
    stats.register("max", np.max, axis=0)

    hof = deap.tools.HallOfFame(hall_of_fame_len)
    pop = toolbox.population(n=pop_size)  # type: ignore
    print(f"Initial population {[(p, p.fitness.values) for p in pop]}")  # noqa: T201

    pop = algorithms.eaSimple(
        pop, toolbox, cxpb=0.5, mutpb=0.1, ngen=generations, stats=stats, halloffame=hof, verbose=True
    )
    print(f"Hall of Fame: {hof}")  # noqa: T201

    best_sats, best_planes = hof[0]
    best_cost = evaluate_constellation(hof[0])[0]

    print(f"Optimal Constellation: {best_sats} satellites, {best_planes} planes, Cost: {best_cost}")  # noqa: T201

    # Store the global Hall of Fame result, so we can access it later even if profiling
    global global_hof_result  # noqa
    global_hof_result = hof

    return hof


optimize_constellation(pop_size=POPULATION_SIZE, generations=GENERATIONS, hall_of_fame_len=HALL_OF_FAME_LEN)

In [None]:
functions_to_profile = "-f optimize_constellation -f evaluate_constellation -f Coverage.analyze"
function_call = (
    "optimize_constellation(pop_size=POPULATION_SIZE, ",
    "generations=GENERATIONS, hall_of_fame_len=HALL_OF_FAME_LEN)",
)

%lprun -u 1 {functions_to_profile} {function_call}

In [None]:
# Plot the results
import matplotlib.pyplot as plt


def plot_coverage(
    nsats,
    nplanes,
    time: Time = TIME,
    start_time: Time = START_TIME,
    end_time: Time = END_TIME,
    discretization_resolution: int = DISCRETIZATION_RESOLUTION,
    aoi: geojson_pydantic.FeatureCollection = AOI,
):
    constellation = Constellation(
        model=WalkerStar(
            time=time,
            nsats=nsats,
            nplanes=nplanes,
            semi_major_axis=7000,
            inclination=45,
            eccentricity=0.0,
            periapsis_argument=90,
        )
    )

    scenario = Scenario(
        name="Coverage analysis with constellation",
        start_time=start_time,
        end_time=end_time,
        areas_of_interest=aoi,
        discretization_resolution=discretization_resolution,
        time_step=600,
        constellations=[constellation],
    )

    cov = Coverage(scenario=scenario)
    results = cov.analyze()
    print(f"Coverage results for {nsats} satellites and {nplanes} planes: {results.coverage_percent}")  # noqa: T201
    results.plot_mpl(legend=True, cmap="viridis", plot_land=True)
    plt.show()

In [None]:
for constellation in global_hof_result:
    nsats, nplanes = constellation
    print(f"Plotting coverage for {nsats} satellites and {nplanes} planes")  # noqa: T201
    plot_coverage(nsats=nsats, nplanes=nplanes)