In [None]:
#pip install tensorflow-cpu
#pip install gymnasium
#pip install stable-baselines3
#pip install tensorflow-probability

#for project
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from gymnasium import Wrapper
import time

#stable-baselines3 (what isaac used, for consistency)
from stable_baselines3.common.monitor import Monitor
#from stable_baselines3.common.evaluation import evaluate_policy

#for gradient policy
import tensorflow as tf
import tensorflow_probability as tfp
import tensorflow.keras as keras
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

In [None]:
#RUN THIS ONLY IF YOU WANT TO USE COMBINED REWARDS
#the only change we made is to +100 for leg touching the ground (instead of the default +10)
class CustomRewardWrapper(Wrapper):
    def __init__(self, env):
        super(CustomRewardWrapper, self).__init__(env)

    def step(self, action):
        state, original_reward, truncated, terminated, info = self.env.step(action)
        reward = self.custom_reward(state, terminated or truncated, action, original_reward)
        return state, reward, truncated, terminated, info

    def custom_reward(self, state, is_terminal, action, original_reward):
        (_, _, _, _, _, _, leg_1_contact, leg_2_contact) = state

        reward = original_reward # maintain original rewards
        # Reward for leg contact
        if leg_1_contact:
            reward += 90 # default was +10
        if leg_2_contact:
            reward += 90 # default was +10

        return reward

def make_env():
    env = Monitor(gym.make("LunarLander-v2", render_mode="human"))
    env = CustomRewardWrapper(env)
    return env

## Task 1: Install gym, and play Lunar Lander with random control

In [None]:
#render_mode='human' lets us visualise the lander crashing :)
def make_env():
    return gym.make("LunarLander-v2", render_mode="human")

env = make_env()
#lets watch the lander crash 10 times :)
episodes = 10
for episode in range(episodes):
    step = 0
    state = env.reset()
    done = False
    score = 0
    while not done:
        step+=1
        env.render()
        action = env.action_space.sample()
        n_state,reward,done,info,_ = env.step(action)
        score+=reward
        #print('Episode:{} Step:{} Score:{}'.format(episode+1, step, score))  # Print the episode, step, and current score
    print('Episode:{} score:{}'.format(episode+1,score))
env.close()

## Task 2: Implementing Policy Gradient

In [None]:
#Policy Gradient seeks to approximate agent's policy (probability distribution over the action space given state)

#Policy Gradient Neural Network
class PolicyGradient(keras.Model):
    def __init__(self, num_actions, fc1num=256, fc2num=256):
        super(PolicyGradient,self).__init__()
        #Number of neurons for each fully connected layer
        self.fc1num = fc1num
        self.fc2num = fc2num
        self.num_actions = num_actions
        #Creating the fc1, fc2, fco (output) layers
        self.fc1 = Dense(self.fc1num, activation='relu')
        self.fc2 = Dense(self.fc2num, activation='relu')
        self.fco = Dense(self.num_actions, activation='softmax')
        
    def call(self, state):
        #Feedforward, outputs policy pi (probabilities)
        value = self.fc1(state)
        value = self.fc2(value)
        pi = self.fco(value)
        return pi
    
#Agent functions
class Agent:
    def __init__(self, lrate=0.001, gamma=0.99, num_actions=4, fc1num=256, fc2num=256):
        #lrate is the learning rate
        self.lr=lrate
        #gamma is the discount factor (from 0 to 1)
        self.gamma=gamma
        #4 default actions (nothing, left, down, right)
        self.num_actions=num_actions
        #3 memory arrays (cleared after every episode)
        self.state_memory=[]
        self.action_memory=[]
        self.reward_memory=[]
        #the policy gradient network
        self.policy=PolicyGradient(num_actions=num_actions, fc1num=fc1num, fc2num=fc2num)
        self.policy.compile(optimizer=Adam(learning_rate=self.lr))
        
    def choose_action(self,observation):
        state=tf.convert_to_tensor([observation],dtype=tf.float32)
        #sample random action based on the probability distribution
        probs=self.policy(state)
        action_probs=tfp.distributions.Categorical(probs=probs)
        action=action_probs.sample()
        return action.numpy()[0]
    
    def store_transition(self, observation,action,reward):
        #store transition info to memory arrays
        self.state_memory.append(observation)
        self.action_memory.append(action)
        self.reward_memory.append(reward)
        
    def learn(self):
        #convert the actions and rewards to tensorflow tensor
        actions=tf.convert_to_tensor(self.action_memory,dtype=tf.float32)
        rewards=np.array(self.reward_memory)
        #calculate G, the discounted sum of future rewards at each time step
        G = np.zeros_like(rewards)
        for t in range(len(rewards)):
            G_sum=0
            discount=1
            for k in range(t,len(rewards)):
                G_sum+=rewards[k]*discount
                discount*=self.gamma
            G[t]=G_sum
        
        #normalizing rewards to promote stability (for nn)                                       
        #std = np.std(G) if np.std(G)>0 else 1
        #G = (G-np.mean(G))/std
        
        #calculate gradients wrt params of deep nn
        with tf.GradientTape() as tape:
            loss=0
            for idx,(g,state) in enumerate(zip(G,self.state_memory)):
                state=tf.convert_to_tensor([state],dtype=tf.float32)
                probs=self.policy(state)
                #clipping because we are taking the log (avoid log0)                              
                probs = tf.clip_by_value(probs, clip_value_min=1e-8, clip_value_max=1-1e-8)
                action_probs=tfp.distributions.Categorical(probs=probs)
                log_prob=action_probs.log_prob(actions[idx])
                loss += (-g)*tf.squeeze(log_prob)
                
        gradient=tape.gradient(loss,self.policy.trainable_variables)
        self.policy.optimizer.apply_gradients(zip(gradient,self.policy.trainable_variables))
        
        #reset memory (monte carlo method)
        self.state_memory = []
        self.action_memory = []
        self.reward_memory = []

### Default run

In [None]:
#hyperparams
lr0 = 0.0003
gamma0 = 0.99
episodes = 5000
fcdim = 256
time_threshold = 40

agent=Agent(lrate=lr0,gamma=gamma0,num_actions=4, fc1num=fcdim, fc2num=fcdim)
env=gym.make("LunarLander-v2")
scores=[]
avgscores=[]
for episode in range(episodes):
    start_time = time.time()
    state = env.reset()[0]
    done = False
    score = 0
    while not done:
        action = agent.choose_action(state)
        n_state,reward,done,info,_ = env.step(action)
        if time.time()-start_time>=time_threshold:
            done=True
            reward-=50
        agent.store_transition(observation=state,action=action,reward=reward)
        state=n_state
        score+=reward
    scores.append(score)
    agent.learn()
    ave_score = np.mean(scores[-50:])
    avgscores.append(ave_score)
    print('Episode:{} score:{} avgscore:{}'.format(episode+1,score,ave_score))

env.close()

In [None]:
#plot individual scores
plt.plot(range(len(scores)), scores, label='Scores', alpha=0.6)

#plot smooth scores (averaged over every 50 episodes)
#use red to distinguish it from training
plt.plot(range(len(avgscores)), avgscores, color='red', label='Averaged Scores', alpha=0.6)

plt.xlabel('Episodes')
plt.ylabel('Reward')
plt.title('Scores per Episode (Default Rewards, Default Hyperparams)')
plt.legend()
plt.show()

#finding the highest reward
print("Highest score is at %d episodes with a score of %.5f." %(scores.index(max(scores))+1, max(scores)))

In [None]:
#Save the model
agent.policy.save("policygradient_1.h5")

#Load a model
#model = keras.models.load_model("policygradient_1.h5")