In [1]:
from tqdm import tqdm

import gym
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. Lunar Lander Environment

In [3]:
""" Environment Information
ref: https://github.com/openai/gym/blob/master/gym/envs/box2d/lunar_lander.py#L75

Action Space
    There are four discrete actions available: do nothing, fire left
    orientation engine, fire main engine, fire right orientation engine.
Observation Space
    The state is an 8-dimensional vector: the coordinates of the lander in `x` & `y`, its linear
    velocities in `x` & `y`, its angle, its angular velocity, and two booleans
    that represent whether each leg is in contact with the ground or not.
Rewards
    For each step, the reward:
    - is increased/decreased the closer/further the lander is to the landing pad.
    - is increased/decreased the slower/faster the lander is moving.
    - is decreased the more the lander is tilted (angle not horizontal).
    - is increased by 10 points for each leg that is in contact with the ground.
    - is decreased by 0.03 points each frame a side engine is firing.
    - is decreased by 0.3 points each frame the main engine is firing.
    The episode receive an additional reward of -100 or +100 points for crashing or landing safely respectively.
    An episode is considered a solution if it scores at least 200 points.
"""
env = gym.make("LunarLander-v2")
# env.reset()
# env.render()

Sample environment image

<img width=300 src="lunar_lander.png" />

In [14]:
print("observation_space: ", env.observation_space)
print("action_space: ", env.action_space)
state = env.reset()
print("sample obs: ", state)

observation_space:  Box([-inf -inf -inf -inf -inf -inf -inf -inf], [inf inf inf inf inf inf inf inf], (8,), float32)
action_space:  Discrete(4)
sample obs:  [-0.00464611  1.4173794  -0.47061905  0.2870732   0.00539051  0.10660225
  0.          0.        ]


In [7]:
# sample step
new_state, reward, done, info = env.step(1)
print("sample step: ", (new_state, reward, done, info))

sample step:  (array([ 2.9697418e-04,  1.4097712e+00,  9.2715714e-03, -3.8273171e-02,
        1.5079111e-03,  3.4357999e-02,  0.0000000e+00,  0.0000000e+00],
      dtype=float32), -1.728385070643468, False, {})


# 2. REINFORCE: Vanilla Policy Gradient

In [112]:
class VanillaNN(nn.Module):
    
    def __init__(self, n_observations, n_actions):
        super(VanillaNN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        x = F.softmax(self.layer3(x), dim=-1)  # dim need to be -1 to prevent NaN results
        return x

In [113]:
sample_nn = VanillaNN(n_observations=8, n_actions=4)
pred = sample_nn(torch.tensor(state).float())
print(pred)

tensor([0.2580, 0.2447, 0.2471, 0.2503], grad_fn=<SoftmaxBackward0>)


In [135]:
class Reinforce:
    def __init__(self, env, gamma=0.99, learning_rate=3e-3):
        self.env = env
        self.gamma = gamma
        self.learning_rate = learning_rate

        # get number of actions and observations
        self._n_observations = self.env.observation_space.shape[0]
        self._n_actions = self.env.action_space.n

        # setup NN model
        self.policy_net = VanillaNN(self._n_observations, self._n_actions).to(device)

        # setup optimizer and loss function
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.learning_rate)
        self.criterion = self.loss_function
        
    def loss_function(self, prob_batch, expected_returns_batch):
        return - torch.sum(torch.log(prob_batch) * expected_returns_batch)

    def get_trajectory(self, max_steps=500, render=False):
        trajectory = list()
        done = False  # incase of early loop-termination (max_steps) before the environment terminated
        state = self.env.reset()

        for _ in range(max_steps):
            action_probs = self.policy_net(torch.from_numpy(state).float())
            selected_action = np.random.choice(np.array([0, 1, 2, 3]), p=action_probs.data.numpy())
            next_state, reward, done, _ = self.env.step(selected_action)
            trajectory.append((state, selected_action, reward))
            prev_state = state
            state = next_state
            
            if render:
                self.env.render()

            if done:
                break

        return trajectory, done

    def train(self, num_episodes=1000, max_steps=500):
        for episode in tqdm(range(num_episodes)):
            # get sample trajectory from the policy network
            trajectory, _ = self.get_trajectory(max_steps=max_steps)
            
            # prepare data for training
            states, actions, rewards = zip(*trajectory)
            states = torch.Tensor(states)
            actions = torch.Tensor(actions)
            
            # calculate expected discounted returns
            expected_returns_batch = list()
            for idx in range(len(rewards)):
                discounted_rewards = [self.gamma**i * reward for i, reward in enumerate(rewards[idx:])]
                expected_returns_batch.append(sum(discounted_rewards))
            
            # normalize and reformat expected returns
            expected_returns_batch = torch.FloatTensor(expected_returns_batch)
            expected_returns_batch /= expected_returns_batch.max()
            
            # calculate loss
            action_probs = self.policy_net(states)
            prob_batch = action_probs.gather(dim=1,index=actions.long().view(-1,1)).squeeze() 
            loss = self.criterion(prob_batch, expected_returns_batch)
            
            # optimize the model with backpropagation
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
    def visualize_policy(self, num_episodes=1, max_steps=500):
        for _ in range(num_episodes):
            trajectory, done = self.get_trajectory(max_steps=max_steps, render=True)
            
    def evaluate_policy(self, num_episodes=100, max_steps=500):
        win = 0
        for _ in range(num_episodes):
            trajectory, done = self.get_trajectory(max_steps=max_steps)
            if done and trajectory[-1][2] == 100:
                win += 1
        return win / num_episodes

In [136]:
# TODO: need to train more
env = gym.make("LunarLander-v2")
reinforce_agent = Reinforce(env)
reinforce_agent.train(num_episodes=int(5e6), max_steps=500)

100%|██████████| 100000/100000 [58:03<00:00, 28.71it/s] 


In [141]:
# test the trained agent
test_episodes = 100
win_rate = reinforce_agent.evaluate_policy(num_episodes=test_episodes)
print(f'agent win rate: {win_rate*100 :.2f}% from {test_episodes} test episodes')

agent win rate: 10.00% from 100 test episodes


In [138]:
# visualize the trained agent on a separate window
reinforce_agent.visualize_policy(num_episodes=10)