In [None]:
import warnings
warnings.filterwarnings('ignore')

## Testing cell

In [None]:
import gym
import time
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT

video_dir_path = 'mario_videos_test'
env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0', apply_api_compatibility=True, render_mode="human")
"""
env = gym.wrappers.RecordVideo(
    env,
    video_folder=video_dir_path,
    episode_trigger=lambda episode_id: True,
    name_prefix='mario-video-{}'.format(time.ctime())
)
"""

env = JoypadSpace(env, SIMPLE_MOVEMENT)

# run 1 episode
env.reset()
while True:
    action = env.action_space.sample()
    state, reward, done, _, info = env.step(action)
    #time.sleep(1/30)
    if done or info['time'] < 370:
        break
print("Your mario video is saved in {}".format(video_dir_path))

env.close()

In [None]:
import gym
import torch
import torch.nn as nn
from tqdm import tqdm
import numpy as np
import random


In [None]:
import gym_super_mario_bros

env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')
print(env.observation_space.shape)  # Dimensions of a frame
print(env.action_space.n)  # Number of actions our agent can take

In [None]:
from gym import spaces
from torchvision import transforms as T

class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info
    
class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = spaces.Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
        self.transform = T.Grayscale()

    def permute_orientation(self, observation):
        # permute [H, W, C] array to [C, H, W] tensor
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        observation = self.transform(observation)
        return observation


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = spaces.Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
        self.transforms = T.Compose(
            [T.Resize(self.shape, antialias=True), T.Normalize(0, 255)]
        )

    def observation(self, observation):
        observation = self.transforms(observation).squeeze(0)
        return observation


In [None]:
import gym
import gym_super_mario_bros
from gym.wrappers import RecordVideo, FrameStack
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import RIGHT_ONLY

import time

video_dir_path = 'mario_videos'

def make_env(env, video_dir_path=None):
    """Apply a series of wrappers to the environment."""
    env = JoypadSpace(env, RIGHT_ONLY)  # Reduce action space
    
    if video_dir_path is not None:
        env = RecordVideo(
            env,
            video_folder=video_dir_path,
            episode_trigger=lambda episode_id: True,
            name_prefix='mario-video-{}'.format(time.ctime())
        )
    
    #env = RecordEpisodeStatistics(env)  # Track stats
    env = SkipFrame(env, skip=4)
    env = GrayScaleObservation(env)  # Convert to grayscale
    env = ResizeObservation(env, 84)  # Resize to 84x84
    env = FrameStack(env, num_stack=4)  # Stack 4 frames
    return env


In [None]:
class DQNSolver(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQNSolver, self).__init__()
        input_shape = (4, 84, 84)
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )

        conv_out_size = self._get_conv_out(input_shape)
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(*shape))
        return int(np.prod(o.size()))

    def forward(self, x: torch.Tensor):
        x = x.squeeze(-1)
        conv_out = self.conv(x).reshape(x.size()[0], -1)
        return self.fc(conv_out)

