# Importar Juego

In [None]:
from google.colab import drive
import sys

drive.mount('/content/drive')
sys.path.append('/content/drive/My Drive/TFE/Azul/')

from Azul import AzulGame, AzulState
game = AzulGame()


# Codificación Estados

In [None]:
import numpy as np

def state_to_vector(state):
    # Codificar las fábricas (agrupadas por color)
    tile_colors = ['B', 'Y', 'R', 'K', 'W']
    factories_vector = []

    for factory in state.factories:
        factory_encoding = [0] * len(tile_colors)
        for tile in factory:
            factory_encoding[tile_colors.index(tile)] += 1
        factories_vector.extend(factory_encoding)

    # Codificar el centro (agrupado por color)
    center_vector = [0] * len(tile_colors)
    for tile in state.center:
        if tile in tile_colors:
            center_vector[tile_colors.index(tile)] += 1
    first_player_token = 1 if '1' in state.center else 0
    center_vector.append(first_player_token)

    # Codificar el tablero de los jugadores y las líneas de preparación
    player_boards_vector = []
    for player in state.players:
        # Codificar el tablero
        board_encoding = []
        for row in player['board']:
            row_encoding = [0 if tile == '' else tile_colors.index(tile) + 1 for tile in row]
            board_encoding.extend(row_encoding)
        player_boards_vector.extend(board_encoding)

        # Codificar las líneas de preparación
        pattern_lines_encoding = []
        for pattern_line in player['pattern_lines']:
            line_encoding = [0 if tile == '' else tile_colors.index(tile) + 1 for tile in pattern_line]
            pattern_lines_encoding.extend(line_encoding)
        player_boards_vector.extend(pattern_lines_encoding)

        # Codificar el suelo
        floor_encoding = [len(player['floor'])]  # Solo importa cuántas fichas hay, no el color
        player_boards_vector.extend(floor_encoding)

    # Codificar el jugador actual (one-hot encoding)
    current_player_vector = [1 if state.current_player == i else 0 for i in range(len(state.players))]

    # Concatenar todo en un vector de estado final
    state_vector = np.array(factories_vector + center_vector + player_boards_vector + current_player_vector, dtype=np.float32)

    return state_vector

# Codificación Acciones

In [None]:
# Parámetros del juego
num_factories = 6  # Número de fábricas (incluyendo el centro)
num_colors = 5     # Número de colores diferentes de fichas
num_rows = 6       # Número de filas de preparación + fila del suelo

# Diccionario para mapear colores a números
color_mapping = {'W': 0, 'Y': 1, 'B': 2, 'R': 3, 'K': 4}

def encode_action(factory_num, tile_color, row_num):
    """
    Codifica la acción en un único índice usando factory_num, tile_color, y row_num.
    """
    # Mapear el centro (-1) a 5 y el suelo (-1) a 5
    if factory_num == -1:
        factory_num = 5
    if row_num == -1:
        row_num = 5

    # Mapear el color usando el diccionario
    tile_color = color_mapping[tile_color]

    # Codificar la acción en un índice único
    action_index = factory_num * (num_colors * num_rows) + tile_color * num_rows + row_num
    return action_index


In [None]:
# Inverso del diccionario de colores
inverse_color_mapping = {v: k for k, v in color_mapping.items()}

def decode_action(action_index):
    """
    Decodifica el índice único en factory_num, tile_color, y row_num.
    """
    # Decodificar el índice en factory_num, tile_color y row_num
    factory_num = action_index // (num_colors * num_rows)
    remainder = action_index % (num_colors * num_rows)
    tile_color = remainder // num_rows
    row_num = remainder % num_rows

    # Restaurar los valores originales
    if factory_num == 5:
        factory_num = -1
    if row_num == 5:
        row_num = -1

    # Restaurar el color a la letra original
    tile_color = inverse_color_mapping[tile_color]

    return factory_num, tile_color, row_num


# Configuración Agente DQN

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.lr_scheduler import StepLR
import pandas as pd

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
input_dim = 115  # Dimensión del vector de estado
num_actions = 180  # Número de acciones posibles total

