In [1]:
import gym
from gym.wrappers import Monitor
import retro
from gym import Env
from gym.spaces import Box, Discrete
import numpy as np
import cv2
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback

class StreetFighter(Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = Discrete(12)
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED)
        self.action_map = self.create_action_map()
    
    def create_action_map(self):
        actions = []
        for i in range(12):
            action = np.zeros(12)
            action[i] = 1
            actions.append(action)
        return actions
    
    def reset(self):
        obs = self.game.reset()
        obs = self.preprocess(obs)
        self.previous_frame = obs
        self.score = 0
        return obs
    
    def preprocess(self, observation):
        # Convert the image to HSV color space
        hsv = cv2.cvtColor(observation, cv2.COLOR_BGR2HSV) 
        # Define color ranges for red, blue, and green in HSV
        lower_red1 = np.array([0, 50, 50])
        upper_red1 = np.array([10, 255, 255])
        lower_red2 = np.array([170, 50, 50])
        upper_red2 = np.array([180, 255, 255])
        lower_blue = np.array([100, 50, 50])
        upper_blue = np.array([140, 255, 255])
        lower_green = np.array([40, 50, 50])
        upper_green = np.array([80, 255, 255])       
        # Create masks for red, blue, and green colors
        mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
        mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
        mask_blue = cv2.inRange(hsv, lower_blue, upper_blue)
        mask_green = cv2.inRange(hsv, lower_green, upper_green)
        # Combine the masks
        mask_red = cv2.bitwise_or(mask_red1, mask_red2)
        mask = cv2.bitwise_or(mask_red, mask_blue)
        mask = cv2.bitwise_or(mask, mask_green)
        # Apply the mask to the original image
        filtered = cv2.bitwise_and(observation, observation, mask=mask) 
        # Convert the filtered image to grayscale
        gray = cv2.cvtColor(filtered, cv2.COLOR_BGR2GRAY)
        # Resize the image
        resize = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_CUBIC)
        # Add the channels value
        channels = np.reshape(resize, (84, 84, 1))
        return channels
    
    def step(self, action):
        mapped_action = self.action_map[action]
        obs, reward, done, info = self.game.step(mapped_action)
        obs = self.preprocess(obs)
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs
        reward = (info['score'] - self.score) + 0.1  # Add a small reward for staying alive
        self.score = info['score']
        return frame_delta, reward, done, info

    def render(self, *args, **kwargs):
        self.game.render()
    
    def close(self):
        self.game.close()

# Create and wrap the environment
env = StreetFighter()
env = Monitor(env, './logs/', force=True)

# Define the DQN model with tuned parameters
model = DQN(
    'CnnPolicy',
    env,
    verbose=1,
    learning_rate=0.0005,  # Slightly higher learning rate for faster convergence
    buffer_size=200000,  # Larger buffer size for more diverse experiences
    learning_starts=5000,  # Start learning after more steps
    batch_size=64,  # Larger batch size for more stable updates
    tau=0.1,
    gamma=0.98,  # Discount factor for future rewards
    train_freq=4,  # Train every 4 steps
    target_update_interval=1000,  # More frequent target network updates
)

# Define a callback for evaluation
eval_callback = EvalCallback(env, best_model_save_path='./logs/', log_path='./logs/', eval_freq=5000, n_eval_episodes=5, render=False)

# Train the model
model.learn(total_timesteps=10, callback=eval_callback)

# Save the trained model
model.save("DQN_StreetFighter")

# Close the training environment to avoid emulator conflict
env.close()

# Create a new evaluation environment and wrap it with Monitor
eval_env = StreetFighter()
eval_env = Monitor(eval_env, './logs/', force=True)

# Load the trained model
model = DQN.load("DQN_StreetFighter")

# Evaluate the model
mean_reward, _ = evaluate_policy(model, eval_env, render=True, n_eval_episodes=10)
print(f"Mean reward: {mean_reward}")

