# SpaceInvaders DQN (self-contained)

Todo el código está en este notebook: wrappers, modelo, agente, entrenamiento/evaluación y análisis del log.

In [None]:
# %pip install #   "tensorflow==2.10.0" #   "keras-rl2==1.0.5" #   "gym==0.25.2" #   "ale-py==0.7.5" #   "opencv-python<4.8" #   "matplotlib<3.6" #   "numpy<1.24" #   "Pillow<10"


In [11]:
import os
import random
from collections import deque
from pathlib import Path

import cv2
import gym
import numpy as np
import tensorflow as tf
from rl.agents.dqn import DQNAgent
from rl.callbacks import FileLogger, ModelIntervalCheckpoint
from rl.core import Processor
from rl.memory import SequentialMemory
from rl.policy import EpsGreedyQPolicy, LinearAnnealedPolicy
from tensorflow.keras.layers import Conv2D, Dense, Flatten, Permute
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam

In [12]:
# Configuración básica y rutas
BASE_DIR = Path.cwd()
WEIGHTS_DIR = BASE_DIR / "weights"
LOGS_DIR = BASE_DIR / "logs"
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)

ENV_NAME = "SpaceInvaders-v0"
SEED = 42
INPUT_SHAPE = (84, 84)
WINDOW_LENGTH = 4
STAGE_STEPS_DEFAULT = 3_000_000
WEIGHTS_PATH = WEIGHTS_DIR / "dqn_spaceinvaders_v0_weights.h5f"
LOG_PATH = LOGS_DIR / "training_log.json"

In [13]:
# Wrappers compatibles con gym clásico y keras-rl2
class MaxAndSkipEnv(gym.Wrapper):
    """Frame-skip + max-pooling de los últimos 2 frames."""
    def __init__(self, env, skip=4):
        super().__init__(env)
        self._skip = skip
        self._obs_buffer = deque(maxlen=2)

    def reset(self, **kwargs):
        self._obs_buffer.clear()
        obs = self.env.reset()
        self._obs_buffer.append(obs)
        return obs

    def step(self, action):
        total_reward = 0.0
        terminated = False
        info = {}
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            terminated = terminated or done
            if done:
                break
        max_frame = np.maximum(self._obs_buffer[0], self._obs_buffer[-1])
        done = terminated
        return max_frame, total_reward, done, info


class FireResetEnv(gym.Wrapper):
    """Asegura que el juego arranque (SpaceInvaders puede requerir FIRE)."""
    def reset(self, **kwargs):
        obs = self.env.reset()
        obs, _, done, info = self.env.step(1)
        if done:
            obs = self.env.reset()
        return obs


class NoopResetEnv(gym.Wrapper):
    """Ejecuta un número aleatorio de acciones NOOP tras reset."""
    def __init__(self, env, noop_max=30):
        super().__init__(env)
        self.noop_max = noop_max
        self.noop_action = 0

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        n_noops = np.random.randint(1, self.noop_max + 1)
        for _ in range(n_noops):
            obs, _, done, _ = self.env.step(self.noop_action)
            if done:
                obs = self.env.reset(**kwargs)
        return obs


class EpisodicLifeEnv(gym.Wrapper):
    """Marca done=True al perder una vida (no es game over), solo para train."""
    def __init__(self, env, lives_key="ale.lives"):
        super().__init__(env)
        self.lives_key = lives_key
        self.lives = 0

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        self.lives = 0
        return obs

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        lives = info.get(self.lives_key, None)
        if lives is not None:
            if self.lives == 0:
                self.lives = lives
            if (lives < self.lives) and (lives > 0):
                done = True
            self.lives = lives
        return obs, reward, done, info

In [14]:
# Processor para preprocesado Atari
class AtariProcessor(Processor):
    def process_observation(self, observation):
        gray = cv2.cvtColor(observation, cv2.COLOR_RGB2GRAY)
        resized = cv2.resize(gray, INPUT_SHAPE, interpolation=cv2.INTER_AREA)
        return resized.astype(np.uint8)

    def process_state_batch(self, batch):
        return batch.astype("float32") / 255.0

    def process_reward(self, reward):
        return np.clip(reward, -1.0, 1.0)

