# CartPole (Reinforcement Learning)

In [31]:
import sys

# Colab setup
if 'google.colab' in sys.modules:
    %pip install -q -U gymnasium
    %pip install -q -U gymnasium[classic_control,box2d,atari,accept-rom-license]
    
from pathlib import Path
import numpy as np
import sklearn
import tensorflow as tf
from tensorflow import keras
import gymnasium as gym
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import seaborn as sns

# Make notebook output stable across runs
random_state = 1000
np.random.seed(random_state)
tf.random.set_seed(random_state)

# Plot settings
%matplotlib inline
sns.set()
mpl.rc('font', size=14)
mpl.rc('axes', labelsize=14, titlesize=14)
mpl.rc('legend', fontsize=14)
mpl.rc('xtick', labelsize=10)
mpl.rc('ytick', labelsize=10)
mpl.rc('animation', html='jshtml')

In [32]:
# Utility functions

def print_cart(obs, reward):
    print(f'Last reward: {reward: .1f}')
    print((f'Cart horizontal position: {obs[0]: .3f}\t'
           f'Cart velocity:         {obs[1]: .3f}\n'
           f'Pole angle:               {obs[2]: .3f}\t'
           f'Pole angular velocity: {obs[3]: .3f}\n'))


def plot_environment(env, figsize=(5, 4)):
    plt.figure(figsize=figsize)
    img = env.render()
    plt.imshow(img)
    plt.axis("off")
    return img

    
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch


def plot_animation(frames, repeat=False, interval=100):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(fig, update_scene, fargs=(frames, patch),
                                   frames=len(frames), repeat=repeat, 
                                   interval=interval)
    plt.close()
    return anim    

## CartPole: A Simple Policy

In [33]:
env = gym.make('CartPole-v1', render_mode='rgb_array')
obs, info = env.reset(seed=random_state)

Try a simple policy. If the pole is tilting left, then push the cart to the left. If the pole is tilting right, then push the cart to the right.

At each step, we observe the state of the system:

- The cart's horizontal position
- The cart's velocity
- The angle of the pole in radians (where vertical is 0)
- The pole's angular velocity

In [34]:
def simple_policy(obs):
    angle = obs[2]
    
    # If the angle is negative, then the pole is leaning left
    # Action 0 corresponds to pushing the cart to the left
    if angle < 0:
        return 0
    else:
        return 1

In [45]:
# Run 1,000 episodes to see how the policy performs
num_episodes = 1000
max_steps_per_episode = 500

episode_rewards = []
for episode in range(num_episodes):
    obs, info = env.reset()
    
    cumulative_reward = 0
    for step in range(max_steps_per_episode):
        action = simple_policy(obs)
        obs, reward, done, truncated, info = env.step(action)
        cumulative_reward += reward
        if done:
            break
    
    episode_rewards.append(cumulative_reward)

In [46]:
# Calculate the mean and standard deviation of the per-episode rewards
print(f'Reward mean:   {np.mean(episode_rewards):.3f}')
print(f'Reward stdev:  {np.std(episode_rewards):.3f}')

# Calculate the min and max of the per-episode rewards
print(f'Reward min:    {np.min(episode_rewards):.3f}')
print(f'Reward max:    {np.max(episode_rewards):.3f}')

Reward mean:   42.242
Reward stdev:  9.096
Reward min:    24.000
Reward max:    72.000


In [48]:
# Visualize a single episode
frames = []
obs, info = env.reset(seed=random_state)
for step in range(max_steps_per_episode):
    img = env.render()
    frames.append(img)
    action = simple_policy(obs)

    obs, reward, done, truncated, info = env.step(action)
    if done:
        break

plot_animation(frames)

## CartPole: A Parameterized Policy

In [59]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [60]:
def parm_policy(obs, theta=0.1):
    # Angle ranges from -0.2095 to 0.2095
    angle = obs[2]
    
    prob_left = 1 - sigmoid(angle / theta)

    if np.random.uniform() < prob_left:
        return 0
    else:
        return 1

### High Theta

