In [None]:
#Gymansium for Xplane
#taken from https://www.gymlibrary.dev/content/environment_creation/
#create gymansium to run XPlane.
#start with XPlane running, use with XPPyhton.  Don't see any way to reload situation
#will need to init by ??
#reset is give new command or set position?
#get obs gets state = [x;u]=[heading, alt, airspeed; pitch, roll, throttle]
#step by?
#to reduce crashes limit commands, faster failure
#start with current state (heading/alt/airspeed + control settings pitch, roll, throttle)
#try to get to commanded state and stay there, reward based on how far from commanded state normalized
#probably make commands a small set of discrete (+/-10 deg hdg change and/or +/-500 alt, airspeed constant)
#try to reach from "random" current state
#need way to time steps--maybe allow input every T=6s sim time, give it 1 sim time minute (at 10x or so sim speed)
#make sure situation display settings are turned way down
#score reward every 6s, cumulative

from os import path
from typing import Optional

import numpy as np
import math
import pde
import random
from XPPython3 import xp

import gymnasium as gym
from gymnasium import spaces
from gymnasium.envs.classic_control import utils
from gymnasium.error import DependencyNotInstalled
##At the top of the code
import logging
logger = logging.getLogger('requests_throttler')
logger.addHandler(logging.NullHandler())
logger.propagate = False


#not sure where to put these

#diffusion equation is included in py-pde, so don't need seperate dynamics
#import dynamics
#from dynamics import MGM

class XPlaneEnv(gym.Env):
    #observation space is state and control spaces, normalized
    #space is 7-vector
    #also make variables to support XP python calls
    #also add menu selection slot for XP python
    def __init__(self, render_mode=None, size: int =7):
        #gymansium init:  make spaces
        obs_size=6
        self.observation_space =spaces.Box(low=-1, high=1, shape=(obs_size+2,), dtype=float) #add two for commands
        action_size=3
        self.action_space = spaces.Box(low=-1, high=1, shape=(action_size,), dtype=float) 
        #need to update action to normal distribution

        self.grid=[]
        self.stepper=[]
        #XP init:  make variable list
        self.StateDataRefDescriptions = ["sim/flightmodel/engine/ENGN_thro", "sim/joystick/yolk_pitch_ratio",
                                   "sim/joystick/yoke_roll_ratio","sim/joystick/FC_ptch",
                                   "sim/cockpit2/gauges/indicators/altitude_ft_pilot", "sim/flightmodel/position/true_psi",
                                   "sim/cockpit2/gauges/indicators/airspeed_kts_pilot"]
        self.ActionDataRefDescriptions = ["sim/flightmodel/engine/ENGN_thro", "sim/cockpit2/controls/total_pitch_ratio",
                                   "sim/cockpit2/controls/total_roll_ratio","sim/flightmodel/controls/rudd_def"]


        #XP call
        XPinit(self)
        
        
    def _get_obs(self):

        Pitch, Roll, Throttle, AltNorm, HdgNorm, SpeedNorm=XPobs(self)
        self.state=np.array([Pitch, Roll, Throttle, AltNorm, HdgNorm, SpeedNorm, self.command[0], self.command[1]])
        return self.state

    def reset(self, seed: Optional[int] = None, options=None):
        #reset to altitude 4000, airspeed 120, heading 180
        #only doing relative heading/altitude changes so shouldn't affect things much
        AltRef=xp.findDataRef("sim/cockpit2/gauges/indicators/altitude_ft_pilot")
        SpeedRef=xp.findDataRef("sim/cockpit2/gauges/indicators/airspeed_kts_pilot")
        HdgRef=xp.findDataRef("sim/flightmodel/position/true_psi")
        
        xp.setDataf(AltRef, 4000.)
        xp.setDataf(SpeedRef, 120.)
        xp.setDataf(HdgRef, 180.)

        #commands randmoly chosen from a matrix of possible commands 
        #+/- 45 deg heading, with fixed initial heading this is 225 or 135
        #+/- 500 ft elevation, with fixed initial altitude this is 4500 or 3500
        #combination of the two
        #commands is 2x8 array with heading change, elevation change as each row
        #commands = np.array([[225., 4000.], [135., 4000.], [180., 4500.], [180., 3500.], [225., 4500.], [135., 4500.], [225., 3500.], [135., 3500.]])
        #issue normalized commands
        commands = np.array([[45./180., 0.], [-45./180., 0.], [0., 500./3000], [0., -500./3000], [45./180, 500./3000], 
                             [-45./180, 500./3000], [45./180, -500./3000], [-45./180, -500./3000]])
        #choose a random command to send
        n=random.randint(0,7)
        self.command = commands[n,]

        observation = self._get_obs()
        
        return observation, self.state, self.command
        
    def step(self,action):
        #design of the training will need to be period of time (30s-1min sim time) where aagent tries to acheive command, then reset position and command
        #during each period T agent can continuously control pitch, roll, throttle (leave rudder/yaw out for now and just do uncoordinated)
        #if altitude change by more than 1500 feet or heading change by more than 90 degrees, or speed change by more than 50kts then fail
        #want to sample for reward and updating control input every X sim seconds (start with T/10)
        #what/how does step work then, is it every 6 seconds or an entire period?  Each step should be a period or a full contorl/reward interval
        #action to be a numpy 3-vector pitch commmand, roll command, throttle command
          
        #self.state.data[0,:]=action[0]
        state=self.state   
        XPaction(action)
        observation=self._get_obs()
        reward=np.sqrt(10.*(self.state.data[3]-self.command[1])**2+10.*(self.state.data[4]-self.command[0])**2+10.*(self.state.data[5]-0.)**2)
        done=False
        truncated=False
        if (self.state.data[3]>1 or self.state.data[3]<-1 or self.state.data[4]>1 or self.state.data[4]<-1 or 
            self.state.data[5]>1 or self.state.data[5]<-1 ):
            truncated= True
            reward=-5000  #if beyond these bounds then consider as departed controlled flight
        state=self.state
 
        return self.state, reward, done, truncated, {}
    
    
    def close(self):
        XPluginStop

