In [1]:
import sys
import os
import numpy as np
import sklearn
import tensorflow as tf
from tensorflow import keras
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import gym

In [2]:
# Choose the 'Acrobot-v1' environment
environ = gym.make('Acrobot-v1')

In [3]:
environ.seed(1)
observe = environ.reset()
print(observe) # Represents the 2-d positions of joint-nodes and angular velocities 

[ 0.99811082  0.06143937  0.99999579  0.00290001 -0.06177637 -0.06262504]


In [4]:
environ.action_space # 0,1 or 2

Discrete(3)

In [5]:
action = 0 # any isolated action from environ.reset() has a reward of -1
observe, reward, done, info = environ.step(action)
reward

-1.0

In [6]:
# A neural net that will take as input a state, and output an approximate Q-value for each possible action
# It is a guess that the number of hidden layers should match the number of possible actions.
keras.backend.clear_session()
tf.random.set_seed(1)
np.random.seed(1)

in_shape = [environ.observation_space.shape[0]]
n_out = environ.action_space.n

model = keras.models.Sequential([
    keras.layers.Dense(32, activation="relu", input_shape=in_shape),
    keras.layers.Dense(32, activation="relu"),
    keras.layers.Dense(32, activation="relu"),
    keras.layers.Dense(n_out, activation="softmax")
])

2021-08-06 20:22:38.996430: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [7]:
#Sometimes the best way to behave is with a little randomness. Here's our way of introducing that policy.
def epsilon_greedy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(n_out)
    else:
        Q_values = model.predict(state[np.newaxis])
        return np.argmax(Q_values[0])

In [8]:
# Here we create a linked list, or a deque, which will store the experience of our agent. We will perform gradient
# descent on a sample of a fixed size from this deque of experiences. Geron calls this a 'replay buffer'
from collections import deque
rep_buff = deque(maxlen=2000)

In [9]:
# A function that samples from the deque of experiences. The sample size is 'batch_size'
def sample_experiences(batch_size):
    indices = np.random.randint(len(rep_buff), size=batch_size)
    batch = [rep_buff[k] for k in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[j] for experience in batch])
        for j in range(5)]
    return states, actions, rewards, next_states, dones

In [10]:
# A function that allows the agent to peform one step, using our epsilon_greedy function from states to actions.
def one_step(environ, state, epsilon):
    action = epsilon_greedy(state, epsilon)
    next_state, reward, done, info = environ.step(action)
    rep_buff.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

In [11]:
# We set the sample size of experiences at 32. I chose a discount_rate of .99 (the future is important for the acrobot!)
# Following Geron's notebook, I chose a learning_rate of 1/100 for the Adam optimizer, which is 
# a kind of stochastic gradient descent. The training_step function is also borrowed from Geron's notebook.
batch_size = 32
discount_rate = 0.99
optimizer = keras.optimizers.Adam(learning_rate=1e-2)
loss_fn = keras.losses.mean_squared_error

def training_step(batch_size):
    states, actions, rewards, next_states, dones = sample_experiences(batch_size)
    max_next_Q_values = np.max(model.predict(next_states), axis=1)
    target_Q_values = (rewards + (1 - dones) * discount_rate * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_out)
    with tf.GradientTape() as tape:
        all_Q_values = model(states)
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [12]:
# Making sure we have a handle on the psuedo-randomness
environ.seed(1)
np.random.seed(1)
tf.random.set_seed(1)

In [13]:
# Training phase. epsilon will start near 1 then about half-way through the episodes be around .01.
# I wanted each episode to have enough steps (N_steps) such that the agent had sufficient time to 'learn'.
N_steps = 300 
n_steps = N_steps # initialize n_steps
for episode in range(500):
    observe = environ.reset()
    for step in range(N_steps):
        epsilon = max(1 - episode /300, 0.01)
        observe, reward, done, info = one_step(environ, observe, epsilon)
        if done:
            break
        environ.render()
    print("\rEpisode: {}, Steps: {}, epsilon: {:.3f}".format(episode, step + 1, epsilon), end="")
    if step < n_steps: 
        best_weights = model.get_weights() 
        n_steps = step
        best_epsilon = epsilon
    if done: 
        print(" It won!")
    if episode > 50:
        training_step(batch_size)

model.set_weights(best_weights)

2021-08-06 20:22:39.665 Python[10033:2045704] ApplePersistenceIgnoreState: Existing state will not be touched. New state will be written to (null)


Episode: 2, Steps: 300, epsilon: 0.993

2021-08-06 20:22:57.769274: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)


Episode: 85, Steps: 249, epsilon: 0.717 It won!
Episode: 86, Steps: 216, epsilon: 0.713 It won!
Episode: 88, Steps: 199, epsilon: 0.707 It won!
Episode: 89, Steps: 272, epsilon: 0.703 It won!
Episode: 90, Steps: 212, epsilon: 0.700 It won!
Episode: 91, Steps: 175, epsilon: 0.697 It won!
Episode: 92, Steps: 228, epsilon: 0.693 It won!
Episode: 117, Steps: 191, epsilon: 0.610 It won!
Episode: 499, Steps: 300, epsilon: 0.010

In [40]:
observe = environ.reset()
environ.render()

True

In [41]:
# Let's test the agent. Take 1. See 'Test_Run_1.gif' file in repository or READ_ME
for step in range(400):
    observe, reward, done, info = one_step(environ, observe, 0)
    if done:
        break
    environ.render()
print(step) 


111


In [42]:
#Take 2. See 2nd .gif file in repository
observe = environ.reset()
for step in range(400):
    observe, reward, done, info = one_step(environ, observe, 0)
    if done:
        break
    environ.render()
print(step) 


82


In [43]:
#Take 3
observe = environ.reset()
for step in range(400):
    observe, reward, done, info = one_step(environ, observe, 0)
    if done:
        break
    environ.render()
print(step) 


96


In [44]:
#Take 4
observe = environ.reset()
for step in range(400):
    observe, reward, done, info = one_step(environ, observe, 0)
    if done:
        break
    environ.render()
print(step) 


83


In [45]:
#Take 5
observe = environ.reset()
for step in range(400):
    observe, reward, done, info = one_step(environ, observe, 0)
    if done:
        break
    environ.render()
print(step) 


101
