<a href="https://colab.research.google.com/github/timmyt110/CS166_Fall2025/blob/main/CSCI_166__DQN_on_New_Atari_Domains.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DQN on Breakout --- Adapted from Pong Starter (09.21.2025)

In [2]:
# Clean any prior AutoROM flavors
!pip uninstall -y AutoROM AutoROM.accept-rom-license autorom >/dev/null 2>&1

# Install the release that exposes a stable CLI
!pip install -q "AutoROM.accept-rom-license==0.6.1"

# Run the CLI (this variant supports these flags)
import os, sys, subprocess, pathlib
os.environ["ALE_ACCEPT_LICENSE"] = "YES"
rom_dir = pathlib.Path.home()/".atari_roms"
rom_dir.mkdir(parents=True, exist_ok=True)

cmd = [sys.executable, "-m", "AutoROM", "--accept-license", "--install-dir", str(rom_dir)]
print("Running:", " ".join(cmd))
subprocess.run(cmd, check=True)

# Point ALE to the ROM directory
os.environ["ALE_PY_ROM_DIR"] = str(rom_dir)
print("✅ ROMs installed to:", rom_dir)

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/434.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m17.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for AutoROM.accept-rom-license (pyproject.toml) ... [?25l[?25hdone
Running: /usr/bin/python3 -m AutoROM --accept-license --install-dir /root/.atari_roms
✅ ROMs installed to: /root/.atari_roms


# Install the Gym

In [3]:
import ale_py
import gymnasium as gym

# Configure the model save drive

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
import os
save_dir = "/content/drive/MyDrive/PUBLIC/Models"
os.makedirs(save_dir, exist_ok=True)

# Now Model

In [6]:
from dataclasses import dataclass
import argparse
import time
from datetime import datetime
import numpy as np
import collections
import typing as tt

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.tensorboard.writer import SummaryWriter

In [7]:
#dqn_model
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
        )
        size = self.conv(torch.zeros(1, *input_shape)).size()[-1]
        self.fc = nn.Sequential(
            nn.Linear(size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
    def forward(self, x: torch.ByteTensor):
        x = x.float() / 255.0
        return self.fc(self.conv(x))

In [8]:
# Install Stable Baselines3 for Atari wrappers
!pip install -q stable-baselines3

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/187.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m187.2/187.2 kB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [9]:
from gymnasium import spaces
from stable_baselines3.common import atari_wrappers
import numpy as np
import collections
import typing as tt

class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super(ImageToPyTorch, self).__init__(env)
        obs = self.observation_space
        assert isinstance(obs, gym.spaces.Box)
        assert len(obs.shape) == 3
        new_shape = (obs.shape[-1], obs.shape[0], obs.shape[1])
        self.observation_space = gym.spaces.Box(
            low=obs.low.min(), high=obs.high.max(),
            shape=new_shape, dtype=obs.dtype)

    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)


class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps):
        super(BufferWrapper, self).__init__(env)
        obs = env.observation_space
        assert isinstance(obs, spaces.Box)
        new_obs = gym.spaces.Box(
            obs.low.repeat(n_steps, axis=0), obs.high.repeat(n_steps, axis=0),
            dtype=obs.dtype)
        self.observation_space = new_obs
        self.buffer = collections.deque(maxlen=n_steps)

    def reset(self, *, seed: tt.Optional[int] = None, options: tt.Optional[dict[str, tt.Any]] = None):
        for _ in range(self.buffer.maxlen):
            self.buffer.append(np.zeros_like(self.env.observation_space.low))
        obs, extra = self.env.reset(seed=seed, options=options)
        return self.observation(obs), extra

    def observation(self, observation: np.ndarray) -> np.ndarray:
        self.buffer.append(observation)
        return np.concatenate(self.buffer)


class FireResetEnv:
    def __init__(self, env):
        self.env = env
        self.action_space = env.action_space
        self.observation_space = env.observation_space

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        try:
            obs, _, term, trunc, info = self.env.step(1)
            if term or trunc:
                obs, info = self.env.reset(**kwargs)
        except Exception:
            pass
        return obs, info

    def step(self, a):
        return self.env.step(a)

    def __getattr__(self, name):
        return getattr(self.env, name)


def make_env(env_name: str, n_steps=4, render_mode=None, **kwargs):
    print(f"Creating environment {env_name}")
    env = gym.make(env_name, render_mode=render_mode, **kwargs)
    env = atari_wrappers.AtariWrapper(env, clip_reward=True, noop_max=30)
    env = ImageToPyTorch(env)
    env = BufferWrapper(env, n_steps=n_steps)
    env = FireResetEnv(env)
    return env

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)


