In [None]:
#import package
import retro
import numpy as np
import cv2
import os
import time

#gym
import gym
from gym import Env
from gym.spaces import MultiBinary, Box

# stable baseline
from stable_baselines3 import A2C
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.common.monitor import Monitor

In [None]:
kModelName = 'model/Ryu'
kLogDir = './logs_TRPO_CNNvsMPL/logs_TRPO_MLP_OP'

In [3]:
!python3 -m retro.import ./roms # Run this from the roms folder, or where you have your game roms 

Imported 0 games


In [4]:
#class for StreetFighter environment
class StreetFighter(Env):
    def __init__(self, state):
        super().__init__()
        
        self.observation_space = Box(low=0, high=255, shape=(100,100,1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED, state=state)
                    
    def step(self, action):
        obs, reward, done, info = self.game.step(action)
        obs = self.preprocess(obs)

        reward = 0
        
        if self.combo_count > 0:
            self.combo_count -= 0.02

        if self.enemy_combo_count > 0:
            self.enemy_combo_count -= 0.02        
        
        # reward using health
        # reward = (self.enemy_health - info['enemy_health'])*2 + (info['health'] - self.health)
        if done:
            
            if info['matches_won'] == 1:
                reward += 50
            elif info['enemy_matches_won'] == 1:
                reward -= 50
            
        else:
            enemy_damage = self.enemy_health - info['enemy_health']
            damage = self.health - info['health']
            
            if enemy_damage > 0 and damage > 0: 
                # double hit
                reward += enemy_damage - damage
            elif enemy_damage > 0:
                reward += enemy_damage + self.combo_count * 8
                self.combo_count += 1
                self.enemy_combo_count = 0
            elif damage > 0: 
                reward -= damage + self.enemy_combo_count * 8
                self.enemy_combo_count += 1
                self.combo_count = 0
            
        self.health = info['health']
        self.enemy_health = info['enemy_health']
        
        return obs, reward, done, info

    def render(self, *args, **kwargs): 
        self.game.render()


    def reset(self):
        self.previous_frame = np.zeros(self.game.observation_space.shape)

        # Frame delta
        obs = self.game.reset()
        obs = self.preprocess(obs)
        self.previous_frame = obs

        # Initial variables
        self.health = 176
        self.enemy_health = 176
        self.combo_count = 0
        self.enemy_combo_count = 0
        return obs

    def preprocess(self, observation): 
        # process the input image
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (100,100), interpolation=cv2.INTER_AREA)
        state = np.reshape(resize, (100,100, 1))
        return state

    def close(self): 
        self.game.close()

In [5]:
from stable_baselines3.common.callbacks import BaseCallback
class TensorboardCallback(BaseCallback):
    """
    Custom callback for plotting additional values in tensorboard.
    """

    def __init__(self, verbose=0):
        super().__init__(verbose)

    def _on_step(self) -> bool:
        # Log scalar value (here a random variable)
        value = np.random.random()
        self.logger.record("random_value", value)
        return True



In [68]:
kCharacters  = ['Balrog', 'Blanka', 'ChunLi', 'Dhalsim', 'EHonda', 'Guile', 'Ken', 'MBison', 'Ryu', 'Sagat', 'Vega', 'Zangief' ]

model = None
i = 0
for opponent in kCharacters:
    state = f'L4_Ryu_{opponent}'
    
    env = StreetFighter(state)
    env = Monitor(env, kLogDir)
    env = DummyVecEnv([lambda: env])
    env = VecFrameStack(env, 4, channels_order='last')

    #model = PPO('CnnPolicy', env, tensorboard_log=kLogDir, verbose=1 )
    #model.learn(total_timesteps=50000)
    #model = A2C('CnnPolicy', "CartPole-v1", env, n_steps=2048, device = 'cuda', tensorboard_log=kLogDir, verbose=1)
    #model.learn(total_timesteps=50000,tb_log_name="first_run" )
    
    if model is None:
        model = A2C("MlpPolicy", env, verbose=0, tensorboard_log=kLogDir )
        
    model.set_env(env) 
    model.learn(total_timesteps=20_000)
    model.save( f'{kModelName}_{i}.zip')
    i=i+1
    
    #env = VecFrameStack(env, 4, channels_order='last')    
    #env = DummyVecEnv([lambda: env])
    #model.set_env(env)

    #model.learn(total_timesteps=50000, tb_log_name="second_run")
    #model.save( f'{kModelName}_{i}.zip')
    #i=i+1
    """
    model.learn(total_timesteps=100000)
    model.save( f'{kModelName}_{i}.zip')
    i=i+1
    model.learn(total_timesteps=100000)
    model.save( f'{kModelName}_{i}.zip')
    i=i+1
    model.learn(total_timesteps=100000)
    model.save( f'{kModelName}_{i}.zip')
    i=i+1
    """
    env.close()


In [67]:
env.close()

In [6]:
env = StreetFighter('L4_Ryu_Guile')
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')
model = A2C.load('Ryu_A2C_MLP_1.zip')
model.set_env(env)
for episode in range(1): 
    obs = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(action)
        env.render()
        time.sleep(0.001)
        if reward != 0 :
            print(f'Reward: {reward}')
        total_reward += reward
    print('Total Reward for episode {} is {}'.format(total_reward, episode))
    time.sleep(2)
env.close()

Reward: [40.]
Reward: [7.]
Reward: [46.]
Reward: [-24.]
Reward: [-24.]
Reward: [-33.]
Reward: [12.]
Reward: [-11.]
Reward: [25.]
Reward: [26.]
Reward: [18.]
Reward: [3.]
Reward: [50.]
Total Reward for episode [135.] is 0


In [64]:
env.close()

In [None]:
kCharacters  = ['Balrog', 'Blanka', 'ChunLi', 'Dhalsim', 'EHonda', 'Guile', 'Ken', 'MBison', 'Ryu', 'Sagat', 'Vega', 'Zangief' ]
model = A2C.load('Ryu_A2C_MLP_1.zip')

rounds = 100
total_win_count = 0;

for opponent in kCharacters:
    
    state = f'L4_Ryu_{opponent}'
    env = StreetFighter(state)
    env = DummyVecEnv([lambda: env])
    env = VecFrameStack(env, 4, channels_order='last')
    
    win_count = 0
    
    for i in range(rounds): 
        obs = env.reset()
        done = False
        
        while not done: 
            action, _ = model.predict(obs)
            obs, reward, done, info = env.step(action)
            if done:
                #print(info)
                if info[0]['matches_won'] == 1:
                    win_count += 1
        win_rate = win_count / rounds * 100.0
    print( f'Vs {opponent} {win_count} / {rounds} = {win_rate}%')
    total_win_count += win_count
    win_count = 0
    env.close()
    
total_win_rate = total_win_count / ( rounds * len(kCharacters) )  * 100.0
print( f'total_win_rate {total_win_rate}' )

NameError: name 'A2C' is not defined

In [91]:
env.close()

0

Vs Balrog 53 / 100 = 53.0%
Vs Blanka 68 / 100 = 68.0%
Vs ChunLi 81 / 100 = 81.0%
Vs Dhalsim 86 / 100 = 86.0%
Vs EHonda 72 / 100 = 72.0%
Vs Guile 63 / 100 = 63.0%
Vs Ken 30 / 100 = 30.0%
Vs MBison 33 / 100 = 33.0%
Vs Ryu 22 / 100 = 22.0%
Vs Sagat 7 / 100 = 7.000000000000001%
Vs Vega 43 / 100 = 43.0%
Vs Zangief 50 / 100 = 50.0%
total_win_rate 50.66666666666667