In [None]:
#XP functions to be in python plugin folder
from XPPython3 import xp
class PythonInterface:
    def XPinit(gym):
        gym.StateDataRef = []
        for Item in range(gym.obs_size):
            gym.InputDataRef.append(xp.findDataRef(gym.StateDataRefDescriptions[Item]))
    
        gym.ActionDataRef = []
        for Item in range(gym.action_size):
            gym.ActionDataRef.append(xp.findDataRef(gym.ActionDataRefDescriptions[Item]))
      
        #XP init:  make item list
        Item = xp.appendMenuItem(xp.findPluginsMenu(), "Python - Position 1", 0)
        gym.PositionMenuHandlerCB = self.PositionMenuHandler
        gym.Id = xp.createMenu("Position1", xp.findPluginsMenu(), Item, self.PositionMenuHandlerCB, 0)
        xp.appendMenuItem(self.Id, "Position1", 1)
    
        # Flag to tell us if the widget is being displayed.
        gym.MenuItem1 = 0
        return gym
    
    def XPobs(self):
        Pitch=[]
        Roll = []
        #Yaw = []
        Throttle =[]
        Alt = []
        Hdg = []
        Speed = []
        count = xp.getDatavf(self.InputDataRef[0], Throttle, 0, 1)
    
        # #Get pitch, roll, yaw
    
        Pitch = xp.getDataf(self.InputDataRef[1])#, self.Pitch, 0, 1) #normalized
        Roll = xp.getDataf(self.InputDataRef[2])#, self.Roll, 0, 1)
        #Yaw = xp.getDataf(self.InputDataRef[3])#, self.Yaw, 0, 1)
    
        # #Get altitude, heading, airspeed
    
        Alt = xp.getDataf(self.InputDataRef[4])#, self.Alt, 0, 1)
        Hdg = xp.getDataf(self.InputDataRef[5])#, self.Hdg, 0, 1)
        Speed = xp.getDataf(self.InputDataRef[6])#, self.Speed, 0, 1)
        #convert to degrees and normalize to +/- 1 with guessed limits 
        SpeedNorm= (Speed-120.)/(170-70) #if speed outside these limits, then fail
        AltNorm= (Alt-4000)/(5500-2500) #if alt outside limits, fail
        HdgNorm = (Hdg-180)/(270-90) # hdg limis 90-270
        #all other already normalized
        return(Pitch, Roll, Throttle, SpeedNorm, AltNorm, HdgNorm)
    
    def XPreset():
        xp.registerFlightLoopCallback(self.InputOutputLoopCB, 3.0, 0)
        AltRef=xp.findDataRef("sim/cockpit2/gauges/indicators/altitude_ft_pilot")
        SpeedRef=xp.findDataRef("sim/cockpit2/gauges/indicators/airspeed_kts_pilot")
        HdgRef=xp.findDataRef("sim/flightmodel/position/true_psi")
        
        xp.setDataf(AltRef, 4000.)
        xp.setDataf(SpeedRef, 120.)
        xp.setDataf(HdgRef, 180.)
    
            #send command
        ThrottleCmd=xp.findDataRef("sim/flightmodel/engine/ENGN_thro")
        PitchCmd=xp.findDataRef("sim/cockpit2/controls/total_pitch_ratio")
        RollCmd=xp.findDataRef( "sim/cockpit2/controls/total_roll_ratio")
        
        xp.setDataf(ThrottleCmd, action[0])
        xp.setDataf(PitchCmd, action[1])
        xp.setDataf(RollCmd, action[2])
       

    def XPaction(action):
        #send command
        ThrottleCmd=xp.findDataRef("sim/flightmodel/engine/ENGN_thro")
        PitchCmd=xp.findDataRef("sim/cockpit2/controls/total_pitch_ratio")
        RollCmd=xp.findDataRef( "sim/cockpit2/controls/total_roll_ratio")
        
        xp.setDataf(ThrottleCmd, action[0])
        xp.setDataf(PitchCmd, action[1])
        xp.setDataf(RollCmd, action[2])

        
        #wait X seconds here, then get observation and reward
        xp.registerFlightLoopCallback(MyCallback, 3.0, 0)
        xp.unregisterFlightLoopCallback(MyCallback, 0)

    def MyCallback(lastCall, elapsedTime, counter, refCon):
    
       #xp.log(f"{elapsedTime}, {counter}")
    
       return 1.0
    
    def PositionMenuHandler(self, inMenuRef, inItemRef):
        # If menu selected create our widget dialog
        if inItemRef == 1:
            if self.MenuItem1 == 0:
                self.CreatePosition(300, 600, 300, 550)
                self.MenuItem1 = 1
            else:
                if not xp.isWidgetVisible(self.PositionWidget):
                    xp.showWidget(self.PositionWidget)
    def CreatePosition(self, x, y, w, h):
        FloatValue = []
        for Item in range(self.MAX_ITEMS):
            FloatValue.append(xp.getDataf(self.PositionDataRef[Item]))
    
        # X, Y, Z, Lat, Lon, Alt
        DoubleValue = [0.0, 0.0, 0.0]
        DoubleValue[0], DoubleValue[1], DoubleValue[2] = xp.localToWorld(FloatValue[0], FloatValue[1], FloatValue[2])
        DoubleValue[2] *= 3.28
    
        x2 = x + w
        y2 = y - h
        PositionText = []
    
        # Create the Main Widget window
        self.PositionWidget = xp.createWidget(x, y, x2, y2, 1, "Python - Position Example 1 by Sandy Barbour", 1,
                                              0, xp.WidgetClass_MainWindow)
    
        # Add Close Box decorations to the Main Widget
        xp.setWidgetProperty(self.PositionWidget, xp.Property_MainWindowHasCloseBoxes, 1)

    def XPluginStop(self):
        if self.MenuItem1 == 1:
            xp.destroyWidget(self.PositionWidget, 1)
            self.MenuItem1 = 0

        xp.destroyMenu(self.Id)

