In [None]:
import random

from coderone.dungeon.agent import GameState, PlayerState
from coderone.dungeon.game import Game, Recorder

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

from tqdm import tqdm

In [None]:
ACTIONS = ['', 'u','d','l','r','p']

In [None]:
class DQN(nn.Module):
    def __init__(self, map_size=(8, 10, 12), n_actions=6):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(map_size[0], 16, kernel_size=3, stride=1, padding=1)  # 32x10x12
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=2)  # 128x4x5
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1)  # 256x2x3
        self.fc = nn.Linear(64*6, n_actions)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [None]:
class Agent:
    _next_move = ''
    game_state = None
    player_state = None

    def set_next_move(self, next_move):
        self._next_move = next_move

    def next_move(self):
        return self._next_move

    def update(self, game_state: GameState, player_state: PlayerState):
        self.game_state = game_state
        self.player_state = player_state

class Opponent:
    def next_move(self):
        return random.choice(ACTIONS)

    def update(self, game_state: GameState, player_state: PlayerState):
        pass

class Environment:
    def __init__(self):
        self.reset()
        self.last_reward = 0

    def reset(self):
        self.agent = Agent()
        self.game = Game(row_count=10, column_count=12, max_iterations=1800, recorder=Recorder())
        self.game.add_agent(self.agent, "Agent")
        self.game.add_agent(Opponent(), "Opponent")
        self.game.generate_map()

        self.game.tick(0)
        game_state, player_state = self.agent.game_state, self.agent.player_state
        state_repr = self.map_to_vec(game_state, player_state)
        return state_repr

    def step(self, action):
        self.agent.set_next_move(action)
        self.game.tick(0)
        game_state, player_state = self.agent.game_state, self.agent.player_state
        state_repr = self.map_to_vec(game_state, player_state)
        
        reward = player_state.reward - self.last_reward
        self.last_reward = player_state.reward

        if self.game.is_over:
            if player_state.hp > 0:
                reward += 10
            else:
                reward -= 10

        return state_repr, reward, self.game.is_over

    def map_to_vec(self, game_state: GameState, player_state: PlayerState):
        width, height = game_state.size
        vec = torch.zeros((8, height, width), device="cuda")  # Channels come first
        for block in game_state.indestructible_blocks:
            vec[0, block[1], block[0]] = 1
        for block in game_state.soft_blocks:
            vec[1, block[1], block[0]] = 1
        for block in game_state.ore_blocks:
            vec[2, block[1], block[0]] = 1
        for bomb in game_state.bombs:
            vec[3, bomb[1], bomb[0]] = 1
        for ammo in game_state.ammo:
            vec[4, ammo[1], ammo[0]] = 1
        for treasure in game_state.treasure:
            vec[5, treasure[1], treasure[0]] = 1
        for player_position in game_state.opponents():
            vec[6, player_position[1], player_position[0]] = 1
        
        vec[7, player_state.location[1], player_state.location[0]] = 1
    
        return vec.unsqueeze(0)


In [None]:
model = DQN().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
environment = Environment()


progress_bar = tqdm()
for epoch in range(100):
    state_tensor = environment.reset()
    total_loss = 0

    while True:
        q_values = model(state_tensor)

        action_idx = q_values.argmax().item()
        next_state_tensor, reward, done = environment.step(ACTIONS[action_idx])
        next_q_values = model(next_state_tensor)

        max_next_q = torch.max(next_q_values)
        target_q = reward + (0.999 * max_next_q)

        loss = criterion(q_values[0][action_idx], target_q)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        state_tensor = next_state_tensor
        total_loss += loss.item()

        progress_bar.update(1)
        progress_bar.set_description(f"E{epoch + 1}, loss: {loss.item():.4f}")

        if done:
            break