In [None]:
import os
import random
import gym
import pylab
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense
from keras.optimizers import Adam, RMSprop
from keras.callbacks import TensorBoard
import time

In [None]:
NAME = "DDQN_Cartpole-{}".format(int(time.time()))
tensorboard = TensorBoard(log_dir = 'logs/{}'.format(NAME))

In [None]:
def RLModel(input_shape, action_space):
    X_i = Input(input_shape)
    X = X_i
    X = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X)
    X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)
    X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)
    X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

    model = Model(inputs = X_i, outputs = X, name='CartPole DDQN model')
    model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.0025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

    model.summary()
    return model

In [None]:
class DQNAgent:
    def __init__(self, env_name):
        self.env_name = env_name       
        self.env = gym.make(env_name)
        self.env.seed(0)  
        self.env._max_episode_steps = 4000
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n

        self.EPISODES = 150
        self.memory = deque(maxlen=2000)
        
        self.gamma = 0.95    
        self.epsilon = 1.0  
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.999
        self.batch_size = 32
        self.train_start = 1000

        self.TAU = 0.1 
        self.Save_Path = 'Models'
        self.scores, self.episodes, self.average = [], [], []
        
        self.Model_name = os.path.join(self.Save_Path,"DDQN_"+self.env_name+".h5")
        
        self.model = RLModel(input_shape=(self.state_size,), action_space = self.action_size)
        self.target_model = RLModel(input_shape=(self.state_size,), action_space = self.action_size)

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())
        return
        
    def memory_fun(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        if len(self.memory) > self.train_start:
            if self.epsilon > self.epsilon_min:
                self.epsilon *= self.epsilon_decay

    def act(self, state):
        if np.random.random() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            return np.argmax(self.model.predict(state))

    def replay_buffer(self):
        if len(self.memory) < self.train_start:
            return

        minibatch = random.sample(self.memory, min(self.batch_size, self.batch_size))

        state = np.zeros((self.batch_size, self.state_size))
        next_state = np.zeros((self.batch_size, self.state_size))
        action, reward, done = [], [], []

        for i in range(self.batch_size):
            state[i] = minibatch[i][0]
            action.append(minibatch[i][1])
            reward.append(minibatch[i][2])
            next_state[i] = minibatch[i][3]
            done.append(minibatch[i][4])

        target = self.model.predict(state)
        target_next = self.model.predict(next_state)
        target_val = self.target_model.predict(next_state)

        for i in range(len(minibatch)):
            if done[i]:
                target[i][action[i]] = reward[i]
            else:
                a = np.argmax(target_next[i])
                target[i][action[i]] = reward[i] + self.gamma * (target_val[i][a])   
                
        self.model.fit(state, target, batch_size=self.batch_size, verbose=0, callbacks=[tensorboard])


    def loadModel(self, name):
        self.model = load_model(name)

    def saveModel(self, name):
        self.model.save(name)

    pylab.figure(figsize=(18, 9))
    def PlotModel(self, score, episode):
        self.scores.append(score)
        self.episodes.append(episode)
        self.average.append(sum(self.scores) / len(self.scores))
        pylab.plot(self.episodes, self.average, 'r')
        pylab.plot(self.episodes, self.scores, 'b')
        pylab.ylabel('Score', fontsize=18)
        pylab.xlabel('Steps', fontsize=18)
        try:
            pylab.savefig("DDQN_Cartpole.png")
        except OSError:
            pass

        return str(self.average[-1])[:5]
    
    def trainModel(self):
        for e in range(self.EPISODES):
            state = self.env.reset()
            state = np.reshape(state, [1, self.state_size])
            done = False
            i = 0
            while not done:
                #self.env.render()
                action = self.act(state)
                next_state, reward, done, _ = self.env.step(action)
                next_state = np.reshape(next_state, [1, self.state_size])
                if not done or i == self.env._max_episode_steps-1:
                    reward = reward
                else:
                    reward = -100
                self.memory_fun(state, action, reward, next_state, done)
                state = next_state
                i += 1
                if done:
                    self.update_target_model()         
                    average = self.PlotModel(i, e)                     
                    print("episode: {}/{}, score: {}, e: {:.2}, average: {}".format(e, self.EPISODES, i, self.epsilon, average))
                    if i == self.env._max_episode_steps:
                        self.saveModel("cartpole-ddqn.h5")
                        self.env.close()
                        break
                self.replay_buffer()
            self.env.close()

    def testModel(self):
        self.loadModel("cartpole-ddqn.h5")
        for e in range(self.EPISODES):
            state = self.env.reset()
            state = np.reshape(state, [1, self.state_size])
            done = False
            i = 0
            while not done:
                #self.env.render()
                action = np.argmax(self.model.predict(state))
                next_state, reward, done, _ = self.env.step(action)
                state = np.reshape(next_state, [1, self.state_size])
                i += 1
                if done:
                    average = self.PlotModel(i, e)
                    print("episode: {}/{}, score: {}, e: {:.2}, average: {}".format(e, self.EPISODES, i, self.epsilon, average))
                    break

In [None]:
if __name__ == "__main__":
    env_name = 'CartPole-v1'
    agent = DQNAgent(env_name)
    agent.trainModel()
    #agent.testModel()