In [1]:
import importlib.util
import scipy.misc
import time

import tensorflow as tf
import numpy as np

from tqdm import trange
from IPython.display import HTML

#Import the vizdoom package as "vd" since it can't be installed normally on Windows

vd_location = 'C:/Anaconda3/envs/doom/Lib/site-packages/vizdoom/vizdoom.pyd'
vizdoom = importlib.util.spec_from_file_location('vizdoom',
                                                 vd_location)
vd = importlib.util.module_from_spec(vizdoom)
vizdoom.loader.exec_module(vd)


In [2]:
#Specify the game scenario and the screen format/resolution

game = vd.DoomGame()
game.set_screen_format(vd.ScreenFormat.BGR24)
game.set_depth_buffer_enabled(True)
game.set_screen_resolution(vd.ScreenResolution.RES_160X120)
game.load_config('deadly_corridor.cfg')

down_sample_ratio = 0.5
width = int(game.get_screen_width()*down_sample_ratio)
height = int(game.get_screen_height()*down_sample_ratio)
channels = game.get_screen_channels() + 1

#Specify the available actions in the scenario

available_actions = game.get_available_buttons()
actions = [list(ohe) for ohe in list(np.identity(len(available_actions)))]
num_actions = len(available_actions)

#Specify the Q-network learning parameters

frame_delay = 12
buffer_size = 50000
epochs = 80
steps_per_epoch = 2000
discount_factor = 0.99
learning_rate = 0.001
start_epsilon = 1.0
end_epsilon = 0.1
batch_size = 100
load_model = False
save_model = True
model_dir = './checkpoints/deadly_corridor.ckpt'


In [3]:
#Create a buffer object that holds a set of training experiences (state-action-reward tuples)

class Buffer():
    def __init__(self, size=1000):
        self.buffer = list()
        self.length = len(self.buffer)
        self.size = size
        
#Add a new experience to the buffer (remove the oldest experience if the buffer is already full)
        
    def add_experience(self, experience):
        if self.length + 1 >= self.size:
            self.buffer[0:(self.length + 1) - self.size] = []
        
        self.buffer.append(experience)
        self.length = len(self.buffer)
            
#Return a batch of experience arrays randomly sampled from the buffer
            
    def sample_buffer(self, sample_size):
        sample = np.random.randint(self.length, size=sample_size)
        s1 = np.concatenate([self.buffer[idx][0] for idx in sample], axis=0)
        a = np.array([self.buffer[idx][1] for idx in sample])
        r = np.array([self.buffer[idx][2] for idx in sample])
        s2 = np.concatenate([self.buffer[idx][3] for idx in sample], axis=0)
        terminal = np.array([self.buffer[idx][4] for idx in sample], dtype=np.int32)
        
        return s1, a, r, s2, terminal

#Downsample and normalize an image array representing the game state at a given time stamp

def preprocess(image, down_sample_ratio=1):
    if down_sample_ratio != 1:
        image = scipy.misc.imresize(image, down_sample_ratio)
    image = image.astype(np.float32)
    image /= 255.0
    image = np.expand_dims(image, axis=0)
    
    return image

#Test the agent using a currently training or previously trained model

def test_agent(model, num_episodes, load_model, training=True, session=None, model_dir=None):
    if load_model == True:
        sess = tf.Session()
        print('Loading model from', model_dir)
        tf.train.Saver().restore(sess, model_dir)
        
#Require an existing session if a pretrained model isn't provided
        
    elif load_model == False:
        sess = session

    game.set_sound_enabled(True)
    episode_rewards = list()
    
#Avoid reinitializing the game if this was already done by the training process
    
    if training == False:
        game.init()

    for i in range(num_episodes):
        game.new_episode()
    
        while not game.is_episode_finished():
            state = game.get_state()
            buffer = np.concatenate((state.screen_buffer,
                                     np.expand_dims(state.depth_buffer,
                                                    axis=2)),
                                    axis=2)
            state1 = preprocess(buffer, down_sample_ratio)
            action = model.choose_action(sess, state1)[0]
            reward = game.make_action(actions[action])
            
