In [None]:
%pip install pip==23.2
%pip install gym-retro
%pip install setuptools==65.5.0 "wheel<0.40.0"
%pip install gym==0.21.0

In [None]:
# Import retro to play Street Fighter using a ROM
import retro
# Import time to slow down game
import time

In [None]:
# python -m retro.import . # Run this from the roms folder, or where you have your game roms
!python -m retro.import ./ROMs

# Setup Environment
## What we are going to do! FUNNN

- Observation Preprocess - grayscale (DONE), frame delta, resize the frame so we have less pixels (DONE)
- Filter the action - parameter DONE
- Reward function - set this to the score


In [None]:
%pip install opencv-python
%pip install matplotlib

In [None]:
# Import environment base class for a wrapper 
from gym import Env 
# Import the space shapes for the environment
from gym.spaces import Discrete, MultiBinary, Box
# Import numpy to calculate frame delta 
import numpy as np
# Import opencv for grayscaling
import cv2
# Import matplotlib for plotting the image
from matplotlib import pyplot as plt

from collections import deque

import math

In [None]:
# 1. frame
# 2. preprocess 200x256x3 -> 84x84x1
# 3. change in pixels: current_frame-last_frame

In [None]:
# this is reward 2 (health, wins, and score)
class StreetFighter(Env):
    def __init__(self):
        super().__init__()
        # Constants for reward calculation
        self.START_HEALTH = 176  # Starting health in Street Fighter II
        self.ROUND_WIN_MULTIPLIER = 2
        self.ROUND_LOSS_MULTIPLIER = -1
        
        self.observation_space = Box(
            low=0,
            high=255,
            shape=(84, 84, 3),  # Keep 3 channels for RGB
            dtype=np.uint8
        )
        self.action_space = MultiBinary(12)  # type of actions that can be taken
        self.game = retro.make(
            game='StreetFighterIISpecialChampionEdition-Genesis',
            use_restricted_actions=retro.Actions.FILTERED
        )
        
        # Initialize health tracking
        self.enemy_health = self.START_HEALTH
        self.agent_health = self.START_HEALTH
        self.score = 0

    def reset(self):
        obs = self.game.reset()
        obs = self.preprocess(obs)
        self.previous_frame = obs
        
        # Reset game state variables
        self.score = 0
        self.enemy_health = self.START_HEALTH
        self.agent_health = self.START_HEALTH
        
        return obs
    
    def preprocess(self, observation):
        # Resize first to reduce computation
        resized = cv2.resize(observation, (84, 84), interpolation=cv2.INTER_AREA)
        
        # Simple color quantization using bitwise operations
        # Reduce to 3 bits per channel (8 values per channel)
        quantized = resized & 0b11100000
        
        # Optional: Create more distinct colors by increasing contrast
        # This helps make different elements more distinguishable
        quantized = cv2.convertScaleAbs(quantized, alpha=1.2, beta=10)
        
        return quantized
        
    def calculate_reward(self, info):
        reward = 0

        # 1. Score reward
        reward += (info['score'] - self.score) * .1

        # 2. Round outcome rewards with health
        if info['enemy_health'] <= 0:  # Victory
            health_ratio = self.agent_health / self.START_HEALTH
            reward += self.ROUND_WIN_MULTIPLIER * health_ratio
        elif info['health'] <= 0:  # Loss
            health_ratio = (info['enemy_health'] / self.START_HEALTH)
            reward += self.ROUND_LOSS_MULTIPLIER * health_ratio

        return reward
    
    def step(self, action):
        obs, _, done, info = self.game.step(action)
        obs = self.preprocess(obs)
        
        reward = self.calculate_reward(info)
        
        # Update health tracking
        self.enemy_health = info['enemy_health']
        self.agent_health = info['health']
        self.score = info['score']
        
        return obs, reward, done, info
    
    def render(self, *args, **kwargs):
        self.game.render()

    def close(self):
        self.game.close()

