In [1]:
import os

DIRECTORIO_BASE = os.path.join(os.getcwd(), "resultados_entrenamiento")
os.makedirs(DIRECTORIO_BASE, exist_ok=True)
print("Directorio base configurado:", DIRECTORIO_BASE)
print("Los resultados se guardarán en:", os.path.abspath(DIRECTORIO_BASE))

Directorio base configurado: /Users/isaackeitor/Desktop/Galaxian/resultados_entrenamiento
Los resultados se guardarán en: /Users/isaackeitor/Desktop/Galaxian/resultados_entrenamiento


In [2]:
# Utilidades para preprocesamiento del entorno Atari
import gymnasium as gym
import ale_py
import numpy as np
import cv2
from collections import deque


class EnvolturaPreprocesamiento(gym.Wrapper):
    """
    Preprocesamiento personalizado de Atari:
    - Frame skipping (salto de cuadros)
    - Conversión a escala de grises
    - Reescalado a 84x84 píxeles
    """

    def __init__(self, entorno_base, salto_cuadros=4, tam_pantalla=84, escala_grises=True):
        super().__init__(entorno_base)
        self.salto_cuadros = salto_cuadros
        self.tam_pantalla = tam_pantalla
        self.escala_grises = escala_grises

        forma_obs = (tam_pantalla, tam_pantalla)
        if not escala_grises:
            forma_obs += (3,)

        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=forma_obs, dtype=np.uint8
        )

    def procesar_cuadro(self, cuadro):
        """Convierte y reescala el cuadro"""
        if self.escala_grises:
            cuadro = cv2.cvtColor(cuadro, cv2.COLOR_RGB2GRAY)
            cuadro = cv2.resize(cuadro, (self.tam_pantalla, self.tam_pantalla), interpolation=cv2.INTER_AREA)
            return cuadro
        else:
            cuadro = cv2.resize(cuadro, (self.tam_pantalla, self.tam_pantalla), interpolation=cv2.INTER_AREA)
            return cuadro

    def step(self, accion):
        """Ejecuta múltiples pasos y acumula recompensas"""
        recompensa_acumulada = 0.0
        terminado = truncado = False
        for _ in range(self.salto_cuadros):
            observacion_raw, recomp, term, trunc, informacion = self.env.step(accion)
            recompensa_acumulada += recomp
            terminado |= term
            truncado |= trunc
            if terminado or truncado:
                break
        cuadro_procesado = self.procesar_cuadro(observacion_raw)
        return cuadro_procesado, recompensa_acumulada, terminado, truncado, informacion

    def reset(self, **kwargs):
        observacion_raw, informacion = self.env.reset(**kwargs)
        cuadro_procesado = self.procesar_cuadro(observacion_raw)
        return cuadro_procesado, informacion


class EnvolturaApilamiento(gym.Wrapper):
    """
    Apila los últimos N cuadros para capturar dinámica temporal.
    """

    def __init__(self, entorno_base, num_apilar=4):
        super().__init__(entorno_base)
        self.num_apilar = num_apilar
        self.cuadros_memoria = deque([], maxlen=num_apilar)
        bajo = np.repeat(entorno_base.observation_space.low[np.newaxis, ...], num_apilar, axis=0)
        alto = np.repeat(entorno_base.observation_space.high[np.newaxis, ...], num_apilar, axis=0)
        self.observation_space = gym.spaces.Box(
            low=bajo.min(), high=alto.max(), dtype=entorno_base.observation_space.dtype, 
            shape=(num_apilar, *entorno_base.observation_space.shape)
        )

    def reset(self, **kwargs):
        observacion, informacion = self.env.reset(**kwargs)
        for _ in range(self.num_apilar):
            self.cuadros_memoria.append(observacion)
        return self._obtener_obs(), informacion

    def step(self, accion):
        observacion, recompensa, terminado, truncado, informacion = self.env.step(accion)
        self.cuadros_memoria.append(observacion)
        return self._obtener_obs(), recompensa, terminado, truncado, informacion

    def _obtener_obs(self):
        return np.stack(self.cuadros_memoria, axis=0)


def crear_entorno_galaxian(semilla: int | None = None, modo_render: str | None = None):
    """
    Crea el entorno ALE/Galaxian-v5 con preprocesamiento completo.
    - Reescalado a 84x84
    - Escala de grises
    - Frame skip = 4
    - Frame stack = 4
    """
    entorno = gym.make("ALE/Galaxian-v5", render_mode=modo_render)
    if semilla is not None:
        entorno.reset(seed=semilla)

    entorno = EnvolturaPreprocesamiento(entorno, salto_cuadros=4, tam_pantalla=84, escala_grises=True)
    entorno = EnvolturaApilamiento(entorno, num_apilar=4)

    return entorno

In [3]:
# Implementación de DQN (Deep Q-Network) desde cero
import os
import csv
import random
from collections import deque
from typing import Tuple, Deque, List

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib
matplotlib.use("Agg")  # backend sin interfaz gráfica para guardar PNG
import matplotlib.pyplot as plt

# Detectar dispositivo: MPS (Mac M1/M2/M3/M4) > CUDA > CPU
if torch.backends.mps.is_available():
    dispositivo = torch.device("mps")
    print("✅ Usando GPU de Apple Silicon (MPS)")
elif torch.cuda.is_available():
    dispositivo = torch.device("cuda")
    print("✅ Usando GPU CUDA")
else:
    dispositivo = torch.device("cpu")
    print("⚠️ Usando CPU (entrenamiento será más lento)")


# ============================================================
# Arquitectura de Red Neuronal DQN
# ============================================================
class RedDQN(nn.Module):
    """
    Red convolucional profunda para aproximar Q(s,a).
    Arquitectura estilo Nature DQN.
    Formato esperado: (Batch, Canales, Alto, Ancho)
    """
    def __init__(self, forma_entrada: Tuple[int, int, int], num_acciones: int):
        super().__init__()
        canales, alto, ancho = forma_entrada
        self.canales_esperados = canales

        self.extractor_caracteristicas = nn.Sequential(
            nn.Conv2d(canales, 32, kernel_size=8, stride=4),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(inplace=True),
            nn.Flatten()
        )

        # Inferir tamaño del vector aplanado
        with torch.no_grad():
            tensor_prueba = torch.zeros(1, canales, alto, ancho)
            tam_aplanado = self.extractor_caracteristicas(tensor_prueba).shape[1]

        self.cabeza_valores_q = nn.Sequential(
            nn.Linear(tam_aplanado, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, num_acciones)
        )

    def forward(self, tensor_entrada: torch.Tensor) -> torch.Tensor:
        """
        tensor_entrada: (B,C,H,W) o (B,H,W,C)
        Retorna: Q(s,·) de forma (B, num_acciones)
        """
        if tensor_entrada.ndim != 4:
            raise ValueError(f"Esperado tensor 4D, recibido tensor_entrada.ndim={tensor_entrada.ndim}")

        # Auto-detectar formato y permutar si es necesario
        if tensor_entrada.shape[1] != self.canales_esperados and tensor_entrada.shape[-1] == self.canales_esperados:
            tensor_entrada = tensor_entrada.permute(0, 3, 1, 2)

        # Normalizar a rango [0,1]
        tensor_entrada = tensor_entrada.float() / 255.0
        caracteristicas = self.extractor_caracteristicas(tensor_entrada)
        return self.cabeza_valores_q(caracteristicas)


