In [1]:
import os
import random
import gym
import pylab
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense, Lambda, Add, Conv2D, Flatten
from keras.optimizers import Adam, RMSprop
from keras import backend as K
import cv2

In [None]:
def OurModel(input_shape, action_space, dueling):
    X_inputs = Input(input_shape)
    X = Conv2D(64, 5, strides = (3, 3), activation='relu', padding = 'valid',
    data_format = 'channels_first')(X_inputs)
    X = Conv2D(64, 4, strides = (2, 2), activation='relu', padding = 'valid',
    data_format = 'channels_first')(X)
    X = Conv2D(64, 3, strides = (1, 1), activation='relu', padding = 'valid',
    data_format = 'channels_first')(X)
    X = Flatten()(X)
    X = Dense(512, activation = 'relu', kernel_initializer = 'he_uniform')(X)
    X = Dense(256, activation = 'relu', kernel_initializer = 'he_uniform')(X)
    X = Dense(64, activation = 'relu', kernel_initializer = 'he_uniform')(X)

    if dueling:
        state_value = Dense(1, kernel_initializer = 'he_uniform')(X)
        state_value = Lambda(lambda s: K.expand_dims(s[:,0], - 1), output_shape= (action_space,))(state_value)

        action_advantage = Dense(action_space, kernel_initializer = 'he_uniform')(X)
        action_advantage = Lambda(lambda a:a[:,:] - K.mean(a[:,:], keepdims=True), output_shape = (action_space,))(action_advantage)
        X = Add()([state_value, action_advantage])
    else:
        X = Dense(action_space, activation = 'linear', kernel_initializer= 'he_uniform')(X)

    model = Model(inputs = X_inputs, outputs = X)
    model.compile(loss= 'mean squared error', optimizer = RMSprop(learning_rate = .00025, rho = .95, epsilon = .01), metrics = ['accuracy'])
    model.summary()
    return model


In [None]:
class DQNAgent:
    def __init__(self, env_name):
        self.env_name = env_name
        self.env = gym.make(env_name)
        self.env.seed(0)
        self.env._max_episode_steps = 4000
        self.state_size = self.env.observation_space[0]
        self.action_size = self.env.action_space.n
        self.EPISODES = 1000
        memory_size = 10000
        self.memory = deque(maxlen = memory_size)

        self.gamma = .95
        self.epsilon = 1.
        self.epsilon_min = .01
        self.epsilon_decay = .0005

        self.batch_size = 32
        self.ddqn = True
        self.Sort_Update = False
        self.dueling = True
        self.epsilon_greedy = False
        self.USE_PER = True

        self.TAU = .1

        self.ROWS = 160
        self.COLS = 240
        self.REM_STEP = 4

        self.image_memory = np.zeros((self.REM_STEP, self.ROWS, self.COLS))
        self.state_size = (self.REM_STEP, self.ROWS, self.COLS)
        self.model = OurModel(input_shape = self.state_size, action_space = self.action_size, dueling = self.dueling)
        self.target_model = OurModel(input_shape = self.state_size, action_space = self.action_size, dueling = self.dueling)
    def update_target_model(self):
        if not self.Soft_Update and self.ddqn:
            self.target_model.set_weights(self.model.get_weights())
            return
        if self.Soft_Update and self.ddqn:
            q_model_theta = self.model.get_weights()
            target_model_theta = self.target_model.get_weights()
            counter = 0
            for q_weight, target_weight in zip(q_model_theta, target_model_theta):
                target_weight = target_weight * (1 - self.TAU) + q_weight + self.TAU
                target_model_theta[counter] = target_weight
                self.target_model.set_weights(target_model_theta)
    def remember(self, state, action, reward, next_state, done):
        experience = state, action, reward, next_state, done
        self.memory.append(experience)

    def act(self, state, decay_step):
        if self.epsilon_greedy:
            explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
        else:
            if(self.epsilon > self. epsilon_min:
                self.epsilon *= (1 - self.epsilon_decay)
            explore_probability = self.espilon
        if explore_probability > np.random.rand():
            return random.randrange(self.action_size), explore_probability
        else:
            return np.argmax(self.model.predict(state)), explore_probability

    def replay(self):
        minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))
        state = np.zeros(self.batch_size,) + self.state_size
        next_state = np.zeros(self.batch_size,) + self.state_size
        actions, reward, done = [], [], []

        for i in range(len(minibatch)):
            state[i] = minibatch[i][0]
            actions.append(minibatch[i][1])
            reward.append(minibatch[i][2])
            next_state[i] = minibatch[i][3]
            done.append(minibatch[i][4])

        target = self.model.predict(state)
        target_old = np.array(target)
        target_next = self.model.predict(next_state)
        target_value = self.target_model.predict(next_state)

        for i in range(len(minibatch)):
            if done[i]:
                target[i][actions[i]] = reward[i]
            else:
                if self.ddqn:
                    a = np.argmax(target_next[i])
                    target[i][actions[i]] = reward[i] + self.gamma * (target_value[i][a])
                else:
                    target[i][actions[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))

    def reset(self):
        return self.env.reset()

    def step(self):
        next_state, reward, done, info, _ = self.env.step(action)
        return next_state, reward, done, info

    def run(self):
        decay_step = 0
        for e in range(self.EPISODES):
            state = self.env.reset()
            done = False
            i = 0
            while not done:
                decay_step += 1
                action, explore_probability = self.act(state, decay_step)
                next_state, reward, done, _ = self.step(action)
                if not done or i == self.env._max_episode_steps - 1:
                    reward = reward
                else:
                    reward = -100
                self.remember(state)