#Add a delay between each time step so that the episodes occur at normal speed

            time.sleep(0.02)
        
        episode_rewards.append(game.get_total_reward())
        print('Test Episode {} Reward: {}'.format(i + 1, game.get_total_reward()))
        time.sleep(1)
    
#Avoid ending the game so that the training process can continue
    
    if training == False:
        game.close()
    
    return ('Average Test Reward:', np.mean(episode_rewards))


In [None]:
#Create a Q-network to estimate values and choose actions for a given state

class Q_network():
    def __init__(self, height, width, channels, learning_rate=0.001):
        self.s_t = tf.placeholder(tf.float32,
                                  shape=[None, height, width, channels],
                                  name='state')
        self.a_t = tf.placeholder(tf.int32,
                                  shape=[None],
                                  name='action')
        self.Q_target = tf.placeholder(tf.float32,
                                       shape=[None, num_actions],
                                       name='Q_target')

        self.input_layer = tf.reshape(self.s_t,
                                      [-1, height, width, channels],
                                      name='input_layer')
        self.conv1 = tf.layers.conv2d(inputs=self.input_layer,
                                      filters=32,
                                      kernel_size=[8, 8],
                                      strides=[4, 4],
                                      padding='valid',
                                      activation=tf.nn.relu,
                                      name='conv1_layer')
        self.conv2 = tf.layers.conv2d(inputs=self.conv1,
                                      filters=64,
                                      kernel_size=[4, 4],
                                      strides=[2, 2],
                                      padding='valid',
                                      activation=tf.nn.relu,
                                      name='conv2_layer')
        self.flatten = tf.reshape(self.conv2,
                                  [-1, 6*8*64],
                                  name='flatten')
        self.dense = tf.layers.dense(inputs=self.flatten,
                                      units=512,
                                      activation=tf.nn.relu,
                                      name='dense1_layer')
        self.Q_values = tf.layers.dense(inputs=self.dense,
                                        units=len(actions),
                                        activation=None,
                                        name='output_layer')        
    
        self.best_action = tf.argmax(self.Q_values, 1)
        self.loss = tf.losses.mean_squared_error(self.Q_values,
                                                 self.Q_target)
        self.adam = tf.train.AdamOptimizer(learning_rate=learning_rate,
                                           name='adam')
        self.train = self.adam.minimize(self.loss)

    def calculate_loss(self, session, s, q):
        L, _ = session.run([self.loss, self.train],
                           feed_dict={self.s_t: s,
                                      self.Q_target: q})
    
        return L

#Return the array of Q-values and the best action associated with a given state

    def get_Q_values(self, session, s):
        Q = session.run(self.Q_values,
                        feed_dict={self.s_t: s})

        return Q
    
    def choose_action(self, session, s):
        a = session.run(self.best_action,
                        feed_dict={self.s_t: s})
    
        return a


In [None]:
#For each time step, collect the following data:
#The current game state
#The action that was taken taken
#The reward obtained from the chosen action
#The next game state (store the first game state if the previous action ends the episode)
#A variable indicating whether the episode is over yet

tf.reset_default_graph()
DQN = Q_network(learning_rate=learning_rate,
                height=height,
                width=width,
                channels=channels)
exp_buffer = Buffer(size=buffer_size)
session = tf.Session()
saver = tf.train.Saver()

if load_model == True:
    print('Loading model from', model_dir)
    tf.train.Saver().restore(session, model_dir)
    
elif load_model == False:
    session.run(tf.global_variables_initializer())

game.set_sound_enabled(False)
game.init()
t = 0

#Accumulate experiences in the buffer using an epsilon-greedy strategy with three training phases

for epoch in range(epochs):
    epoch_rewards = list()
    
    for step in trange(steps_per_epoch, leave=True):
        experience = list()
        game.new_episode()
        
        while not game.is_episode_finished():
            state = game.get_state()
            state1 = preprocess(np.concatenate((state.screen_buffer,
                                                np.expand_dims(state.depth_buffer, axis=2)),
                                                axis=2),
                                               down_sample_ratio)
            
