# Setup StreetFighter

In [None]:
import retro
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
from itertools import cycle

In [None]:
# see all the available game
retro.data.list_games()

In [None]:
# python -m retro.import, use this in command in the rom folder

In [None]:
# use this command because it cant run parallel and will cause an error
env.close() 

In [None]:
# sample the observation space
env.observation_space.sample()


In [None]:
# sample the actions that are available - Multibinary
env.action_space.sample()

In [None]:
# reset game to starting state
obs = env.reset()
# flag indicating whether died/finished or not
done = False


# how many games to play
for game in range(1):
    # until not dead/finished
    while not done:
        if done:
            obs = env.reset()
        env.render()
        # while not done, render our game, and add information to env.step
        obs, reward, done, info = env.step(env.action_space.sample())


# Setup Environment

In [None]:
from gym import Env

# import for shape changer
from gym.spaces import MultiBinary, Box

# import numpy for frame delta
import numpy as np

# import cv for gray-scale
import cv2

from matplotlib import pyplot as plt

# create custom environment
class StreetFighter(Env):
    def __init__(self):
        super().__init__()
        # reduce the observation space to smaller pixel
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        self.game = retro.make(
            game="StreetFighterIISpecialChampionEdition-Genesis",
            use_restricted_actions=retro.Actions.FILTERED, # only does actions that are within the game button
            state = "ryuvsryu" # initial state is ryu vs ryu
        )    
        
    def reset(self):
        # return first frame
        obs = self.game.reset()
        obs = self.preprocess(obs)
        self.enemy_health = 176
        self.health = 176
        self.left_over_enemy_health = 0
        self.left_over_health = 0
        self.consecutive_attacks = 0
        self.enemy_consecutive_attacks = 0
        return obs

    def step(self, action):
        # passes unprocessed obs, reward and takes a step
        obs, reward, done, info = self.game.step(action)
        obs = self.preprocess(obs)

        # reward for damage given, penalty for damage taken
        # extra reward for consecutive attacks, extra penalty for consecutive enemy attack
        # health starts at 176, if dead, goes to -1, then 0, then 176
        if info["enemy_health"] == 176 and info["health"] == 176:
            self.enemy_health = 176
            self.health = 176
            self.left_over_enemy_health = 0
            self.left_over_health = 0
            self.consecutive_attacks = 0
            self.enemy_consecutive_attacks = 0

        elif (info["enemy_health"] > 0 and info["health"] > 0) and (
            info["enemy_health"] < 176 or info["health"] < 176
        ):
            enemy_health_reward = self.enemy_health - info["enemy_health"]
            health_penalty = self.health - info["health"]
            if enemy_health_reward - health_penalty > 0:
                self.consecutive_attacks += 1
                self.enemy_consecutive_attacks = 0
                if self.consecutive_attacks > 1:
                    reward = (
                        enemy_health_reward
                        - health_penalty
                        + (
                            5
                            * (
                                self.consecutive_attacks
                                - self.enemy_consecutive_attacks
                            )
                        )
                    )
                else:
                    reward = enemy_health_reward - health_penalty

            elif enemy_health_reward - health_penalty < 0:
                self.consecutive_attacks = 0
                self.enemy_consecutive_attacks += 1
                if self.enemy_consecutive_attacks > 1:
                    reward = (
                        enemy_health_reward
                        - health_penalty
                        + (
                            5
                            * (
                                self.consecutive_attacks
                                - self.enemy_consecutive_attacks
                            )
                        )
                    )
                else:
                    reward = enemy_health_reward - health_penalty

            else:
                reward = enemy_health_reward - health_penalty

            self.enemy_health = info["enemy_health"]
            self.health = info["health"]

        elif info["enemy_health"] < 0:
            left_over_health = info["health"] - self.left_over_health
            enemy_health_reward = self.enemy_health - info["enemy_health"]
            self.consecutive_attacks += 1
            self.enemy_consecutive_attacks = 0
            if self.consecutive_attacks > 1:
                reward = (
                    left_over_health
                    + enemy_health_reward
                    + (5 * (self.consecutive_attacks - self.enemy_consecutive_attacks))
                )
            else:
                reward = left_over_health + enemy_health_reward
            self.enemy_health = info["enemy_health"]
            self.left_over_health = info["health"]
            self.consecutive_attacks = 0
            self.enemy_consecutive_attacks = 0

        elif info["health"] < 0:
            left_over_enemy_health = info["enemy_health"] - self.left_over_enemy_health
            health_penalty = self.health - info["health"]
            self.consecutive_attacks = 0
            self.enemy_consecutive_attacks += 1
            if self.enemy_consecutive_attacks > 1:
                reward = -(
                    left_over_enemy_health
                    + health_penalty
                    + (5 * (self.enemy_consecutive_attacks - self.consecutive_attacks))
                )
            else:
                reward = -(left_over_enemy_health + health_penalty)
            self.health = info["health"]
            self.left_over_enemy_health = info["enemy_health"]
            self.consecutive_attacks = 0
            self.enemy_consecutive_attacks = 0

        else:
            reward = 0

        self.enemy_health = info["enemy_health"]
        self.health = info["health"]

        return obs, reward, done, info

    def preprocess(self, observation):
        # gray-scaling
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        # resize
        resize = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_CUBIC)
        channels = np.reshape(resize, (84, 84, 1))
        return channels

    def render(self, *arg, **kwargs):
        self.game.render()

    def close(self):
        self.game.close()

