# Solve Super-Mario-Bros with SageMaker RL + Ray
---

![](../xxx-rl-mario-ray/mario.png)

---
## Introduction

In this notebook we'll start from the cart-pole balancing problem, where a pole is attached by an un-actuated joint to a cart, moving along a frictionless track. Instead of applying control theory to solve the problem, this example shows how to solve the problem with reinforcement learning on Amazon SageMaker and Ray RLlib. You can choose either TensorFlow or PyTorch as your underlying DL framework.

(For a similar example using Coach library, see this [link](../rl_cartpole_coach/rl_cartpole_coach_gymEnv.ipynb). Another Cart-pole example using Coach library and offline data can be found [here](../rl_cartpole_batch_coach/rl_cartpole_batch_coach.ipynb).)

1. *Objective*: Prevent the pole from falling over
2. *Environment*: The environment used in this exmaple is part of OpenAI Gym, corresponding to the version of the cart-pole problem described by Barto, Sutton, and Anderson [1]
3. *State*: Cart position, cart velocity, pole angle, pole velocity at tip	
4. *Action*: Push cart to the left, push cart to the right
5. *Reward*: Reward is 1 for every step taken, including the termination step

## Pre-requisites 
### Install dependencies
To get started, we need to install libraries as needed

In [None]:
# !pip install -U 'ray[rllib, tune, serve]'
# !pip install gym[atari] autorom[accept-rom-license]
# !pip install box2d-py
# !pip install pygame
# !pip install tqdm
# !pip install gym-super-mario-bros
# !pip install ffmpeg

### Imports
We'll import the Python libraries as needed, set up the environment with a few prerequisites for permissions and configurations.

In [None]:
import torch 
import os
num_gpus = torch.cuda.device_count()
num_cpus = os.cpu_count()
print('GPUs', num_gpus)
print('CPUs', num_cpus)

In [None]:
import numpy as np 
from tabulate import tabulate
from tqdm import tqdm 
import logging
import gym
from gym import wrappers
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT,COMPLEX_MOVEMENT
import ray
from ray import tune, air
from ray.tune.registry import register_env
from ray.tune.logger import pretty_print
from ray.rllib.agents.ppo import PPOTrainer, DEFAULT_CONFIG
from ray.rllib.algorithms.impala import Impala, ImpalaConfig

In [None]:
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT
from nes_py.wrappers import JoypadSpace
from ray.rllib.env.wrappers.atari_wrappers import (MonitorEnv,
                                          NoopResetEnv,
                                          WarpFrame,
                                          FrameStack)


class EpisodicLifeEnv(gym.Wrapper):

    def __init__(self, env):
        """Make end-of-life == end-of-episode, but only reset on true game
        over. Done by DeepMind for the DQN and co. since it helps value
        estimation.
        """
        gym.Wrapper.__init__(self, env)
        self.lives = 0
        self.was_real_done = True

   
    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        self.was_real_done = done
        # check current lives, make loss of life terminal,
        # then update lives to handle bonus lives
        lives = self.env.unwrapped._life
        if self.lives > lives > 0:
            # for Qbert sometimes we stay in lives == 0 condtion for a few fr
            # so its important to keep lives > 0, so that we only reset once
            # the environment advertises done.
            done = True
        self.lives = lives
        return obs, reward, done, info

    def reset(self, **kwargs):
        """Reset only when lives are exhausted.
        This way all states are still reachable even though lives are episodic,
        and the learner need not know about any of this behind-the-scenes.
        """
        if self.was_real_done:
            obs = self.env.reset(**kwargs)
        else:
            # no-op step to advance from terminal/lost life state
            obs, _, _, _ = self.env.step(0)
        self.lives = self.env.unwrapped._life
        return obs


    
class CustomReward(gym.Wrapper):
    def __init__(self, env):
        super(CustomReward, self).__init__(env)
        self._current_score = 0

    def step(self, action):
        state, reward, done, info = self.env.step(action)
        reward += (info['score'] - self._current_score) / 40.0
        self._current_score = info['score']
        if info['time']<=300:
            done = True
        if done:
            if info['flag_get']:
                reward += 350.0
#             else:
#                 reward -= 50.0
        return state, reward / 10.0, done, info