#Explore the environment by choosing random actions with 100% probability for the first phase of training

            if epoch < 0.3*epochs:
                action = np.random.randint(num_actions)
            
#Increase the probability of greedily choosing an action by a constant amount at each epoch in the second phase
            
            elif epoch < 0.9*epochs:
                epsilon = start_epsilon - (epoch + 1 - 0.2*epochs)*(start_epsilon-end_epsilon)/(0.7*epochs)
            
                if np.random.uniform(0, 1) <= epsilon:
                    action = np.random.randint(num_actions)
                
                else:
                    action = choose_action(session, state1)[0]

#Select a random action with 10% probability in the final phase of training
                
            else:
                if np.random.uniform(0, 1) <= end_epsilon:
                    action = np.random.randint(num_actions)
                    
                else:
                    action = choose_action(session, state1)[0]

            reward = game.make_action(actions[action], frame_delay)
            done = game.is_episode_finished()
            
            if done == False:
                state = game.get_state()
                state2 = preprocess(np.concatenate((state.screen_buffer,
                                                    np.expand_dims(state.depth_buffer, axis=2)),
                                                    axis=2),
                                                    down_sample_ratio)
        
            elif done == True:
                state2 = state1
        
#Add the experience obtained from each time step to the buffer

            t += 1
            exp_buffer.add_experience((state1, action, reward, state2, done))
        
#Sample a minibatch from the buffer if there are enough experiences in the buffer

        if exp_buffer.length > batch_size:
            s1, a, r, s2, terminal = exp_buffer.sample_buffer(batch_size)
            
#Train the Q-network by using the minibatch to update the action-value function Q
            
            Q2 = np.max(DQN.get_Q_values(session, s2), axis=1)
            target_Q = DQN.get_Q_values(session, s1)
            target_Q[np.arange(batch_size), a] = r + discount_factor*(1 - terminal)*Q2
            DQN.calculate_loss(session, s1, target_Q)
            
        epoch_rewards.append(game.get_total_reward())
        
    print('Epoch {} Mean Reward: {}'.format(epoch + 1, np.mean(epoch_rewards)))
    
#Test the agent's performance for 10 episodes and save the model every 10 epochs
    
    if (epoch + 1) % 10 == 0 and epoch > 0:
        print('Epoch {} test:'.format(epoch + 1))
        test_agent(DQN, num_episodes=10, training=True,
                   load_model=False, session=session, model_dir=model_dir)
        
        if save_model == True:
            print('Epoch {} Model saved to {}'.format(epoch + 1, model_dir))
            saver.save(session, model_dir, global_step=epoch + 1)
        
print('{} time steps experienced during training'.format(t))
game.close()
    

100%|██████████| 2000/2000 [04:44<00:00,  7.02it/s]


Epoch 1 Mean Reward: 129.76524337005614


100%|██████████| 2000/2000 [04:42<00:00,  7.08it/s]


Epoch 2 Mean Reward: 129.25157358551024


100%|██████████| 2000/2000 [05:19<00:00,  6.26it/s]


Epoch 3 Mean Reward: 132.53577142333984


100%|██████████| 2000/2000 [04:48<00:00,  6.93it/s]


Epoch 4 Mean Reward: 135.52395477294922


100%|██████████| 2000/2000 [03:03<00:00, 10.89it/s]


Epoch 5 Mean Reward: 130.9761109237671


100%|██████████| 2000/2000 [03:02<00:00, 10.95it/s]


Epoch 6 Mean Reward: 132.0569292526245


100%|██████████| 2000/2000 [03:15<00:00, 10.21it/s]


Epoch 7 Mean Reward: 132.71173384857178


100%|██████████| 2000/2000 [03:13<00:00, 10.32it/s]


Epoch 8 Mean Reward: 129.40692603302003


 60%|█████▉    | 1190/2000 [01:45<01:11, 11.31it/s]

In [None]:
#Test the fully trained model by only choosing actions with a greedy strategy

test_agent(DQN, num_episodes=20, training=False, load_model=True, model_dir=model_dir)