In [None]:
#gym test
env = gym.make('XPlaneROM-v0')
obs, state, command = env.reset(seed=random.randint(0,10000))
print(command, type(command))
action=np.array([0., 0.,])
trunc = False
while not trunc:
    s_next, r, done, trunc, info = env.step(action)
    action=action+np.array([0.05,0.05])
    print(s_next)
print(s_next, r, action, s_next.data)

In [26]:
#PPO in pytorch as https://www.datacamp.com/tutorial/proximal-policy-optimization
#this is for cart pole which has discrete action and 1d action space
#may need to try https://github.com/XinJingHao/PPO-Continuous-Pytorch/blob/main/main.py or other for continuous
#or something else for multiple action space dimension
# will also need to make sure variable formats match gymnaisum

import torch
from torch import nn

class BackboneNetwork(nn.Module):
    def __init__(self, in_features, hidden_dimensions, out_features, dropout):
        super().__init__()
        self.layer1 = nn.Linear(in_features, hidden_dimensions)
        self.layer2 = nn.Linear(hidden_dimensions, hidden_dimensions)
        self.layer3 = nn.Linear(hidden_dimensions, out_features)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        x = self.layer1(x)
        x = f.relu(x)
        x = self.dropout(x)
        x = self.layer2(x)
        x = f.relu(x)
        x = self.dropout(x)
        x = self.layer3(x)
        return x

class ActorCritic(nn.Module):
    def __init__(self, actor, critic):
        super().__init__()
        self.actor = actor
        self.critic = critic
    def forward(self, state):
        action_pred = self.actor(state)
        value_pred = self.critic(state)
        return action_pred, value_pred