In [10]:
# Base Configuration
DEFAULT_ENV_NAME = "ALE/Seaquest-v5"
MEAN_REWARD_BOUND = 19

GAMMA = 0.99
BATCH_SIZE = 32
REPLAY_SIZE = 10000
LEARNING_RATE = 1e-4
SYNC_TARGET_FRAMES = 1000
REPLAY_START_SIZE = 10000

SAVE_EPSILON = 0.5  # Only save if at least this much better
EPSILON_DECAY_LAST_FRAME = 150000
EPSILON_START = 1.0
EPSILON_FINAL = 0.01

# Tuple of tensors returned from a sampled minibatch in replay buffer
State = np.ndarray
Action = int
BatchTensors = tt.Tuple[
    torch.ByteTensor,           # current state
    torch.LongTensor,           # actions
    torch.Tensor,               # rewards
    torch.BoolTensor,           # done || trunc
    torch.ByteTensor            # next state
]

In [11]:
# ⚙️ Fast Training Config for Quick Test Run
MEAN_REWARD_BOUND = 5
REPLAY_START_SIZE = 1000
EPSILON_DECAY_LAST_FRAME = 10_000
SYNC_TARGET_FRAMES = 500

# REPLAY_SIZE = 5000  # optional
# BATCH_SIZE = 16     # optional

In [12]:
import os
from pathlib import Path

# Define directories
save_dir_drive = "/content/drive/MyDrive/PUBLIC/Models"
save_dir_local = "saved_models"

# Create both directories if they don't exist
os.makedirs(save_dir_drive, exist_ok=True)
os.makedirs(save_dir_local, exist_ok=True)

# Safe model filename
env_name = DEFAULT_ENV_NAME
safe_env_name = env_name.replace("/", "_")

In [13]:
@dataclass
class Experience:
    state: State
    action: Action
    reward: float
    done_trunc: bool
    new_state: State


class ExperienceBuffer:
    def __init__(self, capacity: int):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, experience: Experience):
        self.buffer.append(experience)

    def sample(self, batch_size: int) -> tt.List[Experience]:
        indices = np.random.choice(len(self), batch_size, replace=False)
        return [self.buffer[idx] for idx in indices]

In [14]:
class Agent:
    def __init__(self, env: gym.Env, exp_buffer: ExperienceBuffer):
        self.env = env
        self.exp_buffer = exp_buffer
        self.state: tt.Optional[np.ndarray] = None
        self._reset()

    def _reset(self):
        self.state, _ = self.env.reset()
        self.total_reward = 0.0

    @torch.no_grad()
    def play_step(self, net: DQN, device: torch.device,
                  epsilon: float = 0.0) -> tt.Optional[float]:
        done_reward = None

        if np.random.random() < epsilon:
            action = self.env.action_space.sample()  # ← fixed
        else:
            state_v = torch.as_tensor(self.state, device=device).unsqueeze(0)
            q_vals_v = net(state_v)
            _, act_v = torch.max(q_vals_v, dim=1)
            action = int(act_v.item())

        # do step in the environment
        new_state, reward, is_done, is_tr, _ = self.env.step(action)
        self.total_reward += reward

        exp = Experience(
            state=self.state, action=action, reward=float(reward),
            done_trunc=is_done or is_tr, new_state=new_state
        )
        self.exp_buffer.append(exp)
        self.state = new_state

        if is_done or is_tr:
            done_reward = self.total_reward
            self._reset()

        return done_reward

In [15]:
def batch_to_tensors(batch: tt.List[Experience], device: torch.device) -> BatchTensors:
    states, actions, rewards, dones, new_state = [], [], [], [], []
    for e in batch:
        states.append(e.state)
        actions.append(e.action)
        rewards.append(e.reward)
        dones.append(e.done_trunc)
        new_state.append(e.new_state)
    states_t = torch.as_tensor(np.asarray(states))
    actions_t = torch.LongTensor(actions)
    rewards_t = torch.FloatTensor(rewards)
    dones_t = torch.BoolTensor(dones)
    new_states_t = torch.as_tensor(np.asarray(new_state))
    return states_t.to(device), actions_t.to(device), rewards_t.to(device), \
           dones_t.to(device),  new_states_t.to(device)

In [16]:
def calc_loss(batch: tt.List[Experience], net: DQN, tgt_net: DQN,
              device: torch.device) -> torch.Tensor:
    states_t, actions_t, rewards_t, dones_t, new_states_t = batch_to_tensors(batch, device)

    state_action_values = net(states_t).gather(
        1, actions_t.unsqueeze(-1)
    ).squeeze(-1)
    with torch.no_grad():
        next_state_values = tgt_net(new_states_t).max(1)[0]
        next_state_values[dones_t] = 0.0
        next_state_values = next_state_values.detach()

    expected_state_action_values = next_state_values * GAMMA + rewards_t
    return nn.MSELoss()(state_action_values, expected_state_action_values)

