In [None]:
import cv2
import json
import numpy as np
import retro
import time
import optuna
import os

from gym import Env
from gym.spaces import MultiBinary, Box
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

before specifying game name in config.json run the following command from game rom location-

`python -m retro.import .`

from the given names choose one name as the game env and mention it in the config file

In [None]:
config_file = open('config.json')
config_data = json.load(config_file)

game_env = config_data['game_env'] # initialise the game environment

In [None]:
# initialise all log directories

LOG_DIR = './logs/'
OPT_DIR = './opt/'
CHECKPOINT_DIR = './train/'

# Create game class

- Setup the environment by specifying the observation space, action space and the game environment

- Perform pre processing on the game environment:
    * convert the gym retro frame to grayscale
    * compute a frame delta
    * resize the frame for fewer pixels
    * filter the action parameter
    * set score variable as the reward function

In [None]:
class StreetFighter(Env):

    def __init__(self):

        """
        function for environment setup
        """

        super.__init__()

        # specify observation space
        self.observation_space = Box(
            low=0, high=255, shape=(84, 84, 1), dtype=np.uint8
            )
        # specify action space
        self.action_space = MultiBinary(12)
        # startup an instance of the game
        self.game = retro.make(
            game=game_env, use_restricted_actions=retro.ACTIONS.FILTERED
            )
        pass
    
    def reset(self):

        # create an attribute to hold score delta
        self.score = 0
        
        obs = self.game.reset() # return the first frame
        obs = self.preprocess(obs)
        self.previous_frame = obs

        return obs

    def preprocess(self, observation):
        
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(
            gray, (84, 84, 1), interpolation=cv2.INTER_CUBIC
            ) # resize
        channels = np.reshape(resize, (84, 84, 1)) # add the channels value
        
        return channels

    def step(self, action):

        # take a step
        obs, reward, done, info = self.game.step(action)
        obs = self.preprocess(obs)

        # frame delta
        frame_delta = obs-self.previous_frame
        self.previous_frame = obs

        # rehape the reward
        reward = info['score'] - self.score
        self.score = info['score']
        
        return frame_delta, reward, done, info

    def render(self, *args, **kwargs):
        self.game.render()

    def close(self):
        self.game.close()


## Run the game once

In [None]:
env = StreetFighter()

observation_space = env.action_space.sample() # obtain the observation space
actions = env.action_space.sample() # obtain the actions
obs = env.reset() # reset the game to starting state
done = False # set flag to false

In [None]:
for game in range(1):
    while not done:
        if done:
            obs = env.reset()
        env.render()
        obs, reward, done, info = env.step(actions)
        time.sleep(0.01)
        if reward>0:
            print(reward)

env.close()

# Hyper parameter tuning

* Using Optuna package tune the learning rate, no. of steps, gamma, clip range 
* Run a training loop and the mean reward

In [None]:
def optimize_ppo(trial):
    return {
        'n_steps': trial.suggest_int('n_steps', 2048, 8192),
        'gamma': trial.suggest_loguniform('gamma', 0.8, 0.9999),
        'learning_rate': trial.suggest_loguniform(
            'learning_rate', 1e-5, 1e-4
            ),
        'clip_range': trial.suggest_uniform('clip_range', 0.1, 0.4),
        'gae_lambda': trial.suggest_uniform('gae_lambda', 0.8, 0.99)
    }

# training loop
def optimize_agent(trial):
    try:
        model_params = optimize_ppo(trial)

        # create the environment
        env = StreetFighter()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # create the algo
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, 
        verbose=0, **model_params)
        model.learn(total_timesteps=30000)
        # model.learn(total_timesteps=100000)

        # evaluate model
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trail_{}_best_model'.format(
            trial.number
            ))
        model.save(SAVE_PATH)

        return mean_reward
    
    except Exception as exc:
        return -1000

In [None]:
study = optuna.create_study(direction='maximize') # create the experiment
# study.optimize(optimize_agent, n_trials=10, n_jobs=1)
study.optimize(optimize_agent, n_trials=100, n_jobs=1)
model = PPO.load(os.path.join(OPT_DIR, 'trial_5_best_model.zip'))

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    """
    Implementing logs for training and callbacks
    """

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)
    
    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(
                self.save_path, 'best_model_{}'.format(self.n_calls)
                )
            self.model.save(model_path)
        
        return True

In [None]:
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

# Training the model

In [None]:
env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
model_params = study.best_params
model_params['n_steps'] = 7488 # to remove errors, redefine the no of steps
model = PPO(
    'CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params
    )
model.load(os.path.join(OPT_DIR, 'trial_5_best_model.zip'))

# train the model on different number of timesteps for improved performance
model.learn(total_timesteps=100000, callback=callback)
#model.learn(total_timesteps=5000000, callback=callback)

In [None]:
# evaluate the model
model = PPO.load('./opt/trial_5_best_model.zip')
mean_reward, _ = evaluate_policy(
    model, env, render=True, n_eval_episodes=5
    )

# Model Testing

In [None]:
obs = env.reset()
done = False
for game in range(1):
    while not done:
        if done:
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(action)
        time.sleep(0.01)
        print(reward)