# Install library

In [None]:
%pip install -q pip==23.3.2
%pip install -q gym_super_mario_bros nes_py
%pip install -q wheel==0.38.4 setuptools==66.0.0 gym==0.21.0
%pip install -q numpy==1.26.4 opencv-python==4.8.0.76

In [None]:
%pip install -q torch==2.1.1 torchvision==0.16.1 torchaudio==2.1.1 --index-url https://download.pytorch.org/whl/cu121

In [None]:
%pip install -q stable-baselines3[extra]==1.5.0

In [1]:
import torch

if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"CUDA is available. Using device: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    print("CUDA is not available. Using CPU.")

CUDA is available. Using device: NVIDIA GeForce RTX 4060 Laptop GPU


# Setup environment and wrappers


In [2]:
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from nes_py.wrappers import JoypadSpace
from gym.wrappers import GrayScaleObservation, ResizeObservation
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv
from stable_baselines3.common.monitor import Monitor

In [3]:
from gym import RewardWrapper

class CustomRewardWrapper(RewardWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.curr_score = 0

    def reset(self, **kwargs):
        self.curr_score = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        state, reward, done, info = self.env.step(action)
        reward += (info["score"] - self.curr_score) / 40.

        self.curr_score = info["score"]

        if done:
            if info["flag_get"]:
                reward += 50
            else:
                reward -= 50

        return state, reward/10, done, info

In [4]:
from gym import Wrapper

class SkipFrame(Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        done = False
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info

In [17]:
def create_env():
    env = gym_super_mario_bros.make('SuperMarioBros-v0')
    env = JoypadSpace(env, SIMPLE_MOVEMENT)
    env = CustomRewardWrapper(env)
    env = SkipFrame(env, skip=4)
    env = GrayScaleObservation(env, keep_dim=True)
    env = ResizeObservation(env,    (84, 84))
    env = Monitor(env)
    env = DummyVecEnv([lambda: env])
    env = VecFrameStack(env, 4, channels_order='last')
    return env

In [18]:
env = create_env()

# Training

In [8]:
import os 
from stable_baselines3 import PPO

In [9]:
CHECKPOINT_DIR = './train/'
LOG_DIR = './logs/'

In [10]:
model = PPO(
    'CnnPolicy',
    env,
    verbose=1,
    learning_rate=0.0001,
    n_steps=512,
    device ="auto",
    gae_lambda=1.0,
    ent_coef=0.01,
    gamma=0.9,
    batch_size=64,
    n_epochs=10,
    tensorboard_log=LOG_DIR
)

Using cuda device
Wrapping the env in a VecTransposeImage.


In [11]:
from stable_baselines3.common.callbacks import BaseCallback
# import numpy as np

class TrainingProgressMonitor(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainingProgressMonitor, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.model.num_timesteps % self.check_freq == 0 and self.model.num_timesteps > 0:
            model_path = os.path.join(self.save_path, 'model_{}_steps'.format(self.model.num_timesteps))
            self.model.save(model_path)  
        return True

In [12]:
callback = TrainingProgressMonitor(check_freq=200000, save_path=CHECKPOINT_DIR)

In [13]:
model.learn(total_timesteps=4_000_000, callback=callback)

Logging to ./logs/PPO_1
----------------------------
| time/              |     |
|    fps             | 108 |
|    iterations      | 1   |
|    time_elapsed    | 4   |
|    total_timesteps | 512 |
----------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 770        |
|    ep_rew_mean          | 172        |
| time/                   |            |
|    fps                  | 93         |
|    iterations           | 2          |
|    time_elapsed         | 10         |
|    total_timesteps      | 1024       |
| train/                  |            |
|    approx_kl            | 0.01647126 |
|    clip_fraction        | 0.157      |
|    clip_range           | 0.2        |
|    entropy_loss         | -1.94      |
|    explained_variance   | -0.00917   |
|    learning_rate        | 0.0001     |
|    loss                 | 1.65       |
|    n_updates            | 10         |
|    policy_gradient_loss | -0.0135

<stable_baselines3.ppo.ppo.PPO at 0x193ff268d60>

# Test

In [14]:
model = PPO.load('./train/model_4000000_steps', env=env)

Wrapping the env in a VecTransposeImage.


In [16]:
import time

state = env.reset()

try:
    while True:
        action, _ = model.predict(state)
        state, reward, done, info = env.step(action)
        env.render()
        time.sleep(1/10000)
        if done:
            state = env.reset()

except KeyboardInterrupt:
    print("\nInterrupted by user. Closing environment...")
    env.close()
    env = create_env()


Interrupted by user. Closing environment...