In [None]:
class DQNAgent:
    def __init__(self, state_space, action_space, max_memory_size, batch_size, gamma, lr, exploration_max, exploration_min, exploration_decay):
        self.state_space = state_space
        self.action_space = action_space
        self.max_memory_size = max_memory_size
        self.memory_sample_size = batch_size
        self.gamma = gamma
        self.lr = lr
        self.exploration_max = exploration_max
        self.exploration_min = exploration_min
        self.exploration_decay = exploration_decay
        self.exploration_rate = self.exploration_max
        self.step = 0
        self.copy = 1000  # Copy target model weights every 1000 steps
        
        # Memory Buffers
        self.STATE_MEM = torch.zeros((max_memory_size, *state_space))
        self.ACTION_MEM = torch.zeros((max_memory_size, 1))
        self.REWARD_MEM = torch.zeros((max_memory_size, 1))
        self.STATE2_MEM = torch.zeros((max_memory_size, *state_space))
        self.DONE_MEM = torch.zeros((max_memory_size, 1))
        self.ending_position = 0
        self.num_in_queue = 0

        # Neural Networks
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.local_net = DQNSolver(state_space, action_space).to(self.device)
        self.target_net = DQNSolver(state_space, action_space).to(self.device)
        self.target_net.load_state_dict(self.local_net.state_dict())
        self.target_net.eval()

        self.optimizer = torch.optim.Adam(self.local_net.parameters(), lr=lr)
        self.l1 = nn.SmoothL1Loss()

    def save(self, path):
        torch.save(self.local_net.state_dict(), path)

    def load(self, path):
        self.local_net.load_state_dict(torch.load(path))

    def act(self, state, evaluate=False):
        """Select an action using an epsilon-greedy policy"""
        
        if random.random() < self.exploration_rate and not evaluate:
            return torch.tensor([[random.randrange(self.action_space)]], dtype=torch.float32)
        else:
            with torch.no_grad():
            
                state = torch.tensor(state, device=self.device).unsqueeze(0)
                return self.local_net(state.to(self.device)).argmax(dim=1, keepdim=True).cpu().float()

    def copy_model(self):
        """Copy local network weights to target network"""
        self.target_net.load_state_dict(self.local_net.state_dict())

    def update_exploration_rate(self):
        """Decay exploration rate"""
        self.exploration_rate *= self.exploration_decay
        self.exploration_rate = max(self.exploration_min, self.exploration_rate)


    def remember(self, state, action, reward, state2, done):
        self.STATE_MEM[self.ending_position] = state.float()
        self.ACTION_MEM[self.ending_position] = action.float()
        self.REWARD_MEM[self.ending_position] = reward.float()
        self.STATE2_MEM[self.ending_position] = state2.float()
        self.DONE_MEM[self.ending_position] = done.float()
        self.ending_position = (self.ending_position + 1) % self.max_memory_size  # FIFO tensor
        self.num_in_queue = min(self.num_in_queue + 1, self.max_memory_size)
        
    def recall(self):
        # Randomly sample 'batch size' experiences
        idx = random.choices(range(self.num_in_queue), k=self.memory_sample_size)
        
        STATE = self.STATE_MEM[idx].to(self.device)
        ACTION = self.ACTION_MEM[idx].to(self.device)
        REWARD = self.REWARD_MEM[idx].to(self.device)
        STATE2 = self.STATE2_MEM[idx].to(self.device)
        DONE = self.DONE_MEM[idx].to(self.device)
        
        return STATE, ACTION, REWARD, STATE2, DONE
        
    def experience_replay(self):
        
        if self.step % self.copy == 0:
            self.copy_model()

        if self.memory_sample_size > self.num_in_queue:
            return

        STATE, ACTION, REWARD, STATE2, DONE = self.recall()
        
        self.optimizer.zero_grad()
        # Double Q-Learning target is Q*(S, A) <- r + γ max_a Q_target(S', a)
        target = REWARD + torch.mul((self.gamma * self.target_net(STATE2).max(1).values.unsqueeze(1)), 1 - DONE)

        current = self.local_net(STATE).gather(1, ACTION.long())
        loss = self.l1(current, target)
        loss.backward()
        self.optimizer.step()

In [None]:
def train(agent: DQNAgent, env: gym.Env, num_episodes: int = 10) -> DQNAgent:    
    total_rewards = []
    
    for ep_num in tqdm(range(num_episodes)):
        state, info = env.reset()

        # State is a LazyFrame
        state = torch.Tensor(state[0].__array__() if isinstance(state, tuple) else state.__array__())

        total_reward = 0
        while True:
            action = agent.act(state)
            
            state_next, reward, terminal, trunc, info = env.step(int(action[0]))
            total_reward += reward
            
            state_next = torch.Tensor(state_next[0].__array__() if isinstance(state_next, tuple) else state_next.__array__())
            reward = torch.tensor([reward])#.unsqueeze(0)
            
            terminal = torch.tensor([int(terminal)])#.unsqueeze(0)
            agent.remember(state, action, reward, state_next, terminal)
            agent.experience_replay()
            
            state = state_next
            if terminal:
                break
        
        total_rewards.append(total_reward)
        agent.save('mario_model.pth')
        print("Total reward after episode {} is {}".format(ep_num + 1, total_rewards[-1]))

    return agent