# Hyperparameter Tune

https://pytorch.org/get-started/locally/  <- use this site to download pytorch

In [None]:
%pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu121
# these specific versions were needed for cuda 

In [None]:
%pip install stable-baselines3[extra]==1.3.0

In [None]:
%pip install optuna

In [None]:
# Importing the optimzation frame - HPO
import optuna
# PPO algo for RL
from stable_baselines3 import PPO, A2C, DQN
# Bring in the eval policy method for metric calculation
from stable_baselines3.common.evaluation import evaluate_policy
# Import the sb3 monitor for logging 
from stable_baselines3.common.monitor import Monitor
# Import the vec wrappers to vectorize and frame stack
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
# Import os to deal with filepaths
import os

In [None]:
LOG_DIR = './logs/'
OPT_DIR = './opt/'

In [None]:
# alternative version to use later to bypass factor 64 error

# PPO optimization parameters
PPO_PARAMS = {
    # 1. Core Parameters
    'n_steps_range': (2048, 4096),             
    'gamma_range': (0.95, 0.9999),               
    'learning_rate_range': (5e-8, 1e-6),        
    'clip_range_range': (0.1, 0.3),           
    'gae_lambda_range': (0.9, 0.98),             
    # 2. Advanced Parameters
    'ent_coef_range': (1e-8, 1e-3),          
    'vf_coef_range': (0.5, 1.0),             
    'n_epochs_range': (5, 15),                
    'batch_size_range': (64, 256)             
}

A2C_PARAMS = {
    # Your current parameters
    'n_steps_range': (2, 30),                  # Small steps, more frequent updates
    'gamma_range': (0.9, 0.9999),               # General discount range
    'learning_rate_range': (1e-9, 1e-3),       # Higher learning rates typically better
    'ent_coef_range': (1e-8, 1e-3),           # Entropy coefficient for exploration
    'vf_coef_range': (0.2, 1.0),              # Value function coefficient   
    'gae_lambda_range': (0.9, 0.98),           # Generalized Advantage Estimation lambda           
}

DQN_PARAMS = {
    'buffer_size_range': (20000, 40000),         # Balanced for 84x84x3 RGB observations
    'gamma_range': (0.95, 0.9999),                 # Slightly lower gamma since rewards are score-based
    'learning_rate_range': (1e-5, 5e-5),         # Lower learning rate for stability with image inputs
    'batch_size_range': (32, 64),                # Smaller batches for image processing
    'train_freq_range': (4, 8),                  # Update frequently to capture fighting game dynamics
    'target_update_interval_range': (1000, 3000), # Regular target updates
    'exploration_fraction_range': (0.4, 0.6),     # Longer exploration for 12 possible actions
    'exploration_final_eps_range': (0.08, 0.12),  # Higher final exploration due to action space
    'learning_starts_range': (10000, 20000)       # More initial experience for image-based 
}

# Define the optimization function for PPO
def optimize_ppo(trial):
    # Parameter Selection Logic
    params = {
        'n_steps': trial.suggest_categorical('n_steps', range(PPO_PARAMS['n_steps_range'][0], PPO_PARAMS['n_steps_range'][1], 64)),
        'gamma': trial.suggest_loguniform('gamma', *PPO_PARAMS['gamma_range']),
        'learning_rate': trial.suggest_loguniform('learning_rate', *PPO_PARAMS['learning_rate_range']),
        'clip_range': trial.suggest_uniform('clip_range', *PPO_PARAMS['clip_range_range']),
        'gae_lambda': trial.suggest_uniform('gae_lambda', *PPO_PARAMS['gae_lambda_range']),
        'ent_coef': trial.suggest_loguniform('ent_coef', *PPO_PARAMS['ent_coef_range']),
        'vf_coef': trial.suggest_uniform('vf_coef',  *PPO_PARAMS['vf_coef_range']),
        'n_epochs': trial.suggest_int('n_epochs', *PPO_PARAMS['n_epochs_range']),
        'batch_size': trial.suggest_categorical('batch_size', [64, 128, 256]),
        
        # Fixed Parameters (Stability Controls)
        'max_grad_norm': 0.5,          # Prevents explosive gradients
        # 'clip_range_vf': None,         # Uses same clipping as policy
        # 'target_kl': None,             # No KL divergence target
    }
    return params

