# Wildfire Cosimulation Game
*This notebook utilizes newer Python features like the walrus operator (`:=`) from 3.8 and `match`-`case` statements from 3.10. If you are not using an up-to-date version of Python, you will need to make changes to parts of this notebook to account for the absence of newly-supported syntax.*

## Necessary Imports & Definitions
The following package imports, variables, functions, and classes will be necessary to execute various parts of this notebook.

### Sedaro Variables
To use this notebook, you will need to update some of the string values below. Specifically, input your API key as the `API_KEY` variable and the target Wildfire scenario branch ID as the `SCENARIO_BRANCH_ID` variable. The `WILDFIRE_ID` string should not require changing, but if you run into problems you may need to check this value to verify that it matches the agent ID of Wildfire in the targeted scenario.

In [363]:
API_KEY = "API KEY HERE" # API key, change to match your API key
SCENARIO_BRANCH_ID = "BRANCH ID HERE" # Scenario branch ID, change to match your scenario branch
WILDFIRE_ID = "NT06aqHUT5djI1_JPAsck" # Wildfire ID, should not need changing
API_HOST = "https://api.sedaro.com"
WEB_HOST = "https://satellite.sedaro.com"

from sedaro import SedaroApiClient
sedaro = SedaroApiClient(host=API_HOST, api_key=API_KEY)
scenario = sedaro.scenario(SCENARIO_BRANCH_ID)
simulation = scenario.simulation

### Simple Conversion Functions
Sedaro outputs the simulation time in MJD dates and angles in radians. These functions will help to easily convert MJD dates to `datetime` objects and radians to degrees.

In [364]:
from datetime import datetime, timedelta
def mjd2dt(mjd: float) -> datetime:
    """Convert Modified Julian Date (MJD) float to datetime object."""
    return datetime(1858, 11, 17) + timedelta(days=mjd)

from math import pi
def rad2deg(rad: float) -> float:
    """Convert radians to degrees."""
    return rad * 180 / pi

### Rotation & Quaternion Functions
Below are functions that will allow us to calculate the rotation matrix of a quaternion and convert between the spacecraft body frame and the Local-Vertical/Local-Horizontal (LVLH) frame.

In [365]:
import numpy as np
def quaternion2RotMat(quaternion: np.ndarray) -> np.ndarray:
        if len(quaternion) != 4:
            raise ValueError("Bad shape")

        rotation = np.zeros((3, 3))

        rotation[0, 0] = 1 - 2 * (quaternion[1] * quaternion[1] + quaternion[2] * quaternion[2])
        rotation[1, 0] = 2 * (quaternion[0] * quaternion[1] + quaternion[3] * quaternion[2])
        rotation[2, 0] = 2 * (quaternion[0] * quaternion[2] - quaternion[3] * quaternion[1])

        rotation[0, 1] = 2 * (quaternion[0] * quaternion[1] - quaternion[3] * quaternion[2])
        rotation[1, 1] = 1 - 2 * (quaternion[0] * quaternion[0] + quaternion[2] * quaternion[2])
        rotation[2, 1] = 2 * (quaternion[1] * quaternion[2] + quaternion[3] * quaternion[0])

        rotation[0, 2] = 2 * (quaternion[0] * quaternion[2] + quaternion[3] * quaternion[1])
        rotation[1, 2] = 2 * (quaternion[1] * quaternion[2] - quaternion[3] * quaternion[0])
        rotation[2, 2] = 1 - 2 * (quaternion[0] * quaternion[0] + quaternion[1] * quaternion[1])

        return rotation

def body2Lvlh(attitude, lvlh_to_eci):
    body_to_eci = quaternion2RotMat(attitude) # body to ECI rotation matrix
    eci_to_lvlh = lvlh_to_eci.T # transpose for ECI to LVLH rotation matrix
    body_to_lvlh = eci_to_lvlh @ body_to_eci # body to LVLH rotation matrix
    return body_to_lvlh

### Keyboard Input
This notebook utilizes the `pynput` package to interpret keypresses and trigger actions depending on which keys are pressed. These functions define the behavior of the `on_press` and `on_release` callbacks for the keyboard listener. Finally, the `keyboard.Listener` object is initialized and is ready to be started later.

