In [1]:
import gym
import random
import numpy as np

import os
import tensorflow as tf

from collections import deque

from matplotlib import pyplot as plt
%matplotlib inline
print ("gym version:[%s]"%(gym.__version__))
print ("TF:[%s]"%(tf.__version__))

seed = 0
np.random.seed(seed)
random.seed(seed)

gym version:[0.21.0]
TF:[2.7.0]


In [2]:
import cv2
from matplotlib import animation
from IPython.display import display, HTML

def display_animation(anim):
    plt.close(anim._fig)
    return HTML(anim.to_jshtml())
def display_frames_as_gif(frames):
    patch = plt.imshow(frames[0])
    plt.axis('off')
    def animate(i):
        patch.set_data(frames[i])
    anim = animation.FuncAnimation(
        plt.gcf(),animate,frames=len(frames),interval=30)
    display(display_animation(anim))

# DEEP Q LEARNING AGENT

In [3]:
class DQNAgent:
    def __init__(self, obs_dim, n_action, seed=0,
                 discount_factor = 0.995, epsilon_decay = 0.999, epsilon_min = 0.01,
                 learning_rate = 1e-3, # Step size for Adam
                 batch_size = 64, 
                 memory_size = 2000, hidden_unit_size = 64):
        
        self.seed = seed 
        
        # Environment Information
        self.obs_dim = obs_dim
        self.n_action = n_action
        self.discount_factor = discount_factor
        
        # Epsilon Greedy Policy
        self.epsilon = 1.0
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        # Network Hyperparameters
        self.hidden_unit_size = hidden_unit_size
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.train_start = 1000

        # Experience Replay
        self.memory = deque(maxlen=memory_size)
        
        # Define Computational Graph in TF\
        self.build_model()
    
    def build_model(self): # Build networks
        hid1_size = self.hidden_unit_size
        hid2_size = self.hidden_unit_size
        
        q_prediction = tf.keras.Sequential()
        q_prediction.add(tf.keras.Input(shape=(self.obs_dim,)))
        q_prediction.add(tf.keras.layers.Dense(hid1_size,activation='relu',kernel_initializer='he_uniform'))
        q_prediction.add(tf.keras.layers.Dense(hid2_size,activation='relu',kernel_initializer='he_uniform'))
        q_prediction.add(tf.keras.layers.Dense(self.n_action, activation='linear'))
        q_prediction.build()
        
        q_target = tf.keras.Sequential()
        q_target.add(tf.keras.Input(shape=(self.obs_dim,)))
        q_target.add(tf.keras.layers.Dense(hid1_size,activation='relu',kernel_initializer='he_uniform'))
        q_target.add(tf.keras.layers.Dense(hid2_size,activation='relu',kernel_initializer='he_uniform'))
        q_target.add(tf.keras.layers.Dense(self.n_action, activation='linear'))
        q_target.build()
        
        q_target.set_weights(q_prediction.get_weights())
        
        self.q_prediction = q_prediction
        self.q_target = q_target
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate,beta_1=0.9,beta_2=0.999,epsilon=1e-07)
        
    def update_target(self): # Update parameters
        self.q_target.set_weights(self.q_prediction.get_weights())
                
    def update_policy(self):
        if self.epsilon > self.epsilon_min: # Update epsilon
            self.epsilon *= self.epsilon_decay
        
    def get_prediction(self, obs): # Get Q value from prediction network
        q_value=self.q_prediction(obs)
        return q_value

    def get_action(self, obs): # Epsilon Greedy policy
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.n_action)
        else:
            q_value = self.get_prediction(obs)
            return np.argmax(q_value[0])

    def add_experience(self, obs, action, reward, next_obs, done): # Add experience to memory
        self.memory.append((obs, action, reward, next_obs, done))

    def train_model(self):
        loss = np.nan
        n_entries = len(self.memory)
            
        if n_entries > self.train_start: # Start training when the number of experience is greater than batch size
            
            # Randomly sample batch
            mini_batch = random.sample(self.memory, self.batch_size)
            
            observations = np.zeros((self.batch_size, self.obs_dim))
            next_observations = np.zeros((self.batch_size, self.obs_dim))
            actions = np.zeros((self.batch_size, ))
            rewards = np.zeros((self.batch_size, ))
            dones = np.zeros((self.batch_size, ))

            for i in range(self.batch_size):
                observations[i] = mini_batch[i][0]
                actions[i] = mini_batch[i][1]
                rewards[i] = mini_batch[i][2]
                next_observations[i] = mini_batch[i][3]
                dones[i] = mini_batch[i][4]
            
            with tf.GradientTape() as tape:
                next_q = self.q_target(next_observations)
                max_next_q = tf.reduce_max(next_q, axis=1)
                td_target = rewards + self.discount_factor*max_next_q*(1.0-dones) # R + gamma*max(Q)
                q_est = tf.reduce_sum(
                    self.q_prediction(observations) * tf.one_hot(actions,self.n_action,1.0,0.0),
                    axis=1)
                loss = tf.keras.losses.mse(tf.stop_gradient(td_target),q_est)
                
            gradients = tape.gradient(loss, self.q_prediction.trainable_weights)
            self.optimizer.apply_gradients(
                zip(gradients,self.q_prediction.trainable_variables))
        return loss