In [61]:
# Use theta = 1.5
theta = 1.5

# Run 1,000 episodes to see how the policy performs
num_episodes = 1000
max_steps_per_episode = 500

episode_rewards = []
for episode in range(num_episodes):
    obs, info = env.reset()
    
    cumulative_reward = 0
    for step in range(max_steps_per_episode):
        action = parm_policy(obs, theta)
        obs, reward, done, truncated, info = env.step(action)
        cumulative_reward += reward
        if done:
            break
    
    episode_rewards.append(cumulative_reward)
    
# Calculate the mean and standard deviation of the per-episode rewards
print(f'Reward mean:   {np.mean(episode_rewards):.3f}')
print(f'Reward stdev:  {np.std(episode_rewards):.3f}')

# Calculate the min and max of the per-episode rewards
print(f'Reward min:    {np.min(episode_rewards):.3f}')
print(f'Reward max:    {np.max(episode_rewards):.3f}')

Reward mean:   22.950
Reward stdev:  13.117
Reward min:    8.000
Reward max:    99.000


### Low Theta

In [62]:
# Use theta = 0.05
theta = 0.05

# Run 1,000 episodes to see how the policy performs
num_episodes = 1000
max_steps_per_episode = 500

episode_rewards = []
for episode in range(num_episodes):
    obs, info = env.reset()
    
    cumulative_reward = 0
    for step in range(max_steps_per_episode):
        action = parm_policy(obs, theta)
        obs, reward, done, truncated, info = env.step(action)
        cumulative_reward += reward
        if done:
            break
    
    episode_rewards.append(cumulative_reward)
    
# Calculate the mean and standard deviation of the per-episode rewards
print(f'Reward mean:   {np.mean(episode_rewards):.3f}')
print(f'Reward stdev:  {np.std(episode_rewards):.3f}')

# Calculate the min and max of the per-episode rewards
print(f'Reward min:    {np.min(episode_rewards):.3f}')
print(f'Reward max:    {np.max(episode_rewards):.3f}')

Reward mean:   49.412
Reward stdev:  27.059
Reward min:    10.000
Reward max:    232.000


### Best Theta

In [63]:
best_theta = 0.05

# Visualize a single episode
frames = []
obs, info = env.reset(seed=random_state)
for step in range(max_steps_per_episode):
    img = env.render()
    frames.append(img)
    action = parm_policy(obs, best_theta)

    obs, reward, done, truncated, info = env.step(action)
    if done:
        break

plot_animation(frames)

## CartPole: A Neural Network Learns the Simple Policy

In [64]:
keras.backend.clear_session()

# Create the neural network
n_inputs = 4  # Inputs correspond to the 4 components of the observations
model = keras.models.Sequential([
    keras.layers.Dense(5, activation='elu', input_shape=[n_inputs]),
    keras.layers.Dense(1, activation='sigmoid')
]) 

In [65]:
# Function to visualize network policy
def render_policy_net(model, max_steps=max_steps_per_episode,
                      seed=random_state):
    frames = []

    env = gym.make('CartPole-v1', render_mode='rgb_array')

    np.random.seed(seed)
    obs, info = env.reset(seed=seed)
    
    for step in range(max_steps):
        frames.append(env.render())

        left_proba = model.predict(obs.reshape(1, -1), verbose=0)
        action = int(np.random.rand() > left_proba)
        
        obs, _, done, _, _ = env.step(action)
        if done:
            break
            
    env.close()
    return frames

In [66]:
# Visualize the randomly initialized network policy
frames = render_policy_net(model)
plot_animation(frames)

In [67]:
# Run 50 environments for 10,000 iterations in parallel
# Have the neural network learn the "push left if leaning left" policy

n_environments = 50
n_iterations = 10_000
output_interval = 1_000

envs = [gym.make('CartPole-v1', render_mode='rgb_array')
        for _ in range(n_environments)]

np.random.seed(random_state)
observations = [env.reset(seed=random_state)[0] for env in envs]
rewards = np.zeros(n_environments)

optimizer = keras.optimizers.RMSprop()
loss_fn = keras.losses.binary_crossentropy

