# Assault (Atari) con DQN + PER

Este notebook entrena un agente DQN con Prioritized Experience Replay (PER) en ALE/Assault-v5.
Incluye preprocesamiento, checkpoints, logging con TensorBoard y evaluacion vs baseline aleatorio.

In [None]:
# Instalar dependencias (compatible con Colab)
!pip -q install gymnasium[atari,accept-rom-license] ale-py autorom torch tensorboard
!AutoROM --accept-license

import os
import json
import time
import subprocess
import numpy as np
import torch
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

import sys
sys.path.append(os.getcwd())

from src.envs.atari import make_atari_env
from src.agents.dqn_per import DQNPerAgent, EpsilonSchedule, PERBetaSchedule
from src.utils.checkpoints import save_checkpoint, load_checkpoint, save_config

ROOT_DIR = os.getcwd()
ART_DIR = os.path.join(ROOT_DIR, "artifacts", "assault")
CKPT_DIR = os.path.join(ART_DIR, "checkpoints")
LOG_DIR = os.path.join(ART_DIR, "logs")
VIDEO_DIR = os.path.join(ART_DIR, "videos")
for d in [CKPT_DIR, LOG_DIR, VIDEO_DIR]:
    os.makedirs(d, exist_ok=True)

SEED = 123
np.random.seed(SEED)
torch.manual_seed(SEED)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Dispositivo:", DEVICE)
print("Torch:", torch.__version__)
print("Gymnasium:", gym.__version__)
try:
    print(subprocess.check_output(["nvidia-smi"]).decode())
except Exception:
    print("nvidia-smi no disponible")

In [None]:
# Hiperparametros y configuracion
config = {
    "env_id": "ALE/Assault-v5",
    "frame_skip": 4,
    "clip_rewards": True,
    "total_steps": 2_000_000,
    "learning_starts": 50_000,
    "batch_size": 32,
    "buffer_size": 200_000,
    "gamma": 0.99,
    "lr": 1e-4,
    "target_update_interval": 10_000,
    "checkpoint_interval": 200_000,
    "eps_start": 1.0,
    "eps_end": 0.05,
    "eps_decay_steps": 1_000_000,
    "beta_start": 0.4,
    "beta_end": 1.0,
    "beta_steps": 1_000_000,
    "alpha": 0.6,
    "seed": SEED
}
save_config(config, CKPT_DIR)

In [None]:
# Crear entorno
env = make_atari_env(
    config["env_id"],
    seed=SEED,
    frame_skip=config["frame_skip"],
    clip_rewards=config["clip_rewards"],
)
num_actions = env.action_space.n
obs_shape = env.observation_space.shape
print("Obs shape:", obs_shape, "Acciones:", num_actions)

In [None]:
# Agente
eps_schedule = EpsilonSchedule(
    config["eps_start"],
    config["eps_end"],
    config["eps_decay_steps"],
)
beta_schedule = PERBetaSchedule(
    config["beta_start"],
    config["beta_end"],
    config["beta_steps"],
)
agent = DQNPerAgent(
    obs_shape=obs_shape,
    num_actions=num_actions,
    device=DEVICE,
    gamma=config["gamma"],
    lr=config["lr"],
    target_update_interval=config["target_update_interval"],
    buffer_size=config["buffer_size"],
    alpha=config["alpha"],
    eps_schedule=eps_schedule,
    beta_schedule=beta_schedule,
)

In [None]:
# Writer de TensorBoard
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter(log_dir=LOG_DIR)

def obs_to_array(obs):
    arr = np.asarray(obs)
    if arr.ndim == 3:
        arr = np.transpose(arr, (2, 0, 1))
    return arr

def preprocess(obs):
    arr = obs_to_array(obs)
    return arr.astype(np.float32) / 255.0

In [None]:
# Loop de entrenamiento con checkpoints
resume_path = None  # define ruta de checkpoint para reanudar
if resume_path:
    state = load_checkpoint(resume_path, device=DEVICE)
    agent.load_state(state)
    print("Reanudado desde", resume_path)