In [17]:
model_comment = f"test_epsdec{EPSILON_DECAY_LAST_FRAME}_rs{REPLAY_START_SIZE}_sync{SYNC_TARGET_FRAMES}"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# keep runs short so you can record “early vs later” clips
TRAIN_MAX_FRAMES = 150_000  # adjust 50k–200k depending on time

env = make_env(env_name)  # env_name already comes from DEFAULT_ENV_NAME (now Breakout)
net = DQN(env.observation_space.shape, env.action_space.n).to(device)
tgt_net = DQN(env.observation_space.shape, env.action_space.n).to(device)
writer = SummaryWriter(comment=f"-{safe_env_name}-{model_comment}")
print(net)

buffer = ExperienceBuffer(REPLAY_SIZE)
agent = Agent(env, buffer)
epsilon = EPSILON_START

optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
total_rewards = []
frame_idx = 0
ts_frame = 0
ts = time.time()
best_m_reward = None

start_time = time.time()
while True:
    frame_idx += 1
    epsilon = max(EPSILON_FINAL, EPSILON_START - frame_idx / EPSILON_DECAY_LAST_FRAME)

    reward = agent.play_step(net, device, epsilon)
    if reward is not None:
        total_rewards.append(reward)
        speed = (frame_idx - ts_frame) / (time.time() - ts)
        elapsed = time.time() - start_time  # in seconds
        ts_frame = frame_idx
        ts = time.time()
        m_reward = np.mean(total_rewards[-100:]) if total_rewards else 0.0

        writer.add_scalar("epsilon", epsilon, frame_idx)
        writer.add_scalar("speed", speed, frame_idx)
        writer.add_scalar("reward_100", m_reward, frame_idx)
        writer.add_scalar("reward", reward, frame_idx)

        if best_m_reward is None or m_reward > best_m_reward + SAVE_EPSILON:
            print(f"{frame_idx}: done {len(total_rewards)} games, reward {m_reward:.3f}, "
                  f"eps {epsilon:.2f}, speed {speed:.2f} f/s, time {elapsed/60:.1f} min")
            timestamp = datetime.now().strftime("%Y%m%d-%H%M")
            model_filename = f"{safe_env_name}-best_{int(m_reward)}-{timestamp}-{model_comment}.dat"

            # Save to both paths
            model_path_drive = os.path.join(save_dir_drive, model_filename)
            model_path_local = os.path.join(save_dir_local, model_filename)
            torch.save(net.state_dict(), model_path_drive)
            torch.save(net.state_dict(), model_path_local)

            print(f"💾 Model saved to:\n - Google Drive: {model_path_drive}\n - Local:        {model_path_local}")
            if best_m_reward is not None:
                print(f"Best reward updated {best_m_reward:.3f} -> {m_reward:.3f}")
            best_m_reward = m_reward

        if m_reward > MEAN_REWARD_BOUND:
            print("Solved in %d frames!" % frame_idx)
            break

    if len(buffer) < REPLAY_START_SIZE:
        if frame_idx >= TRAIN_MAX_FRAMES:
            print(f"Stopping at TRAIN_MAX_FRAMES={TRAIN_MAX_FRAMES} (buffer filling).")
            break
        continue

    if frame_idx % SYNC_TARGET_FRAMES == 0:
        tgt_net.load_state_dict(net.state_dict())

    optimizer.zero_grad()
    batch = buffer.sample(BATCH_SIZE)
    loss_t = calc_loss(batch, net, tgt_net, device)
    loss_t.backward()
    optimizer.step()

    if frame_idx >= TRAIN_MAX_FRAMES:
        print(f"Reached TRAIN_MAX_FRAMES={TRAIN_MAX_FRAMES}.")
        break

env.close()
writer.close()

Creating environment ALE/Seaquest-v5
DQN(
  (conv): Sequential(
    (0): Conv2d(4, 32, kernel_size=(8, 8), stride=(4, 4))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
    (3): ReLU()
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU()
    (6): Flatten(start_dim=1, end_dim=-1)
  )
  (fc): Sequential(
    (0): Linear(in_features=3136, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=18, bias=True)
  )
)
31: done 1 games, reward 2.000, eps 1.00, speed 354.56 f/s, time 0.0 min
💾 Model saved to:
 - Google Drive: /content/drive/MyDrive/PUBLIC/Models/ALE_Seaquest-v5-best_2-20250929-0045-test_epsdec10000_rs1000_sync500.dat
 - Local:        saved_models/ALE_Seaquest-v5-best_2-20250929-0045-test_epsdec10000_rs1000_sync500.dat


  return datetime.utcnow().replace(tzinfo=utc)


