In [None]:
import gym
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import random

from keras.models import Sequential
from keras.layers import Dense, Activation

from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython import display

In [None]:
def display_frames_as_gif(frames):
    """
    Displays a list of frames as a gif, with controls
    """
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    f = len(frames)
    a = animation.FuncAnimation(plt.gcf(), animate, frames=f, interval=17)
    display.display(display_animation(a, default_mode='once'))

In [None]:
"""
Environment Parameters
"""
observation_space = 4
action_space = 2
env_name = 'CartPole-v0'
seed = 12

"""
Agent Parameters
"""
episodes = 1000
epsilon_init = 1.0
epsilon_decay = 0.002
epsilon_min = 0.1
gamma = 0.99

"""
Model Parameters
"""
batch_size = 128
hidden_dims = 16
activation = 'relu'
optim = 'rmsprop'
loss_func = 'mse'

In [None]:
cartpole_env = gym.make(env_name)
cartpole_env.seed(seed)
random.seed(seed)
np.random.seed(seed)

In [None]:
def build_dqn():
    dqn = Sequential()
    dqn.add(Dense(hidden_dims, input_shape=(observation_space,)))
    dqn.add(Activation(activation))
    dqn.add(Dense(hidden_dims))
    dqn.add(Activation(activation))
    dqn.add(Dense(action_space))
    dqn.compile(optimizer=optim, loss=loss_func)
    return dqn

simple_dqn = build_dqn()
simple_buffer = []

In [None]:
def train_dqn(model, buffer):
    batch = random.sample(buffer, batch_size)
    inputs = []
    labels = []
    for sample in batch:
        s, a, r, s_p, d = sample
        inputs.append(s)
        label = model.predict(s)
        label[0][a] = r
        if not d:
            label[0][a] += gamma*np.max(model.predict(s_p))
        labels.append(label)
    inputs = np.squeeze(np.array(inputs), axis=1)
    labels = np.squeeze(np.array(labels), axis=1)
    model.fit(inputs, labels, verbose=0)

In [None]:
def run_episode(env, model, buffer, epsilon=0.0,
                training=False, render=False):
    total_reward = 0
    done = False
    state = np.expand_dims(env.reset(), 0)
    frames = []
    while not done:
        if render:
            frames.append(env.render(mode='rgb_array'))
        if training and random.random() < epsilon:
            action = env.action_space.sample()
        else:
            q_values = model.predict(state)
            action = np.argmax(q_values)
        next_state, reward, done, _ = env.step(action)
        next_state = np.expand_dims(next_state, 0)
        total_reward += reward
        if training:
            exp_tuple = (state, action, reward, next_state, done)
            buffer.append(exp_tuple)
        state = next_state
    if training:
        train_dqn(model, buffer)
    epsilon -= epsilon_decay
    epsilon = max(epsilon, epsilon_min)
    if render:
        frames.append(env.render(mode='rgb_array'))
        display_frames_as_gif(frames)
    return total_reward, epsilon

In [None]:
run_episode(cartpole_env, simple_dqn, simple_buffer, render=True)

In [None]:
def warmup_buffer(env, buffer):
    for _ in range(50):
        done = False
        state = np.expand_dims(env.reset(), 0)
        while not done:
            action = env.action_space.sample()
            next_state, reward, done, _ = env.step(action)
            next_state = np.expand_dims(next_state, 0)
            exp_tuple = (state, action, reward, next_state, done)
            buffer.append(exp_tuple)
            state = next_state

In [None]:
warmup_buffer(cartpole_env, simple_buffer)
eps = epsilon_init
sum_reward = 0
for episode in range(1, episodes+1):
    r, eps = run_episode(cartpole_env, simple_dqn, simple_buffer,
                         epsilon=eps, training=True, render=False)
    sum_reward += r
    if episode % 10 == 0:
        avg = sum_reward / 10
        sum_reward = 0
        start = episode - 9
        print("Average total reward for episode " + \
              f"{start} through {episode}: {avg}")

In [None]:
run_episode(cartpole_env, simple_dqn, simple_buffer, render=True)

In [None]:
"""
Target Network Parameters
"""
target_decay = 0.99

In [None]:
better_dqn = build_dqn()
target_net = build_dqn()

simple_buffer = []

In [None]:
def update_target(model, target):
    model_weights = model.get_weights()
    target_weights = target.get_weights()
    for i in range(len(target_weights)):
        target_weights[i] = target_decay * target_weights[i] + \
                            (1 - target_decay) * model_weights[i]
    target.set_weights(target_weights)

In [None]:
def run_episode(env, model, target, buffer, epsilon=0.0,
                training=False, render=False):
    total_reward = 0
    done = False
    state = np.expand_dims(env.reset(), 0)
    frames = []
    while not done:
        if render:
            frames.append(env.render(mode='rgb_array'))
        if training and random.random() < epsilon:
            action = env.action_space.sample()
        else:
            q_values = model.predict(state)
            action = np.argmax(q_values)
        next_state, reward, done, _ = env.step(action)
        next_state = np.expand_dims(next_state, 0)
        total_reward += reward
        if training:
            exp_tuple = (state, action, reward, next_state, done)
            buffer.append(exp_tuple)
            if len(buffer) > 10000:
                buffer.pop(0)
        state = next_state
    if training:
        train_dqn(model, target, buffer)
        update_target(model, target)
    epsilon -= epsilon_decay
    epsilon = max(epsilon, epsilon_min)
    if render:
        frames.append(env.render(mode='rgb_array'))
        display_frames_as_gif(frames)
    return total_reward, epsilon

In [None]:
def train_dqn(model, target, buffer):
    batch = random.sample(buffer, batch_size)
    inputs = []
    labels = []
    for sample in batch:
        s, a, r, s_p, d = sample
        inputs.append(s)
        label = model.predict(s)
        label[0][a] = r
        if not d:
            label[0][a] += gamma*np.max(target.predict(s_p))
        labels.append(label)
    inputs = np.squeeze(np.array(inputs), axis=1)
    labels = np.squeeze(np.array(labels), axis=1)
    model.fit(inputs, labels, batch_size=batch_size, verbose=0)

In [None]:
warmup_buffer(cartpole_env, simple_buffer)
eps = epsilon_init
sum_reward = 0
for episode in range(1, episodes+1):
    r, eps = run_episode(cartpole_env, better_dqn, target_net, simple_buffer,
                         epsilon=eps, training=True, render=False)
    sum_reward += r
    if episode % 10 == 0:
        avg = sum_reward / 10
        sum_reward = 0
        start = episode - 9
        print("Average total reward for episode " + \
              f"{start} through {episode}: {avg}")

In [None]:
run_episode(cartpole_env, better_dqn, target_net, simple_buffer, render=True)

In [None]:
better_dqn.get_weights()[0]

In [None]:
len(simple_buffer)