<a href="https://www.kaggle.com/code/oscarandresgutierrez/dqn-doom-v0?scriptVersionId=252095619" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
!pip install vizdoom==1.2.4 gymnasium pyvirtualdisplay imageio imageio-ffmpeg \
        opencv-python-headless torch torchvision --quiet


In [None]:
from pyvirtualdisplay import Display; Display(visible=0, size=(640,480)).start()

import gymnasium as gnm, numpy as np, random, collections, time, cv2, imageio, torch
import torch.nn as nn, torch.nn.functional as F
from vizdoom import gymnasium_wrapper          # registra los entornos VizDoom

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
import vizdoom as vzd
import gymnasium as gnm
import cv2, numpy as np

# ── 84×84 escala de grises  (sin canal extra) ─────────────────────
class GrayResize(gnm.ObservationWrapper):
    """
    Convierte obs["screen"] a escala de grises, la redimensiona a 84×84
    y devuelve un ndarray uint8 de shape (84,84)  (sin eje de canal).
    """
    def __init__(self, env, shape=(84, 84)):
        super().__init__(env)
        self.shape = shape
        self.observation_space = gnm.spaces.Box(
            0, 255, shape, dtype=np.uint8
        )

    def observation(self, obs):
        frame = cv2.cvtColor(obs["screen"], cv2.COLOR_RGB2GRAY)
        return cv2.resize(frame, self.shape, interpolation=cv2.INTER_AREA)

# ── reward‑shaping (daño + disparo fallido) ───────────────────────
class RewardShaper(gnm.Wrapper):
    def __init__(self, env, k_hp=-0.1, k_shot=-0.05):
        super().__init__(env)
        self.k_hp, self.k_shot = k_hp, k_shot
        self.game = self.env.unwrapped.game
        btns = list(self.game.get_available_buttons())
        self.attack_idx = btns.index(vzd.Button.ATTACK)
        self.hp_prev = self.frag_prev = None

    def reset(self, **kw):
        obs, info = self.env.reset(**kw)
        self.hp_prev   = self.game.get_game_variable(vzd.GameVariable.HEALTH)
        self.frag_prev = self.game.get_game_variable(vzd.GameVariable.FRAGCOUNT)
        return obs, info

    def step(self, act_idx):
        obs, r, term, trunc, info = self.env.step(act_idx)
        hp   = self.game.get_game_variable(vzd.GameVariable.HEALTH)
        frag = self.game.get_game_variable(vzd.GameVariable.FRAGCOUNT)

        dmg   = max(self.hp_prev - hp, 0)
        shot  = bool(self.game.get_last_action()[self.attack_idx])
        miss_pen = self.k_shot if shot and frag == self.frag_prev else 0

        shaped = r + self.k_hp * dmg + miss_pen
        self.hp_prev, self.frag_prev = hp, frag
        return obs, shaped, term, trunc, info

# ── fábrica de entornos ────────────────────────────────────────────
def make_env(seed=None):
    env = gnm.make("VizdoomDefendLine-v0", render_mode="rgb_array", frame_skip=4)
    env = GrayResize(env)
    env = gnm.wrappers.FrameStack(env, 4)        # (84,84,4)
    env = RewardShaper(env)
    env.action_space.seed(seed)
    return env


In [None]:
class ReplayBuffer:
    def __init__(self, cap, obs_shape):
        self.cap = cap
        self.ptr = 0; self.full = False
        self.s  = np.empty((cap, *obs_shape), np.uint8)
        self.a  = np.empty((cap,),            np.int64)
        self.r  = np.empty((cap,),            np.float32)
        self.s2 = np.empty((cap, *obs_shape), np.uint8)
        self.d  = np.empty((cap,),            np.bool_)
    def add(self, s,a,r,s2,d):
        self.s[self.ptr], self.a[self.ptr], self.r[self.ptr] = s, a, r
        self.s2[self.ptr], self.d[self.ptr] = s2, d
        self.ptr = (self.ptr + 1) % self.cap
        self.full |= self.ptr == 0
    def sample(self, batch):
        idx = np.random.randint(0, self.cap if self.full else self.ptr, batch)
        to_t = lambda x, dtype=None: torch.as_tensor(x, device=device, dtype=dtype)
        return (to_t(self.s[idx]),
                to_t(self.a[idx]),
                to_t(self.r[idx]),
                to_t(self.s2[idx]),
                to_t(self.d[idx]))


In [None]:
class DoomDQN(nn.Module):
    def __init__(self, n_actions):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(4,32,8,4), nn.ReLU(),
            nn.Conv2d(32,64,4,2), nn.ReLU(),
            nn.Conv2d(64,64,3,1), nn.ReLU())
        self.head = nn.Sequential(
            nn.Linear(7*7*64,512), nn.ReLU(),
            nn.Linear(512,n_actions))
    def forward(self, x):
        x = x.float() / 255.0
        x = self.conv(x).view(x.size(0), -1)
        return self.head(x)


In [None]:
# ── helpers ──────────────────────────────────────────────────────
# ── helper: garantiza (C,84,84) ───────────────────────────────────
def to_chw(obs):
    """
    Convierte cualquier arreglo con un eje‑canal de tamaño 4
    a formato (4,84,84) uint8.

    Maneja:
      • (84,84,4)   → (4,84,84)
      • (84,4,84)   → (4,84,84)
      • (4,84,84)   → (4,84,84)  (ya correcto)
    """
    arr = np.array(obs, copy=False).squeeze()

    if arr.ndim == 3:
        # si ya está en CHW, shape[0] == 4 → nada que hacer
        if arr.shape[0] != 4:
            # localiza eje cuyo tamaño es 4 y lo mueve a la posición 0
            ch_axis = int(np.where(np.array(arr.shape) == 4)[0][0])
            arr = np.moveaxis(arr, ch_axis, 0)

    return arr

