In [1]:
# Set up the Open AI Gym
import gym
env = gym.make("CartPole-v0")

obs = env.reset()
print(obs)
# env.render()

[-0.045353   -0.00858524  0.04259759 -0.0142772 ]


In [2]:
# Ask the environment what actions are possible (what is the action space)?
env.action_space

Discrete(2)

In [3]:
# Discreate(2) means the possible actions are integers 0 and 1 which, in this case, represent
#  accelerating left (0) or right (1)

# Since the poll is leaning toward the right, let's accelerate the cart toward the right
#  by using the step() method which executes the given action
action = 1
obs, reward, done, info = env.step(action)
print(obs, reward, done, info)
# env.render()

[-0.0455247   0.18590074  0.04231205 -0.29322163] 1.0 False {}


In [4]:
# Let's hardcode a simple policy that will
#  - Accelerate left when the pole is leaning toward the left and
#  - Accelerate right when the pole is leaning toward the right

def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1

totals = []
for episode in range(500):
    episode_rewards = 0
    obs = env.reset()
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)
    

import numpy as np
np.mean(totals), np.std(totals), np.min(totals), np.max(totals)

(41.886, 8.368811385137079, 24.0, 68.0)

In [5]:
# Tensorflow time!!!
import tensorflow as tf
from tensorflow import keras

# 1. Specify the neural network architecture
n_inputs = 4 #  == env.observation_space.shape[0]

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
    keras.layers.Dense(1, activation="sigmoid"),
])

In [6]:
# Use Keras to implement a common variant of the call of PG algorithms called REINFORCE algorithms

# function that will play one step of the pole balancing game
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        # start by calling the model, giving it a single observation. This outputs the probability of going left
        left_proba = model(obs[np.newaxis])
        # Sample a random float between 0 and 1 and see if it's greater than left_proba
        # The action will be False with probability left_proba, or True with probability 1 - left_proba.
        action = (tf.random.uniform([1, 1]) > left_proba)
        # Cast the boolean to a number. The action will be 0(left) or 1(right) with appropriate probabilities
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        # Compute the loss by using whichever loss_fn is provided
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    # Use tape to compute the gradies of the loss with regards to the model's trainable variables
    # These gradiants will be tweaked later, before we apply them, depending on how good or bad the action turned out to be
    grads = tape.gradient(loss, model.trainable_variables)
    # Play the selected option
    obs, reward, done, info = env.step(int(action[0, 0].numpy()))
    return obs, reward, done, grads

In [7]:
# Another function that will rely on play_one_step() to play multiple episodes,
#  returning all the rewards and gradients for each episode and each step
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

In [8]:
# Compute the sum of future discounted rewards at each step
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

# Normalize all the discounted rewards (returns) across many episodes by subtracting the mean and dividing by the standard deviation
def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor) for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards]

In [9]:
# Try out the discount rewards functionality
discount_rewards([10, 0, -50], discount_factor=0.8)

array([-22, -40, -50])

In [10]:
discount_and_normalize_rewards([[10, 0, -50], [10, 20]], discount_factor=0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [11]:
# Almost ready to run the algorithm!

# Define hyperparameters
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95

# Need an optimizer and loss function
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy

# We are now ready to build and run the training loop!
for iteration in range(n_iterations):
    # Play the game (in this case) 10 times and return all rewards and gradients for every episode and step
    all_rewards, all_grads = play_multiple_episodes(env, n_episodes_per_update, n_max_steps, model, loss_fn)
    # Compute each action's normalized advantage. This provides a measure of how good or bad each action was, in hindsight
    all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean([final_reward * all_grads[episode_index][step][var_index] 
                                         for episode_index, final_rewards in enumerate(all_final_rewards)
                                            for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    # Apply the mean gradients using the optimizer: the model's trainable variables will be tweaked
    #  and hopefully the policy will be a bit better
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))