# ============================================================
# Buffer de Replay (Memoria de Experiencias)
# ============================================================
class BufferReplay:
    def __init__(self, capacidad_maxima: int):
        self.almacen: Deque = deque(maxlen=capacidad_maxima)

    def agregar(self, estado, accion, recompensa, estado_sig, terminado):
        self.almacen.append((estado, accion, recompensa, estado_sig, terminado))

    def muestrear(self, tam_lote: int):
        lote = random.sample(self.almacen, tam_lote)
        estados, acciones, recompensas, estados_sig, terminados = map(np.array, zip(*lote))
        return estados, acciones, recompensas, estados_sig, terminados

    def __len__(self):
        return len(self.almacen)


# ============================================================
# Política de Evaluación DQN
# ============================================================
class PoliticaDQN:
    def __init__(self, red_q: 'RedDQN'):
        self.red_q = red_q.to(dispositivo)
        self.red_q.eval()

    @torch.no_grad()
    def __call__(self, observacion: np.ndarray, info: dict) -> int:
        """Selecciona acción greedy (argmax Q)"""
        if observacion.ndim != 3:
            raise ValueError(f"Esperado obs 3D, recibido observacion.ndim={observacion.ndim}")

        obs_lote = np.expand_dims(observacion, axis=0)
        obs_tensor = torch.from_numpy(obs_lote).to(dispositivo)
        valores_q = self.red_q(obs_tensor)
        return int(torch.argmax(valores_q, dim=1).item())


# ============================================================
# Funciones de Logging y Visualización
# ============================================================
def _calcular_media_movil(valores: List[float], ventana: int = 100):
    """Calcula media móvil con ventana deslizante"""
    if len(valores) == 0:
        return []
    resultado = []
    suma_acum = 0.0
    cola = []
    for v in valores:
        cola.append(v)
        suma_acum += v
        if len(cola) > ventana:
            suma_acum -= cola.pop(0)
        resultado.append(suma_acum / len(cola))
    return resultado

def _guardar_grafica_y_csv(dir_checkpoints: str, recompensas: List[float], episodio: int):
    """Guarda gráfica PNG y registro CSV de recompensas"""
    os.makedirs(dir_checkpoints, exist_ok=True)

    # Guardar registro CSV
    ruta_csv = os.path.join(dir_checkpoints, "registro_recompensas.csv")
    archivo_nuevo = not os.path.exists(ruta_csv)
    with open(ruta_csv, "a", newline="") as archivo:
        escritor = csv.writer(archivo)
        if archivo_nuevo:
            escritor.writerow(["episodio", "recompensa"])
        escritor.writerow([episodio, recompensas[-1]])

    # Generar gráfica con nuevos colores
    plt.figure(figsize=(10, 5))
    plt.plot(recompensas, label="Recompensa por episodio", color="#FF6B35", linewidth=1.2, alpha=0.7)
    media_movil = _calcular_media_movil(recompensas, ventana=100)
    if len(media_movil) > 0:
        plt.plot(media_movil, label="Media Móvil (100 eps)", color="#00A896", linewidth=2.5)
    plt.xlabel("Episodio", fontsize=12)
    plt.ylabel("Recompensa Total", fontsize=12)
    plt.title("Progreso de Entrenamiento - DQN", fontsize=14, fontweight="bold")
    plt.legend(loc="upper left")
    plt.grid(True, alpha=0.3)
    ruta_png = os.path.join(dir_checkpoints, f"recompensas_ep{episodio}.png")
    plt.tight_layout()
    plt.savefig(ruta_png, dpi=120)
    plt.close()
    print(f"[REGISTRO] Gráfica y CSV guardados: {ruta_png} / {ruta_csv}")