# Close the evaluation environment
eval_env.close()

  logger.warn(


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


  logger.warn(


Mean reward: 2631.7999605834484


## Tuned Parameters

In [2]:
import gym
from gym.wrappers import Monitor
import retro
from gym import Env
from gym.spaces import Box, Discrete
import numpy as np
import cv2
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback

class StreetFighter(Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = Discrete(12)
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED)
        self.action_map = self.create_action_map()
    
    def create_action_map(self):
        actions = []
        for i in range(12):
            action = np.zeros(12)
            action[i] = 1
            actions.append(action)
        return actions
    
    def reset(self):
        obs = self.game.reset()
        obs = self.preprocess(obs)
        self.previous_frame = obs
        self.score = 0
        return obs
    
    def preprocess(self, observation):
        # Convert the image to HSV color space
        hsv = cv2.cvtColor(observation, cv2.COLOR_BGR2HSV) 
        # Define color ranges for red, blue, and green in HSV
        lower_red1 = np.array([0, 50, 50])
        upper_red1 = np.array([10, 255, 255])
        lower_red2 = np.array([170, 50, 50])
        upper_red2 = np.array([180, 255, 255])
        lower_blue = np.array([100, 50, 50])
        upper_blue = np.array([140, 255, 255])
        lower_green = np.array([40, 50, 50])
        upper_green = np.array([80, 255, 255])       
        # Create masks for red, blue, and green colors
        mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
        mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
        mask_blue = cv2.inRange(hsv, lower_blue, upper_blue)
        mask_green = cv2.inRange(hsv, lower_green, upper_green)
        # Combine the masks
        mask_red = cv2.bitwise_or(mask_red1, mask_red2)
        mask = cv2.bitwise_or(mask_red, mask_blue)
        mask = cv2.bitwise_or(mask, mask_green)
        # Apply the mask to the original image
        filtered = cv2.bitwise_and(observation, observation, mask=mask) 
        # Convert the filtered image to grayscale
        gray = cv2.cvtColor(filtered, cv2.COLOR_BGR2GRAY)
        # Resize the image
        resize = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_CUBIC)
        # Add the channels value
        channels = np.reshape(resize, (84, 84, 1))
        return channels
    
    def step(self, action):
        mapped_action = self.action_map[action]
        obs, reward, done, info = self.game.step(mapped_action)
        obs = self.preprocess(obs)
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs
        reward = (info['score'] - self.score) + 0.1  # Add a small reward for staying alive
        self.score = info['score']
        return frame_delta, reward, done, info

    def render(self, *args, **kwargs):
        self.game.render()
    
    def close(self):
        self.game.close()

# Create and wrap the environment
env = StreetFighter()
env = Monitor(env, './logs/', force=True)

# Define the DQN model with tuned parameters
model = DQN(
    'CnnPolicy',
    env,
    verbose=1,
    learning_rate=0.0005,  # Slightly higher learning rate for faster convergence
    buffer_size=200000,  # Larger buffer size for more diverse experiences
    learning_starts=5000,  # Start learning after more steps
    batch_size=64,  # Larger batch size for more stable updates
    tau=0.1,
    gamma=0.98,  # Discount factor for future rewards
    train_freq=5,  # Train every 4 steps
    target_update_interval=1000,  # More frequent target network updates
)

# Define a callback for evaluation
eval_callback = EvalCallback(env, best_model_save_path='./logs/', log_path='./logs/', eval_freq=1000, n_eval_episodes=5, render=False)

# Train the model
model.learn(total_timesteps=1000, callback=eval_callback)

# Save the trained model
model.save("DQN_StreetFighter")

# Close the training environment to avoid emulator conflict
env.close()

# Create a new evaluation environment and wrap it with Monitor
eval_env = StreetFighter()
eval_env = Monitor(eval_env, './logs/', force=True)

# Load the trained model
model = DQN.load("DQN_StreetFighter")

# Evaluate the model
mean_reward, _ = evaluate_policy(model, eval_env, render=True, n_eval_episodes=10)
print(f"Mean reward: {mean_reward}")

# Close the evaluation environment
eval_env.close()

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.




Mean reward: 3353.899971485138
