In [None]:
#!pip install gym-super-mario-bros==7.4.0

In [None]:
import torch
from torch import nn
from torchvision import transforms as T
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os, copy
import cv2
# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack, GrayScaleObservation, TransformObservation

# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace

# Super Mario environment for OpenAI Gym
import gym_super_mario_bros
from skimage import transform
from tqdm import tqdm

In [None]:
# Initialize Super Mario environment (in v0.26 change render mode to 'human' to see results on the screen)

env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0")

# Limit the action-space to
#   0. walk right
#   1. jump right
env = JoypadSpace(env, [["right"], ["right", "A"]])

env.reset()
next_state, reward, done, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")

In [None]:
action_size = env.action_space.n
action_size

In [None]:
state = env.reset()
for t in range(2000):
    action = env.action_space.sample()
    env.render()
    state, reward, done, _ = env.step(action)
    if done:
        break 

#env.close()

In [None]:
#env.close()

# Preprocess Environment

Environment data is returned to the agent in next_state. As you saw above, each state is represented by a [3, 240, 256] size array. Often that is more information than our agent needs; for instance, Mario’s actions do not depend on the color of the pipes or the sky!

We use Wrappers to preprocess environment data before sending it to the agent.

GrayScaleObservation is a common wrapper to transform an RGB image to grayscale; doing so reduces the size of the state representation without losing useful information. Now the size of each state: [1, 240, 256]

ResizeObservation downsamples each observation into a square image. New size: [1, 84, 84]

SkipFrame is a custom wrapper that inherits from gym.Wrapper and implements the step() function. Because consecutive frames don’t vary much, we can skip n-intermediate frames without losing much information. The n-th frame aggregates rewards accumulated over each skipped frame.

FrameStack is a wrapper that allows us to squash consecutive frames of the environment into a single observation point to feed to our learning model. This way, we can identify if Mario was landing or jumping based on the direction of his movement in the previous several frames.

In [None]:
class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        resize_obs = transform.resize(observation, self.shape)
        # cast float back to uint8
        resize_obs *= 255
        resize_obs = resize_obs.astype(np.uint8)
        return resize_obs


class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        done = False
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info


# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env, keep_dim=False)
env = ResizeObservation(env, shape=84)
env = TransformObservation(env, f=lambda x: x / 255.)
env = FrameStack(env, num_stack=4)


In [None]:
#env.reset()

After applying the above wrappers to the environment, the final wrapped state consists of 4 gray-scaled consecutive frames stacked together. Each time Mario makes an action, the environment responds with a state of this structure. The structure is represented by a 3-D array of size [4, 84, 84].



# Train

In [None]:
from model import MarioNet, QPixelNetwork
from agent import Agent
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms
from torch.autograd import Variable
import torchvision.transforms.functional as F
from PIL import Image
from skimage.transform import resize
from collections import deque, namedtuple

In [None]:
agent = Agent(state_dim=(4, 84, 84), action_dim=env.action_space.n, seed=42)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# image_size = 84

# def process_state(x):
#     x = np.squeeze(x)
#     x_resize = resize(x, (image_size,image_size,3), anti_aliasing=True)
#     x_modified = Variable(torch.from_numpy(x_resize).float().to(device).view(3,image_size,image_size))
#     return x_modified

# def process_stack(frames):
#     frames_list = list(frames)
#     torch_stack = torch.stack(frames_list).unsqueeze(dim=0)
#     torch_stack = torch_stack.view(torch_stack.size(0),torch_stack.size(2), torch_stack.size(1), torch_stack.size(3), torch_stack.size(4) )
#     return torch_stack



In [None]:
#state_copy= []

def mario_dqn(episodes_num=40000, seed=0):
    
    scores = []
    score_window = deque(maxlen=100)
    #max_score = 7.0
    for episode in tqdm(range(1, episodes_num+1)):
        # reset the environment
        state = env.reset()           # get the current state
        score = 0
        while True:
            action = agent.act(state)        # select an action
            next_state, reward, done, info = env.step(action)
            score += reward                                # update the score
            # add to replay
            agent.cache(state, next_state, action, reward, done)
            q, loss = agent.learn()
            
            state = next_state 
            
            
            # 10. Check if end of game
            if done or info['flag_get']:
                break
        del q, loss
            
        score_window.append(score)
        scores.append(score)

        #print('\rEpisode: {}\tAverage Score: {:.2f}'.format(episode, np.mean(score_window)), end="")
        if episode % 100 == 0:
            print('\rEpisode: {}\tAverage Score: {:.2f}'.format(episode, np.mean(score_window)))
            torch.save(agent.net.state_dict(), 'checkpoints/checkpoint_mario_{}.pth'.format(episode))
    
    return scores

In [None]:

scores = mario_dqn()
#torch.save(agent.QN_local.state_dict(), 'checkpoint_pixels.pth')

In [None]:
agent.net.load_state_dict(torch.load('checkpoints/checkpoint_mario_400.pth'))

state = env.reset()
score = 0
while True:
    action = agent.act(state)
    env.render(mode="human")
    state, reward, done, info = env.step(action)
    score += reward
    if done or info['flag_get']:
        break 
print(score)

#env.close()

In [None]:
env.close()