# ============================================================
# Función de Entrenamiento DQN
# ============================================================
def entrenar_dqn(
    directorio_checkpoints: str,
    episodios_totales: int = 500,
    capacidad_replay: int = 100_000,
    tam_lote: int = 32,
    factor_descuento: float = 0.99,
    tasa_aprendizaje: float = 1e-4,
    epsilon_inicial: float = 1.0,
    epsilon_final: float = 0.1,
    episodios_decaimiento_eps: int = 300,
    intervalo_actualizacion_target: int = 1_000,
    pasos_inicio_entrenamiento: int = 10_000,
    intervalo_guardado: int = 50,
    intervalo_graficas: int = 50,
    pasos_maximos_por_episodio: int | None = None,
    semilla_aleatoria: int = 42,
):
    """
    Entrena un agente DQN para Galaxian.
    Guarda checkpoints y gráficas en directorio_checkpoints.
    """
    os.makedirs(directorio_checkpoints, exist_ok=True)

    # Crear entorno con preprocesamiento
    entorno_juego = crear_entorno_galaxian(semilla=semilla_aleatoria, modo_render=None)

    # Detectar forma de observación
    observacion, _ = entorno_juego.reset()
    if observacion.ndim != 3:
        entorno_juego.close()
        raise ValueError(f"Observación inesperada: ndim={observacion.ndim}")

    if observacion.shape[0] in (1, 3, 4):
        forma_entrada = (observacion.shape[0], observacion.shape[1], observacion.shape[2])
    else:
        forma_entrada = (observacion.shape[2], observacion.shape[0], observacion.shape[1])

    num_acciones = entorno_juego.action_space.n

    # Inicializar redes
    red_q_principal = RedDQN(forma_entrada, num_acciones).to(dispositivo)
    red_q_objetivo = RedDQN(forma_entrada, num_acciones).to(dispositivo)
    red_q_objetivo.load_state_dict(red_q_principal.state_dict())
    red_q_objetivo.eval()

    optimizador = optim.Adam(red_q_principal.parameters(), lr=tasa_aprendizaje)
    memoria_experiencias = BufferReplay(capacidad_replay)

    pasos_globales = 0

    def calcular_epsilon(ep: int) -> float:
        if ep >= episodios_decaimiento_eps:
            return epsilon_final
        fraccion = ep / float(episodios_decaimiento_eps)
        return epsilon_inicial + fraccion * (epsilon_final - epsilon_inicial)

    registro_recompensas: List[float] = []

    for episodio_actual in range(1, episodios_totales + 1):
        observacion, _ = entorno_juego.reset()
        finalizado = False
        recompensa_total = 0.0
        epsilon_actual = calcular_epsilon(episodio_actual)
        pasos_en_episodio = 0

        while not finalizado:
            pasos_globales += 1
            pasos_en_episodio += 1

            # Política epsilon-greedy
            if random.random() < epsilon_actual:
                accion_elegida = entorno_juego.action_space.sample()
            else:
                with torch.no_grad():
                    obs_lote = np.expand_dims(observacion, axis=0)
                    obs_tensor = torch.from_numpy(obs_lote).to(dispositivo)
                    valores_q = red_q_principal(obs_tensor)
                    accion_elegida = int(torch.argmax(valores_q, dim=1).item())

            siguiente_obs, recompensa_step, terminado, truncado, _ = entorno_juego.step(accion_elegida)
            finalizado = terminado or truncado
            recompensa_total += float(recompensa_step)

            memoria_experiencias.agregar(observacion, accion_elegida, recompensa_step, siguiente_obs, finalizado)
            observacion = siguiente_obs

            # Entrenamiento cuando hay suficientes experiencias
            if len(memoria_experiencias) >= pasos_inicio_entrenamiento:
                estados, acciones, recompensas, estados_sig, terminados = memoria_experiencias.muestrear(tam_lote)

                estados_t = torch.from_numpy(estados).to(dispositivo)
                estados_sig_t = torch.from_numpy(estados_sig).to(dispositivo)
                acciones_t = torch.from_numpy(acciones).long().to(dispositivo)
                recompensas_t = torch.from_numpy(recompensas).float().to(dispositivo)
                terminados_t = torch.from_numpy(terminados.astype(np.float32)).to(dispositivo)

                # Calcular Q(s,a) actual
                valores_q = red_q_principal(estados_t)
                q_valores_accion = valores_q.gather(1, acciones_t.unsqueeze(1)).squeeze(1)

                # Calcular target: r + gamma * max Q_target(s',a')
                with torch.no_grad():
                    q_siguientes = red_q_objetivo(estados_sig_t).max(1)[0]
                    q_objetivo = recompensas_t + factor_descuento * q_siguientes * (1.0 - terminados_t)

                perdida = nn.functional.mse_loss(q_valores_accion, q_objetivo)

                optimizador.zero_grad()
                perdida.backward()
                nn.utils.clip_grad_norm_(red_q_principal.parameters(), 10.0)
                optimizador.step()

            # Actualizar red objetivo
            if pasos_globales % intervalo_actualizacion_target == 0:
                red_q_objetivo.load_state_dict(red_q_principal.state_dict())

            if pasos_maximos_por_episodio is not None and pasos_en_episodio >= pasos_maximos_por_episodio:
                break

        registro_recompensas.append(recompensa_total)
        print(f"[DQN] Episodio {episodio_actual}/{episodios_totales} | Recompensa: {recompensa_total:.1f} | ε={epsilon_actual:.3f} | Memoria={len(memoria_experiencias)}")

        # Guardar checkpoint
        if episodio_actual % intervalo_guardado == 0:
            ruta_ckpt = os.path.join(directorio_checkpoints, f"dqn_galaxian_ep{episodio_actual}.pth")
            torch.save({
                "red_q": red_q_principal.state_dict(),
                "red_objetivo": red_q_objetivo.state_dict(),
                "optimizador": optimizador.state_dict(),
                "episodio": episodio_actual,
                "pasos_globales": pasos_globales,
                "recompensas": registro_recompensas,
                "forma_entrada": forma_entrada,
                "num_acciones": num_acciones,
            }, ruta_ckpt)
            print(f"[CHECKPOINT] Guardado en: {ruta_ckpt}")

        # Guardar gráficas
        if episodio_actual % intervalo_graficas == 0:
            _guardar_grafica_y_csv(directorio_checkpoints, registro_recompensas, episodio_actual)

    entorno_juego.close()

    # Guardar modelo final
    ruta_final = os.path.join(directorio_checkpoints, "dqn_galaxian_final.pth")
    torch.save(red_q_principal.state_dict(), ruta_final)
    print(f"[FINALIZADO] Modelo final guardado en: {ruta_final}")

    return red_q_principal

✅ Usando GPU de Apple Silicon (MPS)


In [4]:
# Entrenar DQN con configuración local
carpeta_dqn = os.path.join(DIRECTORIO_BASE, "dqn")
red_q_entrenada = entrenar_dqn(
    directorio_checkpoints=carpeta_dqn, 
    episodios_totales=10000, 
    intervalo_guardado=500, 
    intervalo_graficas=500
)

A.L.E: Arcade Learning Environment (version 0.8.1+53f58b7)
[Powered by Stella]


[DQN] Episodio 1/10000 | Recompensa: 760.0 | ε=0.997 | Memoria=179
[DQN] Episodio 2/10000 | Recompensa: 1140.0 | ε=0.994 | Memoria=418
[DQN] Episodio 3/10000 | Recompensa: 300.0 | ε=0.991 | Memoria=521
[DQN] Episodio 2/10000 | Recompensa: 1140.0 | ε=0.994 | Memoria=418
[DQN] Episodio 3/10000 | Recompensa: 300.0 | ε=0.991 | Memoria=521
[DQN] Episodio 4/10000 | Recompensa: 700.0 | ε=0.988 | Memoria=701
[DQN] Episodio 4/10000 | Recompensa: 700.0 | ε=0.988 | Memoria=701
[DQN] Episodio 5/10000 | Recompensa: 1390.0 | ε=0.985 | Memoria=990
[DQN] Episodio 6/10000 | Recompensa: 730.0 | ε=0.982 | Memoria=1114
[DQN] Episodio 5/10000 | Recompensa: 1390.0 | ε=0.985 | Memoria=990
[DQN] Episodio 6/10000 | Recompensa: 730.0 | ε=0.982 | Memoria=1114
[DQN] Episodio 7/10000 | Recompensa: 740.0 | ε=0.979 | Memoria=1324
[DQN] Episodio 8/10000 | Recompensa: 580.0 | ε=0.976 | Memoria=1490
[DQN] Episodio 7/10000 | Recompensa: 740.0 | ε=0.979 | Memoria=1324
[DQN] Episodio 8/10000 | Recompensa: 580.0 | ε=0.976 

KeyboardInterrupt: 

In [5]:
# Implementación de A2C (Advantage Actor-Critic) desde cero
import os
from typing import Tuple, List

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

# Detectar dispositivo: MPS (Mac M1/M2/M3/M4) > CUDA > CPU
if torch.backends.mps.is_available():
    dispositivo = torch.device("mps")
    print("✅ Usando GPU de Apple Silicon (MPS)")
