In [1]:
import gym
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

In [2]:
env = gym.envs.make('CartPole-v1')
action_space = list(range(env.action_space.n))
policy_lr = 0.01 # learning rate for the policy net
policy_lr_decay = 0.001 # decay factor for the policy net learning rate

In [3]:
policy_network = tf.keras.models.Sequential([
    tf.keras.layers.Dense(16, activation='relu'),
    tf.keras.layers.Dense(env.action_space.n, activation='softmax')
])
policy_network.build([None, 4])

In [4]:
theta = policy_network.trainable_variables
gamma = 0.9

In [5]:
policy_optimizer = tf.keras.optimizers.SGD(lr=policy_lr, decay=policy_lr_decay)


In [6]:
def generate_episode_PG(env, policy_net, train=True, render=True):
    state = env.reset()
    done = False
    t = 0
    episode = []
    while not done:
        # unsqueeze state vector to [1 x state size]
        state_ = state[None, ...]

        # get the policy distribution for state
        action_distribution = policy_net(state_)

        # sample an action as per the policy distribution
        action = np.random.choice(action_space, p=action_distribution.numpy()[0])
        
        # act on the environment
        new_state, reward, done, __md = env.step(action)
        
        # append the experience tuple to the episode array
        episode.append((t, state, action, reward))
        if render:
            env.render()
        state = new_state
        t += 1
    return episode

In [7]:
NUM_EPISODES = 2000


In [8]:
rewards, average_rewards = [0], [0]
for episode_number in range(NUM_EPISODES):

    episode = generate_episode_PG(env, policy_network, render=False)

    G = 0

    # iterate over experience tuples
    for i, state, action, reward in reversed(episode):
        with tf.GradientTape() as tape:
            G = G * gamma + reward

            # calculate pi(s, a)
            action_probability = policy_network(state[None, ...])[0, action]

            # calculate log(pi(s,a))
            log_action_probability = tf.math.log(action_probability)

        # calculate grad(log(pi(s,a))) wrt theta
        grads = tape.gradient(log_action_probability, theta)

        # calculate PG -- the minus sign makes the optimizer ascend the PG
        policy_gradients = [- G * g for g in grads]

        # update the parameters
        policy_optimizer.apply_gradients(zip(policy_gradients, theta))


    # record the total rewards for this timestep
    # this produces a noisy plot
    rewards.append(len(episode))

    # record a rolling average of total rewards across episodes
    average_rewards.append(average_rewards[-1]*0.9 + len(episode)*0.1)

    # print stats every 50 episodes
    if not (episode_number+1) % 50:
        print(f'Episode: {episode_number+1}, average lifetime:{round(average_rewards[-1])}.')

env.close()



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Episode: 50, average lifetime:10.
Episode: 100, average lifetime:10.
Episode: 150, average lifetime:24.
Episode: 200, average lifetime:21.
Episode: 250, average lifetime:18.
Episode: 300, average lifetime:27.
Episode: 350, average lifetime:53.
Episode: 400, average lifetime:36.
Episode: 450, average lifetime:84.
Episode: 500, average lifetime:50.
Episode: 550, average lifetime:36.
Episode: 600, average lifetime:61.
Episode: 650, average lifetime:165.
Episode: 700, average lifetime:125.
Episode: 750, average lifetime:250.
Episode: 800, average lifetime:198.
Episode: 850, average lifetime:120.
Episode: 900, average lifetime:69.
Episode: 950, average lifetime:78.
Episode: 1000, average lifetim