In [None]:
env.close()

In [None]:
env = StreetFighter()


# Hyperparameter Tuning

In [None]:
# Import optuna for HPO optimization frame
import optuna
# Import PPO for algos
from stable_baselines3 import PPO, A2C
# Evaluate Policy
from stable_baselines3.common.evaluation import evaluate_policy
# Import the sb3 monitor for logging to access rewards
from stable_baselines3.common.monitor import Monitor
# import wrappers
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
import os


In [None]:
# save model and reload without retraining
LOG_DIR = './logs/'
OPT_DIR = './opt/'


In [None]:
# function to return hyperparameters - define objective function
# ppo hyperparameters to tune
# n_steps = batch size (frame) factor of 64
# gamma = discount rate for calculating returns
# learning rate = learning coefficient for optimizier
# clip_range = clipping amount for advantage calc
# gae_lambda = advantage smooth parameter

def optimize_ppo(trial):
    return {
        'n_steps': trial.suggest_int('n_steps', 0, 256),
        'gamma': trial.suggest_loguniform('gamma', 0.8, 0.9999),
        'learning_rate': trial.suggest_loguniform('learning_rate', 1e-8, 1e-6),
        'clip_range': trial.suggest_uniform('clip_range', 0.1, 0.3),
        'gae_lambda': trial.suggest_uniform('gae_lambda', 0.8, .99)
    }


In [None]:
# function to return hyperparameters - define objective function
# ppo hyperparameters to tune
# n_steps = batch size (frame) factor of 64
# gamma = discount rate for calculating returns
# learning rate = learning coefficient for optimizier
# clip_range = clipping amount for advantage calc
# gae_lambda = advantage smooth parameter


def optimize_a2c(trial):
    return {
        "n_steps": trial.suggest_int("n_steps", 0, 128),
        "gamma": trial.suggest_loguniform("gamma", 0.8, 0.9999),
        "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-3),
        "gae_lambda": trial.suggest_uniform("gae_lambda", 0.8, 0.99),
        "ent_coef": trial.suggest_uniform("ent_coef", 0, 0),
        "vf_coef": trial.suggest_uniform("vf_coef", 0.5, 0.5),
        "max_grad_norm": trial.suggest_uniform("max_grad_norm", 0.5, 0.5),
        "rms_prop_eps": trial.suggest_uniform("rms_prop_eps", 1e-05, 1e-05),
    }

In [None]:
def optimize_agent(trial):
    # change model_params based on PPO or A2C
    model_params = optimize_a2c(trial)
    env = StreetFighter()
    env = Monitor(env, LOG_DIR)
    env = DummyVecEnv([lambda: env])
    env = VecFrameStack(env, 4, channels_order='last')
    model = A2C('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
    model.learn(total_timesteps=300000)
    mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)
    env.close()
    SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
    model.save(SAVE_PATH)
    return mean_reward


In [None]:
study = optuna.create_study(direction='maximize')
study.optimize(optimize_agent, n_trials=20)

In [None]:
# see the best hyperparams
study.best_params

In [None]:
# n-step must be factor of 64 for PPO
import math
model_params = study.best_params
#n_steps_var = math.trunc(model_params["n_steps"]/64)
#model_params["n_steps"]=n_steps_var*64

In [None]:
model_params={
    "n_steps": 97,
    "gamma": 0.9123708231325064,
    "learning_rate": 1.7586069563249184e-09,
    "gae_lambda": 0.8103898505539483,
    "ent_coef": 0.0,
    "vf_coef": 0.5,
    "max_grad_norm": 0.5,
    "rms_prop_eps": 1e-05,
}

# Setup Callback

In [None]:
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True



In [None]:
CHECKPOINT_DIR = "./train/"

In [None]:
callback = TrainAndLoggingCallback(check_freq=10000, save_path = CHECKPOINT_DIR)

In [None]:
env.close()

# Training the Model

In [None]:
env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
model = A2C('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)


In [None]:
model.load(os.path.join(OPT_DIR, "trial_11_best_model.zip"))

In [None]:
model.load('./train/best_model_9450000.zip')

In [None]:
model.learn(total_timesteps=10000000, callback=callback)

In [None]:
# open tensorboard from the logs folder in terminal
# tensorboard --logdir=.

# Test the Model

In [None]:
model = A2C.load('./train/best_model_3000000.zip')

In [None]:
mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=1)

In [None]:
env.close()

In [None]:
# reset game to starting state 
obs = env.reset()
# flag indicating whether died/finished or not
done = False

# how many games to play
for game in range(1):
    # until not dead/finished
    while not done:
        if done:
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(action)
        # while not done, render our game, and add information to env.step
        time.sleep(0.01)