In [15]:
# Modelo CNN
def build_model(nb_actions: int):
    model = Sequential()
    model.add(Permute((2, 3, 1), input_shape=(WINDOW_LENGTH,) + INPUT_SHAPE))
    model.add(Conv2D(32, (8, 8), strides=(4, 4), activation="relu"))
    model.add(Conv2D(64, (4, 4), strides=(2, 2), activation="relu"))
    model.add(Conv2D(64, (3, 3), strides=(1, 1), activation="relu"))
    model.add(Flatten())
    model.add(Dense(512, activation="relu"))
    model.add(Dense(nb_actions, activation="linear"))
    return model

In [16]:
# Entorno y semillas
def make_env(seed: int, training: bool):
    env = gym.make(ENV_NAME)
    env = NoopResetEnv(env, noop_max=30)
    env = FireResetEnv(env)
    if training:
        env = EpisodicLifeEnv(env, lives_key="ale.lives")
    env = MaxAndSkipEnv(env, skip=4)
    try:
        env.reset(seed=seed)
        env.action_space.seed(seed)
    except TypeError:
        env.seed(seed)
        env.reset()
    return env


def set_seeds(seed: int = SEED):
    np.random.seed(seed)
    random.seed(seed)
    tf.random.set_seed(seed)

In [17]:
# Agente DQN
def build_agent(env):
    nb_actions = env.action_space.n
    model = build_model(nb_actions)

    memory = SequentialMemory(limit=1_000_000, window_length=WINDOW_LENGTH)

    policy = LinearAnnealedPolicy(
        EpsGreedyQPolicy(),
        attr="eps",
        value_max=0.02, # 0.1, 1.0
        value_min=0.005, # 0.02, 0.1
        value_test=0,
        nb_steps=300_000, # 500_000, 1_000_000
    )

    dqn = DQNAgent(
        model=model,
        nb_actions=nb_actions,
        policy=policy,
        memory=memory,
        processor=AtariProcessor(),
        nb_steps_warmup=20_000, # 100_000
        gamma=0.99,
        target_model_update=5e-3,
        batch_size=32,
        train_interval=4,
        enable_double_dqn=True,
        enable_dueling_network=True,
        dueling_type="avg",
        delta_clip=1.0,
    )

    dqn.compile(Adam(learning_rate=5e-5, clipnorm=10.0), metrics=["mae"]) # 1e-4
    return dqn

In [18]:
# Entrenamiento y evaluación
def train(dqn, env, nb_steps: int = STAGE_STEPS_DEFAULT):
    WEIGHTS_PATH.parent.mkdir(parents=True, exist_ok=True)
    weights_index = Path(f"{WEIGHTS_PATH}.index")
    if weights_index.exists():
        print(f"Cargando pesos: {WEIGHTS_PATH}")
        dqn.load_weights(str(WEIGHTS_PATH))
    else:
        print("No se encontraron pesos previos, entrenando desde cero.")

    dqn.fit(
        env,
        nb_steps=nb_steps,
        log_interval=10_000,
        visualize=False,
        verbose=2,
        callbacks=[
            FileLogger(str(LOG_PATH), interval=100),
            ModelIntervalCheckpoint(str(WEIGHTS_PATH), interval=50_000),
        ],
    )

    dqn.save_weights(str(WEIGHTS_PATH), overwrite=True)
    print(f"Pesos guardados en: {WEIGHTS_PATH}")


def evaluate(dqn, env, episodes: int = 110):
    print(f"Iniciando evaluación ({episodes} episodios)...")
    history = dqn.test(env, nb_episodes=episodes, visualize=False, verbose=0)
    rewards = history.history["episode_reward"]
    last_100 = rewards[-100:] if len(rewards) >= 100 else rewards
    min_score = float(np.min(last_100))
    mean_score = float(np.mean(last_100))
    print(f"Min reward (últimos {len(last_100)}): {min_score}")
    print(f"Mean reward (últimos {len(last_100)}): {mean_score}")
    if len(last_100) >= 100 and min_score > 20:
        print("ESTADO: REQUISITO CUMPLIDO (min(last100) > 20)")
    else:
        print("ESTADO: REQUISITO NO CUMPLIDO")

In [19]:
# Utilidades de análisis
import json

def load_training_log(path: Path = LOG_PATH):
    with open(path, "r") as f:
        return json.load(f)

def reward_summary(log_data):
    rewards = log_data.get("episode_reward")
    if not rewards:
        return {}
    rewards_arr = np.asarray(rewards, dtype=float)
    return {
        "count": int(rewards_arr.size),
        "mean": float(np.mean(rewards_arr)),
        "std": float(np.std(rewards_arr)),
        "min": float(np.min(rewards_arr)),
        "max": float(np.max(rewards_arr)),
        "last_mean_100": float(np.mean(rewards_arr[-100:])) if rewards_arr.size >= 1 else float("nan"),
    }

