<a href="https://colab.research.google.com/github/vischia/adfm_2024-2025/blob/master/practice_3_super_mario_bros_with_rl/notebook_interface.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 1. Install dependencies

Run this once in a fresh environment.


In [None]:
!pip install numpy==1.26.4 gym==0.26.2 gym-super-mario-bros==7.4.0 nes-py==8.2.1 torch torchvision


## 1.bis: run this only if you need rendering in Google Colab! apt install freeglut3 freeglut3-dev

In [None]:
runOnColab=False

if runOnColab:
    ! apt install freeglut3 freeglut3-dev
! pip install pyvirtualdisplay matplotlib

In [None]:
import matplotlib.pyplot as plt
from IPython import display as ipythondisplay

from pyvirtualdisplay import Display
display = Display(visible=0, size=(400, 300))
display.start()


# ENVIRONMENT

In [None]:

import numpy as np
import torch
import gym
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from nes_py.wrappers import JoypadSpace
from gym.wrappers import GrayScaleObservation, ResizeObservation


def make_mario_env(world: int = 1, stage: int = 1):
    """
    Create a Super Mario Bros environment with:
      - Discrete SIMPLE_MOVEMENT action space.
      - Grayscale 84x84 observations.
    """
    env_id = f"SuperMarioBros-{world}-{stage}-v0"
    env = gym_super_mario_bros.make(env_id, 
        apply_api_compatibility=True,
        render_mode="rgb_array")  # use "human" if running on your laptop.
    env = JoypadSpace(env, SIMPLE_MOVEMENT)
    env = GrayScaleObservation(env, keep_dim=False)  # (H, W)
    env = ResizeObservation(env, 84)                 # (84, 84)
    return env


def preprocess_state(state, device):
    """
    Flatten and normalize (84,84) grayscale image to (1, 84*84) tensor.
    """
    state = np.array(state, dtype=np.float32) / 255.0
    state = state.flatten()
    return torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)


# Network

In [None]:

import torch
import torch.nn as nn
import torch.nn.functional as F


class DQN(nn.Module):
    """Simple feed-forward DQN with flattened image input."""
    def __init__(self, input_dim: int, n_actions: int):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, n_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)


# Replay Buffer

In [None]:

import random
from collections import deque


class ReplayMemory:
    """Fixed-size replay buffer for experience replay."""
    def __init__(self, capacity: int):
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size: int):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


# Agent

In [None]:

import torch.nn.utils as nn_utils

class MarioDQNAgent:
    def __init__(
        self,
        n_actions: int,
        input_dim: int = 84 * 84,
        lr: float = 1e-4,
        gamma: float = 0.99,
        replay_capacity: int = 50000,
        device: str = "cpu",
    ):
        self.n_actions = n_actions
        self.device = device
        self.gamma = gamma

        self.policy_net = DQN(input_dim, n_actions).to(device)
        self.target_net = DQN(input_dim, n_actions).to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        self.optimizer = torch.optim.Adam(self.policy_net.parameters(), lr=lr)
        self.memory = ReplayMemory(replay_capacity)

    def select_action(self, state, epsilon: float) -> int:
        """STUDENT TODO: epsilon-greedy policy.

        - With probability epsilon: return a random action in [0, n_actions - 1].
        - Otherwise: return argmax_a Q(state, a) according to policy_net.
        """
        raise NotImplementedError("select_action is not implemented")

    def optimize_model(self, batch_size: int):
        """STUDENT TODO: one DQN update step using replay memory.

        Steps:
            1. Return early if there are fewer transitions than batch_size.
            2. Sample a batch of transitions (state, action, reward, next_state, done).
            3. Compute current Q(s,a) and targets:
                   target = r + gamma * max_a' Q_target(s', a') * (1 - done)
            4. Compute Huber (smooth L1) loss and do a gradient step.
        """
        raise NotImplementedError("optimize_model is not implemented")


# Play interface

In [None]:
import numpy as np
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT


def run_agent_episode(agent: MarioDQNAgent, world: int = 1, stage: int = 1, render: bool = True) -> float:
    env = make_mario_env(world, stage)
    obs, info = env.reset()
    prev_screen = env.render()
    plt.imshow(prev_screen)
    state = preprocess_state(obs, agent.device)
    done = False
    total_reward = 0.0

    while not done:
        screen = env.render()
        plt.imshow(screen)
        action = agent.select_action(state, epsilon=0.0)
        next_obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        next_state = preprocess_state(next_obs, agent.device)
        total_reward += reward
        state = next_state
        ipythondisplay.clear_output(wait=True)
        ipythondisplay.display(plt.gcf())

    ipythondisplay.clear_output(wait=True)
    env.close()
    print(f"[Agent] Total reward: {total_reward:.2f}")
    return total_reward