def create_agent(hidden_dimensions, dropout):
    INPUT_FEATURES = env_train.observation_space.shape[0]
    HIDDEN_DIMENSIONS = hidden_dimensions
    ACTOR_OUTPUT_FEATURES = env_train.action_space.n
    CRITIC_OUTPUT_FEATURES = 1
    DROPOUT = dropout
    actor = BackboneNetwork(
            INPUT_FEATURES, HIDDEN_DIMENSIONS, ACTOR_OUTPUT_FEATURES, DROPOUT)
    critic = BackboneNetwork(
            INPUT_FEATURES, HIDDEN_DIMENSIONS, CRITIC_OUTPUT_FEATURES, DROPOUT)
    agent = ActorCritic(actor, critic)
    return agent

def calculate_returns(rewards, discount_factor):
    returns = []
    cumulative_reward = 0
    for r in reversed(rewards):
        cumulative_reward = r + cumulative_reward * discount_factor
        returns.insert(0, cumulative_reward)
    returns = torch.tensor(returns)
    # normalize the return
    returns = (returns - returns.mean()) / returns.std()
    return returns

def calculate_advantages(returns, values):
    advantages = returns - values
    # Normalize the advantage
    advantages = (advantages - advantages.mean()) / advantages.std()
    return advantages

def calculate_surrogate_loss(
        actions_log_probability_old,
        actions_log_probability_new,
        epsilon,
        advantages):
    advantages = advantages.detach()
    policy_ratio = (
            actions_log_probability_new - actions_log_probability_old
            ).exp()
    surrogate_loss_1 = policy_ratio * advantages
    surrogate_loss_2 = torch.clamp(
            policy_ratio, min=1.0-epsilon, max=1.0+epsilon
            ) * advantages
    surrogate_loss = torch.min(surrogate_loss_1, surrogate_loss_2)
    return surrogate_loss

def calculate_losses(
        surrogate_loss, entropy, entropy_coefficient, returns, value_pred):
    entropy_bonus = entropy_coefficient * entropy
    policy_loss = -(surrogate_loss + entropy_bonus).sum()
    value_loss = f.smooth_l1_loss(returns, value_pred).sum()
    return policy_loss, value_loss

def init_training():
    states = []
    actions = []
    actions_log_probability = []
    values = []
    rewards = []
    done = False
    episode_reward = 0
    return states, actions, actions_log_probability, values, rewards, done, episode_reward

def forward_pass(env, agent, optimizer, discount_factor):
    states, actions, actions_log_probability, values, rewards, done, episode_reward = init_training()
    state = env.reset()
    agent.train()
    while not done:
        state = torch.FloatTensor(state).unsqueeze(0)
        states.append(state)
        action_pred, value_pred = agent(state)
        action_prob = f.softmax(action_pred, dim=-1)
        dist = distributions.Categorical(action_prob)
        action = dist.sample()
        log_prob_action = dist.log_prob(action)
        state, reward, done, _ = env.step(action.item())
        actions.append(action)
        actions_log_probability.append(log_prob_action)
        values.append(value_pred)
        rewards.append(reward)
        episode_reward += reward
    states = torch.cat(states)
    actions = torch.cat(actions)
    actions_log_probability = torch.cat(actions_log_probability)
    values = torch.cat(values).squeeze(-1)
    returns = calculate_returns(rewards, discount_factor)
    advantages = calculate_advantages(returns, values)
    return episode_reward, states, actions, actions_log_probability, advantages, returns

def update_policy(
        agent,
        states,
        actions,
        actions_log_probability_old,
        advantages,
        returns,
        optimizer,
        ppo_steps,
        epsilon,
        entropy_coefficient):
    BATCH_SIZE = 128
    total_policy_loss = 0
    total_value_loss = 0
    actions_log_probability_old = actions_log_probability_old.detach()
    actions = actions.detach()
    training_results_dataset = TensorDataset(
            states,
            actions,
            actions_log_probability_old,
            advantages,
            returns)
    batch_dataset = DataLoader(
            training_results_dataset,
            batch_size=BATCH_SIZE,
            shuffle=False)
    for _ in range(ppo_steps):
        for batch_idx, (states, actions, actions_log_probability_old, advantages, returns) in enumerate(batch_dataset):
            # get new log prob of actions for all input states
            action_pred, value_pred = agent(states)
            value_pred = value_pred.squeeze(-1)
            action_prob = f.softmax(action_pred, dim=-1)
            probability_distribution_new = distributions.Categorical(
                    action_prob)
            entropy = probability_distribution_new.entropy()
            # estimate new log probabilities using old actions
            actions_log_probability_new = probability_distribution_new.log_prob(actions)
            surrogate_loss = calculate_surrogate_loss(
                    actions_log_probability_old,
                    actions_log_probability_new,
                    epsilon,
                    advantages)
            policy_loss, value_loss = calculate_losses(
                    surrogate_loss,
                    entropy,
                    entropy_coefficient,
                    returns,
                    value_pred)
            optimizer.zero_grad()
            policy_loss.backward()
            value_loss.backward()
            optimizer.step()
            total_policy_loss += policy_loss.item()
            total_value_loss += value_loss.item()
    return total_policy_loss / ppo_steps, total_value_loss / ppo_steps