def plot_metric(log_data, metric: str, save_path=None):
    try:
        import matplotlib.pyplot as plt
    except ImportError as exc:
        raise ImportError("matplotlib no está disponible en el entorno actual.") from exc
    if metric not in log_data:
        raise KeyError(f"Métrica '{metric}' no encontrada en el log.")
    values = log_data[metric]
    fig, ax = plt.subplots(figsize=(10, 4))
    ax.plot(values)
    ax.set_title(metric)
    ax.set_xlabel("Steps")
    ax.set_ylabel(metric)
    ax.grid(True, alpha=0.3)
    if save_path:
        fig.savefig(save_path, dpi=150, bbox_inches="tight")
    return fig, ax

## Flujo rápido
1. Fijar semillas y crear envs.
2. Construir agente.
3. Entrenar (opcional) y evaluar.

In [20]:
set_seeds(SEED)
env_train = make_env(SEED, training=True)
env_test = make_env(SEED, training=False)
dqn = build_agent(env_train)
print('Dueling activado:', dqn.enable_dueling_network)

2025-12-29 12:42:00.139641: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/runner/workspace/.venv-rl2/lib/python3.8/site-packages/cv2/../../lib64:
2025-12-29 12:42:00.139666: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2025-12-29 12:42:00.139679: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (0b3794c84d26): /proc/driver/nvidia/version does not exist
2025-12-29 12:42:00.140293: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-1

Dueling activado: True


In [21]:
# Entrenamiento. Ajustar nb_steps para pruebas cortas.
train(dqn, env_train, nb_steps=STAGE_STEPS_DEFAULT)

No se encontraron pesos previos, entrenando desde cero.
Training for 3000000 steps ...




      55/3000000: episode: 1, duration: 0.345s, episode steps:  55, steps per second: 159, episode reward:  6.000, mean reward:  0.109 [ 0.000,  1.000], mean action: 3.982 [3.000, 4.000],  loss: --, mae: --, mean_q: --, mean_eps: --
     117/3000000: episode: 2, duration: 0.336s, episode steps:  62, steps per second: 184, episode reward:  8.000, mean reward:  0.129 [ 0.000,  1.000], mean action: 3.903 [0.000, 4.000],  loss: --, mae: --, mean_q: --, mean_eps: --
     170/3000000: episode: 3, duration: 0.234s, episode steps:  53, steps per second: 226, episode reward:  8.000, mean reward:  0.151 [ 0.000,  1.000], mean action: 3.925 [2.000, 4.000],  loss: --, mae: --, mean_q: --, mean_eps: --
     216/3000000: episode: 4, duration: 0.241s, episode steps:  46, steps per second: 191, episode reward:  4.000, mean reward:  0.087 [ 0.000,  1.000], mean action: 4.043 [4.000, 5.000],  loss: --, mae: --, mean_q: --, mean_eps: --
     277/3000000: episode: 5, duration: 0.292s, episode steps:  61, 

In [24]:
# Evaluación: carga pesos si existen y evalúa.
if Path(f"{WEIGHTS_PATH}.index").exists():
    print('Cargando pesos:', WEIGHTS_PATH)
    dqn.load_weights(str(WEIGHTS_PATH))
    evaluate(dqn, env_test, episodes=110)
else:
    print('No se encontraron pesos en', WEIGHTS_PATH)

Cargando pesos: /home/runner/workspace/notebooks/project/notebooks/weights/dqn_spaceinvaders_v0_weights.h5f
Iniciando evaluación (110 episodios)...
Min reward (últimos 100): 28.0
Mean reward (últimos 100): 38.78
ESTADO: REQUISITO CUMPLIDO (min(last100) > 20)


In [23]:
# Análisis del log
try:
    log_data = load_training_log(LOG_PATH)
    print(reward_summary(log_data))
    # fig, ax = plot_metric(log_data, 'loss')
    # fig
except FileNotFoundError:
    print('No se encontró training_log.json en', LOG_PATH)

{'count': 17, 'mean': 7.647058823529412, 'std': 1.0256232808330996, 'min': 4.0, 'max': 8.0, 'last_mean_100': 7.647058823529412}