def optimize_a2c(trial):
    return {
        'n_steps': trial.suggest_int('n_steps', *A2C_PARAMS['n_steps_range']),
        'gamma': trial.suggest_loguniform('gamma', *A2C_PARAMS['gamma_range']),
        'learning_rate': trial.suggest_loguniform('learning_rate', *A2C_PARAMS['learning_rate_range']),
        'ent_coef': trial.suggest_loguniform('ent_coef', *A2C_PARAMS['ent_coef_range']),
        'vf_coef': trial.suggest_uniform('vf_coef', *A2C_PARAMS['vf_coef_range']),
        'gae_lambda': trial.suggest_uniform('gae_lambda', *A2C_PARAMS['gae_lambda_range']),
        # Fixed Parameters (Stability Controls)
        'max_grad_norm': 0.5,          # Prevents explosive gradients
    }

def optimize_dqn(trial):
    return {
        'buffer_size': trial.suggest_int('buffer_size', *DQN_PARAMS['buffer_size_range']),
        'gamma': trial.suggest_float('gamma', *DQN_PARAMS['gamma_range']),
        'learning_rate': trial.suggest_float('learning_rate', *DQN_PARAMS['learning_rate_range'], log=True),
        'batch_size': trial.suggest_int('batch_size', *DQN_PARAMS['batch_size_range']),
        'train_freq': trial.suggest_int('train_freq', *DQN_PARAMS['train_freq_range']),
        'target_update_interval': trial.suggest_int('target_update_interval', 
                                                  *DQN_PARAMS['target_update_interval_range']),
        'exploration_fraction': trial.suggest_float('exploration_fraction', 
                                                  *DQN_PARAMS['exploration_fraction_range']),
        'exploration_final_eps': trial.suggest_float('exploration_final_eps', 
                                                   *DQN_PARAMS['exploration_final_eps_range']),
        'learning_starts': trial.suggest_int('learning_starts', *DQN_PARAMS['learning_starts_range'])
    }

In [None]:
SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(1))

In [None]:
ALGORITHMS = {
    'PPO': (PPO, optimize_ppo),
    'A2C': (A2C, optimize_a2c),
    'DQN': (DQN, optimize_dqn),
}

def optimize_agent(trial, algo_name='PPO'):
    try:
        # Print trial start
        print(f"\nStarting Trial {trial.number}")
        
        # Select algorithm and get hyperparameters
        ModelClass, optimize_fn = ALGORITHMS[algo_name]
        model_params = optimize_fn(trial)

        # Create environment with error checking
        try:
            env = StreetFighter()
        except Exception as e:
            print(f"Environment creation failed: {str(e)}")
            raise

        try:
            env = Monitor(env, LOG_DIR)
            env = DummyVecEnv([lambda: env])
            env = VecFrameStack(env, 4, channels_order='last')
        except Exception as e:
            print(f"Environment wrapper failed: {str(e)}")
            raise

        # Initialize model with error checking
        try:
            model = ModelClass('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)
            print(f"Model initialized on device: {model.device}")
        except Exception as e:
            print(f"Model initialization failed: {str(e)}")
            raise

        # Training with error checking
        try:
            model.learn(total_timesteps=100000)
            print(f"Training completed for trial {trial.number}")
        except Exception as e:
            print(f"Training failed: {str(e)}")
            raise

        # Evaluation with error checking
        try:
            mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)
            print(f"Evaluation completed with mean reward: {mean_reward}")
        except Exception as e:
            print(f"Evaluation failed: {str(e)}")
            raise

        env.close()
        
        # Save model
        SAVE_PATH = os.path.join(OPT_DIR, f'trial_{trial.number}_best_model')
        model.save(SAVE_PATH)
        
        return mean_reward

    except Exception as e:
        print(f"\nTrial {trial.number} failed with error:\n{str(e)}\n")
        print(f"Error type: {type(e)}")
        import traceback
        print(traceback.format_exc())
        return -1000