In [366]:
from pynput import keyboard
def on_press(key):
    global RWs, thruster
    if isinstance(key, keyboard.KeyCode):
        match key.char:
            case 'w': # pitch down
                RWs['Y'].forward()
            case 's': # pitch up
                RWs['Y'].reverse()
            case 'a': # yaw left
                RWs['Z'].reverse()
            case 'd': # yaw right
                RWs['Z'].forward()
            case 'q': # roll left
                RWs['X'].reverse()
            case 'e': # roll right
                RWs['X'].forward()
            case n if int(n) in range(1, 10): # set op mode
                sat.setOpMode(int(n))
    elif isinstance(key, keyboard.Key):
        if key == keyboard.Key.space: # fire thruster
            thruster.burn()
        elif key == keyboard.Key.esc:
            print('Stopping the keyboard listener.')
            return False # stop listener

def on_release(key):
    global RWs, thruster
    if isinstance(key, keyboard.KeyCode):
        match key.char:
            case 'w' | 's':
                RWs['Y'].stop()
            case 'a' | 'd':
                RWs['Z'].stop()
            case 'q' | 'e':
                RWs['X'].stop()
    elif isinstance(key, keyboard.Key):
        if key == keyboard.Key.space:
            thruster.stop()

listener = keyboard.Listener(on_press=on_press, on_release=on_release)

### Game Classes
These classes will help logically organize, calculate, and recall information during the game loop. They are separate from classes within the `sedaro` Python package; they are purely for game logic.

In [367]:
from time import sleep
class Game:
    def __init__(self, min_loop_time = 0.2):
        self.minLoopTime = min_loop_time # minimum game loop time [seconds]
        self.prevTime = None
        self.time = None
        self.loopStart = None
        self.loopEnd = None
        self.loopTime = None
    
    def startLoop(self):
        self.loopStart = datetime.now()
    
    def endLoop(self):
        self.loopEnd = datetime.now()
        self.loopTime = (self.loopEnd - self.loopStart).total_seconds()
    
    def wait(self):
        if self.loopTime < self.minLoopTime:
            sleep(self.minLoopTime - self.loopTime) # sleep for the remainder of the minimum game loop time
    
    def update(self):
        if self.prevTime < self.time:
            self.prevTime = self.time
            return True
        else:
            return False

class Satellite:
    def __init__(self, opModes):
        self.opModes = opModes
        self.activeOpMode = opModes[0]
        self.stateOfCharge = None
        self.powerLoading = 0
        self.solarUtilization = None
        self.apogee = None
        self.perigee = None
        self.attitude = None
        self.lvlh2Eci = None
        self.yaw = None
        self.pitch = None
    
    def calcYawPitch(self, thruster):
        """Calculate the yaw and pitch angles of the thrust vector in the ram frame"""
        vector = -thruster.exhaustVector # thrust vector in body frame
        vector = body2Lvlh(self.attitude, self.lvlh2Eci) @ vector # vector in LVLH frame
        self.yaw = -np.arctan2(vector[2], vector[1]) # yaw angle from ram in LVLH frame
        self.pitch = np.arcsin(vector[0]) # pitch angle from ram in LVLH frame
    
    def setOpMode(self, priority: int):
        """Set the active op mode based on priority"""
        priority = min(priority, len(self.opModes)) # limit priority input to number of op modes
        self.activeOpMode = self.opModes[priority - 1] # set active op mode

class RW:
    def __init__(self, ratedTorque: float, ratedMomentum: float):
        self.ratedTorque = ratedTorque
        self.torque = 0
        self.ratedMomentum = ratedMomentum
        self.momentum = 0
    
    def forward(self):
        self.torque = self.ratedTorque
    
    def reverse(self):
        self.torque = -self.ratedTorque
    
    def stop(self):
        self.torque = 0

class Thruster:
    def __init__(self, maxThrust: float, exhaustVector, capacity: float):
        self.maxThrust = maxThrust
        self.thrust = 0
        self.exhaustVector = exhaustVector
        self.capacity = capacity
        self.fuel = capacity
    
    def burn(self):
        self.thrust = self.maxThrust
    
    def stop(self):
        self.thrust = 0