def run_random_episode(world: int = 1, stage: int = 1, render: bool = True) -> float:
    env = make_mario_env(world, stage)
    obs, info = env.reset()
    prev_screen = env.render()
    plt.imshow(prev_screen)
    done = False
    total_reward = 0.0

    while not done:
        screen = env.render()
        plt.imshow(screen)
        action = env.action_space.sample()
        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        total_reward += reward
        ipythondisplay.clear_output(wait=True)
        ipythondisplay.display(plt.gcf())

    ipythondisplay.clear_output(wait=True)
    env.close()
    print(f"[Random] Total reward: {total_reward:.2f}")
    return total_reward


def evaluate_agent_vs_random(agent: MarioDQNAgent, episodes: int = 5):
    agent_scores = []
    random_scores = []
    for i in range(episodes):
        print(f"\n=== Match {i+1}/{episodes} ===")
        agent_scores.append(run_agent_episode(agent, render=False))
        random_scores.append(run_random_episode(render=False))

    print("\n=== Summary (computer vs computer) ===")
    print(f"Agent average reward:  {np.mean(agent_scores):.2f}")
    print(f"Random average reward: {np.mean(random_scores):.2f}")


def run_human_episode(world: int = 1, stage: int = 1):
    env = make_mario_env(world, stage)
    obs, info = env.reset()
    prev_screen = env.render()
    plt.imshow(prev_screen)
    done = False
    total_reward = 0.0

    print("\nActions (SIMPLE_MOVEMENT):")
    for idx, action in enumerate(SIMPLE_MOVEMENT):
        print(f"{idx}: {action}")
    print("Press Ctrl+C to quit.")

    while not done:
        env.render()
        try:
            a = int(input("Choose action index: "))
            if a < 0 or a >= len(SIMPLE_MOVEMENT):
                print("Invalid index, try again.")
                continue
        except ValueError:
            print("Please enter a valid integer.")
            continue

        obs, reward, terminated, truncated, info = env.step(a)
        done = terminated or truncated
        total_reward += reward
        ipythondisplay.clear_output(wait=True)
        ipythondisplay.display(plt.gcf())


    pythondisplay.clear_output(wait=True)
    env.close()
    print(f"[Human] Total reward: {total_reward:.2f}")


In [None]:
# Training infrastructure 

In [None]:
def train_dqn(
    num_episodes: int = 200,
    batch_size: int = 32,
    epsilon_start: float = 1.0,
    epsilon_end: float = 0.1,
    epsilon_decay_episodes: int = 150,
    target_update_interval: int = 1000,
) -> MarioDQNAgent:
    """STUDENT TODO: implement the full DQN training loop.

    Suggested algorithm:
        - Create env with make_mario_env()
        - Instantiate MarioDQNAgent.
        - For each episode:
            * Reset env, preprocess initial state:
                  obs = env.reset()
                  state = preprocess_state(obs, device)
            * Compute epsilon via linear decay:
                  frac = min(episode_idx / epsilon_decay_episodes, 1.0)
                  epsilon = epsilon_start + frac * (epsilon_end - epsilon_start)
            * Loop until done:
                - Choose action via agent.select_action(state, epsilon).
                - Step env; get obs, reward, done, info.
                - Preprocess next observation.
                - Store transition in replay memory.
                - Call agent.optimize_model(batch_size).
                - Periodically update target_net.
                - Accumulate total_reward.
        - Close env and return the trained agent.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")
    env = make_mario_env()
    n_actions = env.action_space.n
    agent = MarioDQNAgent(n_actions=n_actions, device=device)

    global_step = 0

    for episode in range(num_episodes):
        # TODO: implement the main training loop as described above.
        raise NotImplementedError("train_dqn main loop is not implemented")

    env.close()
    return agent



## 2. Train the agent


In [None]:
agent = train_dqn(num_episodes=10)


## 3. Evaluate and play


In [None]:
run_human_episode()
run_agent_episode(agent, render=True)
evaluate_agent_vs_random(agent, episodes=2)
# run_human_episode()  # uncomment to control Mario manually