# DEFINE ENVIRONMENT AND AGENT

In [4]:
env = gym.make('CartPole-v1')
obs_space = env.observation_space
print('Observation space')
print(type(obs_space))
print(obs_space.shape)
print("Dimension:{}".format(obs_space.shape[0]))
print("High: {}".format(obs_space.high))
print("Low: {}".format(obs_space.low))
print()

act_space = env.action_space
print('Action space')
print(type(act_space))
print("Total {} actions".format(act_space.n))
print()

env.seed(seed)
max_t = env.spec.max_episode_steps
agent = DQNAgent(env.observation_space.high.shape[0],env.action_space.n)

Observation space
<class 'gym.spaces.box.Box'>
(4,)
Dimension:4
High: [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]
Low: [-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]

Action space
<class 'gym.spaces.discrete.Discrete'>
Total 2 actions



# TRAIN

In [5]:
avg_return_list = deque(maxlen=10)
avg_loss_list = deque(maxlen=10)
nepisodes = 500
for i in range(nepisodes):
    obs = env.reset()
    done = False
    total_reward = 0
    total_loss = 0
    for t in range(max_t):
        # Get transition
        action = agent.get_action(obs.reshape(1,-1))
        next_obs, reward, done, info = env.step(action)
        
        # Add experience
        agent.add_experience(obs,action,reward,next_obs,done)
        
        # Online update perdiction network parameter
        loss = agent.train_model()
        agent.update_policy()
                
        obs = next_obs
        total_reward += reward
        total_loss += loss
        
        if done:
            break
    
    # Update target network parameter
    agent.update_target()
    avg_return_list.append(total_reward)
    avg_loss_list.append(total_loss)
    
    if (np.mean(avg_return_list) > 490): # Threshold return to success cartpole
        print('[{}/{}] loss : {:.3f}, return : {:.3f}, eps : {:.3f}'.format(i,nepisodes, np.mean(avg_loss_list), np.mean(avg_return_list), agent.epsilon))
        print('The problem is solved with {} episodes'.format(i))
        break
    
    if i > 99 and (i%50)==0:
        print('[{}/{}] loss : {:.3f}, return : {:.3f}, eps : {:.3f}'.format(i,nepisodes, np.mean(avg_loss_list), np.mean(avg_return_list), agent.epsilon))

[100/500] loss : 3653.745, return : 33.400, eps : 0.184
[150/500] loss : 280.133, return : 150.000, eps : 0.010
[200/500] loss : 12.430, return : 148.100, eps : 0.010
[250/500] loss : 10488.397, return : 290.900, eps : 0.010
[300/500] loss : 21269.902, return : 452.600, eps : 0.010
[350/500] loss : 804.609, return : 207.100, eps : 0.010
[361/500] loss : 6110.642, return : 500.000, eps : 0.010
The problem is solved with 361 episodes


# TEST

In [None]:
env = gym.make('CartPole-v1')
obs = env.reset()
total_reward = 0
frames = []
for t in range(10000):
    # Render into buffer. 
    frames.append(env.render(mode = 'rgb_array'))
    action = agent.get_action(obs.reshape(1,-1))
    obs, reward, done, info = env.step(action)
    total_reward += reward
    if done:
        break
env.close()
print('Total Reward : %.2f'%total_reward)
display_frames_as_gif(frames)

Total Reward : 500.00
