In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import gym
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from nes_py.wrappers import JoypadSpace
from gym.wrappers import RecordVideo
from torch.utils.tensorboard import SummaryWriter
import seaborn as sns
import matplotlib.pyplot as plt
import cv2
import time 

We will use Deep Q-networks.

The table would have 5×256^(84×84×4) since a state is a list of 4 contiguous 84x84 pixel frames, and we have 5 possible actions. 
-> This number is ridiculous so we have to resort to function approximation in which we use a nn to approximate the Q-table.

Q*(s_t, a_t)←Q*(s_t, a_t) + α(r_t+1 + γmaxaQθ(s_{t+1}, a) - Q*(s_t, a_t))

In [None]:
# Deep Q-Network
class DQNSolver(nn.Module):
    def __init__(self, input_shape, n_actions, memory_size=10000, batch_size=32, gamma=0.99, lr=1e-4):
        super(DQNSolver, self).__init__()

        # Here we are using a simple CNN model since the task is pretty simple. When we start another project, 
        # we could improve this part by using a different model or a pretrained model.

        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        conv_out_size = self._get_conv_out(input_shape)
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
        self.memory_size = memory_size
        self.memory_sample_size = batch_size
        self.gamma = gamma
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss_fn = nn.SmoothL1Loss()
        self.to(self.device)

        # Experience replay memory
        # The shpae is (state, action, reward, next_state, done)
        self.STATE_MEM = torch.zeros((memory_size, *input_shape)).to(self.device)
        self.ACTION_MEM = torch.zeros((memory_size, 1)).to(self.device)
        self.REWARD_MEM = torch.zeros((memory_size, 1)).to(self.device)
        self.STATE2_MEM = torch.zeros((memory_size, *input_shape)).to(self.device)
        self.DONE_MEM = torch.zeros((memory_size, 1)).to(self.device)
        self.ending_position = 0
        self.num_in_queue = 0

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0], -1)
        return self.fc(conv_out)

    def remember(self, state, action, reward, state2, done):
        self.STATE_MEM[self.ending_position] = state.float()
        self.ACTION_MEM[self.ending_position] = torch.tensor(action, dtype=torch.float32)
        self.REWARD_MEM[self.ending_position] = torch.tensor(reward, dtype=torch.float32)
        self.STATE2_MEM[self.ending_position] = state2.float()
        self.DONE_MEM[self.ending_position] = torch.tensor(done, dtype=torch.float32)
        self.ending_position = (self.ending_position + 1) % self.memory_size
        self.num_in_queue = min(self.num_in_queue + 1, self.memory_size)

    # recall prevents overfitting to recent experiences - standard RL without reply memory learns from consecutive states only
    # recall reuses experiences
    def recall(self):
        idx = random.sample(range(self.num_in_queue), self.memory_sample_size)
        return (
            self.STATE_MEM[idx].to(self.device),
            self.ACTION_MEM[idx].to(self.device),
            self.REWARD_MEM[idx].to(self.device),
            self.STATE2_MEM[idx].to(self.device),
            self.DONE_MEM[idx].to(self.device),
        )

    def experience_replay(self):
        if self.memory_sample_size > self.num_in_queue:
            return
        STATE, ACTION, REWARD, STATE2, DONE = self.recall()
        self.optimizer.zero_grad()

        # Bellman equation Q(s,a)= r + γ maxQ(s',a')
        target = REWARD + self.gamma * self(STATE2).max(1).values.unsqueeze(1) * (1 - DONE)
        current = self(STATE).gather(1, ACTION.long())
        loss = self.loss_fn(current, target)
        loss.backward()
        self.optimizer.step()
    
    def save_model(self, path="dqn_model.pth"):
        torch.save(self.state_dict(), path)
    
    def load_model(self, path="dqn_model.pth"):
        self.load_state_dict(torch.load(path))
        self.eval()

In [3]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip=4):
        super(SkipFrame, self).__init__(env)
        self.skip = skip

    def step(self, action):
        total_reward = 0.0
        for _ in range(self.skip):
            obs, reward, done, trunc, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunc, info


In [None]:


# Frame processing wrapper
class ProcessFrame84(gym.ObservationWrapper):
    def __init__(self, env):
        super(ProcessFrame84, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(1, 84, 84), dtype=np.uint8)

    def observation(self, obs):
        obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
        return np.expand_dims(obs, axis=0)  # Change axis to 0 to match PyTorch format


# Environment Setup
def make_env():
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", apply_api_compatibility=True, render_mode="human")
    env = JoypadSpace(env, SIMPLE_MOVEMENT)
    env = ProcessFrame84(env)
    env = SkipFrame(env, skip=10)  # Skips 4 frames per action
    return env

