In [None]:
!pip install gym_super_mario_bros==7.3.0 nes_py

In [None]:
!pip install stable-baselines3[extra]

In [None]:
conda update --all

In [None]:
conda install pytorch torchvision torchaudio -c pytorch

In [None]:
conda install pytorch torchvision torchaudio cudatoolkit=11.3 -c pytorch

In [None]:
conda install pytorch torchvision torchaudio cudatoolkit=10.2 -c pytorch-lts

In [None]:
conda install freetype=2.10.4

In [None]:
import gym

# Import the game
import gym_super_mario_bros

# Import the joypad wrapper
from nes_py.wrappers import JoypadSpace

# Import the simplified controls
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, RIGHT_ONLY 

In [None]:
# Import frame stack and grayscaling wrapper
from gym.wrappers import FrameStack, GrayScaleObservation

# Import vectorization wrappers
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv

# Import Matplotlib to show the impact of stack framing
from matplotlib import pyplot as plt

In [None]:
# We will use a subset of possible actions
SIMPLE_MOVEMENT
#RIGHT_ONLY 
#CUSTUM_RIGHT_ONLY = [['right', 'B'], ['right', 'A', 'B']]
#CUSTUM_RIGHT_ONLY

In [None]:
class CustomReward(gym.Wrapper):
    def __init__(self, env):
        super(CustomReward, self).__init__(env)
        self._current_score = 0

    def step(self, action):
        state, reward, done, info = self.env.step(action)
        reward += (info["score"] - self._current_score) / 40.0
        self._current_score = info["score"]
        if done:
            if info["flag_get"]:
                reward += 350.0
            else:
                reward -= 50.0
        return state, reward / 10.0, done, info

In [None]:
class CustomRewardNoMovingRightReward(gym.Wrapper):
    def __init__(self, env):
        super(CustomRewardNoMovingRightReward, self).__init__(env)
        self._current_score = 0

        # starting point
        self._current_x_pos = 40

        self._max_x_pos_memory = 0
        self._previous_x_pos_memory = 0
        self._steps_run_wrong_direction = 0

    def step(self, action):
        state, reward, done, info = self.env.step(action)
        score_diff = info["score"] - self._current_score
        reward += score_diff / 10.0

        if self._max_x_pos_memory < info["x_pos"]:
            self._max_x_pos_memory = info["x_pos"]
        else:
            self._steps_run_wrong_direction += 1

        #  Handle when the agent hits the left wall
        standstill = self._previous_x_pos_memory == info["x_pos"]
        if standstill:
            self._steps_run_wrong_direction += 1

        self._previous_x_pos_memory = info["x_pos"]

        # reward movement also for left direction /normal reward for left is -3
        # Make sure to force progress after moving in the wrong direction too long.
        if info["x_pos"] < self._current_x_pos and self._steps_run_wrong_direction < 750:
            reward += 4
        elif standstill and self._steps_run_wrong_direction > 750:
            reward -= 2
        elif score_diff > 1:
            self._steps_run_wrong_direction = 0

        self._current_score = info["score"]
        self._current_x_pos = info["x_pos"]

        if done:
            if info["flag_get"]:
                reward += 350.0
            else:
                reward -= 100.0
                self._current_x_pos = 40
        return state, reward, done, info

In [None]:
class CustomRewardNoMovingRightReward(gym.Wrapper):
    def __init__(self, env):
        super(CustomRewardNoMovingRightReward, self).__init__(env)
        self._current_score = 0
        self._current_time = 400

        # starting point
        self._current_x_pos = 40

        self._max_x_pos_memory = 0
        self._previous_x_pos_memory = 0
        self._steps_run_wrong_direction = 0

    def step(self, action):
        state, reward, done, info = self.env.step(action)
        score_diff = info["score"] - self._current_score
        reward += score_diff / 10.0

        time_diff = self._current_time - info["time"]

        if self._max_x_pos_memory < info["x_pos"]:
            self._max_x_pos_memory = info["x_pos"]
        else:
            self._steps_run_wrong_direction += time_diff

        #  Handle when the agent hits the left wall
        standstill = self._previous_x_pos_memory == info["x_pos"]
        if standstill:
            self._steps_run_wrong_direction += time_diff

        self._previous_x_pos_memory = info["x_pos"]

        # reward movement also for left direction /normal reward for left is -3
        # Make sure to force progress after moving in the wrong direction too long.
        if info["x_pos"] < self._current_x_pos:
            reward -= min((self._steps_run_wrong_direction / 10000), 1)
        elif standstill:
            reward -= 0.2
        """elif score_diff > 1:
            self._steps_run_wrong_direction = 0"""

        self._current_score = info["score"]
        self._current_x_pos = info["x_pos"]

        if done:
            if info["flag_get"]:
                reward += 350.0
            else:
                reward -= 100.0
                self._current_x_pos = 40
        return state, reward/10, done, info