elif torch.cuda.is_available():
    dispositivo = torch.device("cuda")
    print("✅ Usando GPU CUDA")
else:
    dispositivo = torch.device("cpu")
    print("⚠️ Usando CPU (entrenamiento será más lento)")


# ============================================================
# Arquitectura Actor-Crítico
# ============================================================
class RedA2C(nn.Module):
    """
    Red neuronal con arquitectura Actor-Crítico.
    Backbone convolucional compartido con dos cabezas:
    - Actor: predice distribución de acciones (logits)
    - Crítico: predice valor del estado V(s)
    """
    def __init__(self, forma_entrada: Tuple[int, int, int], num_acciones: int):
        super().__init__()
        canales, alto, ancho = forma_entrada
        self.canales_esperados = canales

        self.extractor_caracteristicas = nn.Sequential(
            nn.Conv2d(canales, 32, kernel_size=8, stride=4),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(inplace=True),
            nn.Flatten()
        )

        with torch.no_grad():
            tensor_prueba = torch.zeros(1, canales, alto, ancho)
            tam_aplanado = self.extractor_caracteristicas(tensor_prueba).shape[1]

        self.cabeza_actor = nn.Sequential(
            nn.Linear(tam_aplanado, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, num_acciones)
        )
        self.cabeza_critico = nn.Sequential(
            nn.Linear(tam_aplanado, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 1)
        )

    def forward(self, tensor_entrada: torch.Tensor):
        """
        Retorna:
          - logits de política: (B, num_acciones)
          - valores de estado V(s): (B,)
        """
        if tensor_entrada.ndim != 4:
            raise ValueError(f"Esperado tensor 4D, recibido tensor_entrada.ndim={tensor_entrada.ndim}")

        if tensor_entrada.shape[1] != self.canales_esperados and tensor_entrada.shape[-1] == self.canales_esperados:
            tensor_entrada = tensor_entrada.permute(0, 3, 1, 2)

        tensor_entrada = tensor_entrada.float() / 255.0
        caracteristicas = self.extractor_caracteristicas(tensor_entrada)
        logits_politica = self.cabeza_actor(caracteristicas)
        valores_estado = self.cabeza_critico(caracteristicas).squeeze(-1)
        return logits_politica, valores_estado


# ============================================================
# Política de Evaluación A2C
# ============================================================
class PoliticaA2C:
    def __init__(self, red_ac: 'RedA2C'):
        self.red_ac = red_ac.to(dispositivo)
        self.red_ac.eval()

    @torch.no_grad()
    def __call__(self, observacion: np.ndarray, info: dict) -> int:
        """Estrategia greedy para evaluación"""
        if observacion.ndim != 3:
            raise ValueError(f"Esperado obs 3D, recibido observacion.ndim={observacion.ndim}")

        obs_lote = np.expand_dims(observacion, axis=0)
        obs_tensor = torch.from_numpy(obs_lote).to(dispositivo)
        logits, _ = self.red_ac(obs_tensor)
        probabilidades = torch.softmax(logits, dim=-1)
        accion = torch.argmax(probabilidades, dim=-1).item()
        return int(accion)


# ============================================================
# Función de Entrenamiento A2C
# ============================================================
def entrenar_a2c(
    directorio_checkpoints: str,
    episodios_totales: int = 500,
    factor_descuento: float = 0.99,
    tasa_aprendizaje: float = 2.5e-4,
    coef_entropia: float = 0.01,
    coef_valor: float = 0.5,
    longitud_rollout: int = 5,
    intervalo_guardado: int = 50,
    pasos_maximos_por_episodio: int | None = None,
    semilla_aleatoria: int = 123,
    lambda_gae: float = 0.95,
):
    """
    Entrena un agente A2C para Galaxian con rollouts n-step y GAE(λ).
    """
    os.makedirs(directorio_checkpoints, exist_ok=True)

    entorno_juego = crear_entorno_galaxian(semilla=semilla_aleatoria, modo_render=None)

    observacion, _ = entorno_juego.reset()
    if observacion.ndim != 3:
        entorno_juego.close()
        raise ValueError(f"Observación inesperada: ndim={observacion.ndim}")

    if observacion.shape[0] in (1, 3, 4):
        forma_entrada = (observacion.shape[0], observacion.shape[1], observacion.shape[2])
    else:
        forma_entrada = (observacion.shape[2], observacion.shape[0], observacion.shape[1])

    num_acciones = entorno_juego.action_space.n

    red_ac = RedA2C(forma_entrada, num_acciones).to(dispositivo)
    optimizador = optim.RMSprop(red_ac.parameters(), lr=tasa_aprendizaje, eps=1e-5)

    indice_episodio = 0
    registro_recompensas: List[float] = []

    while indice_episodio < episodios_totales:
        observacion, _ = entorno_juego.reset()
        finalizado = False
        recompensa_episodio = 0.0
        pasos_en_episodio = 0

        while not finalizado:
            # Recolectar rollout
            log_probs_lista = []
            valores_lista = []
            recompensas_lista = []
            finalizados_lista = []
            entropias_lista = []

            for _ in range(longitud_rollout):
                if finalizado:
                    break

                obs_lote = np.expand_dims(observacion, axis=0)
                obs_tensor = torch.from_numpy(obs_lote).to(dispositivo)

                logits, valor = red_ac(obs_tensor)
                probabilidades = torch.softmax(logits, dim=-1)
                distribucion = torch.distributions.Categorical(probabilidades)

                accion_muestreada = distribucion.sample()
                entropia = distribucion.entropy().mean()

                siguiente_obs, recompensa_step, terminado, truncado, _ = entorno_juego.step(accion_muestreada.item())
                finalizado = terminado or truncado

                log_probs_lista.append(distribucion.log_prob(accion_muestreada).squeeze(0))
                valores_lista.append(valor.squeeze(0))
                recompensas_lista.append(torch.tensor(recompensa_step, dtype=torch.float32, device=dispositivo))
                finalizados_lista.append(torch.tensor(float(finalizado), device=dispositivo))
                entropias_lista.append(entropia)

                observacion = siguiente_obs
                recompensa_episodio += float(recompensa_step)
                pasos_en_episodio += 1

                if pasos_maximos_por_episodio is not None and pasos_en_episodio >= pasos_maximos_por_episodio:
                    finalizado = True
                    break

            # Bootstrap del valor siguiente
            if finalizado:
                valor_siguiente = torch.zeros(1, device=dispositivo)
            else:
                obs_siguiente_lote = np.expand_dims(observacion, axis=0)
                obs_siguiente_tensor = torch.from_numpy(obs_siguiente_lote).to(dispositivo)
                _, valor_siguiente = red_ac(obs_siguiente_tensor)
                valor_siguiente = valor_siguiente.detach()

            # Calcular retornos con GAE(λ)
            retornos_lista = []
            ventaja_acumulada = 0
            valor_futuro = valor_siguiente
            for recomp, terminal, val in zip(reversed(recompensas_lista), reversed(finalizados_lista), reversed(valores_lista)):
                valor_futuro = valor_futuro * (1.0 - terminal)
                delta_temporal = recomp + factor_descuento * valor_futuro - val
                ventaja_acumulada = delta_temporal + factor_descuento * lambda_gae * (1.0 - terminal) * ventaja_acumulada
                valor_futuro = val
                retornos_lista.insert(0, ventaja_acumulada + val)

            retornos_tensor = torch.stack(retornos_lista)
            valores_tensor = torch.stack(valores_lista)
            log_probs_tensor = torch.stack(log_probs_lista)
            entropias_tensor = torch.stack(entropias_lista) if len(entropias_lista) > 0 else torch.tensor(0.0, device=dispositivo)

            ventajas = retornos_tensor - valores_tensor

            perdida_politica = -(log_probs_tensor * ventajas.detach()).mean()
            perdida_valor = ventajas.pow(2).mean()
            perdida_entropia = entropias_tensor.mean() if entropias_tensor.ndim > 0 else entropias_tensor

            perdida_total = perdida_politica + coef_valor * perdida_valor - coef_entropia * perdida_entropia

            optimizador.zero_grad()
            perdida_total.backward()
            nn.utils.clip_grad_norm_(red_ac.parameters(), 0.5)
            optimizador.step()

            if finalizado:
                break

        indice_episodio += 1
        registro_recompensas.append(recompensa_episodio)
        print(f"[A2C] Episodio {indice_episodio}/{episodios_totales} | Recompensa: {recompensa_episodio:.1f}")

        # Guardar checkpoint
        if indice_episodio % intervalo_guardado == 0:
            ruta_ckpt = os.path.join(directorio_checkpoints, f"a2c_galaxian_ep{indice_episodio}.pth")
            torch.save({
                "red": red_ac.state_dict(),
                "optimizador": optimizador.state_dict(),
                "episodio": indice_episodio,
                "recompensas": registro_recompensas,
                "forma_entrada": forma_entrada,
                "num_acciones": num_acciones,
            }, ruta_ckpt)
            print(f"[CHECKPOINT] Guardado en: {ruta_ckpt}")

    entorno_juego.close()

    ruta_final = os.path.join(directorio_checkpoints, "a2c_galaxian_final.pth")
    torch.save(red_ac.state_dict(), ruta_final)
    print(f"[FINALIZADO] Modelo final guardado en: {ruta_final}")

    return red_ac

