In [None]:
!pip install gym_super_mario_bros==7.3.0 nes_py

In [None]:
!pip install stable-baselines3[extra]

In [4]:
import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT

## Setting up the game environment with gynamsium

In [5]:
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, SIMPLE_MOVEMENT)

  logger.warn(


In [7]:
# Create a flag - restart or not
done = True
# Loop through each frame in the game
for step in range(100000):
    # Start the game to begin with
    if done:
        # Start the game
        env.reset()
    state, reward, done, info = env.step(env.action_space.sample())
    env.render()
# Close the game
env.close()

NoSuchDisplayException: ignored

## Pre-processing the environment
The environment the playing agent is exposed to is in RGB color space, that means it has 3 layers to it, Red, Green and Blue. Our goal is to flatten it to only one color channel, specifically grayscale and then vectorize the frames to feed into our model.

In [None]:
from gym.wrappers import GrayScaleObservation
# Import Vectorization Wrappers
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv
from matplotlib import pyplot as plt

In [None]:
# Create the base environment
env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, SIMPLE_MOVEMENT)
# Grayscale
env = GrayScaleObservation(env, keep_dim=True)
env = DummyVecEnv([lambda: env])
# Stack the frames
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
state = env.reset()

In [None]:
state, reward, done, info = env.step([5])

In [None]:
plt.figure(figsize=(20,16))
for idx in range(state.shape[3]):
    plt.subplot(1,4,idx+1)
    plt.imshow(state[0][:,:,idx])
plt.show()

## Training the Model using Proximal Policy Optimization (PPO)



In [None]:
import os
# Import PPO for algos
from stable_baselines3 import PPO
# Import Base Callback for saving models
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train/'
LOG_DIR = './logs/'

In [None]:
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

In [None]:
model = PPO('CnnPolicy', env, verbose=1, tensorboard_log=LOG_DIR, learning_rate=0.000001,
            n_steps=512)

In [None]:
model.learn(total_timesteps=1000000, callback=callback)

In [None]:
model.save('thisisatestmodel')


## Testing out the model

In [None]:
model = PPO.load('./train/best_model_1000000')
state = env.reset()
# Loop through the game
while True:

    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    env.render()