In [1]:
import gym
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import model_from_config
from keras import backend as K

print(tf.__version__)
print(tf.keras.__version__)

2.0.0
2.2.4-tf


Using TensorFlow backend.


In [2]:
class Memory:   # stored as ( s, a, r, s_, d )
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def add(self, sample):
        self.memory.append(sample)        

    def sample(self, n):
        n = min(n, len(self.memory))
        return random.sample(self.memory, n)

    def isFull(self):
        return len(self.memory) >= self.memory.maxlen
    
    def update(self, idx, p):
        pass
    
def argmax(q_values):
    top = float("-inf")
    ties = []

    for i in range(len(q_values)):
        if q_values[i] > top:
            top = q_values[i]
            ties = []

        if q_values[i] == top:
            ties.append(i)

    return np.random.choice(ties)

In [3]:
class DQNAgent:
    def __init__(self,
                 observation_spec,
                 action_spec):
        self.observation_spec = observation_spec
        self.action_spec = action_spec
        self.num_observations = observation_spec.shape[0]
        self.num_actions = action_spec.n
        
    def agent_init(self, memory, agent_info={}):
        self.learning_rate = agent_info.get("learning_rate", 0.05)
        self.hidden_units = agent_info.get("hidden_units", 64)
        self.batch_size = agent_info.get("batch_size", 32)
        self.gamma = agent_info.get("gamma", 1.0)
        self.epsilon = agent_info.get("epsilon", 0.1)
        self.huber_loss = agent_info.get("huber_loss", False)
        self.update_after = agent_info.get("update_after", 1)
        self.observation_range = np.zeros(self.num_observations)
        
        self.memory = memory
        #self.model = self._build_model()
        self.model = self.build_model((self.num_observations,), 
                                      [self.hidden_units, self.hidden_units, self.num_actions],
                                      'he_uniform',
                                      self.learning_rate)
        self.model_target = self._clone_model(self.model)
        self.update_target_model()
        self.steps = 0
        
    def build_model(self, input_shape, layer_spec, kernel_initializer, learning_rate):
        inputs = keras.Input(shape = input_shape)
        out = inputs
        for ix, layer_size in enumerate(layer_spec):
            out = layers.Dense(layer_size, 
                               activation = 'relu' if ix < len(layer_spec) - 1 else 'linear',
                               kernel_initializer=kernel_initializer)(out)

        model = keras.Model(inputs=inputs, outputs=out)
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate),
                              loss=tf.keras.losses.MeanSquaredError())
        return model
                
    def _build_model(self):
        inputs = keras.Input(shape = (self.num_observations,))
        
        # Shared layers
        x = layers.Dense(self.hidden_units, 
                         activation='relu', 
                         kernel_initializer='he_uniform')(inputs)
     
        q_out = layers.Dense(self.num_actions, 
                             activation='linear', 
                             kernel_initializer='he_uniform')(x)

        model = keras.Model(inputs=inputs, outputs=q_out)

        if (self.huber_loss == False):
            model.compile(optimizer=tf.keras.optimizers.Adam(self.learning_rate),
                          loss=tf.keras.losses.MeanSquaredError())
        else:
            model.compile(optimizer=tf.keras.optimizers.RMSprop(self.learning_rate),
                          loss=tf.keras.losses.Huber())
            
        return model
    
    def _clone_model(self, model):
        config = {
            'class_name': model.__class__.__name__,
            'config': model.get_config(),
        }
        clone = model_from_config(config)
        clone.set_weights(model.get_weights())
        return clone
    
    def remember(self, observation, action, reward, next_observation, done):
        self.memory.add((observation, action, reward, next_observation, done))            

    def policy_action(self, observation):
        observation = np.reshape(observation, [1, observation.shape[0]])
        q_values = self.model.predict(observation)[0]
        chosen_action = argmax(q_values)
        return chosen_action

    def train_policy_action(self, observation):
        observation = np.reshape(observation, [1, observation.shape[0]])
        q_values = self.model.predict(observation)[0]
        
        max_indicies = np.argwhere(q_values == np.amax(q_values)) 
        num_max = max_indicies.shape[0]
        probs = np.ones(self.num_actions) * (self.epsilon / self.num_actions)
        probs[max_indicies] += (1-self.epsilon)/num_max
        chosen_action = np.random.choice(self.num_actions, p=probs)
        
        return chosen_action
    
    def _get_targets(self, batch):
        no_state = np.zeros(self.num_observations)

        observations = np.array([ x[0] for x in batch ])
        next_observations = np.array([ (no_state if x[4] is True else x[3]) for x in batch ])

        p = self.model.predict(observations)
        p_ = self.model.predict(next_observations)
        pTarget_ = self.model_target.predict(next_observations)
        
        targets = p
                
        for index, batch_entry in enumerate(batch):
            action = batch_entry[1]
            reward = batch_entry[2]
            done = batch_entry[4]
            
            if (done == True):
                targets[index][action] = reward
            else:
                targets[index][action] = reward + self.gamma * pTarget_[index][ argmax(p_[index]) ]  # double DQN
        
        return observations, targets
        
    def replay(self):
        batch = self.memory.sample(self.batch_size)
        observations, targets = self._get_targets(batch)
        
        self.model.fit(observations, targets, epochs=1, verbose=0)
        
        self.steps += 1
        
        if (self.steps % self.update_after == 0):
            self.update_target_model()
        
    def update_target_model(self):
        self.model_target.set_weights(self.model.get_weights())        
                