✅ Usando GPU de Apple Silicon (MPS)


In [6]:
# Entrenar A2C con configuración local
carpeta_a2c = os.path.join(DIRECTORIO_BASE, "a2c")
red_a2c_entrenada = entrenar_a2c(
    directorio_checkpoints=carpeta_a2c, 
    episodios_totales=50
)

[A2C] Episodio 1/50 | Recompensa: 570.0
[A2C] Episodio 2/50 | Recompensa: 390.0
[A2C] Episodio 2/50 | Recompensa: 390.0
[A2C] Episodio 3/50 | Recompensa: 210.0
[A2C] Episodio 3/50 | Recompensa: 210.0
[A2C] Episodio 4/50 | Recompensa: 880.0
[A2C] Episodio 4/50 | Recompensa: 880.0
[A2C] Episodio 5/50 | Recompensa: 590.0
[A2C] Episodio 5/50 | Recompensa: 590.0
[A2C] Episodio 6/50 | Recompensa: 970.0
[A2C] Episodio 6/50 | Recompensa: 970.0
[A2C] Episodio 7/50 | Recompensa: 740.0
[A2C] Episodio 7/50 | Recompensa: 740.0
[A2C] Episodio 8/50 | Recompensa: 300.0
[A2C] Episodio 8/50 | Recompensa: 300.0
[A2C] Episodio 9/50 | Recompensa: 1090.0
[A2C] Episodio 9/50 | Recompensa: 1090.0
[A2C] Episodio 10/50 | Recompensa: 1270.0
[A2C] Episodio 10/50 | Recompensa: 1270.0
[A2C] Episodio 11/50 | Recompensa: 780.0
[A2C] Episodio 11/50 | Recompensa: 780.0
[A2C] Episodio 12/50 | Recompensa: 1000.0
[A2C] Episodio 12/50 | Recompensa: 1000.0
[A2C] Episodio 13/50 | Recompensa: 730.0
[A2C] Episodio 13/50 | Reco

In [9]:
# Implementación de Dueling Double DQN con Prioritized Experience Replay (PER)
import os
import csv
import random
from typing import Tuple, List

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

# Detectar dispositivo: MPS (Mac M1/M2/M3/M4) > CUDA > CPU
if torch.backends.mps.is_available():
    dispositivo = torch.device("mps")
    print("✅ Usando GPU de Apple Silicon (MPS)")
elif torch.cuda.is_available():
    dispositivo = torch.device("cuda")
    print("✅ Usando GPU CUDA")
else:
    dispositivo = torch.device("cpu")
    print("⚠️ Usando CPU (entrenamiento será más lento)")


# ===========================
#  Árboles de Segmentos para PER
# ===========================
class ArbolSegmentos:
    def __init__(self, capacidad, funcion_reduccion):
        assert capacidad > 0 and (capacidad & (capacidad - 1)) == 0, \
            "Capacidad debe ser potencia de 2"
        self.capacidad = capacidad
        self.arbol = np.zeros(2 * capacidad, dtype=np.float32)
        self.funcion_reduccion = funcion_reduccion

    def actualizar(self, indice, valor):
        i = indice + self.capacidad
        self.arbol[i] = valor
        i //= 2
        while i >= 1:
            self.arbol[i] = self.funcion_reduccion(self.arbol[2 * i], self.arbol[2 * i + 1])
            i //= 2

    def reducir(self, inicio, fin):
        resultado_izq = None
        resultado_der = None
        inicio += self.capacidad
        fin += self.capacidad
        while inicio <= fin:
            if (inicio % 2) == 1:
                resultado_izq = self.arbol[inicio] if resultado_izq is None else self.funcion_reduccion(resultado_izq, self.arbol[inicio])
                inicio += 1
            if (fin % 2) == 0:
                resultado_der = self.arbol[fin] if resultado_der is None else self.funcion_reduccion(self.arbol[fin], resultado_der)
                fin -= 1
            inicio //= 2
            fin //= 2
        if resultado_izq is None:
            return resultado_der
        if resultado_der is None:
            return resultado_izq
        return self.funcion_reduccion(resultado_izq, resultado_der)

    def __getitem__(self, indice):
        return self.arbol[indice + self.capacidad]