8538: done 265 games, reward 2.520, eps 0.15, speed 97.91 f/s, time 1.1 min
💾 Model saved to:
 - Google Drive: /content/drive/MyDrive/PUBLIC/Models/ALE_Seaquest-v5-best_2-20250929-0047-test_epsdec10000_rs1000_sync500.dat
 - Local:        saved_models/ALE_Seaquest-v5-best_2-20250929-0047-test_epsdec10000_rs1000_sync500.dat
Best reward updated 2.000 -> 2.520
15746: done 416 games, reward 3.030, eps 0.01, speed 122.52 f/s, time 2.1 min
💾 Model saved to:
 - Google Drive: /content/drive/MyDrive/PUBLIC/Models/ALE_Seaquest-v5-best_3-20250929-0048-test_epsdec10000_rs1000_sync500.dat
 - Local:        saved_models/ALE_Seaquest-v5-best_3-20250929-0048-test_epsdec10000_rs1000_sync500.dat
Best reward updated 2.520 -> 3.030
25953: done 609 games, reward 3.570, eps 0.01, speed 128.90 f/s, time 3.6 min
💾 Model saved to:
 - Google Drive: /content/drive/MyDrive/PUBLIC/Models/ALE_Seaquest-v5-best_3-20250929-0049-test_epsdec10000_rs1000_sync500.dat
 - Local:        saved_models/ALE_Seaquest-v5-best_3-2025

In [18]:
# Recorder Helper

import imageio.v2 as imageio
import numpy as np, torch

def record_clip(env_id, policy_net, eps=1.0, target_seconds=12, save_fps=15,
                slow_factor=2, out_path="video.mp4", seed=0, max_episodes=6):
    """
    save_fps: frames per second in the output video (lower = slower playback)
    slow_factor: duplicate each rendered frame this many times (bigger = slower)
    """
    target_frames = int(target_seconds * save_fps)
    frames = []
    env = make_env(env_id, n_steps=4, render_mode="rgb_array")
    dev = next(policy_net.parameters()).device
    rng = np.random.RandomState(seed)

    episodes = 0
    while len(frames) < target_frames and episodes < max_episodes:
        s, _ = env.reset(seed=int(rng.randint(0, 1_000_000)))
        done = trunc = False
        while not (done or trunc) and len(frames) < target_frames:
            if rng.rand() < eps:
                a = env.action_space.sample()
            else:
                with torch.no_grad():
                    sv = torch.as_tensor(s, device=dev).unsqueeze(0)
                    q = policy_net(sv)
                    a = int(torch.argmax(q, dim=1).item())
            ns, r, done, trunc, _ = env.step(a)
            frame = env.render()
            # duplicate frames to slow apparent motion
            frames.extend([frame] * slow_factor)
            s = ns
        episodes += 1

    if not frames:
        raise RuntimeError("No frames recorded.")
    # trim/pad to exact length
    frames = frames[:target_frames] if len(frames) >= target_frames else frames + [frames[-1]]*(target_frames-len(frames))
    imageio.mimsave(out_path, frames, fps=save_fps)
    env.close()
    print(f"Saved {out_path} — {len(frames)/save_fps:.1f}s at {save_fps} fps (slow_factor={slow_factor})")

In [20]:
# Make the videos

# Early: fresh net, random-ish, slowed
tmp_env = make_env(DEFAULT_ENV_NAME, render_mode="rgb_array")
tmp_net = DQN(tmp_env.observation_space.shape, tmp_env.action_space.n).to(next(net.parameters()).device)
tmp_env.close()

import os
os.makedirs("videos", exist_ok=True)

record_clip(DEFAULT_ENV_NAME, tmp_net, eps=1.0,  target_seconds=12, save_fps=15, slow_factor=2,
            out_path="videos/early_seaquest.mp4", seed=0)

# Later: your trained net, greedy-ish, slowed
record_clip(DEFAULT_ENV_NAME, net,     eps=0.01, target_seconds=12, save_fps=15, slow_factor=2,
            out_path="videos/later_seaquest.mp4", seed=1)

Creating environment ALE/Seaquest-v5
Creating environment ALE/Seaquest-v5




Saved videos/early_seaquest.mp4 — 12.0s at 15 fps (slow_factor=2)
Creating environment ALE/Seaquest-v5




Saved videos/later_seaquest.mp4 — 12.0s at 15 fps (slow_factor=2)


In [21]:
# Cell C — Preview videos directly in Colab
from IPython.display import Video, display

display(Video("videos/early_seaquest.mp4", embed=True, width=480))
display(Video("videos/later_seaquest.mp4", embed=True, width=480))