In [1]:
!pip install vizdoom

Collecting vizdoom
  Downloading vizdoom-1.2.4-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (11 kB)
Downloading vizdoom-1.2.4-cp311-cp311-manylinux_2_28_x86_64.whl (28.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m28.1/28.1 MB[0m [31m62.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: vizdoom
Successfully installed vizdoom-1.2.4


**Import Libraries**

In [3]:
import gymnasium as gym
import vizdoom as vzd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
import cv2
import matplotlib.pyplot as plt

**Create Game Environment**

In [21]:
#create vizdoom game environment function
def create_vizdoom_env():
    # initiate the game
    game = vzd.DoomGame()
    # load game configurations
    game.load_config(vzd.scenarios_path + "/basic.cfg")

    # Disable rendering to avoid animation
    game.set_window_visible(False)
    game.set_render_hud(False)
    game.set_render_crosshair(False)
    game.set_render_weapon(False)
    game.set_render_decals(False)
    game.set_render_particles(False)
    game.set_screen_resolution(vzd.ScreenResolution.RES_160X120)

    game.init()
    return game

**Store Experiences for Learning**

In [6]:
# ReplayBuffer class for storing the past experiences
class ReplayBuffer:
    def __init__(self, capacity = 100000, history_len = 4):  # max 100000 frames can be saved, and 4 frames to define a state
        self.capacity = capacity
        self.frame_queue = deque(maxlen = history_len)
        self.history_len = history_len
        self.buffer = deque(maxlen = capacity)
        
# reset history and store first frame in the frame_queue which is also the default state
    def reset_history(self, first_frame: torch.Tensor):
        
        self.frame_queue.clear()
        for _ in range(self.history_len):
            self.frame_queue.append(first_frame.clone())

# this function first check the new frame shape and then append the state, action, reward, next_state, done in a list
    def push(self, new_frame: torch.Tensor, action: int, reward: float, done: bool):

        assert new_frame.shape == (1, 84, 84)

        state_stack = torch.cat(list(self.frame_queue), dim =0) # concatenate frames along the dimension 0 which is along the batch size
# add new frame in frame queue and then create a new stack of next state
        self.frame_queue.append(new_frame) 
        next_state_stack = torch.cat(list(self.frame_queue), dim=0)

        self.buffer.append((state_stack, action, reward, next_state_stack, done))
# this function takes some samples as experience from batches
    def sample(self, batch_size: int):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        return list(states), list(actions), list(rewards), list(next_states), list(dones)

    def __len__(self):
        return len(self.buffer)

**Preprocess the input states**

In [7]:
def preprocess(state):
    #cv2 takes image in the shape (H,W,C)
    img = np.moveaxis(state, 0, -1)  # input image: (3, 240, 320), output image: (240, 320, 3)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(gray, (84, 84))
    return torch.tensor(resized, dtype=torch.float32).unsqueeze(0) / 255.0  # shape: (1, 84, 84)

**Model architecture**

In [22]:
class CNNDQN(nn.Module):
    def __init__(self, action_space):
        super(CNNDQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(4, 32, 8, 4),      # input image: (1,4, 84, 84) 
            nn.BatchNorm2d(32),
            nn.SiLU(),

            nn.Conv2d(32, 64, 4, 2),     
            nn.BatchNorm2d(64),
            nn.SiLU(),

            nn.Conv2d(64, 64, 3, 1),    
            nn.BatchNorm2d(64),
            nn.SiLU()
        )

        # automatic feature size detection
        with torch.no_grad():
            dummy = torch.zeros(1, 4, 84, 84)
            flat_dim = self.conv(dummy).view(1, -1).size(1)

        self.value   = nn.Sequential(
            nn.Linear(flat_dim, 512), nn.SiLU(),
            nn.Linear(512, 1)
        )
        self.advantage = nn.Sequential(
            nn.Linear(flat_dim, 512), nn.SiLU(),
            nn.Linear(512, action_space)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:        
        z = self.conv(x)                                       
        z = z.flatten(1)                                      
        v = self.value(z)                                      
        a = self.advantage(z)                                  
        q = v + a - a.mean(dim=1, keepdim=True)              
        return q


**Training setup**

In [None]:
EPISODES       = 2000
BATCH_SIZE     = 32
GAMMA          = 0.99
LR             = 1e-4
EPS_START      = 1.0
EPS_MIN        = 0.10
EPS_DECAY      = 0.995      
TARGET_SYNC    = 10          
GRAD_CLIP      = 5.0



def train():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    game       = create_vizdoom_env()                 
    n_actions  = game.get_available_buttons_size()
# we initialize two networks: (1) policy and (2) target. Target network will help in evaluating the "running" policy network
    policy_net  = CNNDQN(n_actions).to(device) 
    target_net  = CNNDQN(n_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict()) 
    target_net.eval()

    optimiser   = optim.Adam(policy_net.parameters(), lr=LR)
    loss_fn     = nn.SmoothL1Loss()
    buffer      = ReplayBuffer(capacity=100_000)

    epsilon     = EPS_START

    for ep in range(EPISODES):
        game.new_episode()

# preprocessing the game state to get the first frame which will be stored as a reset frame
        first_frame = preprocess(game.get_state().screen_buffer) 
        buffer.reset_history(first_frame)

        total_reward, steps = 0.0, 0

        while not game.is_episode_finished():
            steps += 1

            # build current (4,84,84) stack and add batch dim → (1,4,84,84)
            state_stack = torch.cat(list(buffer.frame_queue), dim=0).unsqueeze(0).to(device)

            # epsilon-greedy action
            if torch.rand(1).item() < epsilon:
                action_idx = torch.randint(0, n_actions, (1,)).item()
            else:
                with torch.no_grad():
                    action_idx = policy_net(state_stack).argmax().item()

            # act (repeat = 1 frame)
            reward = game.make_action([1 if i == action_idx else 0 for i in range(n_actions)])

            # basic shaping / clipping
            reward = max(min(reward, 1), -1)

            done = game.is_episode_finished()
            if not done:
                new_frame = preprocess(game.get_state().screen_buffer)  # (1,84,84)
            else:
                new_frame = torch.zeros_like(first_frame)               # dummy frame

            # store transition (buffer handles stacking internally)
            buffer.push(new_frame, action_idx, reward, done)
            total_reward += reward

            
            if len(buffer) >= BATCH_SIZE:
                states, actions, rewards, next_states, dones = buffer.sample(BATCH_SIZE)

                states      = torch.stack(states).to(device)           
                next_states = torch.stack(next_states).to(device)
                actions     = torch.tensor(actions).long().to(device)
                rewards     = torch.tensor(rewards).float().to(device)
                dones       = torch.tensor(dones).float().to(device)

                # current Q(s,a)
                q_pred = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()

                # Double-DQN target Q-value
                with torch.no_grad():
                    next_actions = policy_net(next_states).argmax(1, keepdim=True)
                    q_next = target_net(next_states).gather(1, next_actions).squeeze()
                    q_tgt  = rewards + GAMMA * q_next * (1 - dones)

                # optimise
                loss = loss_fn(q_pred, q_tgt)
                optimiser.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(policy_net.parameters(), GRAD_CLIP)
                optimiser.step()

        
        if ep % TARGET_SYNC == 0:
            target_net.load_state_dict(policy_net.state_dict())
            print(f"Ep {ep:4d} │ steps {steps:3d} │ reward {total_reward:6.2f} │ epsilon {epsilon:.2f}")

        # ε decay
        epsilon = max(EPS_MIN, epsilon * EPS_DECAY)

    game.close()

In [23]:
train()

Ep    0 │ steps 300 │ reward -300.00 │ epsilon 1.00
Ep   10 │ steps   7 │ reward  -5.00 │ epsilon 0.95
Ep   20 │ steps 300 │ reward -300.00 │ epsilon 0.90
Ep   30 │ steps 300 │ reward -300.00 │ epsilon 0.86
Ep   40 │ steps 165 │ reward -163.00 │ epsilon 0.82
Ep   50 │ steps 300 │ reward -300.00 │ epsilon 0.78
Ep   60 │ steps 300 │ reward -300.00 │ epsilon 0.74
Ep   70 │ steps  29 │ reward -27.00 │ epsilon 0.70
Ep   80 │ steps  23 │ reward -21.00 │ epsilon 0.67
Ep   90 │ steps 300 │ reward -300.00 │ epsilon 0.64
Ep  100 │ steps 300 │ reward -300.00 │ epsilon 0.61
Ep  110 │ steps   6 │ reward  -4.00 │ epsilon 0.58
Ep  120 │ steps 300 │ reward -300.00 │ epsilon 0.55
Ep  130 │ steps 192 │ reward -190.00 │ epsilon 0.52
Ep  140 │ steps  10 │ reward  -8.00 │ epsilon 0.50
Ep  150 │ steps  26 │ reward -24.00 │ epsilon 0.47
Ep  160 │ steps 300 │ reward -300.00 │ epsilon 0.45
Ep  170 │ steps 300 │ reward -300.00 │ epsilon 0.43


KeyboardInterrupt: 