obs, info = env.reset()
episode_reward = 0.0
episode_len = 0
start_time = time.time()

for step in range(1, config["total_steps"] + 1):
    agent.step_count = step
    obs_proc = preprocess(obs)
    action = agent.select_action(obs_proc)
    next_obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

    agent.replay.add(obs_to_array(obs), action, reward, obs_to_array(next_obs), float(done))
    episode_reward += reward
    episode_len += 1

    if step > config["learning_starts"]:
        update_out = agent.update(config["batch_size"])
        if update_out is not None:
            loss, td_error = update_out
            writer.add_scalar("train/loss", loss, step)
            writer.add_scalar("train/td_error", td_error, step)

    eps_value = agent.eps_schedule.value(step)
    writer.add_scalar("train/epsilon", eps_value, step)

    if done:
        writer.add_scalar("rollout/episode_return", episode_reward, step)
        writer.add_scalar("rollout/episode_length", episode_len, step)
        obs, info = env.reset()
        episode_reward = 0.0
        episode_len = 0
    else:
        obs = next_obs

    if step % config["checkpoint_interval"] == 0:
        state = agent.save_state()
        ckpt_path = save_checkpoint(state, CKPT_DIR, step)
        print("Guardado", ckpt_path)

train_time = time.time() - start_time
print(f"Tiempo de entrenamiento (s): {train_time:.1f}")

In [None]:
# Helpers de evaluacion
def greedy_action(agent, obs):
    obs_t = torch.tensor(obs, dtype=torch.float32, device=DEVICE).unsqueeze(0)
    with torch.no_grad():
        q = agent.online(obs_t)
    return int(torch.argmax(q, dim=1).item())

def run_eval(env_id, n_episodes=10, seed_offset=0, use_random=False, record_video=False):
    eval_env = make_atari_env(
        env_id,
        seed=SEED + seed_offset,
        frame_skip=config["frame_skip"],
        clip_rewards=False,
        render_mode="rgb_array" if record_video else None,
    )
    if record_video:
        eval_env = RecordVideo(eval_env, video_folder=VIDEO_DIR, name_prefix="assault_eval")
    rewards = []
    for ep in range(n_episodes):
        obs, info = eval_env.reset(seed=SEED + seed_offset + ep)
        done = False
        ep_reward = 0.0
        while not done:
            obs_proc = preprocess(obs)
            if use_random:
                action = eval_env.action_space.sample()
            else:
                action = greedy_action(agent, obs_proc)
            obs, reward, terminated, truncated, info = eval_env.step(action)
            done = terminated or truncated
            ep_reward += reward
        rewards.append(ep_reward)
    eval_env.close()
    mean_r = float(np.mean(rewards))
    std_r = float(np.std(rewards))
    return rewards, mean_r, std_r

agent.online.eval()
eval_rewards, eval_mean, eval_std = run_eval(config["env_id"], n_episodes=10)
print(f"DQN+PER recompensa media: {eval_mean:.2f} +/- {eval_std:.2f}")

rand_rewards, rand_mean, rand_std = run_eval(config["env_id"], n_episodes=10, use_random=True)
print(f"Politica aleatoria recompensa media: {rand_mean:.2f} +/- {rand_std:.2f}")

In [None]:
# Exportar un video corto de evaluacion
_ = run_eval(config["env_id"], n_episodes=1, seed_offset=9999, record_video=True)
print("Video guardado en:", VIDEO_DIR)

## TensorBoard

Ejecuta en Colab:

```
%load_ext tensorboard
%tensorboard --logdir artifacts/assault/logs
```

## Reporte tecnico (completar despues del entrenamiento)
- Algoritmo: DQN + PER
- Hiperparametros: ver config.json
- Librerias y versiones: impresas en la celda de setup
- Hardware: salida de nvidia-smi en la celda de setup
- Tiempo de entrenamiento: impreso al finalizar
- Resultados: media/desviacion en 10 episodios + baseline aleatorio
- Conclusiones: agrega observaciones y siguientes pasos