class ArbolSuma(ArbolSegmentos):
    def __init__(self, capacidad):
        super().__init__(capacidad, funcion_reduccion=lambda a, b: a + b)

    def suma_total(self):
        return self.arbol[1]

    def encontrar_suma_prefijo(self, suma_objetivo):
        """Encuentra índice i tal que sum(0..i) >= suma_objetivo"""
        idx = 1
        while idx < self.capacidad:
            izquierda = 2 * idx
            if self.arbol[izquierda] >= suma_objetivo:
                idx = izquierda
            else:
                suma_objetivo -= self.arbol[izquierda]
                idx = izquierda + 1
        return idx - self.capacidad


class ArbolMinimo(ArbolSegmentos):
    def __init__(self, capacidad):
        super().__init__(capacidad, funcion_reduccion=min)

    def minimo_total(self):
        return self.arbol[1]


# =======================================
#  Buffer de Replay Priorizado
# =======================================
class BufferReplayPriorizado:
    def __init__(self, capacidad_maxima: int, alfa_prioridad: float = 0.6, epsilon_per: float = 1e-6):
        potencia_2 = 1
        while potencia_2 < capacidad_maxima:
            potencia_2 *= 2
        self.capacidad = potencia_2
        self.alfa_prioridad = alfa_prioridad
        self.epsilon_per = epsilon_per

        self.posicion = 0
        self.tamanio = 0

        self.estados = [None] * self.capacidad
        self.acciones = np.zeros(self.capacidad, dtype=np.int64)
        self.recompensas = np.zeros(self.capacidad, dtype=np.float32)
        self.estados_siguientes = [None] * self.capacidad
        self.terminados = np.zeros(self.capacidad, dtype=np.bool_)

        self.arbol_suma = ArbolSuma(self.capacidad)
        self.arbol_minimo = ArbolMinimo(self.capacidad)
        self.prioridad_maxima = 1.0

        for i in range(self.capacidad):
            self.arbol_suma.actualizar(i, 0.0)
            self.arbol_minimo.actualizar(i, float("inf"))

    def __len__(self):
        return self.tamanio

    def agregar(self, estado, accion, recompensa, estado_sig, terminal):
        idx = self.posicion
        self.estados[idx] = estado
        self.acciones[idx] = accion
        self.recompensas[idx] = recompensa
        self.estados_siguientes[idx] = estado_sig
        self.terminados[idx] = terminal

        prioridad = (self.prioridad_maxima + self.epsilon_per) ** self.alfa_prioridad
        self.arbol_suma.actualizar(idx, prioridad)
        self.arbol_minimo.actualizar(idx, prioridad)

        self.posicion = (self.posicion + 1) % self.capacidad
        self.tamanio = min(self.tamanio + 1, self.capacidad)

    def muestrear(self, tam_lote: int, beta_importancia: float = 0.4):
        """Devuelve (indices, pesos_is, batch)"""
        indices_salida = []
        estados_salida = []
        acciones_salida = np.empty(tam_lote, dtype=np.int64)
        recompensas_salida = np.empty(tam_lote, dtype=np.float32)
        estados_sig_salida = []
        terminados_salida = np.empty(tam_lote, dtype=np.float32)

        suma_total = self.arbol_suma.suma_total()
        segmento = suma_total / tam_lote
        probabilidad_minima = self.arbol_minimo.minimo_total() / suma_total
        peso_maximo = (probabilidad_minima * self.tamanio) ** (-beta_importancia)

        for i in range(tam_lote):
            a = segmento * i
            b = segmento * (i + 1)
            masa = random.random() * (b - a) + a
            idx = self.arbol_suma.encontrar_suma_prefijo(masa)
            indices_salida.append(idx)
            estados_salida.append(self.estados[idx])
            acciones_salida[i] = self.acciones[idx]
            recompensas_salida[i] = self.recompensas[idx]
            estados_sig_salida.append(self.estados_siguientes[idx])
            terminados_salida[i] = float(self.terminados[idx])

        # Pesos de importance sampling
        probabilidades = np.array([self.arbol_suma[idx] / suma_total for idx in indices_salida], dtype=np.float32)
        pesos_is = (probabilidades * self.tamanio) ** (-beta_importancia)
        pesos_is = pesos_is / peso_maximo
        pesos_is = pesos_is.astype(np.float32)

        return np.array(indices_salida), pesos_is, (np.array(estados_salida), acciones_salida, recompensas_salida, np.array(estados_sig_salida), terminados_salida)

    def actualizar_prioridades(self, indices, prioridades):
        for idx, prioridad in zip(indices, prioridades):
            prioridad = float(prioridad + self.epsilon_per)
            self.arbol_suma.actualizar(idx, prioridad ** self.alfa_prioridad)
            self.arbol_minimo.actualizar(idx, prioridad ** self.alfa_prioridad)
            self.prioridad_maxima = max(self.prioridad_maxima, prioridad)


# ===========================
#  Arquitectura Dueling DQN
# ===========================
class RedDuelingDQN(nn.Module):
    """
    Arquitectura Dueling DQN:
    Q(s,a) = V(s) + (A(s,a) - mean(A))
    Dos streams separados para Valor y Ventaja.
    """
    def __init__(self, forma_entrada: Tuple[int, int, int], num_acciones: int):
        super().__init__()
        canales, alto, ancho = forma_entrada
        self.canales_esperados = canales

        self.extractor_caracteristicas = nn.Sequential(
            nn.Conv2d(canales, 32, kernel_size=8, stride=4),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(inplace=True),
            nn.Flatten()
        )
        with torch.no_grad():
            tam_aplanado = self.extractor_caracteristicas(torch.zeros(1, canales, alto, ancho)).shape[1]

        self.stream_valor = nn.Sequential(
            nn.Linear(tam_aplanado, 512), nn.ReLU(inplace=True),
            nn.Linear(512, 1)
        )
        self.stream_ventaja = nn.Sequential(
            nn.Linear(tam_aplanado, 512), nn.ReLU(inplace=True),
            nn.Linear(512, num_acciones)
        )

    def forward(self, tensor_entrada):
        if tensor_entrada.ndim != 4:
            raise ValueError(f"tensor_entrada.ndim={tensor_entrada.ndim}, esperado 4")
        if tensor_entrada.shape[1] != self.canales_esperados and tensor_entrada.shape[-1] == self.canales_esperados:
            tensor_entrada = tensor_entrada.permute(0, 3, 1, 2)
        tensor_entrada = tensor_entrada.float() / 255.0
        caracteristicas = self.extractor_caracteristicas(tensor_entrada)
        valor_estado = self.stream_valor(caracteristicas)
        ventaja_acciones = self.stream_ventaja(caracteristicas)
        valores_q = valor_estado + (ventaja_acciones - ventaja_acciones.mean(dim=1, keepdim=True))
        return valores_q