### User Interface
These functions define the format and logic for displaying information during the game loop. `bar` generates a colored capacity bar that will help to visualize various variables. `display` formats information from the game classes and displays a user interface in the [game loop](#game-loop) output cell.

In [368]:
def bar(num: float, minimum: float, maximum: float, start: float = 0, width: int = 101):
    bar_color = '\033[34m' # blue
    slide_color = '\033[36m' # cyan
    rail_color = '\033[0m' # reset

    num = max(minimum, min(num, maximum)) # keep num within bounds
    w = width - 2 # width adjusted for brackets
    str = [slide_color + '-'] * w # create list of slide characters
    i_start, i_num = [min(int((n - minimum) / (maximum - minimum) * w), w - 1) for n in [start, num]] # get indices of start and num
    for i in range(i_start, i_num + 1, 1) if i_num > i_start else range(i_start, i_num - 1, -1): # loop between start and num indices
        str[i] = bar_color + '\u2588' # insert bar
    str[i_start] = rail_color + '|' # insert start rail
    str.insert(0, rail_color + '[') # add left bracket
    str.append(rail_color + ']') # add right bracket
    return ''.join(str) # return joined list as string

from IPython.display import clear_output
def display(update: bool = False):
    update_color = '\033[92m' # green
    bold = '\033[1m' # bold
    reset = '\033[0m'
    info_string = "\n".join([
        f"{bold}Loop Time{reset}:           {game.loopTime:.4f} seconds",
        "",
        f"{bold}Simulation Time{reset}:     {update_color if update else ''}{mjd2dt(game.time)}{reset}",
        "",
        f"{bold}Operational Mode{reset}:    [{sat.activeOpMode.priority + 1}] {sat.activeOpMode.name}",
        "",
        f"{bold}---- Orbit ----{reset}",
        f"  Apogee:            {sat.apogee:9.4f} km",
        f"  Perigee:           {sat.perigee:9.4f} km",
        "",
        f"{bold}---- Attitude ----{reset}",
        f"  Yaw from Ram:      {bar(rad2deg(sat.yaw), -180, 180)} {rad2deg(sat.yaw):7.2f}\u00B0",
        f"  Pitch from Ram:    {bar(rad2deg(sat.pitch), -90, 90)} {rad2deg(sat.pitch):7.2f}\u00B0",
        "",
        f"{bold}---- Actuators ----{reset}",
        f"                          {bold}Actuation{reset}      |                                                {bold}Capacity{reset}",
        f"  RW X:              {bar(RWs['X'].torque, -RWs['X'].ratedTorque, RWs['X'].ratedTorque, 0, 9)} {RWs['X'].torque:5.2f} N\u22C5m | {bar(RWs['X'].momentum, -RWs['X'].ratedMomentum, RWs['X'].ratedMomentum)} {RWs['X'].momentum:5.2f} N\u22C5s",
        f"  RW Y:              {bar(RWs['Y'].torque, -RWs['Y'].ratedTorque, RWs['Y'].ratedTorque, 0, 9)} {RWs['Y'].torque:5.2f} N\u22C5m | {bar(RWs['Y'].momentum, -RWs['Y'].ratedMomentum, RWs['Y'].ratedMomentum)} {RWs['Y'].momentum:5.2f} N\u22C5s",
        f"  RW Z:              {bar(RWs['Z'].torque, -RWs['Z'].ratedTorque, RWs['Z'].ratedTorque, 0, 9)} {RWs['Z'].torque:5.2f} N\u22C5m | {bar(RWs['Z'].momentum, -RWs['Z'].ratedMomentum, RWs['Z'].ratedMomentum)} {RWs['Z'].momentum:5.2f} N\u22C5s",
        f"  Thruster:          {bar(thruster.thrust, 0, thruster.maxThrust, 0, 9)} {thruster.thrust:5.1f} N   | {bar(thruster.fuel, 0, thruster.capacity)} {thruster.fuel:5.2f} kg",
        "",
        f"{bold}---- Power ----{reset}",
        f"  Power Loading:     {sat.powerLoading:5.2f} W",
        f"  Battery Charge:    {bar(sat.stateOfCharge, 0, 1)} {sat.stateOfCharge:7.2%}",
        f"  Solar Utilization: {bar(sat.solarUtilization, 0, 1)} {sat.solarUtilization:7.2%}",
    ])
    clear_output() # clear output of Jupyter Notebook cell output
    print(info_string, flush=True)

## Scenario Setup

### External State Block

In [369]:
# Delete existing ExternalState blocks
if len(perRoundIDs := scenario.PerRoundExternalState.get_all_ids()) > 0:
    scenario.crud(delete=perRoundIDs)
if len(spontaneousIDs := scenario.SpontaneousExternalState.get_all_ids()) > 0:
    scenario.crud(delete=spontaneousIDs)

In [370]:
gnc_state_block = scenario.SpontaneousExternalState.create(**{
            'consumed':[
                "time",
                {"root": "attitude"},
                {"root": "lvlhAxes"},
                {"ReactionWheel": "momentum"},
                {"FuelTank": "wetMass"},
                {"Orbit": "radiusApogee"},
                {"Orbit": "radiusPerigee"}],
            'produced': [
                {"ReactionWheel": "commandedTorqueMagnitude"},
                {"Thruster": "thrust"},
                "timeStep"],
            'engineIndex': 0,
            'agents': [WILDFIRE_ID],
        }
    )
GNC_STATE_ID = gnc_state_block.id
cdh_state_block = scenario.SpontaneousExternalState.create(**{
            'consumed': [{"prev.root": "activeCommInterfaces"}],
            'produced': [{"root": "activeOpMode"}],
            'engineIndex': 1,
            'agents': [WILDFIRE_ID],
        }
    )
CDH_STATE_ID = cdh_state_block.id
power_state_block = scenario.SpontaneousExternalState.create(**{
            'consumed':[
                {"Battery": "soc"},
                {"PowerProcessor": "outputPowers"},
                {"SolarArray": "utilization"}],
            'engineIndex': 2,
            'agents': [WILDFIRE_ID],
        }
    )
POWER_STATE_ID = power_state_block.id

### Clock Configuration
By default, the scenario clock is configured to not limit the speed of the simulation. To make our game more interactive, we set the `ClockConfig` to run in real-time. This only needs to be done once as every subsequent simulation will now run in real-time until this setting is changed back to `realTime=False`.

In [371]:
clock_config = scenario.ClockConfig.get_first() # get clock config block
_ = clock_config.update(realTime=True, syncTime=False) # set real-time mode with syncTime off

### Game Class Initialization


#### Actuator Initialization
If you have changed the names (or orientations) of Wildfire's reaction wheels, you will need to update the `RW_names` dictionary in the block below as it uses these values to determine the maximum torque available to each reaction wheel.

In [372]:
agent_block = scenario.block(WILDFIRE_ID)
AGENT_TEMPLATE_ID = agent_block.templateRef # get referenced agent template branch ID
agent_template = sedaro.agent_template(AGENT_TEMPLATE_ID) # get agent template

## Reaction Wheels
RW_names = {
    'X': "RW-X",
    'Y': "RW-Y",
    'Z': "RW-Z"
} # reaction wheel names for their corresponding axes
RWs = {}
reaction_wheel_IDs = agent_template.data['index']['ReactionWheel'] # list of reaction wheel IDs in agent template
for rw_ID in reaction_wheel_IDs: # iterate through reaction wheels to get rated torques
    rw_block = agent_template.block(rw_ID) # reaction wheel block
    for axis, name in RW_names.items():
        if rw_block.name == name:
            RWs[axis] = RW(rw_block.ratedTorque, rw_block.ratedMomentum)
            break

## Thruster
thruster_ID = agent_template.data['index']['Thruster'][0] # thruster ID in agent template
thruster_block = agent_template.block(thruster_ID) # thruster block
thruster = Thruster(thruster_block.maxThrust,
                    np.array(thruster_block.orientation.unitVector), # thruster exhaust vector
                    sum(tank.capacity for tank in thruster_block.fuelReservoir.fuelTanks), # capacity of thruster's fuel tanks
                    ) # initialize thruster

#### Operational Mode Sorting

In [373]:
op_mode_blocks = agent_template.OperationalMode.get_all()
sorted_op_modes = sorted(op_mode_blocks, key=lambda x: x.priority)

#### Game & Satellite Initialization

In [374]:
game = Game()
game.prevTime = clock_config.startTime # initialize previous game loop time to start time
sat = Satellite(sorted_op_modes)

## Running the Game

### Starting Simulation & Listener
These blocks will start the simulation and the keyboard listener. Starting the simulation can take longer for more complicated scenarios, so you may need to give it some time (~30 seconds). After the simulation is started, the code block below will print a link that you can use to view the scenario results in a web browser.

In [None]:
simulation_handle = simulation.start() # start the simulation (this will take a few seconds)
print(f"View the scenario results at {WEB_HOST}/#/agent-analyze/{SCENARIO_BRANCH_ID}/custom/playback?agentId={WILDFIRE_ID}") # print link to scenario results

In [None]:
listener.start() # start non-blocking keyboard listener in another thread

### Game Loop

In [None]:
while listener.running and simulation.status()['status'] == 'RUNNING':
    game.startLoop() # start game loop timer
    sim_updated = False

    # Consume & produce simulation state
    gnc_state = simulation_handle.consume(agent_id = WILDFIRE_ID, external_state_id = GNC_STATE_ID) # query the simulation for the current state
    simulation_handle.produce(agent_id = WILDFIRE_ID,
                              external_state_id = GNC_STATE_ID,
                              values = (
                                  [RWs['X'].torque, RWs['Y'].torque, RWs['Z'].torque],
                                  [thruster.thrust],
                                  2 / 86400)) # Send state (reaction wheel torque and thruster thrust) to the simulation
    simulation_handle.consume(agent_id = WILDFIRE_ID, external_state_id = CDH_STATE_ID) # query the simulation for the current state
    simulation_handle.produce(agent_id = WILDFIRE_ID,
                              external_state_id = CDH_STATE_ID,
                              values = (sat.activeOpMode.id,))
    power_state = simulation_handle.consume(agent_id = WILDFIRE_ID, external_state_id = POWER_STATE_ID) # query the simulation for the current state
    

    # Parse simulation output
    (
        time, # simulation time [MJD]
        attitude,
        lvlhAxes,
        momentum, # reaction wheel momentum list
        wetMass, # tank fuel list
        apogee, # apogee list [km]
        perigee # perigee list [km]
    ) = gnc_state # unpack the GNC state tuple
    (
        stateOfCharge, # battery state of charge list
        powerLoading, # power processor output power list
        solarUtilization, # solar array utilization list
    ) = power_state # unpack the power state tuple

    # Update fields
    game.time = time # update game time
    sat.attitude = attitude # update attitude quaternion
    sat.lvlh2Eci = lvlhAxes # update LVLH axes
    sat.calcYawPitch(thruster) # calculate yaw and pitch angles
    for rw, m in zip(RWs.values(), momentum):
        rw.momentum = m # update reaction wheel momentum
    thruster.fuel = sum(wetMass) # update thruster fuel
    sat.apogee = apogee[0] # apogee [km]
    sat.perigee = perigee[0] # perigee [km]
    sat.stateOfCharge = stateOfCharge[0]
    sat.powerLoading = powerLoading[0]['total']
    totalUtilization = sum([(util if util == util else 0) for util in solarUtilization]) # sum utilization values, ignoring NaNs
    sat.solarUtilization = totalUtilization / len(solarUtilization)

    sim_updated = game.update() # detect if the simulation time has changed
    game.endLoop() # end game loop timer
    display(sim_updated) # update display output
    game.wait() # wait if necessary

### Stopping Simulation & Listener

In [378]:
listener.stop() # stop keyboard listener if it hasn't been stopped already

In [379]:
try:
    simulation_handle.terminate() # stop simulation if it hasn't stopped already
except:
    pass