In [1]:
import gym
# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

# Reinforcement Learning

cartpole

As the agent observes the current state of the environment and chooses an action, the environment transitions to a new state, and also returns a reward that indicates the consequences of the action. In this task, rewards are +1 for every incremental timestep and the environment terminates if the pole falls over too far or the cart moves more then 2.4 units away from center. This means better performing scenarios will run for longer duration, accumulating larger return.

The CartPole task is designed so that the inputs to the agent are 4 real values representing the environment state (position, velocity, etc.). However, neural networks can solve the task purely by looking at the scene, so we'll use a patch of the screen centered on the cart as an input. Because of this, our results aren't directly comparable to the ones from the official leaderboard - our task is much harder. Unfortunately this does slow down the training, because we have to render all the frames.

Strictly speaking, we will present the state as the difference between the current screen patch and the previous one. This will allow the agent to take the velocity of the pole into account from one image.

In [2]:
# Create an enviromnent using gym
env = gym.make("CartPole-v1")
obs = env.reset()
obs

array([ 0.04726964, -0.04846026, -0.02008476,  0.04727328])

In [3]:
try:
    import pyvirtualdisplay
    display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()
except ImportError:
    pass

In [4]:
img = env.render(mode="rgb_array")
img.shape

(800, 1200, 3)

In [6]:
# plot the state of the enviromnent
env.render()

True

In [5]:
# What is the space of actions
env.action_space

Discrete(2)

In [6]:
action = 1
obs, reward, done, info = env.step(action)
obs

array([ 0.04630044,  0.14694385, -0.0191393 , -0.25167829])

In [7]:
reward

1.0

In [8]:
done

False

In [9]:
info

{}

## Create a Policy for CartPole

A simple hard-coded policy

Let's hard code a simple strategy: if the pole is tilting to the left, then push the cart to the left, and vice versa. Let's see if that works:

In [10]:
def basic_policy(obs):
    angle = obs[2]
    action = 0 if angle < 0 else 1
    return action

totals = []
env = gym.make("CartPole-v1")

for episode in range(500):
    episode_rewards = 0
    obs = env.reset()
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)

In [11]:
import numpy as np
np.mean(totals), np.std(totals), np.min(totals), np.max(totals)

(42.0, 8.64175908018732, 24.0, 68.0)

In [12]:
env.seed(42)

frames = []

obs = env.reset()
for step in range(200):
    img = env.render(mode="rgb_array")
    frames.append(img)
    action = basic_policy(obs)

    obs, reward, done, info = env.step(action)
    if done:
        print(step)
        print("DONE")
        break

54
DONE


In [13]:

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.animation as animation
mpl.rc('animation', html='jshtml')

def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

In [14]:
plot_animation(frames)

## Neural Network Policies
Let's create a neural network that will take observations as inputs, and output the action to take for each observation. To choose an action, the network will estimate a probability for each action, then we will select an action randomly according to the estimated probabilities. In the case of the Cart-Pole environment, there are just two possible actions (left or right), so we only need one output neuron: it will output the probability p of the action 0 (left), and of course the probability of action 1 (right) will be 1 - p.

In [25]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

n_inputs = 4 # == env.observation_space.shape[0]

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
    keras.layers.Dense(1, activation="sigmoid"),
])

In [32]:
model.predict([[1,2,3,4]])

array([[0.5079533]], dtype=float32)

In [21]:
def render_policy_net(model, n_max_steps=200, seed=42):
    frames = []
    env = gym.make("CartPole-v1")
    env.seed(seed)
    np.random.seed(seed)
    obs = env.reset()
    for step in range(n_max_steps):
        frames.append(env.render(mode="rgb_array"))
        left_proba = model.predict(obs.reshape(1, -1))
        action = int(np.random.rand() > left_proba)
        obs, reward, done, info = env.step(action)
        if done:
            break
    env.close()
    return frames

In [23]:
frames = render_policy_net(model)
plot_animation(frames)