In [None]:
class DuelingDQN(nn.Module):
    def __init__(self, input_dim, num_actions):
        super(DuelingDQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.bn1 = nn.BatchNorm1d(128)
        self.fc2 = nn.Linear(128, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.fc3 = nn.Linear(128, 128)
        self.bn3 = nn.BatchNorm1d(128)
        self.V = nn.Linear(128, 1)
        self.A = nn.Linear(128, num_actions)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        V = self.V(x)
        A = self.A(x)
        Q = V + (A - A.mean(dim=1, keepdim=True))
        return Q


In [None]:
# Red principal y red objetivo
main_nn = DuelingDQN(input_dim, num_actions).to(device)
target_nn = DuelingDQN(input_dim, num_actions).to(device)
target_nn.load_state_dict(main_nn.state_dict())

# Función de pérdida y optimizador
optimizer = torch.optim.Adam(main_nn.parameters(), lr=1e-4, weight_decay=1e-5)
loss_fn = nn.SmoothL1Loss()  # Huber loss

# Learning rate scheduler
scheduler = StepLR(optimizer, step_size=500, gamma=0.97)
min_lr = 1e-5


# Funciones auxiliares

In [None]:
class UniformBuffer(object):
  """Experience replay buffer that samples uniformly."""

  def __init__(self, size, device):
    self._size = size
    self.buffer = []
    self.device = device
    self._next_idx = 0

  def add(self, state, action, reward, next_state, done):
    if self._next_idx >= len(self.buffer):
      self.buffer.append((state, action, reward, next_state, done))
    else:
      self.buffer[self._next_idx] = (state, action, reward, next_state, done)
    self._next_idx = (self._next_idx + 1) % self._size

  def __len__(self):
    return len(self.buffer)

  def sample(self, num_samples):
    states, actions, rewards, next_states, dones = [], [], [], [], []
    idx = np.random.choice(len(self.buffer), num_samples)
    for i in idx:
      elem = self.buffer[i]
      state, action, reward, next_state, done = elem
      states.append(np.array(state, copy=False))
      actions.append(np.array(action, copy=False))
      rewards.append(reward)
      next_states.append(np.array(next_state, copy=False))
      dones.append(done)

    states = torch.as_tensor(np.array(states), device=self.device)
    actions = torch.as_tensor(np.array(actions), device=self.device)
    rewards = torch.as_tensor(np.array(rewards, dtype=np.float32),
                              device=self.device)
    next_states = torch.as_tensor(np.array(next_states), device=self.device)
    dones = torch.as_tensor(np.array(dones, dtype=np.float32),
                            device=self.device)

    return states, actions, rewards, next_states, dones

In [None]:
import random

def select_action(state, epsilon, legal_actions):
    if random.random() < epsilon:
        return random.choice(legal_actions) # Elige una acción aleatoria de entre las acciones legales
    else:
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            q_values = main_nn(state)

        # Convertir las acciones legales a índices únicos usando encode_action
        legal_action_indices = [encode_action(*action) for action in legal_actions]

        # Selecciona la mejor acción entre las legales
        legal_q_values = {action: q_values[0, action].item() for action in legal_action_indices}

        # Buscar el índice con la mejor Q-value
        best_action_index = max(legal_q_values, key=legal_q_values.get)

        # Decodificar la acción de vuelta a (factory_num, tile_color, row_num) para devolverla
        return decode_action(best_action_index)

In [None]:
def train_step(states, actions, rewards, next_states, dones):
  """Realiza una iteración de entrenamiento en un batch de datos."""

   # Predicción del mejor Q-value para el siguiente estado usando la red principal
  next_qs_argmax = main_nn(next_states).argmax(dim=-1, keepdim=True)

  # Extraer el Q-value de la mejor acción usando la red objetivo
  masked_next_qs = target_nn(next_states).gather(1, next_qs_argmax).squeeze()

  # Valor objetivo
  target = rewards + (1.0 - dones) * discount * masked_next_qs

  # Calcula los Q-values actuales para las acciones tomadas
  masked_qs = main_nn(states).gather(1, actions.unsqueeze(dim=-1)).squeeze()

  # Calcular pérdida
  loss = loss_fn(masked_qs, target.detach())

  optimizer.zero_grad()
  loss.backward()

  # Clipping de gradientes para prevenir explosión
  torch.nn.utils.clip_grad_norm_(main_nn.parameters(), max_norm=1.0)

  optimizer.step()

  return loss

In [None]:
def soft_update(target_model, main_model, tau=0.01):
    for target_param, main_param in zip(target_model.parameters(), main_model.parameters()):
        target_param.data.copy_(tau * main_param.data + (1.0 - tau) * target_param.data)


# Retomar entrenamiento

In [None]:
# Cargar el estado guardado
checkpoint = torch.load('/content/drive/My Drive/TFE/model.pth')

# Cargar los pesos del modelo
main_nn.load_state_dict(checkpoint['model_state_dict'])
main_nn.train()

# Configurar el optimizador y cargar su estado
optimizer = torch.optim.Adam(main_nn.parameters(), weight_decay=1e-5)
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# Cargar epsilon
epsilon = checkpoint['epsilon']


# Entrenamiento

In [None]:
# Hiperparametros
num_episodes = 50000
evaluation_episodes = 20
epsilon = 1.0
epsilon_decay = 0.9998
epsilon_final = 0.01
batch_size = 64
discount = 0.9
buffer_size = 200000

In [None]:
buffer = UniformBuffer(size=buffer_size, device=device)
last_100_ep_rewards = []
losses = []
metrics = []

for episode in range(num_episodes+1):
    state = game.get_initial_state()
    game.draw_tiles(state)
    done = False
    loss = None
    ep_reward = 0

    while True:
        # Seleccionar acción
        legal_actions = game.get_legal_moves(state)
        state_vector = state_to_vector(state)
        action = select_action(state_vector, epsilon, legal_actions)

        # Obtener estado siguiente y recompensa con la accion seleccionada
        next_state, reward = game.step(state, action)
        reward /= 100
        next_state_vector = state_to_vector(next_state)
        ep_reward += reward

        # Comprobar fin de ronda
        if game.check_end_of_round(next_state):
            next_state.move_tiles_to_wall()
            done = game.check_end_of_game(next_state)
            game.draw_tiles(next_state)

        factory_num, tile_color, row_num = action
        action_index = encode_action(factory_num, tile_color, row_num)

        # Guardar la transición en el buffer
        buffer.add(state_vector, action_index, reward, next_state_vector, done)

        # Realizar el aprendizaje por batch
        if len(buffer) > batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            loss = train_step(states, actions, rewards, next_states, dones)
            losses.append(loss.item())

        # Actualizar el estado actual
        state = next_state

        if done:
            break

    if len(last_100_ep_rewards) == 100:
        last_100_ep_rewards = last_100_ep_rewards[1:]
    last_100_ep_rewards.append(ep_reward)

    # Monitorear los valores Q para el estado actual
    if episode % 50 == 0:

        avg_reward = np.mean(last_100_ep_rewards)
        avg_loss = np.mean(losses[-100:])
        loss_value = loss.item() if loss is not None else None

        # Cálculo del Q-value promedio
        state_tensor = torch.tensor(state_vector, dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            q_values = main_nn(state_tensor)
        avg_q_value = q_values.mean().item()


    # Reducir epsilon para explorar menos con el tiempo
    epsilon = max(epsilon_final, epsilon_decay * epsilon)

    # Aplicar el scheduler y controlar el learning rate hasta llegar al límite
    scheduler.step()

    # Verifica si el learning rate actual es menor que el mínimo permitido
    current_lr = optimizer.param_groups[0]['lr']
    if current_lr < min_lr:
        # Si es menor, restablece el learning rate al valor mínimo
        for param_group in optimizer.param_groups:
            param_group['lr'] = min_lr


    soft_update(target_nn, main_nn, tau=0.01)


    # Guardar los pesos del modelo principal
    if episode % 200 == 0:
        checkpoint = {
          'episode': episode,  # Guardar el número de episodio actual
          'model_state_dict': main_nn.state_dict(),  # Pesos del modelo
          'optimizer_state_dict': optimizer.state_dict(),  # Estado del optimizador
          'scheduler_state_dict': scheduler.state_dict(),
          'memory' : buffer,
          'epsilon': epsilon
        }
        torch.save(checkpoint, '/content/drive/My Drive/TFE/model.pth')


    # Evaluación contra un jugador aleatorio
    if episode % 100 == 0:
        total_reward = 0
        total_s1 = 0
        total_s2 = 0
        total_duration = 0

        for eval_episode in range(evaluation_episodes):
            random_state = game.get_initial_state()
            game.draw_tiles(random_state)
            done = False
            random_reward = 0
            DQN_reward = 0
            actions = 0

            while True:
                if random_state.current_player == 0:
                    legal_actions = game.get_legal_moves(random_state)
                    action = select_action(state_to_vector(random_state), 0, legal_actions)
                    next_state, reward = game.step(random_state, action)
                    DQN_reward += reward
                    actions += 1
                else:
                    action = game.random_player(random_state)
                    next_state, reward = game.step(random_state, action)
                    random_reward += reward

                if game.check_end_of_round(next_state):
                    next_state.move_tiles_to_wall()
                    game.draw_tiles(next_state)

                if game.check_end_of_game(next_state):
                    done = True

                random_state = next_state

                if done:
                    break

            punt1 = random_state.players[0]['score']
            punt2 = random_state.players[1]['score']

            total_s1 += punt1
            total_s2 += punt2
            total_reward += DQN_reward
            total_duration += actions

        # Cálculos de promedios
        average_bounty = total_reward / evaluation_episodes
        average_s1 = total_s1 / evaluation_episodes
        average_s2 = total_s2 / evaluation_episodes
        average_duration = total_duration / evaluation_episodes

        # Mostrar los resultados en la consola
        print(f"Episodio {episode}: Recompensa Promedio = {average_bounty:.4f}, Puntuación Promedio = {average_s1}, Punt random: {average_s2}, Acciones/partida = {average_duration}, lr: {current_lr}, Pérdida Promedio = {avg_loss}")




# Evaluación modelo

In [None]:
checkpoint = torch.load('/content/drive/My Drive/TFE/model.pth')

# Cargar los pesos del modelo
main_nn.load_state_dict(checkpoint['model_state_dict'])
main_nn.eval()


In [None]:
import csv

num_episodes = 1000  # Número de episodios (partidas) para evaluar
total_score = 0
total_s1 = 0
total_s2 = 0

for episode in range(num_episodes):
    state = game.get_initial_state()
    game.draw_tiles(state)
    state_vector = state_to_vector(state)
    done = False
    episode_score = 0

    while True:
        #state.display_state()
        if state.current_player == 0:
            legal_actions = game.get_legal_moves(state)
            action = select_action(state_vector, epsilon=0, legal_actions=legal_actions)  # epsilon=0 para evaluación
            next_state, reward = game.step(state, action)  # Realizar la acción
            episode_score += reward

        else:
            action = game.random_player(state)
            next_state, reward = game.step(state, action)

        # Actualizar el estado
        state_vector = state_to_vector(next_state)

        if game.check_end_of_round(state):
            state.move_tiles_to_wall()
            game.draw_tiles(state)

        if game.check_end_of_game(next_state):
                    done = True

        state = next_state

        if done:
            break

    punt1 = state.players[0]['score']
    punt2 = state.players[1]['score']
    total_score += episode_score
    total_s1 += punt1
    total_s2 += punt2


    print(f"Partida {episode + 1}: Recompensa del agente = {episode_score}")
    print(f"Puntuacion 1: {punt1}, Puntuacion 2: {punt2}")
    input("Presiona Enter para continuar...")

# Calcular la puntuación promedio
average_score = total_score / num_episodes
average_s1 = total_s1 / num_episodes
average_s2 = total_s2 / num_episodes
average = (average_s1 + average_s2) / 2
print(f"Puntuación promedio del agente en {num_episodes} partidas: p1: {average_s1}, p2: {average_s2}, Recompensa: {average_score}")