In [1]:
import gym
import tensorflow as tf
from tensorflow import keras
import numpy as np

In [2]:
env = gym.make("CartPole-v1")
obs = env.reset()

In [None]:
img = env.render(mode='rgb_array')
img.shape

In [None]:
env.close()

In [None]:
env.action_space

In [None]:
# policy network
n_inputs = 4  # env.observation_space.shape[0] horizontal_pos, velocity, pole_angle, angular_velocity
model = keras.models.Sequential(
    [
        keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
        keras.layers.Dense(1, activation="sigmoid"),  # outputs the proba of going left
    ]
)


In [None]:
# hardcoded policy

def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1

totals = []
for episode in range(500): # playing the game 500 times
    episode_rewards = 0
    obs = env.reset() # env init
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)
# policy network
import tensorflow as tf
from tensorflow import keras

# action 0 is left
# action 1 is right

n_inputs = 4 # env.observation_space.shape[0]
model = keras.models.Sequential([ 
    keras.layers.Dense(5, activation='elu', input_shape = [n_inputs]),
    keras.layers.Dense(1, activation='sigmoid') # outputs the proba of going left
])

In [10]:
# one step
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis]) # model proba of going left
        action = (tf.random.uniform([1,1])>left_proba) # action left (0) with prob left_proba or right (1) with prob 1-left_proba
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32) # target prob of going left
        loss = tf.reduce_mean(loss_fn(y_target,left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(int(action[0,0].numpy()))
    return obs, reward, done, grads



In [11]:
# play multiple episodes, returning all the rewards and gradients for each episode

def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads


In [6]:
def discount_rewards(rewards, gamma):
    discounted = np.array(rewards)
    for step in range(len(rewards)-2, -1, -1):
        discounted[step] += discounted[step+1] * gamma
    return discounted

In [7]:
def discount_and_normalize_rewards(all_rewards, gamma):
    all_discounted_rewards = [discount_rewards(rewards, gamma)
                             for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards-reward_mean)/reward_std
            for discounted_rewards in all_discounted_rewards]

In [8]:
# hyperparams

n_iterations = 150
n_episodes_per_update = 10 # how many games to play
n_max_steps = 200 # how many steps does each game last
gamma = 0.95

optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy


In [12]:
for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)

    total_rewards = sum(map(sum,all_rewards))
    mean_rewards = total_rewards / n_episodes_per_update
    print(f"Iteration: {iteration}, mean rewards: {mean_rewards:.2f}")
    # how good or bad an action was
    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                       gamma)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

Iteration: 0, mean rewards: 77.30
Iteration: 1, mean rewards: 90.10
Iteration: 2, mean rewards: 97.20
Iteration: 3, mean rewards: 61.80
Iteration: 4, mean rewards: 79.70
Iteration: 5, mean rewards: 88.10
Iteration: 6, mean rewards: 105.80
Iteration: 7, mean rewards: 112.20
Iteration: 8, mean rewards: 106.50
Iteration: 9, mean rewards: 141.50
Iteration: 10, mean rewards: 126.20
Iteration: 11, mean rewards: 111.80
Iteration: 12, mean rewards: 142.60
Iteration: 13, mean rewards: 147.20
Iteration: 14, mean rewards: 173.80
Iteration: 15, mean rewards: 142.80
Iteration: 16, mean rewards: 132.30
Iteration: 17, mean rewards: 152.90
Iteration: 18, mean rewards: 140.20
Iteration: 19, mean rewards: 150.30
Iteration: 20, mean rewards: 131.50
Iteration: 21, mean rewards: 148.50
Iteration: 22, mean rewards: 192.40
Iteration: 23, mean rewards: 171.80
Iteration: 24, mean rewards: 150.00
Iteration: 25, mean rewards: 164.80
Iteration: 26, mean rewards: 197.40
Iteration: 27, mean rewards: 175.80
Iteratio

KeyboardInterrupt: 