# Q-Value Visualization
def plot_q_values(model, state):
    with torch.no_grad():
        q_values = model(torch.tensor(state).unsqueeze(0).float()).cpu().numpy()
        sns.heatmap(q_values, annot=True, cmap="coolwarm")
        plt.xlabel("Actions")
        plt.ylabel("Q-Value")
        plt.show()

# Training Loop
if __name__ == "__main__":
    env = make_env()
    env = RecordVideo(env, "./video", episode_trigger=lambda e: e % 10 == 0)
    writer = SummaryWriter()
    input_shape = (1, 84, 84)
    n_actions = env.action_space.n
    model = DQNSolver(input_shape, n_actions)

    # Initialize TensorBoard
    writer = SummaryWriter("runs/mario_dqn")

    num_episodes = 50000
    max_steps_per_episode = 10000
    stuck_threshold = 60  # Reset if Mario is stuck
    
    gamma = 0.99  # Discount factor for future rewards
    epsilon = 1.0  # Initial exploration rate
    epsilon_min = 0.05  # Minimum exploration rate
    epsilon_decay = 0.9995  # Decay rate

    for episode in range(num_episodes):
        state, _ = env.reset()
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.float32)

        state = state.unsqueeze(0).squeeze(-1)

        
        total_reward = 0
        steps = 0
        stuck_counter = 0
        last_position = 0
        
        while steps < max_steps_per_episode:
            env.render()
            time.sleep(0.000000001)

            # ε-Greedy action selection
            if random.random() < epsilon:
                action = env.action_space.sample()  # Explore
            else:
                with torch.no_grad():
        
                    action = model(state).argmax().item()


            next_state, reward, done, trunc, info = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)

            # If Mario is stuck
            if info["x_pos"] == last_position:
                stuck_counter += 1
                reward -= 0.3  # Penalty for being stuck bad boi
            else:
                stuck_counter = 0
            last_position = info["x_pos"]

            model.remember(state, action, reward, next_state, done)
            model.experience_replay()

            total_reward += reward
            state = next_state
            steps += 1

            if stuck_counter > stuck_threshold:
                print(f"Episode {episode + 1}: Mario got stuck! Resetting...")
                break

            if done:
                break
        
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # Log progress
        writer.add_scalar("Total Reward", total_reward, episode)
        writer.add_scalar("Steps", steps, episode)

        print(f"Episode {episode + 1}/{num_episodes}, Reward: {total_reward}, Steps: {steps}")

        if (episode + 1) % 100 == 0:
            model.save_model()

    env.close()
    writer.close()


  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(
  if not isinstance(terminated, (bool, np.bool8)):


Episode 1/50000, Reward: 172.7, Steps: 26
Episode 2/50000, Reward: 1020.9000000000003, Steps: 114
Episode 3/50000, Reward: 618.7, Steps: 48
Episode 4/50000, Reward: 221.0, Steps: 16
Episode 5/50000, Reward: 598.0999999999999, Steps: 64
Episode 6/50000, Reward: 213.7, Steps: 21
Episode 7/50000, Reward: 793.1000000000004, Steps: 93
Episode 8/50000, Reward: 580.4, Steps: 58
Episode 9/50000, Reward: 205.39999999999998, Steps: 27
Episode 10/50000, Reward: 238.0, Steps: 16
Episode 11/50000, Reward: 169.7, Steps: 39
Episode 12/50000, Reward: 202.7, Steps: 24
Episode 13/50000, Reward: 221.0, Steps: 19
Episode 14/50000, Reward: 237.1, Steps: 22
Episode 15/50000, Reward: 197.7, Steps: 29
Episode 16/50000, Reward: 242.0, Steps: 13
Episode 17/50000, Reward: 245.0, Steps: 14
Episode 18/50000, Reward: 998.2000000000005, Steps: 129
Episode 19/50000, Reward: 247.0, Steps: 13
Episode 20/50000, Reward: 209.0, Steps: 18
Episode 21/50000, Reward: 597.3000000000003, Steps: 91
Episode 22/50000, Reward: 565.

In [None]:
import torch

def load_model(self, filename="mario_dqn.pth"):
    self.load_state_dict(torch.load(filename))
    self.eval()
    print(f"Model loaded from {filename}")


In [None]:
import time
import gym_super_mario_bros
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT

def demo():
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", apply_api_compatibility=True, render_mode="human")
    env = JoypadSpace(env, SIMPLE_MOVEMENT)
    env = ProcessFrame84(env)

    model = DQNSolver((1, 84, 84), env.action_space.n)
    model.load_model("mario_dqn.pth")

    state, _ = env.reset()
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

    done = False
    while not done:
        env.render()
        time.sleep(0.05)

        with torch.no_grad():
            action = model(state).argmax().item()  # Select best action

        next_state, _, done, trunc, _ = env.step(action)
        state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)

    env.close()


demo()