In [96]:

def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = (tf.random.uniform([1, 1]) > left_proba)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    #print(grads)
    obs, reward, done, info = env.step(int(action[0, 0].numpy()))
    return obs, reward, done, grads

In [84]:
seed=42
env = gym.make("CartPole-v1")
env.seed(seed)
obs = env.reset()

obs, reward, done, grads = play_one_step(env, obs, model, keras.losses.binary_crossentropy)

[<tf.Tensor: shape=(4, 5), dtype=float32, numpy=
array([[-2.2972822e-03,  1.9323071e-04, -5.2426341e-03,  3.0851464e-03,
        -2.8777502e-03],
       [-2.8587106e-04,  2.4045399e-05, -6.5238716e-04,  3.8391197e-04,
        -3.5810380e-04],
       [ 7.6804007e-03, -6.4601959e-04,  1.7527465e-02, -1.0314433e-02,
         9.6210539e-03],
       [-3.2955222e-04,  2.7719540e-05, -7.5207197e-04,  4.4257377e-04,
        -4.1282212e-04]], dtype=float32)>, <tf.Tensor: shape=(5,), dtype=float32, numpy=
array([ 0.18253168, -0.01535324,  0.41655606, -0.24513184,  0.22865306],
      dtype=float32)>, <tf.Tensor: shape=(5, 1), dtype=float32, numpy=
array([[-0.00885278],
       [ 0.00856712],
       [-0.01134038],
       [ 0.00214442],
       [ 0.00806441]], dtype=float32)>, <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.49459037], dtype=float32)>]


In [90]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

In [91]:
def discount_rewards(rewards, discount_rate):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_rate
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_rate):
    all_discounted_rewards = [discount_rewards(rewards, discount_rate)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
            for discounted_rewards in all_discounted_rewards]

In [92]:
discount_rewards([10, 0, -50], discount_rate=0.8)

array([-22, -40, -50])

In [93]:
discount_and_normalize_rewards([[10, 0, -50], [10, 20]], discount_rate=0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [94]:
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_rate = 0.95

In [87]:
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy

In [88]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[4]),
    keras.layers.Dense(1, activation="sigmoid"),
])

In [95]:
env = gym.make("CartPole-v1")
env.seed(42);

for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    total_rewards = sum(map(sum, all_rewards))                     # Not shown in the book
    print("\rIteration: {}, mean rewards: {:.1f}".format(          # Not shown
        iteration, total_rewards / n_episodes_per_update), end="") # Not shown
    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                       discount_rate)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

env.close()

ype=float32, numpy=
array([[-0.01823048,  0.13752963,  0.23638341, -0.235705  ,  0.26203477],
       [ 0.0113997 , -0.08599859, -0.1478128 ,  0.14738859, -0.16385284],
       [-0.00274172,  0.02068334,  0.03555014, -0.03544811,  0.03940789],
       [-0.00079723,  0.00601427,  0.01033722, -0.01030756,  0.01145898]],
      dtype=float32)>, <tf.Tensor: shape=(5,), dtype=float32, numpy=
array([-0.03304602,  0.24929707,  0.42848724, -0.4272575 ,  0.47498494],
      dtype=float32)>, <tf.Tensor: shape=(5, 1), dtype=float32, numpy=
array([[-0.06800482],
       [ 0.04278411],
       [-0.07247763],
       [ 0.08879156],
       [ 0.00859776]], dtype=float32)>, <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.3685539], dtype=float32)>]
[<tf.Tensor: shape=(4, 5), dtype=float32, numpy=
array([[-0.0308934 ,  0.23358868,  0.49781734, -0.3939007 ,  0.4450558 ],
       [ 0.0085701 , -0.06479957, -0.13809893,  0.10927154, -0.12346242],
       [-0.00473232,  0.03578162,  0.07625674, -0.06033856,  0.0

In [36]:
frames = render_policy_net(model)
plot_animation(frames)