In [None]:
import os
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import random
import torch.nn.functional as F
import vizdoom as vzd

In [None]:
# Set up ViZDoom environment
def setup_vizdoom():
    game = vzd.DoomGame()
    game.set_doom_scenario_path(os.path.join(vzd.scenarios_path, "basic.wad"))
    game.set_doom_map("map01")
    game.set_screen_resolution(vzd.ScreenResolution.RES_160X120)
    game.set_screen_format(vzd.ScreenFormat.RGB24)
    game.set_window_visible(True)
    game.set_available_buttons([vzd.Button.MOVE_LEFT, vzd.Button.MOVE_RIGHT, vzd.Button.ATTACK])
    game.set_mode(vzd.Mode.PLAYER)
    game.set_living_reward(-1)
    game.init()
    return game

In [None]:
class QNetwork(nn.Module):
    def __init__(self, image_height: int, image_width: int, num_actions: int):
        super(QNetwork, self).__init__()
        h = image_height
        w = image_width

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=4)
        h //= 4
        w //= 4

        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=4)
        h //= 4
        w //= 4

        self.fc = nn.Sequential(
            nn.Linear(h * w * 32, 128),
            nn.ReLU(),
            nn.Linear(128, num_actions)
        )

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = x.view(x.size(0), -1)  # Flatten
        return self.fc(x)

In [None]:
class QLearningAgent:
    def __init__(self, env, lr=0.0003, gamma=0.95, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.env = env
        screen_shape = self.env.get_state().screen_buffer.shape
        self.image_height, self.image_width, _ = screen_shape
        self.num_actions = len(env.get_available_buttons())

        self.q_network = QNetwork(self.image_height, self.image_width, self.num_actions)
        self.target_network = QNetwork(self.image_height, self.image_width, self.num_actions)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.target_network.eval()

        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.actions = [
            [True, False, False],  # MOVE_LEFT
            [False, True, False],  # MOVE_RIGHT
            [False, False, True],  # ATTACK
        ]

    def select_action(self, obs):
        if random.random() < self.epsilon:
            return random.randint(0, self.num_actions - 1)
        obs = torch.tensor(obs, dtype=torch.float).unsqueeze(0)
        with torch.no_grad():
            q_values = self.q_network(obs)
        return torch.argmax(q_values).item()

    def train(self, num_episodes, batch_size=32, target_update=10, replay_buffer_size=5000):
        replay_buffer = []
        for episode in range(num_episodes):
            self.env.new_episode()
            obs = self._process_obs(self.env.get_state().screen_buffer)
            episode_reward = 0

            while not self.env.is_episode_finished():
                action_idx = self.select_action(obs)
                action = self.actions[action_idx]
                reward = self.env.make_action(action)

                next_obs = (
                    self._process_obs(self.env.get_state().screen_buffer)
                    if not self.env.is_episode_finished()
                    else None
                )
                replay_buffer.append((obs, action_idx, reward, next_obs))
                if len(replay_buffer) > replay_buffer_size:
                    replay_buffer.pop(0)

                obs = next_obs
                episode_reward += reward

                if len(replay_buffer) >= batch_size:
                    self._train_step(replay_buffer, batch_size)

            self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)

            if episode % target_update == 0:
                self.target_network.load_state_dict(self.q_network.state_dict())

            print(f"Episode {episode + 1}/{num_episodes}, Reward: {episode_reward}, Epsilon: {self.epsilon:.2f}")

    def _train_step(self, replay_buffer, batch_size):
        batch = random.sample(replay_buffer, batch_size)
        obs_batch, action_batch, reward_batch, next_obs_batch = zip(*batch)

        obs_batch = torch.tensor(np.array(obs_batch), dtype=torch.float)
        action_batch = torch.tensor(action_batch, dtype=torch.long)
        reward_batch = torch.tensor(reward_batch, dtype=torch.float)
        next_obs_batch = torch.tensor(
            [x for x in next_obs_batch if x is not None], dtype=torch.float
        )

        q_values = self.q_network(obs_batch)
        next_q_values = torch.zeros(len(batch), self.num_actions)
        if len(next_obs_batch) > 0:
            next_q_values[: len(next_obs_batch)] = self.target_network(next_obs_batch)

        max_next_q_values = torch.max(next_q_values, dim=1)[0]
        target_q_values = reward_batch + (self.gamma * max_next_q_values)

        current_q_values = q_values.gather(1, action_batch.unsqueeze(1)).squeeze()

        loss = F.mse_loss(current_q_values, target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def _process_obs(self, obs):
        obs = obs.transpose(2, 0, 1)
        obs = obs / 255.0  
        return obs

In [None]:
if __name__ == "__main__":
  
    env = setup_vizdoom()
    agent = QLearningAgent(env)
    agent.train(num_episodes=500)

    env.close()