# ===========================
#  Política de Evaluación
# ===========================
class PoliticaDQN:
    def __init__(self, red_q: 'RedDuelingDQN'):
        self.red_q = red_q.to(dispositivo)
        self.red_q.eval()

    @torch.no_grad()
    def __call__(self, observacion: np.ndarray, info: dict) -> int:
        if observacion.ndim != 3:
            raise ValueError("obs debe ser 3D")
        obs_lote = np.expand_dims(observacion, axis=0)
        obs_tensor = torch.from_numpy(obs_lote).to(dispositivo)
        valores_q = self.red_q(obs_tensor)
        return int(torch.argmax(valores_q, dim=1).item())


# ===========================
#  Utilidades de logging
# ===========================
def _calcular_media_movil(valores: List[float], ventana: int = 100):
    if len(valores) == 0:
        return []
    resultado = []
    suma_acum = 0.0
    cola = []
    for v in valores:
        cola.append(v)
        suma_acum += v
        if len(cola) > ventana:
            suma_acum -= cola.pop(0)
        resultado.append(suma_acum / len(cola))
    return resultado

def _guardar_grafica_y_csv(dir_checkpoints: str, recompensas: List[float], episodio: int):
    os.makedirs(dir_checkpoints, exist_ok=True)
    
    # CSV
    ruta_csv = os.path.join(dir_checkpoints, "registro_recompensas.csv")
    archivo_nuevo = not os.path.exists(ruta_csv)
    with open(ruta_csv, "a", newline="") as archivo:
        escritor = csv.writer(archivo)
        if archivo_nuevo:
            escritor.writerow(["episodio", "recompensa"])
        escritor.writerow([episodio, recompensas[-1]])

    # Gráfica PNG con nuevos colores
    plt.figure(figsize=(10, 5))
    plt.plot(recompensas, label="Recompensa", color="#FF6B35", linewidth=1.2, alpha=0.7)
    media_movil = _calcular_media_movil(recompensas, ventana=100)
    if len(media_movil) > 0:
        plt.plot(media_movil, label="Media Móvil (100)", color="#00A896", linewidth=2.5)
    plt.xlabel("Episodio", fontsize=12)
    plt.ylabel("Recompensa Total", fontsize=12)
    plt.title("Progreso de Entrenamiento - Dueling DDQN + PER", fontsize=14, fontweight="bold")
    plt.legend(loc="upper left")
    plt.grid(True, alpha=0.3)
    ruta_png = os.path.join(dir_checkpoints, f"recompensas_ep{episodio}.png")
    plt.tight_layout()
    plt.savefig(ruta_png, dpi=120)
    plt.close()
    print(f"[REGISTRO] Gráfica y CSV guardados: {ruta_png} / {ruta_csv}")


# ===========================
#  Función de Entrenamiento
# ===========================
def entrenar_dueling_ddqn_per(
    directorio_checkpoints: str,
    episodios_totales: int = 50000,
    capacidad_buffer: int = 100_000,
    tam_lote: int = 32,
    factor_descuento: float = 0.99,
    tasa_aprendizaje: float = 1e-4,
    epsilon_inicial: float = 1.0,
    epsilon_final: float = 0.1,
    episodios_decaimiento_eps: int = 30000,
    intervalo_actualizacion_target: int = 1000,
    pasos_inicio_entrenamiento: int = 10_000,
    alfa_per: float = 0.6,
    beta_per_inicial: float = 0.4,
    beta_per_final: float = 1.0,
    episodios_annealing_beta: int = 50000,
    epsilon_per: float = 1e-6,
    intervalo_guardado: int = 500,
    intervalo_graficas: int = 200,
    pasos_maximos_por_episodio: int | None = None,
    semilla_aleatoria: int = 42,
):
    os.makedirs(directorio_checkpoints, exist_ok=True)

    entorno_juego = crear_entorno_galaxian(semilla=semilla_aleatoria, modo_render=None)

    observacion, _ = entorno_juego.reset()
    if observacion.ndim != 3:
        entorno_juego.close()
        raise ValueError("obs.ndim inesperado")
    if observacion.shape[0] in (1, 3, 4):
        forma_entrada = (observacion.shape[0], observacion.shape[1], observacion.shape[2])
    else:
        forma_entrada = (observacion.shape[2], observacion.shape[0], observacion.shape[1])
    num_acciones = entorno_juego.action_space.n

    red_q_principal = RedDuelingDQN(forma_entrada, num_acciones).to(dispositivo)
    red_q_objetivo = RedDuelingDQN(forma_entrada, num_acciones).to(dispositivo)
    red_q_objetivo.load_state_dict(red_q_principal.state_dict())
    red_q_objetivo.eval()

    optimizador = optim.Adam(red_q_principal.parameters(), lr=tasa_aprendizaje)
    memoria_experiencias = BufferReplayPriorizado(capacidad_buffer, alfa_prioridad=alfa_per, epsilon_per=epsilon_per)

    registro_recompensas: List[float] = []
    pasos_globales = 0

    def calcular_epsilon(ep):
        if ep >= episodios_decaimiento_eps:
            return epsilon_final
        fraccion = ep / float(episodios_decaimiento_eps)
        return epsilon_inicial + fraccion * (epsilon_final - epsilon_inicial)

    def calcular_beta(ep):
        fraccion = min(1.0, ep / float(episodios_annealing_beta))
        return beta_per_inicial + fraccion * (beta_per_final - beta_per_inicial)

    for episodio_actual in range(1, episodios_totales + 1):
        observacion, _ = entorno_juego.reset()
        finalizado = False
        recompensa_total = 0.0
        pasos_en_episodio = 0

        epsilon_actual = calcular_epsilon(episodio_actual)
        beta_actual = calcular_beta(episodio_actual)

        while not finalizado:
            pasos_globales += 1
            pasos_en_episodio += 1

            # Política ε-greedy
            if random.random() < epsilon_actual:
                accion_elegida = entorno_juego.action_space.sample()
            else:
                with torch.no_grad():
                    obs_lote = np.expand_dims(observacion, axis=0)
                    obs_tensor = torch.from_numpy(obs_lote).to(dispositivo)
                    valores_q = red_q_principal(obs_tensor)
                    accion_elegida = int(torch.argmax(valores_q, dim=1).item())

            siguiente_obs, recompensa_step, terminado, truncado, _ = entorno_juego.step(accion_elegida)
            finalizado = terminado or truncado
            recompensa_total += float(recompensa_step)

            memoria_experiencias.agregar(observacion, accion_elegida, recompensa_step, siguiente_obs, finalizado)
            observacion = siguiente_obs

            # Entrenamiento
            if len(memoria_experiencias) >= pasos_inicio_entrenamiento:
                indices, pesos_is, lote = memoria_experiencias.muestrear(tam_lote, beta_importancia=beta_actual)
                estados, acciones, recompensas, estados_sig, terminados = lote

                estados_t = torch.from_numpy(estados).to(dispositivo)
                estados_sig_t = torch.from_numpy(estados_sig).to(dispositivo)
                acciones_t = torch.from_numpy(acciones).long().to(dispositivo)
                recompensas_t = torch.from_numpy(recompensas).float().to(dispositivo)
                terminados_t = torch.from_numpy(terminados).float().to(dispositivo)
                pesos_is_t = torch.from_numpy(pesos_is).float().to(dispositivo)

                # Q(s,a) actual
                valores_q = red_q_principal(estados_t).gather(1, acciones_t.unsqueeze(1)).squeeze(1)

                # Double DQN: selección con red principal, evaluación con red objetivo
                with torch.no_grad():
                    q_principal_sig = red_q_principal(estados_sig_t)
                    acciones_optimas = torch.argmax(q_principal_sig, dim=1, keepdim=True)

                    q_objetivo_sig = red_q_objetivo(estados_sig_t)
                    q_siguientes = q_objetivo_sig.gather(1, acciones_optimas).squeeze(1)

                    q_objetivo_valores = recompensas_t + factor_descuento * q_siguientes * (1.0 - terminados_t)

                errores_td = q_objetivo_valores - valores_q
                perdida = (pesos_is_t * errores_td.pow(2)).mean()

                optimizador.zero_grad()
                perdida.backward()
                nn.utils.clip_grad_norm_(red_q_principal.parameters(), 10.0)
                optimizador.step()

                # Actualizar prioridades
                nuevas_prioridades = errores_td.detach().abs().cpu().numpy() + epsilon_per
                memoria_experiencias.actualizar_prioridades(indices, nuevas_prioridades)

            # Sincronizar red objetivo
            if pasos_globales % intervalo_actualizacion_target == 0:
                red_q_objetivo.load_state_dict(red_q_principal.state_dict())

            if pasos_maximos_por_episodio is not None and pasos_en_episodio >= pasos_maximos_por_episodio:
                break

        registro_recompensas.append(recompensa_total)
        print(f"[Dueling-DDQN+PER] Ep {episodio_actual}/{episodios_totales} | R: {recompensa_total:.1f} | ε={epsilon_actual:.3f} | β={beta_actual:.3f} | Memoria={len(memoria_experiencias)}")

        # Guardar checkpoint
        if episodio_actual % intervalo_guardado == 0:
            ruta_ckpt = os.path.join(directorio_checkpoints, f"dueling_ddqn_per_ep{episodio_actual}.pth")
            torch.save({
                "red_q": red_q_principal.state_dict(),
                "red_objetivo": red_q_objetivo.state_dict(),
                "optimizador": optimizador.state_dict(),
                "episodio": episodio_actual,
                "pasos_globales": pasos_globales,
                "recompensas": registro_recompensas,
                "forma_entrada": forma_entrada,
                "num_acciones": num_acciones,
            }, ruta_ckpt)
            print(f"[CHECKPOINT] Guardado: {ruta_ckpt}")

        # Guardar gráficas
        if episodio_actual % intervalo_graficas == 0:
            _guardar_grafica_y_csv(directorio_checkpoints, registro_recompensas, episodio_actual)

    entorno_juego.close()

    ruta_final = os.path.join(directorio_checkpoints, "dueling_ddqn_per_final.pth")
    torch.save(red_q_principal.state_dict(), ruta_final)
    print(f"[FINALIZADO] Modelo final guardado en: {ruta_final}")
    _guardar_grafica_y_csv(directorio_checkpoints, registro_recompensas, episodios_totales)

    return red_q_principal