In [None]:
ALGO = 'PPO' # A2C, DQN, PPO

In [None]:
# Creating the experiment 
study = optuna.create_study(direction='maximize') # since mean reward is positive we maximize, otherwise minimize
study.optimize(lambda trial: optimize_agent(trial, algo_name=ALGO), n_trials=25) # previously 50; 10 is a demo

In [None]:
study.best_params

In [None]:
study.best_trial

# Setup Callback

In [None]:
# Import base callback 
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback): # continuously learn by starting from best parameters done above

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train/'

In [None]:
callback = TrainAndLoggingCallback(check_freq=5000, save_path=CHECKPOINT_DIR)

# Train Model

In [None]:
# Create environment 
env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

# # loading model from zip (temp, comment out later)
# model_path = os.path.join(OPT_DIR, 'trial_4_best_model')
# model_from_zip = PPO.load(model_path)

In [None]:
def closest_factor_64_round_down(num):
    return num - (num % 64)

In [None]:
# # loading model params from zip (temp, comment out layer)
# model_params = {
#     'n_steps': model_from_zip.n_steps,           
#     'gamma': model_from_zip.gamma,          
#     'learning_rate': model_from_zip.learning_rate,   
#     'clip_range': model_from_zip.clip_range,        
#     'gae_lambda': model_from_zip.gae_lambda,  
#     'ent_coef': model_from_zip.ent_coef,        
#     'vf_coef': model_from_zip.vf_coef,            
#     'n_epochs': model_from_zip.n_epochs,              
#     'batch_size': model_from_zip.batch_size,
# }
model_params = study.best_params
if ALGO == 'PPO':
    model_params['n_steps'] = closest_factor_64_round_down(model_params['n_steps'])
# model_params['learning_rate'] = 5e-7 -> if really slow at training
model_params

In [None]:
model = None
if ALGO == 'PPO':
    model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)
elif ALGO == 'A2C':
    model = A2C('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)
else:
    model = DQN('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)

In [None]:
# Reload previous weights from HPO
best_trial = study.best_trial.number
model.load(os.path.join(OPT_DIR, 'trial_{}_best_model.zip').format(best_trial))

In [None]:
TRAINING_TIMESTEPS = 10000000 # previously 5 mil; 100k is demo

In [None]:
# Kick off training 
model.learn(total_timesteps=TRAINING_TIMESTEPS, callback=callback) 

In [None]:
# tensorboard --logdir=. 
# cd to logs
# ^ use to visually see learning progress

# Evaluate Model

In [None]:
if ALGO == 'PPO':
    model = PPO.load(f'./train/best_model_{TRAINING_TIMESTEPS}.zip')
elif ALGO == 'A2C':
    model = A2C.load(f'./train/best_model_{TRAINING_TIMESTEPS}.zip')
else:
    model = DQN.load(f'./train/best_model_{TRAINING_TIMESTEPS}.zip')

In [None]:
mean_reward, _ = evaluate_policy(model, env, render=True, n_eval_episodes=1)

In [None]:
mean_reward

# Testing Model

In [None]:
obs = env.reset()

In [None]:
obs.shape

In [None]:
env.step(model.predict(obs)[0])

In [None]:
# Reset game to starting state
obs = env.reset()
# Set flag to flase
done = False
for game in range(1): 
    while not done: 
        if done: 
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0]
        print(action)
        obs, reward, done, info = env.step(action)
        time.sleep(0.01)
        # print(reward)