def evaluate(env, agent):
    agent.eval()
    rewards = []
    done = False
    episode_reward = 0
    state = env.reset()
    while not done:
        state = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            action_pred, _ = agent(state)
            action_prob = f.softmax(action_pred, dim=-1)
        action = torch.argmax(action_prob, dim=-1)
        state, reward, done, _ = env.step(action.item())
        episode_reward += reward
    return episode_reward

def plot_train_rewards(train_rewards, reward_threshold):
    plt.figure(figsize=(12, 8))
    plt.plot(train_rewards, label='Training Reward')
    plt.xlabel('Episode', fontsize=20)
    plt.ylabel('Training Reward', fontsize=20)
    plt.hlines(reward_threshold, 0, len(train_rewards), color='y')
    plt.legend(loc='lower right')
    plt.grid()
    plt.show()

def plot_test_rewards(test_rewards, reward_threshold):
    plt.figure(figsize=(12, 8))
    plt.plot(test_rewards, label='Testing Reward')
    plt.xlabel('Episode', fontsize=20)
    plt.ylabel('Testing Reward', fontsize=20)
    plt.hlines(reward_threshold, 0, len(test_rewards), color='y')
    plt.legend(loc='lower right')
    plt.grid()
    plt.show()

def plot_losses(policy_losses, value_losses):
    plt.figure(figsize=(12, 8))
    plt.plot(value_losses, label='Value Losses')
    plt.plot(policy_losses, label='Policy Losses')
    plt.xlabel('Episode', fontsize=20)
    plt.ylabel('Loss', fontsize=20)
    plt.legend(loc='lower right')
    plt.grid()
    plt.show()

def run_ppo():
    MAX_EPISODES = 500
    DISCOUNT_FACTOR = 0.99
    REWARD_THRESHOLD = 475
    PRINT_INTERVAL = 10
    PPO_STEPS = 8
    N_TRIALS = 100
    EPSILON = 0.2
    ENTROPY_COEFFICIENT = 0.01
    HIDDEN_DIMENSIONS = 64
    DROPOUT = 0.2
    LEARNING_RATE = 0.001
    train_rewards = []
    test_rewards = []
    policy_losses = []
    value_losses = []
    agent = create_agent(HIDDEN_DIMENSIONS, DROPOUT)
    optimizer = optim.Adam(agent.parameters(), lr=LEARNING_RATE)
    for episode in range(1, MAX_EPISODES+1):
        train_reward, states, actions, actions_log_probability, advantages, returns = forward_pass(
                env_train,
                agent,
                optimizer,
                DISCOUNT_FACTOR)
        policy_loss, value_loss = update_policy(
                agent,
                states,
                actions,
                actions_log_probability,
                advantages,
                returns,
                optimizer,
                PPO_STEPS,
                EPSILON,
                ENTROPY_COEFFICIENT)
        test_reward = evaluate(env_test, agent)
        policy_losses.append(policy_loss)
        value_losses.append(value_loss)
        train_rewards.append(train_reward)
        test_rewards.append(test_reward)
        mean_train_rewards = np.mean(train_rewards[-N_TRIALS:])
        mean_test_rewards = np.mean(test_rewards[-N_TRIALS:])
        mean_abs_policy_loss = np.mean(np.abs(policy_losses[-N_TRIALS:]))
        mean_abs_value_loss = np.mean(np.abs(value_losses[-N_TRIALS:]))
        if episode % PRINT_INTERVAL == 0:
            print(f'Episode: {episode:3} | \
                  Mean Train Rewards: {mean_train_rewards:3.1f} \
                  | Mean Test Rewards: {mean_test_rewards:3.1f} \
                  | Mean Abs Policy Loss: {mean_abs_policy_loss:2.2f} \
                  | Mean Abs Value Loss: {mean_abs_value_loss:2.2f}')
        if mean_test_rewards >= REWARD_THRESHOLD:
            print(f'Reached reward threshold in {episode} episodes')
            break
    plot_train_rewards(train_rewards, REWARD_THRESHOLD)
    plot_test_rewards(test_rewards, REWARD_THRESHOLD)
    plot_losses(policy_losses, value_losses)



In [None]:
run_ppo()