## Setup the StreetFighter environment

In [1]:
%pip install --upgrade pip
%pip install gym==0.21.0
%pip install gym-retro
%pip install retrowrapper
%pip install opencv-python
%pip install matplotlib
%pip install torch 
%pip install stable-baselines3[extra]
%pip install stable-baselines3
%pip install optuna
%pip install tensorboard

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
zsh:1: no matches found: stable-baselines3[extra]
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting tensorboard
  Downloading tensorboard-2.11.0-py3-none-any.whl (6.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.0/6.0 MB[0m [31m16.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting google-auth-oauthlib<0.5,>=0.4.1
  Using cached google

In [2]:
# Run this if you need to reset the runtime because we aren't using retrowrapper yet
# env.close()

In [3]:
# import retro for retro games (Street Fighter)
import retro
# import retrowrapper
# import time to slow down the game
import time
import os 

# After downloading the ROM for Street Fighter, we used this command in the roms folder to connect it with our gym retro environment (python -m retro.import .)
# import the ROM for Street Fighter
gamename = "StreetFighterIISpecialChampionEdition-Genesis"
# env = retrowrapper.RetroWrapper(gamename)
env = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis')

### Figure out the observation and action space of the environment

In [4]:
env.observation_space

Box([[[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 ...

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]], [[[255 255 255]
  [255 255 255]
  [255 255 255]
  ...
  [255 255 255]
  [255 255 255]
  [255 255 255]]

 [[255 255 255]
  [255 255 255]
  [255 255 255]
  ...
  [255 255 255]
  [255 255 255]
  [255 255 255]]

 [[255 255 255]
  [255 255 255]
  [255 255 255]
  ...
  [255 255 255]
  [255 255 255]
  [255 255 255]]

 ...

 [[255 255 255]
  [255 255 255]
  [255 255 255]
  ...
  [255 255 255]
  [255 255 255]
  [255 255 255]]

 [[255 255 255]
  [255 255 255]
  [255 255 255]
  ...
  [255 255 255]
  [255 255 255]
  [255 255 255]]

 [[255 255 255]
  [255 255 255]
  [255 255 255]
  ...
 

This most likely tells us that each observation is an image of height 200, width of 256, and 3 channels of RGB

In [5]:
env.action_space
env.action_space.sample()

array([0, 1, 0, 1, 1, 1, 0, 0, 1, 1, 1, 0], dtype=int8)

This means that we have a one-hot-encoded vector of length 12 to represent our action space. This means that we have 2^12 possible actions!

# Preprocess the environment

### Agenda:
- Shrink the images so we have less pixels
- Calculate the frame delta (to understand movement and change within the game)
- Filter the action 
- Set the reward function to the score of the game

In [7]:
# import the environment base class
from gym import Env

# import opencv to process the image
import cv2
# import numpy to work calculate the frame delta
import numpy as np
# import the space shapes for our environment
from gym.spaces import MultiBinary, Box
# import matplotlib to plot the image
from matplotlib import pyplot as plt

In [8]:
# Create custom environment
class StreetFighter(Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        # startup an instance of the game
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED)
    
    def step(self, action):
        # take a step (using the base environment)
        obs, reward, done, info = self.game.step(action)
        # preprocess the observation
        obs = self.preprocess(obs)

        # calculate the frame delta
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs

        # calculate the score delta and reshape the reward function based on the score in the environment
        reward = info['score'] - self.score
        self.score = info['score']

        return frame_delta, reward, done, info

    def reset(self):
        obs = self.game.reset()
        # preprocess the image
        obs = self.preprocess(obs)
        # initialize the previous_frame value with the first frame
        self.previous_frame = obs
        # create a default value for the score delta
        self.score = 0
        return obs
    
    def preprocess(self, observation):
        # Grayscaling 
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        # resize the image
        resize = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_CUBIC)
        # add the channels value 
        channels = np.reshape(resize, (84, 84, 1))

        return channels
        
    def render(self, *args, **kwargs):
        self.game.render()

    def close(self):
        self.game.close()

In [9]:
# # Setup a game loop to see what the game looks like (testing)
# obs = env.reset()
# done = False
# # we are choosing to only play one game
# for game in range(1):
#     while not done:
#         if done:
#             obs = env.reset()
#         env.render()
#         action = env.action_space.sample()
#         obs, reward, done, info = env.step(action)
#         if reward > 0:
#             print(reward)

## Tune hyperparameters with Optuna

In [10]:
import optuna 
from stable_baselines3 import PPO
# useful for evaluting the current policy during our hyperparameter tuning
from stable_baselines3.common.evaluation import evaluate_policy
# import Monitor for logging
from stable_baselines3.common.monitor import Monitor
# import DummyVecEnv for vectorizing our environment and frame stacking
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

  from .autonotebook import tqdm as notebook_tqdm


In [11]:
LOG_DIR = "./logs/"
OPT_DIR = "./opt/"

In [13]:
# Function to return test hyperparameters
def optimize_ppo(trial):
    return {
        "n_steps": trial.suggest_int("n_steps", 2048, 8192),
        "gamma": trial.suggest_loguniform("gamma", 0.8, 0.9999),
        "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-4),
        "clip_range": trial.suggest_uniform("clip_range", 0.1, 0.4),
        "gae_lambda": trial.suggest_uniform("gae_lambda", 0.8, 0.99),
    }

In [14]:
# Setup the training loop and return the mean reward
total_steps = 100000
def train_ppo(trial):
    try:
        # setup the hyperparameters
        hyperparams = optimize_ppo(trial)
        # setup the environment
        env = StreetFighter()
        # setup the monitor (this is important since we are vectorizing the environment, because this allows us 
        # to get the mean episode reward and mean episode length)
        env = Monitor(env, LOG_DIR)
        # setup the vectorized environment
        env = DummyVecEnv([lambda: env])
        # setup the frame stacking
        env = VecFrameStack(env, n_stack=4, channels_order='last')
        # setup the model
        model = PPO("CnnPolicy", env, verbose=0, tensorboard_log=LOG_DIR, **hyperparams)
        # train the model
        model.learn(total_timesteps=total_steps)
        # evaluate the model
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
        # close the environment
        env.close()

        # save the best model
        SAVE_PATH = os.path.join(OPT_DIR, "trial_{}_best_model".format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward

    except Exception as e:
        return -1000

In [15]:
# NOTE that since we used a positive reward function, we are maximizing the reward
study = optuna.create_study(direction="maximize")
study.optimize(train_ppo, n_trials=100, n_jobs=1)

[32m[I 2022-12-25 21:08:35,941][0m A new study created in memory with name: no-name-a6459919-e494-469f-8592-9d50ade99ee9[0m
  "gamma": trial.suggest_loguniform("gamma", 0.8, 0.9999),
  "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-4),
  "clip_range": trial.suggest_uniform("clip_range", 0.1, 0.4),
  "gae_lambda": trial.suggest_uniform("gae_lambda", 0.8, 0.99),
We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=7489 and n_envs=1)
[32m[I 2022-12-25 21:12:36,216][0m Trial 0 finished with value: 19200.0 and parameters: {'n_steps': 7489, 'gamma': 0.9774495372176722, 'learning_rate': 7.004741008281202e-05, 'clip_range': 0.12141611344965272, 'gae_lambda': 0.8172720382244215}. Best is trial 0 with value: 19200.0.[0m


In [17]:
best_model = PPO.load(os.path.join(OPT_DIR, "trial_{}_best_model".format(study.best_trial.number)))

# Setup Callback

In [None]:
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = "./train/"
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

# Train Model

In [None]:
# Recreate the environment
env = StreetFighter()
# setup the monitor (this is important since we are vectorizing the environment, because this allows us
# to get the mean episode reward and mean episode length)
env = Monitor(env, LOG_DIR)
# setup the vectorized environment
env = DummyVecEnv([lambda: env])
# setup the frame stacking
env = VecFrameStack(env, n_stack=4, channels_order='last')