# VizDoom tournament

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import numpy as np
import torch
from torch.nn import functional as F
from torchvision import transforms

from arena import VizdoomMPEnv

In [3]:
def resize(x):
    # batch dimension for interpolation
    if x.ndim < 4:
        x = x.unsqueeze(0)
    return F.interpolate(x, (128, 128))


def minmax(x):
    return x / 255.0


def to_tensor(x):
    if isinstance(x, np.ndarray):
        x = torch.from_numpy(x)
    return x


frame_transform = transforms.Compose([to_tensor, minmax, resize])

In [None]:
env = VizdoomMPEnv(
    num_players=2,
    num_bots=0,
    episode_timeout=5000,
    player_transforms=frame_transform,
)

In [None]:
env.action_space, env.observation_space

## Random policy (2 players)

In [None]:
frames = {k: [] for k in range(env.num_players)}

for episode in range(2):
    ep_return = {k: 0.0 for k in range(env.num_players)}
    ep_step = 0
    obs = env.reset()
    for i, o in enumerate(obs):
        frames[i].append(o)
    for step in range(100):
        act = env.action_space.sample()
        obs, rwd, done, info = env.step(act)
        ep_return = {k: ep_return[k] + rwd[i] for i, k in enumerate(ep_return)}
        for i, o in enumerate(obs):
            frames[i].append(o)
        if done:
            print("ep steps: {}; ep return: {}".format(ep_step, ep_return))
            break
        else:
            ep_step += 1

In [None]:
from IPython.display import HTML

from arena.render import render_episode


ani = render_episode(frames)
HTML(ani.to_html5_video())

## Deep Q-learning
Single agent, against bots

In [8]:
import random
import torch
from torch import nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque

In [9]:
device = "cuda"

GAMMA = 0.95
EPISODES = 100
BATCH_SIZE = 512
REPLAY_BUFFER_SIZE = 20000
LEARNING_RATE = 1e-4
EPSILON_START = 1.0
EPSILON_END = 0.1
EPSILON_DECAY = 10000

In [None]:
env = VizdoomMPEnv(
    num_players=1,
    num_bots=2,
    episode_timeout=5000,
    player_transforms=frame_transform,
)

In [None]:
class DQN(nn.Module):
    def __init__(self, action_space=6):
        super().__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=8, stride=4),
            nn.BatchNorm2d(32),
            nn.SiLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.BatchNorm2d(64),
            nn.SiLU(),
            nn.Conv2d(64, 32, kernel_size=3, stride=2),
            nn.BatchNorm2d(32),
            nn.SiLU(),
        )

        self.fc_layers = nn.Sequential(
            nn.Linear(32 * 6 * 6, 512), nn.SiLU(), nn.Linear(512, action_space)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x


dqn = DQN(action_space=env.action_space.n).to(device)
optimizer = optim.Adam(dqn.parameters(), lr=LEARNING_RATE)

print(f"Parameters: {sum(p.numel() for p in dqn.parameters()) / 1e3:.1f}K")

In [17]:
# Replay buffer
replay_buffer = deque(maxlen=REPLAY_BUFFER_SIZE)


@torch.no_grad
def epsilon_greedy(state, epsilon):
    if random.random() < epsilon:
        return env.action_space.sample()
    else:
        state = state.to(device)
        q_values = dqn(state)
        return q_values.argmax().item()

## Training loop

In [None]:
epsilon = EPSILON_START
steps_done = 0
q_loss_list = []  # Track Q-loss per episode
reward_list = []  # Track total rewards per episode

for episode in range(EPISODES):
    ep_return = 0.0
    ep_step = 0
    obs = env.reset()
    obs = obs[0]  # Single player

    for step in range(100):
        act = epsilon_greedy(obs, epsilon)
        next_obs, rwd, done, info = env.step(act)

        # Single player adjustments
        rwd = rwd[0]
        next_obs = next_obs[0]

        # Store in replay buffer
        replay_buffer.append((obs, act, rwd, next_obs, done))

        obs = next_obs
        ep_return += rwd

        # Train if buffer has enough samples
        if len(replay_buffer) > BATCH_SIZE:
            batch = random.sample(replay_buffer, BATCH_SIZE)
            states, actions, rewards, next_states, dones = zip(*batch)

            states = torch.cat(states).to(device, dtype=torch.float32)
            next_states = torch.cat(next_states, 0).to(device, dtype=torch.float32)
            actions = torch.tensor(actions, dtype=torch.long).to(device)
            rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
            dones = torch.tensor(dones, dtype=torch.float32).to(device)

            q_values = dqn(states).gather(1, actions.unsqueeze(1)).squeeze(1)

            with torch.no_grad():
                next_q_values = dqn(next_states).max(1).values
                target_q_values = rewards + GAMMA * next_q_values * (1 - dones)

            loss = F.mse_loss(q_values, target_q_values)
            q_loss_list.append(loss.item())  # Store loss

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Update epsilon
        epsilon = max(EPSILON_END, EPSILON_START - steps_done / EPSILON_DECAY)
        steps_done += 1

        if done:
            break
        else:
            ep_step += 1

    reward_list.append(ep_return)

    avg_reward = np.mean(reward_list[-10:])
    print(f"Episode {episode + 1}/{EPISODES}, steps: {ep_step}, epsilon: {epsilon:.2f}")
    print(f"\tReturn: {ep_return:.2f}, avg Reward (last 10): {avg_reward:.2f}")
    if len(q_loss_list) > 0:
        avg_q_loss = np.mean(q_loss_list[-10:])
        print(f"\tAvg Q-loss: {avg_q_loss:.4f}\n")

In [None]:
frames = {k: [] for k in range(env.num_players)}

ep_return = {k: 0.0 for k in range(env.num_players)}
ep_step = 0
done = False
obs = env.reset()
for i, o in enumerate(obs):
    frames[i].append(o)
while not done:
    obs = obs[0].to(device)
    act = dqn(obs).argmax().item()
    obs, rwd, done, info = env.step(act)
    ep_return = {k: ep_return[k] + rwd[i] for i, k in enumerate(ep_return)}
    for i, o in enumerate(obs):
        frames[i].append(o)
    if done:
        print("ep steps: {}; ep return: {}".format(ep_step, ep_return))
        break
    else:
        ep_step += 1

In [None]:
from IPython.display import HTML

from arena.render import render_episode


ani = render_episode(frames)
HTML(ani.to_html5_video())