for iteration in range(n_iterations):    
    # if angle < 0, we want proba(left) = 1
    target_probas = np.array([
        [1.] if obs[2] < 0 else [0.] for obs in observations
    ])

    # Watch these calculations so we can easily access the gradients later
    with tf.GradientTape() as tape:
        left_probas = model(np.array(observations))
        loss = tf.reduce_mean(loss_fn(target_probas, left_probas))
    
    if iteration % output_interval == 0:
        print(f'Iteration: {iteration}\tLoss: {loss.numpy():.3f}')
    
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    actions = (np.random.rand(n_environments, 1) > left_probas.numpy())
    actions = actions.astype(np.int32)
    
    for env_index, env in enumerate(envs):
        obs, reward, done, _, _ = env.step(actions[env_index][0])
        if done:
            env.reset()
        else:
            observations[env_index] = obs

for env in envs:
    env.close()  

Iteration: 0	Loss: 0.700
Iteration: 1000	Loss: 0.485
Iteration: 2000	Loss: 0.346
Iteration: 3000	Loss: 0.170
Iteration: 4000	Loss: 0.179
Iteration: 5000	Loss: 0.114
Iteration: 6000	Loss: 0.061
Iteration: 7000	Loss: 0.046
Iteration: 8000	Loss: 0.026
Iteration: 9000	Loss: 0.069


In [68]:
frames = render_policy_net(model)
plot_animation(frames)

In [69]:
# Evaluate rewards over 100 episodes
episode_rewards = []
for episode in range(100):
    obs, _ = env.reset()
    
    cumulative_reward = 0
    for step in range(max_steps_per_episode):
        left_proba = model.predict(obs.reshape(1, -1), verbose=0)
        action = int(np.random.rand() > left_proba)
        obs, reward, done, _, _ = env.step(action)
        cumulative_reward += reward
        if done:
            break
    
    episode_rewards.append(cumulative_reward)
    
# Calculate the mean and standard deviation of the per-episode rewards
np.mean(episode_rewards), np.std(episode_rewards)    

(41.28, 9.055473482927328)

In [70]:
# Calculate the min and max of the per-episode rewards
np.min(episode_rewards), np.max(episode_rewards)   

(25.0, 66.0)

In [72]:
# Show prediction for an example state
obs = [-1, 1, -0.2, 0.2]
model.predict(np.array(obs).reshape(1, -1))



array([[0.9999998]], dtype=float32)

## CartPole: Policy Gradients, REINFORCE

In [21]:
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = (tf.random.uniform([1, 1]) > left_proba)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, truncated, info = env.step(int(action[0, 0].numpy()))
    return obs, reward, done, grads

In [22]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs, info = env.reset(seed=random_state)
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

In [23]:
def discount_rewards(rewards, discount_rate):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_rate
    return discounted

In [24]:
def discount_and_normalize_rewards(all_rewards, discount_rate):
    all_discounted_rewards = [discount_rewards(rewards, discount_rate)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
            for discounted_rewards in all_discounted_rewards]

In [25]:
n_iterations = 300
n_episodes_per_update = 10
n_max_steps = 500
discount_rate = 0.95

In [26]:
optimizer = keras.optimizers.Adam(learning_rate=0.01)
loss_fn = keras.losses.binary_crossentropy

In [27]:
keras.backend.clear_session()
np.random.seed(random_state)
tf.random.set_seed(random_state)

model = keras.models.Sequential([
    keras.layers.Dense(5, activation='elu', input_shape=[4]),
    keras.layers.Dense(1, activation='sigmoid'),
])

In [28]:
env = gym.make('CartPole-v1', render_mode='rgb_array')

for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    total_rewards = sum(map(sum, all_rewards))                     
    print('\rIteration: {}, mean rewards: {:.1f}'.format(          
        iteration, total_rewards / n_episodes_per_update), end='')
    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                       discount_rate)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

env.close()

Iteration: 299, mean rewards: 500.0

In [29]:
frames = render_policy_net(model)
plot_animation(frames)