In [None]:
class CustomRewardNoMovingRightReward(gym.Wrapper):
    def __init__(self, env):
        super(CustomRewardNoMovingRightReward, self).__init__(env)
        self._current_score = 0
        self._number_of_lives = 2
        self._current_time = 400

        # starting point
        self._current_x_pos = 40
        self._max_x_pos_memory = 0
        self._previous_x_pos_memory = 0
        self._steps_run_wrong_direction = 0

    def step(self, action):
        def reset():
            self._current_x_pos = 40
            self._current_time = 400
            self._steps_run_wrong_direction = 0

        state, reward, done, info = self.env.step(action)
        score_diff = info["score"] - self._current_score
        life_loss = (self._number_of_lives - info["life"]) > 0

        # clip to avoid too high reward for mushroom/flower (1000p) and coins (200p)
        reward += min(score_diff, 150)

        time_diff = self._current_time - info["time"]

        if self._max_x_pos_memory < info["x_pos"]:
            self._max_x_pos_memory = info["x_pos"]
        else:
            self._steps_run_wrong_direction += time_diff

        #  Handle when the agent hits the left wall
        standstill = self._previous_x_pos_memory == info["x_pos"]
        if standstill:
            self._steps_run_wrong_direction += time_diff

        self._previous_x_pos_memory = info["x_pos"]

        # reward movement also for left direction /normal reward for left is -3
        # Make sure to force progress after moving in the wrong direction too long.
        if info["x_pos"] < self._current_x_pos:
            reward -= min((self._steps_run_wrong_direction / 100), 10)
        elif standstill:
            reward -= min((self._steps_run_wrong_direction / 100), 10)
        """elif score_diff > 1:
            self._steps_run_wrong_direction = 0"""

        self._current_score = info["score"]
        self._current_x_pos = info["x_pos"]
        self._current_time = info["time"]

        if done:
            if info["flag_get"]:
                reward += 350.0
            else:
                reward -= 100.0
                self._number_of_lives = 2
                reset()

        if life_loss:
            reward -= 50.0
            self._number_of_lives = info["life"]
            reset()

        return state, reward / 10, done, info

In [None]:
class CustomReward6(gym.Wrapper):
    def __init__(self, env):
        super(CustomReward6, self).__init__(env)
        self._current_score = 0
        self._number_of_lives = 2

    def step(self, action):
        state, reward, done, info = self.env.step(action)
        reward = (info["score"] - self._current_score) / 10

        if info["life"] == 255:
            life_loss = True
        else:
            life_loss = (self._number_of_lives - info["life"]) > 0

        self._current_score = info["score"]
        if done:
            if info["flag_get"]:
                reward += 350.0
            else:
                reward -= 100.0
                self._number_of_lives = 2
                self._current_score = 0

        if life_loss and not done:
            reward -= 50.0
            self._number_of_lives = info["life"]

        return state, reward, done, info

In [None]:
# Create the base environment
env = gym_super_mario_bros.make("SuperMarioBros-v0")
# My custom reward function
env = CustomReward6(env)
# Simplify the controls
customMovement = [['right', 'B'], ['right', 'A', 'B'], ['A'], ['left', 'B'], ['left', 'A', 'B']]
#env = JoypadSpace(env, customMovement)
env = JoypadSpace(env, SIMPLE_MOVEMENT)
# Grayscale
env = GrayScaleObservation(env, keep_dim=True)
# Wrap inside the Dummy Environment
env = DummyVecEnv([lambda: env])
# Stack the frames
env = VecFrameStack(env, 4, channels_order="last")

In [None]:
state = env.reset()

In [None]:
SIMPLE_MOVEMENT[env.action_space.sample()]

In [None]:
state, reward, done, info = env.step([env.action_space.sample()])
info

In [None]:
"""plt.figure(figsize=(10, 8))
for idx in range(state.shape[3]):
    plt.subplot(1, 4, idx + 1)
    plt.imshow(state[0][:, :, idx])
plt.show()"""

In [None]:
# Import os for file path management
import os
# Import PPO for algos
from stable_baselines3 import PPO
# Import base callback for saving models
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train-reward-score-right-and-left13/'
LOG_DIR = './logs/train-reward-score-right-and-left13/'

In [None]:
# Setup model saving callback
callback = TrainAndLoggingCallback(check_freq=100000, save_path=CHECKPOINT_DIR)

In [None]:
# This is the AI model started
model = PPO('CnnPolicy', env, verbose=1, tensorboard_log=LOG_DIR, learning_rate=0.00001, n_steps=512) 

In [None]:
# Train the AI model, this is where the AI model starts to learn
model.learn(total_timesteps=10000000, callback=callback)

In [None]:
model.save('thisisatestmodel')

In [None]:
# Load model
model = PPO.load('./train/best_model_4500000')

In [None]:
model.set_env(env)

In [None]:
env = CustomReward6(env)

In [None]:
state = env.reset()

In [None]:
# Start the game 
state = env.reset()
# Loop through the game
while True:
    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    env.render()
    if reward > 3 or reward < -1:
        print(reward)