Setting up Doom Environment

In [102]:
#pipinstall vizdoom and clone repo in new folder
#!cd github & git clone https://github.com/Farama-Foundation/ViZDoom.git

#import vizdoom to setup game environment
from vizdoom import *

import random
import time
import numpy as np
from matplotlib import pyplot as plt

#pip install and import gym environment
from gym import Env
from gym.spaces import Discrete, Box
import cv2

In [142]:
class VizDoomGym(Env):
    #called when env is started > game
    def __init__(self,render=False):
        
        #inherit from Env import
        super().__init__()
        
        self.game = DoomGame()
        self.game.load_config("github/VizDoom/scenarios/basic.cfg")
        
        #Disable or enable window visiblity when game is running
        if render == False:
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)
        
        self.game.init()
    
        
        
        #create obs and action space
        #low/high indicates pixel vals
        self.observation_space = Box(low=0,high=255,shape=(100,160,1),dtype=np.uint8)
        self.action_space = Discrete(3)
    #tale actons
    def step(self,action):
        actions = np.identity(3,dtype=np.uint8)
        
        #take action, make_action() returns reward value for taking ste
        #2nd para is frame skip to give time between taking action and receiving result
        reward = self.game.make_action(actions[action],4)
        
        #if something is returned from game_state()
        if self.game.get_state():
            #get game state to grab screen image
            state = self.game.get_state().screen_buffer
            #apply grayscale
            state = self.grayscale(state)
            #use game state to grab game vars, i.e. ammo
            ammo = self.game.get_state().game_variables[0]
            info = ammo
        #game_state returns nothing/errors out
        else:
            state = np.zeros(self.observation_space.shape)
            info = 0
        
        info = {"info":info}
        
        done = self.game.is_episode_finished()
        
        
        return state,reward,done,info
    def render():
        pass
    
    #resets game
    def reset(self):
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        return self.grayscale(state)
    
    #grayscale frame and scales down image, to make training faster
    def grayscale(self,observation):
        gray = cv2.cvtColor(np.moveaxis(observation,0,-1),cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160,100),interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize,(100,160,1))
        return state
    #close the game
    def close(self):
        self.game.close()

  and should_run_async(code)


In [143]:
import os
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import env_checker
from stable_baselines3.common.evaluation import evaluate_policy

Setting up callback for training

In [144]:
#saves tensorboard log file after training, go into PPO_n and run tensorboard --logdir=. then open local host link
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True
    
CHECKPOINT_DIR = './train/train_basic'
LOG_DIR = './logs/log_basic'

#after every 10k steps of training model, save version of pytorch weights for RL agent
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

Proximal Policy Optimization model

In [None]:
# Non rendered environment
env = VizDoomGym()
#pass convolutional neural network, cnn for image
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, learning_rate=0.0001, n_steps=2048)
model.learn(total_timesteps=100000, callback=callback)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Logging to ./logs/log_basic\PPO_2
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 36       |
|    ep_rew_mean     | -108     |
| time/              |          |
|    fps             | 54       |
|    iterations      | 1        |
|    time_elapsed    | 37       |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 34.4        |
|    ep_rew_mean          | -97.7       |
| time/                   |             |
|    fps                  | 36          |
|    iterations           | 2           |
|    time_elapsed         | 110         |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.007562112 |
|    clip_fraction        | 0

In [None]:
# Reload model from disc
model = PPO.load('./train/train_basic/best_model_60000')

In [None]:

# Create rendered environment
env = VizDoomGym(render=True)

In [None]:
# Evaluate mean reward for 10 games
mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=100)
mean_reward

In [None]:
model.predict(obs)

In [None]:
for episode in range(100): 
    obs = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(action)
        # time.sleep(0.20)
        total_reward += reward
    print('Total Reward for episode {} is {}'.format(total_reward, episode))
    time.sleep(2)