def env_creator(env_name):
    env = gym_super_mario_bros.make(env_name)
    env = CustomReward(env)
    env = JoypadSpace(env, COMPLEX_MOVEMENT)
    env = MonitorEnv(env)
    env = NoopResetEnv(env, noop_max=30)
    env = EpisodicLifeEnv(env)
    env = WarpFrame(env, 84)
    env = FrameStack(env, 4)     
    return env


def print_results(result, iteration, config):
    table = [['IMPALA',
              config['num_gpus'],
              config['num_workers'],
              config['num_envs_per_worker'],
              iteration,
              result['episodes_total'],
              result['timesteps_total'],
              result['episode_len_mean'],
              round(result['episode_reward_mean'], 3)]]

    print(tabulate(table, headers=['Agent',
                                    'GPUs',
                                    'Workers',
                                    'Envs per Worker',
                                    'Iteration',
                                    'Episodes',
                                    'Steps',
                                    'Episode Length(mean)',
                                    'Mean Reward'],
                        tablefmt='psql',
                        showindex="never"))
    print()





### Create and register environment to Ray

In [None]:
env_name = 'SuperMarioBros-v0'

In [None]:
def env_creator_lambda(config):
    return env_creator(env_name)

register_env(env_name, env_creator_lambda)

## Initialize Ray Cluster for training

In [None]:
ray.shutdown()
ray.init(ignore_reinit_error=True, log_to_driver=False, logging_level=logging.FATAL)

## Initialize RL algorithm

In [None]:
config = {
    'env': env_name,
    "framework": "torch",
    "num_workers": num_cpus-1,
    'num_gpus': num_gpus,
    'train_batch_size': 5000,
    'recreate_failed_workers': True,
    'num_envs_per_worker': 1,
    'log_level': 'ERROR',
    'create_env_on_driver': True,
    "evaluation_num_workers": 1,
}


In [None]:
agent =Impala(config=config)

### Start Training

In [None]:
%%time

rewards = []
episodes = []
steps = []
episode_lens = []

iter_num = 1000
for iteration in tqdm(range(iter_num)):
    result = agent.train()
    # record the learning process
    rewards.append(result['episode_reward_mean'])
    print_results(result, iteration, config)
    pretty_print(result['perf'])
    # save the check point every 500 training iterations
    if iteration % 500 == 0 and iteration > 0:
        checkpoint = agent.save()
        print('Checkpoint saved at', checkpoint)
        
        

### Plot results

In [None]:
import matplotlib.pyplot as plt 
plt.figure(figsize=(6,4))
plt.plot(range(iter_num), rewards)
plt.xlabel('Training Iteration', fontsize=12)
plt.ylabel('epoisode reward (mean)', fontsize=12)
plt.title('SuperMario', fontsize=12)
plt.tight_layout()
plt.savefig('results/super-mario-reward_gpus_%s_workers_%s'%(num_gpus, config['num_workers'])+'.png', dpi=100)
plt.show()




### Evaluate 

![](../xxx-rl-mario-ray/recordings/record_episode_0.gif)

In [None]:
from PIL import Image

In [None]:
os.getcwd()

In [None]:
def get_orig_env(env_name):
    env = gym_super_mario_bros.make(env_name)
    env = CustomReward(env)
    env = JoypadSpace(env, COMPLEX_MOVEMENT)
    return env


In [None]:
env = env_creator('SuperMarioBros-v0')
orig_env = get_orig_env('SuperMarioBros-v0')

epi_num = 10

for epi in tqdm(range(epi_num)):
    frame_list = []
    cum_reward = 0
    done = False
    _ = orig_env.reset()
    state = env.reset()
    step = 0
    while not done:
        step +=1 
        action = agent.compute_single_action(state)
        state, reward, done, info = env.step(action)
        cum_reward += reward 
        image, _, _, _ = orig_env.step(action)
        frame = Image.fromarray(image, mode='RGB')
        frame_list.append(frame)
    print('episode %s, reward %s, step_num %s'%(epi, cum_reward, step))
    frame_list[0].save(
        "records/record_episode_%s.gif"%(epi), save_all=True, append_images=frame_list[::5], duration=1, loop=1
    )

        
    

In [None]:
ray.shutdown()