In [23]:
#misc
import numpy as np
import random
# env
import gymnasium as gym
from vizdoom import gymnasium_wrapper
# learning
import torch
import torch.nn as nn
import torch.autograd as autograd
import stable_baselines3

# Game Loop

In [24]:
env = gym.make("VizdoomBasic-v0")

obs, info = env.reset(seed=42)
for _ in range(100):
    action = env.action_space.sample()

    obs, rew, term, trun, info = env.step(action)

    if term or trun:
        obs, info = env.reset()

env.close()



# Setup for DQN

We first need a **Transition** class which represents a `(state, action) -> (state', reward)` datapoint.

Then we need a **Replay Memory** class to store and utilize these transitions.

In [25]:
class Transition():
    def __init__(self) -> None:
        self.state
        self.action
        self.next_state
        self.reward

In [26]:
class ReplayMemory(object):
    def __init__(self):
        self.memory = np.array()

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# DQN Class

In [27]:
class DQN(nn.Module):
    def __init__(self, n_actions) -> None:
        super(DQN, self).__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=3, stride=2, bias=False),
            nn.BatchNorm2d(8),
            nn.ReLU()
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(8, 8, kernel_size=3, stride=2, bias=False),
            nn.BatchNorm2d(8),
            nn.ReLU()
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(8, 8, kernel_size=3, stride=1, bias=False),
            nn.BatchNorm2d(8),
            nn.ReLU(),
        )

        self.conv4 = nn.Sequential(
            nn.Conv2d(8, 16, kernel_size=3, stride=1, bias=False),
            nn.BatchNorm2d(16),
            nn.ReLU(),
        )

        self.state_fc = nn.Sequential(nn.Linear(96, 64), nn.ReLU(), nn.Linear(64, 1))

        self.advantage_fc = nn.Sequential(
            nn.Linear(96, 64), nn.ReLU(), nn.Linear(64, n_actions)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = x.view(-1, 192)
        x1 = x[:, :96]  # input for the net to calculate the state value
        x2 = x[:, 96:]  # relative advantage of actions in the state
        state_value = self.state_fc(x1).reshape(-1, 1)
        advantage_values = self.advantage_fc(x2)
        x = state_value + (
            advantage_values - advantage_values.mean(dim=1).reshape(-1, 1)
        )

        return x

# Training

We will add some boilerplate for the training, and some helper functions.

In [31]:
# Utilize GPU for training if GPU present
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize RNG seed
seed:int = 42 #rng seed
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

# HYPERPARAMETERS
total_timesteps:int = 500000 # timestep max of an experiment
lr:float = 0.0001
buffer_size:int = 10000 # experience replay buffer size
gamma: float = 0.99 # discount factor
batch_size: int = 128 # batch size for experience replay buffer sampling
epsilon_max: float = 1 # starting epsilon value (exploration/exploitation)
epsilon_min:float = 0.05 # ending epsilon value
epsilon_duration:float = 0.5 # time spent before min epsilon is reached
training_start:int = 10000 # steps needed before training begins
tnur: int = 1 # target network update rate
tnuf: int = 500 # target network update frequency
qntf: int = 10 # qnetwork training frequency

Epsilon decay let's us start by picking random actions, then slowly start picking actions that yield high rewards. We first explore a wide array of options, and once we have an idea of what works and what doesn't, we start exploiting that knowledge and ldive deeper.

In [29]:
def epsilon_decay(current_timestep: int):
    slope = (epsilon_min - epsilon_max) / total_timesteps
    return max(slope * current_timestep + epsilon_max, epsilon_min)

In [30]:
def get_action(obs, policy_net:DQN, current_timestep:int):
    rng = random.random()
    epsilon = epsilon_decay(current_timestep)
    if rng > epsilon:
        # action with highest q_value
        q_values = policy_net(torch.Tensor(obs).to(device))
        action = torch.argmax(q_values).cpu().numpy()
    else:
        # random action 
        action = env.action_space.sample()
    return action
    