In [None]:
env = gym_super_mario_bros.make('SuperMarioBrosRandomStages-v0', apply_api_compatibility=True, render_mode='rgb_array')
env = make_env(env)#, 'training_videos')
observation_space = env.observation_space.shape
action_space = env.action_space.n
agent = DQNAgent(state_space=observation_space,
                    action_space=action_space,
                    max_memory_size=30000,
                    batch_size=64,
                    gamma=0.90,
                    lr=0.00025,
                    exploration_max=0.90,
                    exploration_min=0.02,
                    exploration_decay=0.99)

In [None]:
import os
if os.path.exists('mario_model.pth'):
    agent.load('mario_model.pth')

env = gym_super_mario_bros.make('SuperMarioBrosRandomStages-v0', apply_api_compatibility=True, render_mode='rgb_array')
env = make_env(env)#, 'training_videos')

trained_agent = train(agent, env, num_episodes=200)
env.close()
trained_agent.save('mario_model.pth')

In [None]:
def evaluate_agent(agent, env: gym.Env, num_episodes=1, show: bool = False) -> float:
    """Runs the trained agent in the environment without training."""
    
    total_rewards = []
    
    for ep_num in range(num_episodes):
        state, info = env.reset()
        state = torch.Tensor(state[0].__array__() if isinstance(state, tuple) else state.__array__())
        total_reward = 0
        done = False

        while not done:
            action = agent.act(state, evaluate=True)  # Use deterministic policy
            state_next, reward, done, _, info = env.step(int(action.item()))
            if show:
                time.sleep(1/30)
            done = done or info['time'] < 250
            
            total_reward += reward
            state = torch.Tensor(state_next[0].__array__() if isinstance(state_next, tuple) else state_next.__array__())
        
        total_rewards.append(total_reward)
        if show:
            print(f"Evaluation Episode {ep_num + 1}: Total Reward = {total_reward}")
    if show:
        print(f"Average Reward over {num_episodes} episodes: {np.mean(total_rewards)}")
    return np.mean(total_rewards)

In [1]:
worlds = list(range(1, 9))
stages = list(range(1, 5))
rewards = []
for world in worlds:
    for stage in stages:
        env = gym_super_mario_bros.make(f'SuperMarioBros-{world}-{stage}-v0', apply_api_compatibility=True, render_mode='rgb_array')
        env = make_env(env)
        rewards.append(evaluate_agent(trained_agent, env))
        env.close()
        print("World {} Stage {}: Reward = {}".format(world, stage, rewards[-1]))

print(f"Average Reward over all stages: {np.mean(rewards)}")

env = gym_super_mario_bros.make(f'SuperMarioBros-1-1-v0', apply_api_compatibility=True, render_mode='human')
env = make_env(env)
rewards.append(evaluate_agent(trained_agent, env, show=True))
env.close()

In [None]:
from record import record_mario_gameplay
worlds = list(range(1, 9))
stages = list(range(1, 5))
rewards = []
for world in worlds:
    for stage in stages:
        env = gym_super_mario_bros.make(f'SuperMarioBros-{world}-{stage}-v0', apply_api_compatibility=True, render_mode='rgb_array')
        env = make_env(env)
        rewards.append(evaluate_agent(trained_agent, env))
        env.close()
        print("World {} Stage {}: Reward = {}".format(world, stage, rewards[-1]))

# This will play and record Mario with random actions\n"
gif_path = record_mario_gameplay()
print(f"GIF saved to: {gif_path}")

  logger.warn(
  logger.warn(
  logger.warn(


AttributeError: 'NoneType' object has no attribute '__array_interface__'