In [1]:
from collections import deque
from functools import partial
from matplotlib.animation import FuncAnimation
from tensorflow import keras
from tensorflow.keras import layers
from tqdm import tqdm

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import gym

mpl.rc("animation", html="jshtml")
%matplotlib inline

In [2]:
env = gym.make("CartPole-v1")

In [3]:
default_layer = partial(
    layers.Dense, kernel_initializer="he_normal", activation="elu"
)

model = keras.Sequential([
    default_layer(32, input_shape=env.observation_space.shape),
    default_layer(32),
    default_layer(16),
    default_layer(8),
    default_layer(4),
    default_layer(env.action_space.n)
])

In [4]:
batch_size = 32
discount_factor = 0.98
optimizer = keras.optimizers.Nadam(learning_rate=0.05)
mse_loss = keras.losses.MeanSquaredError()

replay_buffer = deque(maxlen=4096)

In [5]:
def epsilon_decay_schedule(episode):
    if episode < 50:
        return 1
    else:
        return np.reciprocal(np.log2(2 + episode - 50))

def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(env.action_space.n)
    else:
        q_values = model.predict(state[np.newaxis], verbose=0)
        return np.argmax(q_values)

In [6]:
def get_sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    samples = [replay_buffer[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in samples])
        for field_index in range(5)
    ]
    return states, actions, rewards, next_states, dones

In [7]:
def play_one_step(state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done, info = env.step(action)
    replay_buffer.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

In [8]:
def train_one_step(model, batch_size):
    states, actions, rewards, next_states, dones = get_sample_experiences(batch_size)
    next_q_values = model.predict(next_states, verbose=0)
    next_state_values = next_q_values.max(axis=-1)
    q_values = rewards + (1 - dones) * discount_factor * next_state_values
    q_values = q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, env.action_space.n)
    
    with tf.GradientTape() as tape:
        predicted_q_values = model(states)
        predicted_q_values = tf.reduce_sum(
            predicted_q_values * mask, axis=1, keepdims=True
        )
        loss = tf.reduce_mean(mse_loss(q_values, predicted_q_values))
    
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

In [9]:
n_episodes = 2000
best_score = 0

for episode in tqdm(range(n_episodes)):
    state = env.reset()
    for step in range(1, env.spec.max_episode_steps + 1):
        epsilon = epsilon_decay_schedule(episode)
        state, reward, done, info = play_one_step(state, epsilon)
        if done:
            break
    
    if step >= best_score:
        best_weights = model.get_weights()
        best_score = step
    
    if episode > 100:
        train_one_step(model, batch_size)

100%|███████████████████████████████████████| 2000/2000 [37:46<00:00,  1.13s/it]


In [10]:
print(f"Best Score: {best_score}")

Best Score: 500


In [11]:
def evaluate_policy(env, policy, n_episodes=100):
    min_steps = env.spec.max_episode_steps
    max_steps = 0
    total_steps = 0
    
    for _ in tqdm(range(n_episodes)):
        state = env.reset()
        for step in range(1, env.spec.max_episode_steps + 1):
            action = epsilon_greedy_policy(state)
            state, reward, done, info = env.step(action)
            if done:
                break
    
        total_steps += step
        if step < min_steps:
            min_steps = step
        if step > max_steps:
            max_steps = step
    
    return min_steps, max_steps, total_steps / n_episodes

In [12]:
model.set_weights(best_weights)
min_steps, max_steps, avg_steps = evaluate_policy(env, epsilon_greedy_policy)

100%|█████████████████████████████████████████| 100/100 [23:08<00:00, 13.88s/it]


In [13]:
print(f"The minimum number of steps in an episode: {min_steps}")
print(f"The maximum number of steps in an episode: {max_steps}")
print(f"The average number of steps in an episode: {avg_steps}")

The minimum number of steps in an episode: 500
The maximum number of steps in an episode: 500
The average number of steps in an episode: 500.0


In [14]:
def plot_one_episode(steps, interval=50, repeat=False):
    def update_frame(index):
        frame.set_data(steps[index])
        return frame,
        
    fig = plt.figure()
    plt.axis("off")
    
    frame = plt.imshow(steps[0])
    animation = FuncAnimation(
        fig, update_frame, frames=len(steps), interval=interval, repeat=repeat
    )
    plt.close()
    return animation

In [15]:
def play_one_episode(env, policy):
    steps = []
    state = env.reset()
    for step in range(env.spec.max_episode_steps):
        steps.append(env.render(mode="rgb_array"))
        action = epsilon_greedy_policy(state)
        state, reward, done, info = env.step(action)
        if done:
            break
    return steps

In [None]:
steps = play_one_episode(env, epsilon_greedy_policy)
plot_one_episode(steps)