def record_checkpoint(net, step, max_frames=1_000, fps=30):
    """
    • Guarda los pesos en  «dqn_step_<step>.pth»
    • Graba un episodio determinista en «dqn_step_<step>.mp4»
    """
    tag = f"dqn_step_{step:,}"
    torch.save(net.state_dict(), f"{tag}.pth")

    env_eval = make_env()
    frames, ep_R = [], 0
    s,_ = env_eval.reset(); s = to_chw(s); done = False
    with torch.no_grad():
        while not done and len(frames) < max_frames:
            a = net(torch.tensor(s[None], device=device)).argmax(1).item()
            nxt, r, done, term, _ = env_eval.step(a)
            frames.append(env_eval.render())
            s, ep_R = to_chw(nxt), ep_R + r
    env_eval.close()
    imageio.mimsave(f"{tag}.mp4", frames, fps=fps)
    print(f"💾 guardado {tag}.pth   🎞️ {tag}.mp4   |  Reward={ep_R:.1f}")

# ── hiperparámetros ─────────────────────────────────────────────
MAX_STEPS      = 5_000_000          # 🔧 pasos totales
LEARN_START    = 10_000
BATCH          = 32
GAMMA          = 0.99
LR             = 1e-4
TARGET_SYNC    = 10_000
EPS_START, EPS_END, EPS_DECAY = 1.0, 0.05, 1_000_000
BUFFER_SIZE    = 100_000
SEED           = 42

# ── inicialización ──────────────────────────────────────────────
env       = make_env(SEED)
n_actions = env.action_space.n
q_net     = DoomDQN(n_actions).to(device)
tgt_net   = DoomDQN(n_actions).to(device); tgt_net.load_state_dict(q_net.state_dict())
opt       = torch.optim.Adam(q_net.parameters(), lr=LR)
buf       = ReplayBuffer(BUFFER_SIZE, (4,84,84))

rewards, eps = [], EPS_START
state,_ = env.reset(seed=SEED); state = to_chw(state)
ep_R, t0 = 0, time.time()

# ── bucle de entrenamiento ──────────────────────────────────────
for step in range(1, MAX_STEPS + 1):

    # ε‑greedy
    if random.random() < eps:
        action = env.action_space.sample()
    else:
        with torch.no_grad():
            action = int(q_net(torch.tensor(state[None], device=device)).argmax(1))

    # transición
    nxt, r, done, term, _ = env.step(action)
    nxt = to_chw(nxt)
    buf.add(state, action, r, nxt, done or term)
    state, ep_R = nxt, ep_R + r

    # aprendizaje
    if step > LEARN_START:
        s, a, rn, s2, d = buf.sample(BATCH)
        q_pred = q_net(s).gather(1, a.unsqueeze(1)).squeeze(1)
        with torch.no_grad():
            a2 = q_net(s2).argmax(1)
            q_next = tgt_net(s2).gather(1, a2.unsqueeze(1)).squeeze(1)
            y = rn + GAMMA * q_next * (~d)
        loss = F.smooth_l1_loss(q_pred, y)

        opt.zero_grad(); loss.backward()
        nn.utils.clip_grad_norm_(q_net.parameters(), 10)
        opt.step()

        if step % TARGET_SYNC == 0:
            tgt_net.load_state_dict(q_net.state_dict())
        eps = max(EPS_END, EPS_START - (step - LEARN_START) / EPS_DECAY)

    # fin de episodio
    if done or term:
        rewards.append(ep_R)
        ep_R = 0
        state,_ = env.reset(); state = to_chw(state)

    # logging
    if step % 10000 == 0:
        avg = np.mean(rewards[-100:]) if rewards else 0
        print(f"Paso {step:,} | ε={eps:.3f} | Reward_100ep={avg:6.2f}")

    # 🔖 checkpoint + vídeo cada 50 k pasos
    if step % 500_000 == 0:
        record_checkpoint(q_net, step)

env.close()
torch.save(q_net.state_dict(), "dqn_defendline_final.pth")
print(f"\nEntrenamiento finalizado en {(time.time()-t0)/60:.1f} min — modelo guardado.")


In [None]:
import matplotlib.pyplot as plt
from IPython.display import Video, display

def play_and_record(model_path, vid="dqn_defendline_eval.mp4", episodes=3):
    env = make_env()
    net = DoomDQN(env.action_space.n).to(device)
    net.load_state_dict(torch.load(model_path, map_location=device))
    net.eval()

    frames, ep_ret = [], []
    for _ in range(episodes):
        s,_ = env.reset(); s = to_chw(s); R, done = 0, False
        while not done:
            with torch.no_grad():
                a = net(torch.tensor(s[None], device=device)).argmax(1).item()
            s2,r,done,_,_ = env.step(a)
            frames.append(env.render())        # RGB frame
            s, R = to_chw(s2), R + r
        ep_ret.append(R)
    env.close()
    imageio.mimsave(vid, frames, fps=30)
    return vid, ep_ret

vid_path, eval_R = play_and_record("dqn_defendline.pth")
display(Video(vid_path, embed=True, height=480))

plt.figure(figsize=(6,3))
plt.plot(rewards, label="Train reward (ep.)")
plt.axhline(np.mean(eval_R), color="r", ls="--", label=f"Eval avg ({np.mean(eval_R):.1f})")
plt.xlabel("Episodio"); plt.ylabel("Reward"); plt.title("DQN – Vizdoom DefendLine")
plt.legend(); plt.tight_layout(); plt.show()