✅ Usando GPU de Apple Silicon (MPS)


In [10]:
# Entrenar Dueling DDQN + PER con configuración local
carpeta_dueling = os.path.join(DIRECTORIO_BASE, "dueling_ddqn_per")
modelo_dueling = entrenar_dueling_ddqn_per(
    directorio_checkpoints=carpeta_dueling,
    episodios_totales=100000,
    intervalo_guardado=500,
    intervalo_graficas=500
)

[Dueling-DDQN+PER] Ep 1/100000 | R: 360.0 | ε=1.000 | β=0.400 | Memoria=155
[Dueling-DDQN+PER] Ep 2/100000 | R: 870.0 | ε=1.000 | β=0.400 | Memoria=366
[Dueling-DDQN+PER] Ep 3/100000 | R: 400.0 | ε=1.000 | β=0.400 | Memoria=435
[Dueling-DDQN+PER] Ep 4/100000 | R: 150.0 | ε=1.000 | β=0.400 | Memoria=510
[Dueling-DDQN+PER] Ep 2/100000 | R: 870.0 | ε=1.000 | β=0.400 | Memoria=366
[Dueling-DDQN+PER] Ep 3/100000 | R: 400.0 | ε=1.000 | β=0.400 | Memoria=435
[Dueling-DDQN+PER] Ep 4/100000 | R: 150.0 | ε=1.000 | β=0.400 | Memoria=510
[Dueling-DDQN+PER] Ep 5/100000 | R: 180.0 | ε=1.000 | β=0.400 | Memoria=585
[Dueling-DDQN+PER] Ep 6/100000 | R: 470.0 | ε=1.000 | β=0.400 | Memoria=726
[Dueling-DDQN+PER] Ep 5/100000 | R: 180.0 | ε=1.000 | β=0.400 | Memoria=585
[Dueling-DDQN+PER] Ep 6/100000 | R: 470.0 | ε=1.000 | β=0.400 | Memoria=726
[Dueling-DDQN+PER] Ep 7/100000 | R: 270.0 | ε=1.000 | β=0.400 | Memoria=794
[Dueling-DDQN+PER] Ep 8/100000 | R: 480.0 | ε=1.000 | β=0.400 | Memoria=890
[Dueling-DDQ

KeyboardInterrupt: 

In [16]:
# Ejemplo: Reanudar entrenamiento desde checkpoint (opcional)
# Descomenta y modifica según necesites

ruta_checkpoint = os.path.join(carpeta_dueling, "dueling_ddqn_per_ep21000.pth")

# Cargar checkpoint (weights_only=False para checkpoints con objetos numpy)
checkpoint = torch.load(ruta_checkpoint, weights_only=False)

# Crear red y cargar pesos
forma_entrada = tuple(checkpoint['forma_entrada'])
num_acciones = checkpoint['num_acciones']
red_reanudada = RedDuelingDQN(forma_entrada, num_acciones).to(dispositivo)
red_reanudada.load_state_dict(checkpoint['red_q'])

episodio_inicio = checkpoint['episodio']
print(f"Checkpoint cargado desde episodio: {episodio_inicio}")

Checkpoint cargado desde episodio: 21000