def train_agent (env, agent, num_episodes, train_until):
    rewards = []
    epsilons = []
    for episodeIx in range(num_episodes):
        # reset state in the beginning of each game
        done = False
        observation = env.reset()
        episode_rewards = 0
        while not done:
            action = agent.train_policy_action(observation)
            next_observation, reward, done, _ = env.step(action)
            agent.remember(observation, action, reward, next_observation, done)
            observation = next_observation
            episode_rewards += reward
            
            # train the agent with the experience of the episode
            agent.replay()
                    
        rewards.append(episode_rewards)
        epsilons.append(agent.epsilon)
        
        rewards_mean = np.mean(rewards[-min(10, len(rewards)):])
        
        print("\repisode: {}/{}.\trewards={}.\tepsilon={}\tMean={}.".format(episodeIx+1, 
                                                                            num_episodes,
                                                                            episode_rewards,
                                                                            round(agent.epsilon, 4),
                                                                            round(rewards_mean, 2)), end="")
        if  rewards_mean > train_until:
            break
            
    return rewards, epsilons
            
def run_episode(env, agent, render=True):
    rewards = 0
    done = False
    observation = env.reset()

    while not done:
        if (render is True):
            env.render()
        action = agent.policy_action(observation)
        observation, reward, done, _ = env.step(action)
        rewards += reward
        
    return rewards

def initialize_memory(env, policy, steps, memory):    
    done = False
    for step in range(steps):
        if (done == False):            
            observation = env.reset()
            
        action = policy(observation)
        next_observation, reward, done, _ = env.step(action)
        memory.add((observation, action, reward, next_observation, done))
        observation = next_observation

def make_random_policy(num_actions):
    probs = np.ones(num_actions)/num_actions
    def random_policy(observation):
        chosen_action = np.random.choice(num_actions, p=probs)
        return chosen_action
    return random_policy

def make_simple_mountain_car_policy():
    def policy(observation):
        position, velocity = observation
        action = 0 if velocity<0 else 2
        return action
    return policy

In [4]:
env_name = 'CartPole-v0'
env = gym.make(env_name)

batch_size = 32
initial_steps = batch_size
    
if (env_name == 'MountainCar-v0'):
    initial_steps = 5000
    initial_policy = make_simple_mountain_car_policy()
    train_until = -130
elif (env_name == 'CartPole-v0'):
    initial_steps = 0
    initial_policy = make_random_policy(env.action_space.n)
    train_until = 180

agent_info= {
    "batch_size":batch_size
}

memory = Memory(10000)
initialize_memory(env, initial_policy, initial_steps, memory)
agent = DQNAgent(env.observation_space, 
                 env.action_space)
agent.agent_init(memory, agent_info)

In [5]:
rewards, epsilons = train_agent(env, agent, 100, train_until)

episode: 79/100.	rewards=200.0.	epsilon=0.1	Mean=182.7.

In [None]:
fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('episodes')
ax1.set_ylabel('reward', color=color)
ax1.plot(rewards, color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

color = 'tab:blue'
ax2.set_ylabel('epsilon', color=color)  # we already handled the x-label with ax1
ax2.plot(epsilons, color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()  # otherwise the right y-label is slightly clipped
plt.show()

In [None]:
run_episode(env, agent)