In [None]:
# IMPORTAÇÕES
# ==========================================

# Bibliotecas para manipulação do sistema e ambiente de aprendizado por reforço
import os

# Bibliotecas para operações matemáticas e aleatoriedade
import numpy as np
import random
import logging  # Para registro de mensagens e depuração
import itertools  # Para geração de combinações e permutações
import statistics
import cv2
import csv
import math
import heapq
import json
from collections import deque



# Biblioteca para manipulações de DataFrame
import pandas as pd

# Biblioteca para operações de Deep Learning com TensorFlow
import tensorflow as tf
from tensorflow.keras.layers import Layer, Conv2D, Flatten, Dense, InputLayer, MultiHeadAttention, LayerNormalization, Reshape, MultiHeadAttention, GlobalAveragePooling2D, TimeDistributed, MaxPooling2D, BatchNormalization, Dropout
from tensorflow.keras.optimizers import Adam  # Otimizador para ajuste dos pesos da rede neural
from tensorflow.keras.losses import MeanSquaredError, Huber
from tensorflow.keras.regularizers import l2  # Regularização L2 para controle de overfitting
from tensorflow.keras.models import load_model, Model  # Carregar modelos pré-treinados, 



from tensorflow.keras.layers import (InputLayer, Conv2D, BatchNormalization, Flatten, 
                                     Dense, Dropout, Activation, Concatenate, Multiply, Layer)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError, Huber
import tensorflow.keras.backend as K



# Verificar dispositivos físicos disponíveis (por exemplo, GPU)
#device_lib.list_local_devices()
tf.config.list_physical_devices('GPU')



### CLASS
--------------------------------------

In [2]:
#Class: Agents
# ==========================================

class Agent:
    def __init__(self, x, y, env, breed, channel):
        self.x = x
        self.y = y
        self.env = env
        self.breed = breed
        self.channel = channel
        self.is_alive = True
        self.is_done = False
        self.current_target = None
        self.current_ally = None
        self.last_coord = self.x, self.y
        self.kid = False

        # Category         | Breed | Color  |     Channel      | Color - Att | Channel - Att
        # ---------------------------------------------------------------------------
        # Predator         |   0   | Red    |    [1, 0, 0]     |   Yellow     | [1, 1, 0]
        # Vegetation       |   1   | Green  |    [0, 1, 0]     |  --------    | ---------
        # Prey             |   2   | Blue   |    [0, 0, 1]     |   Cyan       | [0, 1, 1]
        # Outside Grid     |   -   | White  |    [1, 1, 1]     |  --------    | ---------
        # Empty Grid Cell  |   -   | Black  |    [0, 0, 0]     |  --------    | ---------
        # Reserved Training|   -   | Gray   | [0.5, 0.5, 0.5]  |  --------    | ---------

    def reset(self, env):
        self.x, self.y = env.new_position()
        self.is_alive = True
        self.target = None
        self.is_done = False
        self.current_target = None
        self.current_ally = None
        
        # Restaurar o `channel` para a cor padrão com base no `breed`
        if self.breed == 0:  # Predador
            self.channel = [1.0, 0.0, 0.0]
        elif self.breed == 2:  # Presa
            self.channel = [0.0, 0.0, 1.0]

    def step(self, action):
        penalty = self.env.move_agent(self, action)
        done, reward, ler = self.check_goal()
        state = self.env.render_agent(self)

        return state, reward + penalty, done, ler
    
class Prey(Agent):
    def __init__(self, x, y, env, id, nn = None, is_on = True):
        super().__init__(x, y, env, breed=2, channel=[0.0, 0.0, 1.0])
        self.name = f"Prey_{id}"
        self.in_danger = False
        self.is_on = is_on
        self.model_nn = nn

    def check_goal(self):
        done = False
        reward = 5.0  # Recompensa padrão para incentivar a exploração segura.
        feedback = ""
        target_detected = False
        ally_detected = False

        closest_target_distance = float('inf')
        closest_ally_distance = float('inf')

        # Procura por agente vivo, presa ou predador, mais próxima ou mantém o foco na atual.
        for agent in self.env.agents:
            # Verifica se é predador
            if agent.breed == 0 and agent.is_alive:
                distance = self.env.chebyshev_distance(self.x, self.y, agent.x, agent.y)
                if distance <= self.env.ray and (self.current_target is None or distance < closest_target_distance):
                    closest_target_distance = distance
                    self.current_target = agent
                    target_detected = True

            # Verifica se é presa com informação de predador
            elif agent.breed == 2 and agent.is_alive and agent.channel == [0, 1, 1]:
                distance = self.env.chebyshev_distance(self.x, self.y, agent.x, agent.y)
                if distance <= self.env.ray and (self.current_ally is None or distance < closest_ally_distance):
                    closest_ally_distance = distance
                    self.current_ally = agent
                    ally_detected = True

        
        # Verificação de fuga do predador
        if self.in_danger and closest_target_distance > 3:
            done = True
            self.in_danger = False
            reward += 20.0  # Recompensa máxima por escapar do perigo
            feedback = f"[PREY]: Evasão do alvo"
        
        # Critérios de Recompensa/Penalidade
        elif target_detected and 0 < closest_target_distance <= 3:
            # Penalidade por proximidade com predador
            self.in_danger = True
            if closest_target_distance == 3:
                reward -= 1.0
            elif closest_target_distance == 2:
                reward -= 3.0
            elif closest_target_distance == 1:
                reward -= 5.0
            feedback = f"[PREY]: Proximidade predador: {closest_target_distance}"

        elif ally_detected and not target_detected and 0 < closest_ally_distance <= 3:
            # Recompensa por proximidade com aliado (se não houver predador no campo de visão)
            if closest_ally_distance == 3:
                reward -= 0.1
            elif closest_ally_distance == 2:
                reward -= 0.3
            elif closest_ally_distance == 1:
                reward -= 0.5
            feedback += f"[PREY]: Proximidade aliado: {closest_ally_distance}"

        else:
            feedback += "[PREY]: Explorando mapa"


        # Atualiza channel
        for agent in self.env.agents:
            if agent.is_alive and agent.breed == 0:
                distance = self.env.chebyshev_distance(self.x, self.y, agent.x, agent.y)
                if distance <= self.env.ray:
                    self.channel = [0.0, 1.0, 1.0]
                    break
                else:
                    self.channel = [0.0, 0.0, 1.0]

        return done, reward, feedback

class Predator(Agent):
    def __init__(self, x, y, env, id, nn = None, is_on = True):
        super().__init__(x, y, env, breed=0, channel=[1.0, 0.0, 0.0])
        self.name = f"Predador_{id}"
        self.last_hunt = None
        self.is_on = is_on

    def check_goal(self):
        done = False
        reward = -0.05  # Penalidade leve para incentivar movimentação
        feedback = ""
        target_detected = False
        ally_detected = False

        closest_target_distance = float('inf')
        closest_ally_distance = float('inf')

        # Procura por agente vivo, presa ou predador, mais próxima ou mantém o foco na atual.
        for agent in self.env.agents:
            # Verifica se é presa
            if agent.is_alive and agent.breed == 2:
                distance = self.env.chebyshev_distance(self.x, self.y, agent.x, agent.y)
                if distance <= self.env.ray and (self.current_target is None or distance < closest_target_distance):
                    closest_target_distance = distance
                    self.current_target = agent
                    target_detected = True

            # Verifica se é predador com informação de presa
            elif agent.breed == 0 and agent.is_alive and agent.channel == [1.0, 1.0, 0.0]:
                distance = self.env.chebyshev_distance(self.x, self.y, agent.x, agent.y)
                if distance <= self.env.ray and (self.current_ally is None or distance < closest_ally_distance):
                    closest_ally_distance = distance
                    self.current_ally = agent
                    ally_detected = True

        # Critérios de Recompensa
        if target_detected and closest_target_distance == 0:
            done = True
            reward += 20.0  # Recompensa máxima por capturar a presa
            self.current_target.is_alive = False
            feedback = f"[PREDATOR]: Alvo capturado"
            # remover da lista esta na classe de treinamento
            self.current_target = None

        elif target_detected and 0 < closest_target_distance <= 3:
            # Recompensa por proximidade com a presa
            if closest_target_distance == 3:
                reward += 0.5
            elif closest_target_distance == 2:
                reward += 1.0
            elif closest_target_distance == 1:
                reward += 2.0
            feedback = f"[PREDATOR]: Proximidade presa: {closest_target_distance}"

        elif ally_detected and 0 < closest_ally_distance <= 3:
            # Recompensa por proximidade com aliado (que tem informação de presa)
            if closest_ally_distance == 3:
                reward += 0.1
            elif closest_ally_distance == 2:
                reward += 0.3
            elif closest_ally_distance == 1:
                reward += 0.5
            feedback = f"[PREDATOR]: Proximidade aliado: {closest_ally_distance}"

        else:
            feedback += "[PREDATOR]: Explorando mapa"

        # Atualiza channel
        for agent in self.env.agents:
            if agent.is_alive and agent.breed == 2:
                distance = self.env.chebyshev_distance(self.x, self.y, agent.x, agent.y)
                if distance <= self.env.ray:
                    self.channel = [1.0, 1.0, 0.0]
                    break
                else:
                    self.channel = [1.0, 0.0, 0.0]

        return done, reward, feedback

In [3]:
#Class: Obstacle
# ==========================================

class Obstacle:
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.channel = [0.0, 1.0, 0.0]  # Cor verde para o obstáculo (RGB)

    def reset(self, env):
        self.channel = [0.0, 1.0, 0.0]

        # Escolhe uma posição aleatória no grid que esteja livre
        position = env.new_position()
        if position:
            self.x, self.y = position
        else:
            raise ValueError("Não foi possível encontrar uma posição livre para o obstáculo.")

In [4]:
#Class: Environment
# ==========================================

class Env:
    def __init__(self, sizeX, sizeY, ray=3):
        self.sizeX = sizeX
        self.sizeY = sizeY
        self.ray = ray
        self.agents = []
        self.objects = []
        self.obstacles = []  # Lista separada para obstáculos
        self.actions = 9  # Número de ações possíveis


    def new_position(self):
        # Cria uma lista de todas as posições possíveis.
        iterables = [range(self.sizeX), range(self.sizeY)]
        points = list(itertools.product(*iterables))

        # Cria uma lista das posições atuais ocupadas pelos agentes e obstáculos.
        current_positions = [(agent.x, agent.y) for agent in self.agents]
        current_positions += [(obstacle.x, obstacle.y) for obstacle in self.obstacles]

        # Filtra as posições possíveis, removendo as ocupadas.
        available_points = [point for point in points if point not in current_positions]

        # Escolhe aleatoriamente uma das posições disponíveis.
        if available_points:
            return random.choice(available_points)
        else:
            # Loga a mensagem indicando que não há posições disponíveis.
            logging.info("Não foi possível encontrar uma nova posição disponível.")
            return None

    def add_obstacle(self, obstacle):
        # Define uma posição disponível para o obstáculo
        position = self.new_position()
        if position:
            obstacle.x, obstacle.y = position  # Atribui as coordenadas ao obstáculo
            self.objects.append(obstacle)
        else:
            logging.info("Não foi possível adicionar um novo obstáculo: nenhuma posição disponível.")

    def add_agent(self, agent):
        # Define uma posição disponível para o agente
        position = self.new_position()
        if position:
            agent.x, agent.y = position  # Atribui as coordenadas ao agente
            self.objects.append(agent)
        else:
            logging.info("Não foi possível adicionar um novo agente: nenhuma posição disponível.")

    def reset(self):
        # Verifica se a lista de objetos está vazia
        if not self.objects:
            logging.info("Não é possível prosseguir: nenhum objeto foi adicionado ao ambiente.")
            return  # Interrompe o método se `self.objects` estiver vazio

        # Limpa as listas de agentes e obstáculos para reiniciar o ambiente
        self.agents = []
        self.obstacles = []

        # Embaralha a lista de objetos para variar a ordem dos elementos no ambiente
        random.shuffle(self.objects)

        # Adiciona os agentes e obstáculos à lista apropriada e os reseta
        for item in self.objects:
            item.reset(self)  # Reposiciona cada objeto no ambiente
            if isinstance(item, Agent):  # Considerando uma classe Agent
                self.agents.append(item)
            elif isinstance(item, Obstacle):  # Considerando uma classe Obstacle
                self.obstacles.append(item)

    def is_position_empty_and_valid(self, x, y):
        # Verifica se a posição está dentro dos limites do ambiente
        if x < 0 or x >= self.sizeX or y < 0 or y >= self.sizeY:
            return False  # A posição está fora dos limites do ambiente

        # Verifica se a posição está ocupada por algum agente
        for agent in self.agents:
            if agent.x == x and agent.y == y:
                return False  # A posição está ocupada

        # Verifica se a posição está ocupada por algum obstáculo
        for obstacle in self.obstacles:
            if obstacle.x == x and obstacle.y == y:
                return False  # A posição está ocupada por um obstáculo

    def get_agent_at_position(self, x, y):
        for agent in self.agents:
            if agent.x == x and agent.y == y:
                return agent
        return None

    def move_agent(self, agent, action):
        # Inicializa a penalidade padrão e os valores de penalidade
        ZERO = 0.0
        PENALIZE = -10.0
        direction = action

        # Inicializa os incrementos de movimento
        new_x, new_y = 0, 0

        # Define os incrementos de movimento com base na direção
        if direction == 0:  # Para cima
            new_y = -1
        elif direction == 1:  # Para cima e direita (diagonal)
            new_x = 1
            new_y = -1
        elif direction == 2:  # Para direita
            new_x = 1
        elif direction == 3:  # Para baixo e direita (diagonal)
            new_x = 1
            new_y = 1
        elif direction == 4:  # Para baixo
            new_y = 1
        elif direction == 5:  # Para baixo e esquerda (diagonal)
            new_x = -1
            new_y = 1
        elif direction == 6:  # Para esquerda
            new_x = -1
        elif direction == 7:  # Para cima e esquerda (diagonal)
            new_x = -1
            new_y = -1
        elif direction == 8:  # Ficar parado
            new_x = 0
            new_y = 0

        # Calcula a nova posição absoluta do agente
        target_x = agent.x + new_x
        target_y = agent.y + new_y

        # Verifica se a nova posição contém um obstáculo
        if any(obstacle.x == target_x and obstacle.y == target_y for obstacle in self.obstacles):
            print("Agente tentou ocupar um obstáculo!")
            return PENALIZE  # Penalidade por tentar ocupar a posição de um obstáculo

        # Verifica se o movimento está dentro dos limites do ambiente
        if target_x < 0 or target_x >= self.sizeX or target_y < 0 or target_y >= self.sizeY:
            print("Agente fora do limite!")
            return PENALIZE

        # Verifica se a nova posição contém outro agente
        other_agent = self.get_agent_at_position(target_x, target_y)
        if other_agent:
            # Lógica para a presa
            if agent.breed == 2:  # Presa
                print("Presa não pode ocupar a posição de outro agente!")
                return PENALIZE  # Penalidade por tentar ocupar a posição de outro agente

            # Lógica para o predador
            elif agent.breed == 0:  # Predador
                if other_agent.breed == 0:  # Outro predador na posição
                    print("Predador não pode ocupar a posição de outro predador!")
                    return PENALIZE  # Penalidade por tentar ocupar a posição de outro predador

        # Atualiza a posição do agente se o movimento for válido
        agent.x, agent.y = target_x, target_y
        return ZERO

    def render_env(self):
        a = np.zeros([self.sizeY, self.sizeX, 3])

        for agent in self.agents:
            if agent.x is not None and agent.y is not None:
                a[agent.y, agent.x, :] = agent.channel

        for obstacle in self.obstacles:
            a[obstacle.y, obstacle.x, :] = obstacle.channel  # Renderiza obstáculos

        return a

    def render_agent(self, agent):
        # Renderiza o ambiente para obter a matriz RGB atual
        a = self.render_env()

        # Calcula o tamanho do recorte com base em self.ray
        recorte_tamanho = 2 * self.ray + 1

        # Inicializa o recorte temporário com cor amarela
        recorte_temp = np.ones((recorte_tamanho, recorte_tamanho, 3)) * np.array([1.0, 1.0, 1.0])  # fora do Grid - GRAY

        # Calcula as coordenadas do recorte dentro do ambiente
        inicio_x = agent.x - self.ray
        inicio_y = agent.y - self.ray
        fim_x = inicio_x + recorte_tamanho
        fim_y = inicio_y + recorte_tamanho

        # Calcula os limites de sobreposição entre o recorte e o ambiente
        sobreposicao_inicio_x = max(inicio_x, 0)
        sobreposicao_inicio_y = max(inicio_y, 0)
        sobreposicao_fim_x = min(fim_x, self.sizeX)
        sobreposicao_fim_y = min(fim_y, self.sizeY)

        # Calcula os índices de destino no recorte temporário
        destino_inicio_x = sobreposicao_inicio_x - inicio_x
        destino_inicio_y = sobreposicao_inicio_y - inicio_y
        destino_fim_x = destino_inicio_x + sobreposicao_fim_x - sobreposicao_inicio_x
        destino_fim_y = destino_inicio_y + sobreposicao_fim_y - sobreposicao_inicio_y

        # Copia a sobreposição do ambiente para o recorte temporário
        recorte_temp[destino_inicio_y:destino_fim_y, destino_inicio_x:destino_fim_x] = \
            a[sobreposicao_inicio_y:sobreposicao_fim_y, sobreposicao_inicio_x:sobreposicao_fim_x]

        # Pinta o elemento central do recorte de branco, ajustando a posição baseada em self.ray
        centro = self.ray
        recorte_temp[centro, centro, :] = np.array([0.5, 0.5, 0.5])  # Cinza - Destaque de Agente em treinamento

        return recorte_temp

    def population_count(self):
        """Retorna a quantidade de presas, predadores e obstáculos no ambiente."""
        predator_count = 0
        prey_count = 0

        for agent in self.agents:
            if agent.is_alive is True:
                if agent.breed == 0:
                    predator_count += 1
                elif agent.breed == 2:
                    prey_count += 1

        obstacle_count = len(self.obstacles)  # Conta o total de obstáculos na lista `self.obstacles`
        return prey_count, predator_count, obstacle_count

    def remove_agent(self, agent):
        self.agents.remove(agent)

    @staticmethod
    def chebyshev_distance(x1, y1, x2, y2):
        distance = max(abs(x2 - x1), abs(y2 - y1))
        return distance


In [5]:
#Class: PrioritizedReplayBuffer
# ==========================================


class PrioritizedReplayBuffer:
    def __init__(self, capacity, alpha=0.6):
        self.capacity = capacity
        self.alpha = alpha
        self.buffer = []
        self.priorities = []
        self.pos = 0

    def add(self, transition, td_error):
        """Adiciona uma transição ao buffer com prioridade baseada no erro TD."""
        max_priority = max(self.priorities, default=1.0)  # Prioridade máxima inicial
        if len(self.buffer) < self.capacity:
            self.buffer.append(transition)
            self.priorities.append(max_priority)
        else:
            self.buffer[self.pos] = transition
            self.priorities[self.pos] = max_priority
        self.pos = (self.pos + 1) % self.capacity

    def sample(self, batch_size, beta=0.4):
        """Amostra um minibatch com base nas prioridades."""
        priorities = np.array(self.priorities)
        probabilities = priorities ** self.alpha
        probabilities /= probabilities.sum()

        indices = np.random.choice(len(self.buffer), batch_size, p=probabilities)
        samples = [self.buffer[i] for i in indices]

        # Calcular pesos de importância
        weights = (len(self.buffer) * probabilities[indices]) ** (-beta)
        weights /= weights.max()

        return samples, indices, weights

    def update_priorities(self, indices, td_errors):
        """Atualiza as prioridades com base nos novos erros TD."""
        for i, td_error in zip(indices, td_errors):
            self.priorities[i] = abs(td_error) + 1e-5  # Adiciona pequeno valor para estabilidade numérica


### MODELS
---------------------------------------------------------

In [None]:
# Support
# ==========================================

lr = 0.0001
l2_regularization = 0.01

In [6]:
# Class - RADAR
# ==========================================

class ColorCombDepthwiseConv2D(Layer):
    def __init__(self, kernel_size=(7, 7), activation='relu', padding='same', **kwargs):
        super(ColorCombDepthwiseConv2D, self).__init__(**kwargs)
        self.kernel_size = kernel_size
        self.activation = activation
        self.padding = padding

        # Convoluções individuais para cores puras
        self.conv_r = Conv2D(1, kernel_size=self.kernel_size, padding=self.padding, activation=None, name="conv_r")
        self.bn_r = BatchNormalization(name="bn_r")
        
        self.conv_g = Conv2D(1, kernel_size=self.kernel_size, padding=self.padding, activation=None, name="conv_g")
        self.bn_g = BatchNormalization(name="bn_g")
        
        self.conv_b = Conv2D(1, kernel_size=self.kernel_size, padding=self.padding, activation=None, name="conv_b")
        self.bn_b = BatchNormalization(name="bn_b")

        # Convoluções para combinações específicas (Magenta, Ciano, Amarelo)
        self.conv_magenta = Conv2D(1, kernel_size=self.kernel_size, padding=self.padding, activation=None, name="conv_magenta")
        self.bn_magenta = BatchNormalization(name="bn_magenta")
        
        self.conv_cyan = Conv2D(1, kernel_size=self.kernel_size, padding=self.padding, activation=None, name="conv_cyan")
        self.bn_cyan = BatchNormalization(name="bn_cyan")
        
        self.conv_yellow = Conv2D(1, kernel_size=self.kernel_size, padding=self.padding, activation=None, name="conv_yellow")
        self.bn_yellow = BatchNormalization(name="bn_yellow")

    def call(self, inputs, training=False):
        r, g, b = tf.split(inputs, num_or_size_splits=3, axis=-1)

        # Convoluções individuais com BatchNormalization
        r_out = self.bn_r(self.conv_r(r), training=training)
        g_out = self.bn_g(self.conv_g(g), training=training)
        b_out = self.bn_b(self.conv_b(b), training=training)

        # Combinações de canais com BatchNormalization
        magenta_out = self.bn_magenta(self.conv_magenta(r + b), training=training)
        cyan_out = self.bn_cyan(self.conv_cyan(g + b), training=training)
        yellow_out = self.bn_yellow(self.conv_yellow(r + g), training=training)

        # Concatenando saídas
        outputs = Concatenate(axis=-1, name="concat_colors")([r_out, g_out, b_out, magenta_out, cyan_out, yellow_out])

        # Ativação
        outputs = Activation(self.activation, name="activation_colors")(outputs)
        return outputs


class SpatialAttentionModule(Layer):
    def __init__(self, kernel_size=4):
        super(SpatialAttentionModule, self).__init__()
        self.conv = Conv2D(1, kernel_size=kernel_size, padding='same', activation=None, name="attention_conv")
    
    def call(self, inputs):
        avg_pool = tf.reduce_mean(inputs, axis=-1, keepdims=True)
        squared_inputs = K.square(inputs)
        l2_pool = tf.sqrt(tf.reduce_mean(squared_inputs, axis=-1, keepdims=True) + 1e-6)

        concat = Concatenate(axis=-1, name="concat_attention")([avg_pool, l2_pool])
        attention_map = self.conv(concat)
        attention_map = Activation('sigmoid', name="sigmoid_attention")(attention_map)
        return Multiply(name="apply_attention")([inputs, attention_map])




In [None]:
# Model: nn_dqn
# ==========================================

class nn_dqn(tf.keras.Model):
    def __init__(self, num_actions=9, input_shape=(7, 7, 3)):
        super(nn_dqn, self).__init__()
        self.num_actions = num_actions
        self.optimizer = Adam(learning_rate=lr)
        self.loss_fn = MeanSquaredError()

        # Entrada e camadas convolucionais
        self.input_layer = InputLayer(input_shape=input_shape)

        # Camadas convolucionais
        self.conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding='same', name="conv1_layer")
        self.bn1 = BatchNormalization(name="bn1_layer")
        self.dropout_conv1 = Dropout(0.3, name="dropout_conv1_layer")  # Dropout após conv1

        self.conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding='same', name="conv2_layer")
        self.bn2 = BatchNormalization(name="bn2_layer")
        self.dropout_conv2 = Dropout(0.3, name="dropout_conv2_layer")  # Dropout após conv2

        # Flatten e camadas densas
        self.flatten = Flatten(name="flatten_layer")
        self.dropout_flatten = Dropout(0.4, name="dropout_flatten_layer")  # Dropout após Flatten

        self.dense1 = Dense(64, activation='relu', name="dense1_layer", kernel_regularizer=l2(l2_regularization))
        self.dropout_dense1 = Dropout(0.5, name="dropout_dense1_layer")  # Dropout após dense1
        
        self.dense2 = Dense(32, activation='relu', name="dense2_layer", kernel_regularizer=l2(l2_regularization))
        self.dropout_dense2 = Dropout(0.5, name="dropout_dense2_layer")  # Dropout após dense2

        self.dense_output = Dense(num_actions, activation='linear', name="dense_output_layer", kernel_regularizer=l2(l2_regularization))

    def call(self, inputs, training=False):
        x = self.input_layer(inputs)

        # Camada convolucional 1
        x = self.conv1(x)
        x = self.bn1(x, training=training)
        x = tf.nn.relu(x, name="relu1")
        x = self.dropout_conv1(x, training=training)  # Dropout aplicado após conv1

        # Camada convolucional 2
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = tf.nn.relu(x, name="relu2")
        x = self.dropout_conv2(x, training=training)  # Dropout aplicado após conv2

        # Atenção espacial e Flatten
        x = self.flatten(x)
        x = self.dropout_flatten(x, training=training)  # Dropout aplicado após Flatten

        # Camadas densas
        x = self.dense1(x)
        x = self.dropout_dense1(x, training=training)  # Dropout aplicado após dense1
        x = self.dense2(x)
        x = self.dropout_dense2(x, training=training)  # Dropout aplicado após dense2

        Q_values = self.dense_output(x)
        return Q_values


    def training_step(self, batch_data):
        states, actions, targetQ = batch_data
        with tf.GradientTape() as tape:
            Q_values = self(states, training=True)
            actions_onehot = tf.one_hot(actions, self.num_actions, dtype=tf.float32)
            Q = tf.reduce_sum(Q_values * actions_onehot, axis=1)
            loss = self.loss_fn(targetQ, Q)
        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        return loss

    def predict_action(self, state):
        Q_values = self(state)
        return tf.argmax(Q_values, axis=1)[0].numpy()  # Retorna a ação com o maior valor Q como um número Python

    def save_model(self, file_path):
        self.save(file_path)
        print(f"Modelo salvo em: {file_path}")

    def load_model(self, file_path):
        model_loaded = tf.keras.models.load_model(file_path)
        return model_loaded
    


In [8]:
# Model: nn_per
# ==========================================

class nn_per(tf.keras.Model):
    def __init__(self, num_actions=9, input_shape=(7, 7, 3), discount_factor=0.99):
        super(nn_per, self).__init__()
        self.num_actions = num_actions
        self.discount_factor = discount_factor
        self.optimizer = Adam(learning_rate=lr)
        self.loss_fn = MeanSquaredError()

        # Entrada e camada de combinação de cores
        self.input_layer = InputLayer(input_shape=input_shape, name="input_layer")

        # Camadas convolucionais
        self.conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding='same', name="conv1_layer")
        self.bn1 = BatchNormalization(name="bn1_layer")
        self.dropout_conv1 = Dropout(0.3, name="dropout_conv1_layer")

        self.conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding='same', name="conv2_layer")
        self.bn2 = BatchNormalization(name="bn2_layer")
        self.dropout_conv2 = Dropout(0.3, name="dropout_conv2_layer")

        # Flatten e camadas densas
        self.flatten = Flatten(name="flatten_layer")
        self.dropout_flatten = Dropout(0.3, name="dropout_flatten_layer")

        self.dense1 = Dense(64, activation='relu', name="dense1_layer", kernel_regularizer=l2(l2_regularization))
        self.dropout_dense1 = Dropout(0.4, name="dropout_dense1_layer")

        self.dense2 = Dense(32, activation='relu', name="dense2_layer", kernel_regularizer=l2(l2_regularization))
        self.dropout_dense2 = Dropout(0.4, name="dropout_dense2_layer")

        self.dense_output = Dense(num_actions, activation='linear', name="dense_output_layer", kernel_regularizer=l2(l2_regularization))

    def call(self, inputs, training=False):
        x = self.input_layer(inputs)

        x = self.conv1(x)
        x = self.bn1(x, training=training)
        x = tf.nn.relu(x, name="relu1")
        x = self.dropout_conv1(x, training=training)  # Aplicação do Dropout após conv1

        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = tf.nn.relu(x, name="relu2")
        x = self.dropout_conv2(x, training=training)  # Aplicação do Dropout após conv2

        x = self.flatten(x)
        x = self.dropout_flatten(x, training=training)  # Aplicação do Dropout após Flatten

        x = self.dense1(x)
        x = self.dropout_dense1(x, training=training)  # Aplicação do Dropout após dense1
        x = self.dense2(x)
        x = self.dropout_dense2(x, training=training)  # Aplicação do Dropout após dense2

        Q_values = self.dense_output(x)
        return Q_values
    

    def training_step(self, batch_data):
        states, actions, targetQ, weights = batch_data
        with tf.GradientTape() as tape:
            Q_values = self(states, training=True)
            actions_onehot = tf.one_hot(actions, self.num_actions, dtype=tf.float32)
            Q = tf.reduce_sum(Q_values * actions_onehot, axis=1)
            td_errors = targetQ - Q
            weighted_loss = tf.reduce_mean(weights * tf.square(td_errors))

        grads = tape.gradient(weighted_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))

        return weighted_loss  # Apenas a perda


    def predict_action(self, state):
        Q_values = self(state)
        return tf.argmax(Q_values, axis=1)[0].numpy()  # Retorna a ação com o maior valor Q como um número Python

    def save_model(self, file_path):
        self.save(file_path)
        print(f"Modelo salvo em: {file_path}")

    def load_model(self, file_path):
        model_loaded = tf.keras.models.load_model(file_path)
        return model_loaded




In [None]:
# Model: nn_dueling
# ==========================================

class nn_dueling(tf.keras.Model):
    def __init__(self, num_actions=9, input_shape=(7, 7, 3)):
        super(nn_dueling, self).__init__()
        self.num_actions = num_actions
        self.optimizer = Adam(learning_rate=lr)
        self.loss_fn = MeanSquaredError()
        
        # Entrada e camada de combinação de cores
        self.input_layer = InputLayer(input_shape=input_shape, name="input_layer")

        # Camadas convolucionais
        self.conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding='same', name="conv1_layer")
        self.bn1 = BatchNormalization(name="bn1_layer")
        self.dropout_conv1 = Dropout(0.3, name="dropout_conv1_layer")  # Dropout na primeira camada convolucional

        self.conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding='same', name="conv2_layer")
        self.bn2 = BatchNormalization(name="bn2_layer")
        self.dropout_conv2 = Dropout(0.3, name="dropout_conv2_layer")  # Dropout na segunda camada convolucional


        # Flatten
        self.flatten = Flatten(name="flatten_layer")
        self.dropout_flatten = Dropout(0.4, name="dropout_flatten_layer")  # Dropout após Flatten

        # Camadas densas compartilhadas
        self.dense_shared1 = Dense(64, activation='relu', name="shared_dense1", kernel_regularizer=l2(l2_regularization))
        self.dropout_shared = Dropout(0.4, name="dropout_shared_layer")  # Dropout na camada compartilhada

        # Rede para Valor (V)
        self.value_dense = Dense(32, activation='relu', name="value_dense", kernel_regularizer=l2(l2_regularization))
        self.dropout_value = Dropout(0.4, name="dropout_value_layer")  # Dropout na rede de valor
        self.value_output = Dense(1, activation='linear', name="value_output", kernel_regularizer=l2(l2_regularization))

        # Rede para Vantagem (A)
        self.advantage_dense = Dense(32, activation='relu', name="advantage_dense", kernel_regularizer=l2(l2_regularization))
        self.dropout_advantage = Dropout(0.4, name="dropout_advantage_layer")  # Dropout na rede de vantagem
        self.advantage_output = Dense(num_actions, activation='linear', name="advantage_output", kernel_regularizer=l2(l2_regularization))

    def call(self, inputs, training=False):
        x = self.input_layer(inputs)

        # Camada convolucional 1
        x = self.conv1(x)
        x = self.bn1(x, training=training)
        x = tf.nn.relu(x, name="relu1")
        x = self.dropout_conv1(x, training=training)  # Dropout aplicado

        # Camada convolucional 2
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = tf.nn.relu(x, name="relu2")
        x = self.dropout_conv2(x, training=training)  # Dropout aplicado
        

        x = self.flatten(x)
        x = self.dropout_flatten(x, training=training)  # Dropout aplicado após Flatten

        # Camadas compartilhadas
        x = self.dense_shared1(x)
        x = self.dropout_shared(x, training=training)  # Dropout aplicado na camada compartilhada

        # Valor (V)
        v = self.value_dense(x)
        v = self.dropout_value(v, training=training)  # Dropout aplicado na rede de valor
        v = self.value_output(v)

        # Vantagem (A)
        a = self.advantage_dense(x)
        a = self.dropout_advantage(a, training=training)  # Dropout aplicado na rede de vantagem
        a = self.advantage_output(a)

        # Combina V e A para calcular Q
        q = v + (a - tf.reduce_mean(a, axis=1, keepdims=True))
        return q
    
    def training_step(self, batch_data):
        states, actions, targetQ = batch_data
        with tf.GradientTape() as tape:
            Q_values = self(states, training=True)
            actions_onehot = tf.one_hot(actions, self.num_actions, dtype=tf.float32)
            Q = tf.reduce_sum(Q_values * actions_onehot, axis=1)
            loss = self.loss_fn(targetQ, Q)
        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        return loss

    def predict_action(self, state):
        Q_values = self(state)
        return tf.argmax(Q_values, axis=1)[0].numpy()  # Retorna a ação com o maior valor Q como um número Python

    def save_model(self, file_path):
        self.save(file_path)
        print(f"Modelo salvo em: {file_path}")

    def load_model(self, file_path):
        model_loaded = tf.keras.models.load_model(file_path)
        return model_loaded
    



In [None]:
# Model: nn_double
# ==========================================

class nn_double(tf.keras.Model):
    def __init__(self, num_actions=9, input_shape=(7, 7, 3), discount_factor=0.99):
        super(nn_double, self).__init__()
        self.num_actions = num_actions
        self.discount_factor = discount_factor

        # Otimizador e função de perda
        self.optimizer = Adam(learning_rate=lr)
        self.loss_fn = MeanSquaredError()

        # Camada de entrada e processamento inicial de cores
        self.input_layer = InputLayer(input_shape=input_shape, name="input_layer")


        # Camadas convolucionais da rede principal
        self.conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding="same")
        self.batch_norm1 = BatchNormalization()
        self.relu1 = tf.keras.layers.ReLU()

        self.conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding="same")
        self.batch_norm2 = BatchNormalization()
        self.relu2 = tf.keras.layers.ReLU()


        # Camadas densas
        self.flatten = Flatten()
        self.dense1 = Dense(64, activation="relu", kernel_regularizer=l2(l2_regularization))
        self.dense2 = Dense(32, activation="relu", kernel_regularizer=l2(l2_regularization))
        self.output_layer = Dense(num_actions, activation="linear", kernel_regularizer=l2(l2_regularization))

        # Camadas da rede-alvo (adicionadas as mesmas modificações)
        self.target_conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding="same")
        self.target_batch_norm1 = BatchNormalization()
        self.target_relu1 = tf.keras.layers.ReLU()

        self.target_conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding="same")
        self.target_batch_norm2 = BatchNormalization()
        self.target_relu2 = tf.keras.layers.ReLU()



        self.target_flatten = Flatten()
        self.target_dense1 = Dense(64, activation="relu", kernel_regularizer=l2(l2_regularization))
        self.target_dense2 = Dense(32, activation="relu", kernel_regularizer=l2(l2_regularization))
        self.target_output_layer = Dense(num_actions, activation="linear", kernel_regularizer=l2(l2_regularization))

    def call(self, inputs, training=False):
        """Chama a rede principal para inferência."""
        x = self.input_layer(inputs)

        x = self.conv1(x)
        x = self.batch_norm1(x, training=training)
        x = self.relu1(x)

        x = self.conv2(x)
        x = self.batch_norm2(x, training=training)
        x = self.relu2(x)


        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        return self.output_layer(x)

    def target_call(self, inputs, training=False):
        """Chama a rede-alvo para inferência."""
        x = self.input_layer(inputs)
        

        x = self.target_conv1(x)
        x = self.target_batch_norm1(x, training=training)
        x = self.target_relu1(x)

        x = self.target_conv2(x)
        x = self.target_batch_norm2(x, training=training)
        x = self.target_relu2(x)

        
        x = self.target_flatten(x)
        x = self.target_dense1(x)

        x = self.target_dense2(x)
        return self.target_output_layer(x)

    def update_target_network(self):
        """Sincroniza os pesos da rede principal para a rede-alvo."""
        self.target_conv1.set_weights(self.conv1.get_weights())
        self.target_batch_norm1.set_weights(self.batch_norm1.get_weights())
        self.target_conv2.set_weights(self.conv2.get_weights())
        self.target_batch_norm2.set_weights(self.batch_norm2.get_weights())
        self.target_dense1.set_weights(self.dense1.get_weights())
        self.target_dense2.set_weights(self.dense2.get_weights())
        self.target_output_layer.set_weights(self.output_layer.get_weights())

    def training_step(self, batch_data):
        """Realiza uma etapa de treinamento com Double DQN."""
        states, actions, targetQ = batch_data
        next_states, rewards, dones = targetQ
        with tf.GradientTape() as tape:
            # Predições da rede principal
            Q_values = self(states, training=True)
            actions_onehot = tf.one_hot(actions, self.num_actions, dtype=tf.float32)
            Q = tf.reduce_sum(Q_values * actions_onehot, axis=1)

            # Double DQN: Calcula o valor-alvo
            main_Q_values_next = self(next_states)  # Rede principal para selecionar a melhor ação
            next_actions = tf.argmax(main_Q_values_next, axis=1)
            target_Q_values_next = self.target_call(next_states)  # Rede-alvo para calcular Q
            target_Q = tf.reduce_sum(target_Q_values_next * tf.one_hot(next_actions, self.num_actions), axis=1)
            target_Q = rewards + (1 - dones) * self.discount_factor * target_Q

            # Calcula o loss
            loss = self.loss_fn(target_Q, Q)

        # Gradientes e atualização dos pesos
        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        return loss

    def predict_action(self, state):
        """Prediz a melhor ação para um dado estado."""
        Q_values = self(tf.expand_dims(state, axis=0))
        return tf.argmax(Q_values, axis=1).numpy()[0]

    def save_model(self, file_path):
        """Salva os pesos do modelo principal."""
        self.save_weights(file_path)
        print(f"Modelo salvo em: {file_path}")

    def load_model(self, file_path):
        """Carrega os pesos do modelo principal."""
        self.load_weights(file_path)
        print(f"Modelo carregado de: {file_path}")






In [None]:
# Model: nn_radar_dqn
# ==========================================

class nn_radar_dqn(tf.keras.Model):
    def __init__(self, num_actions=9, input_shape=(7, 7, 3)):
        super(nn_radar_dqn, self).__init__()
        self.num_actions = num_actions
        self.optimizer = Adam(learning_rate=lr)
        self.loss_fn = MeanSquaredError()

        # Entrada e camadas convolucionais
        self.input_layer = InputLayer(input_shape=input_shape)
        self.color_comb_layer = ColorCombDepthwiseConv2D(kernel_size=(7, 7), activation='relu', name="color_comb_layer")

        # Camadas convolucionais
        self.conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding='same', name="conv1_layer")
        self.bn1 = BatchNormalization(name="bn1_layer")
        self.dropout_conv1 = Dropout(0.3, name="dropout_conv1_layer")  # Dropout após conv1

        self.conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding='same', name="conv2_layer")
        self.bn2 = BatchNormalization(name="bn2_layer")
        self.dropout_conv2 = Dropout(0.3, name="dropout_conv2_layer")  # Dropout após conv2

        # Camada de atenção espacial
        self.spatial_attention = SpatialAttentionModule(kernel_size=4)

        # Flatten e camadas densas
        self.flatten = Flatten(name="flatten_layer")
        self.dropout_flatten = Dropout(0.4, name="dropout_flatten_layer")  # Dropout após Flatten

        self.dense1 = Dense(64, activation='relu', name="dense1_layer", kernel_regularizer=l2(l2_regularization))
        self.dropout_dense1 = Dropout(0.5, name="dropout_dense1_layer")  # Dropout após dense1
        
        self.dense2 = Dense(32, activation='relu', name="dense2_layer", kernel_regularizer=l2(l2_regularization))
        self.dropout_dense2 = Dropout(0.5, name="dropout_dense2_layer")  # Dropout após dense2

        self.dense_output = Dense(num_actions, activation='linear', name="dense_output_layer", kernel_regularizer=l2(l2_regularization))

    def call(self, inputs, training=False):
        x = self.input_layer(inputs)
        x = self.color_comb_layer(x)

        # Camada convolucional 1
        x = self.conv1(x)
        x = self.bn1(x, training=training)
        x = tf.nn.relu(x, name="relu1")
        x = self.dropout_conv1(x, training=training)  # Dropout aplicado após conv1

        # Camada convolucional 2
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = tf.nn.relu(x, name="relu2")
        x = self.dropout_conv2(x, training=training)  # Dropout aplicado após conv2

        # Atenção espacial e Flatten
        x = self.spatial_attention(x)
        x = self.flatten(x)
        x = self.dropout_flatten(x, training=training)  # Dropout aplicado após Flatten

        # Camadas densas
        x = self.dense1(x)
        x = self.dropout_dense1(x, training=training)  # Dropout aplicado após dense1
        x = self.dense2(x)
        x = self.dropout_dense2(x, training=training)  # Dropout aplicado após dense2

        Q_values = self.dense_output(x)
        return Q_values


    def training_step(self, batch_data):
        states, actions, targetQ = batch_data
        with tf.GradientTape() as tape:
            Q_values = self(states, training=True)
            actions_onehot = tf.one_hot(actions, self.num_actions, dtype=tf.float32)
            Q = tf.reduce_sum(Q_values * actions_onehot, axis=1)
            loss = self.loss_fn(targetQ, Q)
        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        return loss

    def predict_action(self, state):
        Q_values = self(state)
        return tf.argmax(Q_values, axis=1)[0].numpy()  # Retorna a ação com o maior valor Q como um número Python

    def save_model(self, file_path):
        self.save(file_path)
        print(f"Modelo salvo em: {file_path}")

    def load_model(self, file_path):
        model_loaded = tf.keras.models.load_model(file_path)
        return model_loaded
    

In [18]:
# Model: nn_radar_per
# ==========================================

class nn_radar_per(tf.keras.Model):
    def __init__(self, num_actions=9, input_shape=(7, 7, 3), discount_factor=0.99):
        super(nn_radar_per, self).__init__()
        self.num_actions = num_actions
        self.discount_factor = discount_factor
        self.optimizer = Adam(learning_rate=lr)
        self.loss_fn = MeanSquaredError()

        # Entrada e camada de combinação de cores
        self.input_layer = InputLayer(input_shape=input_shape, name="input_layer")
        self.color_comb_layer = ColorCombDepthwiseConv2D(kernel_size=(7, 7), activation='relu', name="color_comb_layer")

        # Camadas convolucionais
        self.conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding='same', name="conv1_layer")
        self.bn1 = BatchNormalization(name="bn1_layer")
        self.dropout_conv1 = Dropout(0.3, name="dropout_conv1_layer")

        self.conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding='same', name="conv2_layer")
        self.bn2 = BatchNormalization(name="bn2_layer")
        self.dropout_conv2 = Dropout(0.3, name="dropout_conv2_layer")
        
        # Camada de atenção espacial
        self.spatial_attention = SpatialAttentionModule(kernel_size=4)

        # Flatten e camadas densas
        self.flatten = Flatten(name="flatten_layer")
        self.dropout_flatten = Dropout(0.3, name="dropout_flatten_layer")

        self.dense1 = Dense(64, activation='relu', name="dense1_layer", kernel_regularizer=l2(l2_regularization))
        self.dropout_dense1 = Dropout(0.4, name="dropout_dense1_layer")

        self.dense2 = Dense(32, activation='relu', name="dense2_layer", kernel_regularizer=l2(l2_regularization))
        self.dropout_dense2 = Dropout(0.4, name="dropout_dense2_layer")

        self.dense_output = Dense(num_actions, activation='linear', name="dense_output_layer", kernel_regularizer=l2(l2_regularization))

    def call(self, inputs, training=False):
        x = self.input_layer(inputs)
        x = self.color_comb_layer(x)

        x = self.conv1(x)
        x = self.bn1(x, training=training)
        x = tf.nn.relu(x, name="relu1")
        x = self.dropout_conv1(x, training=training)  # Aplicação do Dropout após conv1

        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = tf.nn.relu(x, name="relu2")
        x = self.dropout_conv2(x, training=training)  # Aplicação do Dropout após conv2

        x = self.spatial_attention(x)
        x = self.flatten(x)
        x = self.dropout_flatten(x, training=training)  # Aplicação do Dropout após Flatten

        x = self.dense1(x)
        x = self.dropout_dense1(x, training=training)  # Aplicação do Dropout após dense1
        x = self.dense2(x)
        x = self.dropout_dense2(x, training=training)  # Aplicação do Dropout após dense2

        Q_values = self.dense_output(x)
        return Q_values
    

    def training_step(self, batch_data):
        states, actions, targetQ, weights = batch_data
        with tf.GradientTape() as tape:
            Q_values = self(states, training=True)
            actions_onehot = tf.one_hot(actions, self.num_actions, dtype=tf.float32)
            Q = tf.reduce_sum(Q_values * actions_onehot, axis=1)
            td_errors = targetQ - Q
            weighted_loss = tf.reduce_mean(weights * tf.square(td_errors))

        grads = tape.gradient(weighted_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))

        return weighted_loss  # Apenas a perda


    def predict_action(self, state):
        Q_values = self(state)
        return tf.argmax(Q_values, axis=1)[0].numpy()  # Retorna a ação com o maior valor Q como um número Python

    def save_model(self, file_path):
        self.save(file_path)
        print(f"Modelo salvo em: {file_path}")

    def load_model(self, file_path):
        model_loaded = tf.keras.models.load_model(file_path)
        return model_loaded




In [None]:
# Model: nn_radar_dueling
# ==========================================

class nn_radar_dueling(tf.keras.Model):
    def __init__(self, num_actions=9, input_shape=(7, 7, 3)):
        super(nn_radar_dueling, self).__init__()
        self.num_actions = num_actions
        self.optimizer = Adam(learning_rate=lr)
        self.loss_fn = MeanSquaredError()
        
        # Entrada e camada de combinação de cores
        self.input_layer = InputLayer(input_shape=input_shape, name="input_layer")
        self.color_comb_layer = ColorCombDepthwiseConv2D(kernel_size=(7, 7), activation='relu', name="color_comb_layer")

        # Camadas convolucionais
        self.conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding='same', name="conv1_layer")
        self.bn1 = BatchNormalization(name="bn1_layer")
        self.dropout_conv1 = Dropout(0.3, name="dropout_conv1_layer")  # Dropout na primeira camada convolucional

        self.conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding='same', name="conv2_layer")
        self.bn2 = BatchNormalization(name="bn2_layer")
        self.dropout_conv2 = Dropout(0.3, name="dropout_conv2_layer")  # Dropout na segunda camada convolucional
     
        # Camada de atenção espacial
        self.spatial_attention = SpatialAttentionModule(kernel_size=4)

        # Flatten
        self.flatten = Flatten(name="flatten_layer")
        self.dropout_flatten = Dropout(0.4, name="dropout_flatten_layer")  # Dropout após Flatten

        # Camadas densas compartilhadas
        self.dense_shared1 = Dense(64, activation='relu', name="shared_dense1", kernel_regularizer=l2(l2_regularization))
        self.dropout_shared = Dropout(0.4, name="dropout_shared_layer")  # Dropout na camada compartilhada

        # Rede para Valor (V)
        self.value_dense = Dense(32, activation='relu', name="value_dense", kernel_regularizer=l2(l2_regularization))
        self.dropout_value = Dropout(0.4, name="dropout_value_layer")  # Dropout na rede de valor
        self.value_output = Dense(1, activation='linear', name="value_output", kernel_regularizer=l2(l2_regularization))

        # Rede para Vantagem (A)
        self.advantage_dense = Dense(32, activation='relu', name="advantage_dense", kernel_regularizer=l2(l2_regularization))
        self.dropout_advantage = Dropout(0.4, name="dropout_advantage_layer")  # Dropout na rede de vantagem
        self.advantage_output = Dense(num_actions, activation='linear', name="advantage_output", kernel_regularizer=l2(l2_regularization))

    def call(self, inputs, training=False):
        x = self.input_layer(inputs)
        x = self.color_comb_layer(x)

        # Camada convolucional 1
        x = self.conv1(x)
        x = self.bn1(x, training=training)
        x = tf.nn.relu(x, name="relu1")
        x = self.dropout_conv1(x, training=training)  # Dropout aplicado

        # Camada convolucional 2
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = tf.nn.relu(x, name="relu2")
        x = self.dropout_conv2(x, training=training)  # Dropout aplicado
        
        # Atenção espacial
        x = self.spatial_attention(x)
        x = self.flatten(x)
        x = self.dropout_flatten(x, training=training)  # Dropout aplicado após Flatten

        # Camadas compartilhadas
        x = self.dense_shared1(x)
        x = self.dropout_shared(x, training=training)  # Dropout aplicado na camada compartilhada

        # Valor (V)
        v = self.value_dense(x)
        v = self.dropout_value(v, training=training)  # Dropout aplicado na rede de valor
        v = self.value_output(v)

        # Vantagem (A)
        a = self.advantage_dense(x)
        a = self.dropout_advantage(a, training=training)  # Dropout aplicado na rede de vantagem
        a = self.advantage_output(a)

        # Combina V e A para calcular Q
        q = v + (a - tf.reduce_mean(a, axis=1, keepdims=True))
        return q
    
    def training_step(self, batch_data):
        states, actions, targetQ = batch_data
        with tf.GradientTape() as tape:
            Q_values = self(states, training=True)
            actions_onehot = tf.one_hot(actions, self.num_actions, dtype=tf.float32)
            Q = tf.reduce_sum(Q_values * actions_onehot, axis=1)
            loss = self.loss_fn(targetQ, Q)
        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        return loss

    def predict_action(self, state):
        Q_values = self(state)
        return tf.argmax(Q_values, axis=1)[0].numpy()  # Retorna a ação com o maior valor Q como um número Python

    def save_model(self, file_path):
        self.save(file_path)
        print(f"Modelo salvo em: {file_path}")

    def load_model(self, file_path):
        model_loaded = tf.keras.models.load_model(file_path)
        return model_loaded
    

In [None]:
# Model: nn_radar_double
# ==========================================

class nn_radar_double(tf.keras.Model):
    def __init__(self, num_actions=9, input_shape=(7, 7, 3), discount_factor=0.99):
        super(nn_radar_double, self).__init__()
        self.num_actions = num_actions
        self.discount_factor = discount_factor

        # Otimizador e função de perda
        self.optimizer = Adam(learning_rate=lr)
        self.loss_fn = MeanSquaredError()

        # Camada de entrada e processamento inicial de cores
        self.input_layer = InputLayer(input_shape=input_shape, name="input_layer")
        self.color_comb_layer = ColorCombDepthwiseConv2D(kernel_size=(7, 7), activation='relu', name="color_comb_layer")

        # Camadas convolucionais da rede principal
        self.conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding="same")
        self.batch_norm1 = BatchNormalization()
        self.relu1 = tf.keras.layers.ReLU()

        self.conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding="same")
        self.batch_norm2 = BatchNormalization()
        self.relu2 = tf.keras.layers.ReLU()

        # Módulo de atenção espacial
        self.spatial_attention = SpatialAttentionModule(kernel_size=4)

        # Camadas densas
        self.flatten = Flatten()
        self.dense1 = Dense(64, activation="relu", kernel_regularizer=l2(l2_regularization))
        self.dense2 = Dense(32, activation="relu", kernel_regularizer=l2(l2_regularization))
        self.output_layer = Dense(num_actions, activation="linear", kernel_regularizer=l2(l2_regularization))

        # Camadas da rede-alvo (adicionadas as mesmas modificações)
        self.target_color_comb_layer = ColorCombDepthwiseConv2D(kernel_size=(7, 7), activation='relu', name="target_color_comb_layer")
        self.target_conv1 = Conv2D(32, (4, 4), strides=(1, 1), activation=None, padding="same")
        self.target_batch_norm1 = BatchNormalization()
        self.target_relu1 = tf.keras.layers.ReLU()

        self.target_conv2 = Conv2D(64, (3, 3), strides=(1, 1), activation=None, padding="same")
        self.target_batch_norm2 = BatchNormalization()
        self.target_relu2 = tf.keras.layers.ReLU()

        self.target_spatial_attention = SpatialAttentionModule(kernel_size=4)

        self.target_flatten = Flatten()
        self.target_dense1 = Dense(64, activation="relu", kernel_regularizer=l2(l2_regularization))
        self.target_dense2 = Dense(32, activation="relu", kernel_regularizer=l2(l2_regularization))
        self.target_output_layer = Dense(num_actions, activation="linear", kernel_regularizer=l2(l2_regularization))

    def call(self, inputs, training=False):
        """Chama a rede principal para inferência."""
        x = self.input_layer(inputs)
        x = self.color_comb_layer(x)

        x = self.conv1(x)
        x = self.batch_norm1(x, training=training)
        x = self.relu1(x)

        x = self.conv2(x)
        x = self.batch_norm2(x, training=training)
        x = self.relu2(x)

        x = self.spatial_attention(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        return self.output_layer(x)

    def target_call(self, inputs, training=False):
        """Chama a rede-alvo para inferência."""
        x = self.input_layer(inputs)
        x = self.target_color_comb_layer(x)

        x = self.target_conv1(x)
        x = self.target_batch_norm1(x, training=training)
        x = self.target_relu1(x)

        x = self.target_conv2(x)
        x = self.target_batch_norm2(x, training=training)
        x = self.target_relu2(x)

        x = self.target_spatial_attention(x)
        x = self.target_flatten(x)
        x = self.target_dense1(x)

        x = self.target_dense2(x)
        return self.target_output_layer(x)

    def update_target_network(self):
        """Sincroniza os pesos da rede principal para a rede-alvo."""
        self.target_color_comb_layer.set_weights(self.color_comb_layer.get_weights())
        self.target_conv1.set_weights(self.conv1.get_weights())
        self.target_batch_norm1.set_weights(self.batch_norm1.get_weights())
        self.target_conv2.set_weights(self.conv2.get_weights())
        self.target_batch_norm2.set_weights(self.batch_norm2.get_weights())
        self.target_dense1.set_weights(self.dense1.get_weights())
        self.target_dense2.set_weights(self.dense2.get_weights())
        self.target_output_layer.set_weights(self.output_layer.get_weights())

    def training_step(self, batch_data):
        """Realiza uma etapa de treinamento com Double DQN."""
        states, actions, targetQ = batch_data
        next_states, rewards, dones = targetQ
        with tf.GradientTape() as tape:
            # Predições da rede principal
            Q_values = self(states, training=True)
            actions_onehot = tf.one_hot(actions, self.num_actions, dtype=tf.float32)
            Q = tf.reduce_sum(Q_values * actions_onehot, axis=1)

            # Double DQN: Calcula o valor-alvo
            main_Q_values_next = self(next_states)  # Rede principal para selecionar a melhor ação
            next_actions = tf.argmax(main_Q_values_next, axis=1)
            target_Q_values_next = self.target_call(next_states)  # Rede-alvo para calcular Q
            target_Q = tf.reduce_sum(target_Q_values_next * tf.one_hot(next_actions, self.num_actions), axis=1)
            target_Q = rewards + (1 - dones) * self.discount_factor * target_Q

            # Calcula o loss
            loss = self.loss_fn(target_Q, Q)

        # Gradientes e atualização dos pesos
        grads = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        return loss

    def predict_action(self, state):
        """Prediz a melhor ação para um dado estado."""
        Q_values = self(tf.expand_dims(state, axis=0))
        return tf.argmax(Q_values, axis=1).numpy()[0]

    def save_model(self, file_path):
        """Salva os pesos do modelo principal."""
        self.save_weights(file_path)
        print(f"Modelo salvo em: {file_path}")

    def load_model(self, file_path):
        """Carrega os pesos do modelo principal."""
        self.load_weights(file_path)
        print(f"Modelo carregado de: {file_path}")





### TRAIN
-------------------------

In [11]:
class TrainingSessionDoubleDQN:
    def __init__(self, task_name, env, model_nn_predator, model_nn_prey,
                 num_episodes=1000, num_steps=10, buffer_capacity=1000, batch_size=32):
        self.task_name = task_name
        self.directory = f"C:/Users/beLIVE/IA/DECISYS/final-models/train"
        self.env = env
        self.model_nn_predator = model_nn_predator
        self.model_nn_prey = model_nn_prey
        self.num_episodes = num_episodes
        self.num_steps = num_steps
        self.replay_buffer_predators = deque(maxlen=buffer_capacity)
        self.replay_buffer_preys = deque(maxlen=buffer_capacity)

        # Configuração de epsilon
        self.epsilon_initial = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay_rate = np.log(self.epsilon_initial / self.epsilon_min) / self.num_episodes
        self.epsilon = self.epsilon_initial  # Inicializa com o valor inicial

        self.discount_factor = 0.99
        self.batch_size = batch_size
        self.save_interval = 250

    def update_epsilon_old(self, episode):
        """
        Atualiza o valor de epsilon com decaimento exponencial suave ao longo de todo o treinamento.
        """
        self.epsilon = self.epsilon_min + (self.epsilon_initial - self.epsilon_min) * np.exp(-self.epsilon_decay_rate * episode)

    def update_epsilon(self, episode):
        """
        Atualiza o valor de epsilon com decaimento exponencial suave ao longo de todo o treinamento.
        """
        epsilon_decay_rate = np.log(self.epsilon_initial / self.epsilon_min) / self.num_episodes
        self.epsilon = self.epsilon_min + (self.epsilon_initial - self.epsilon_min) * np.exp(-epsilon_decay_rate * episode)

    def train_model_old(self, buffer, model):
        """
        Treina o modelo com um minibatch do buffer de replay, usando Double DQN.
        """
        if len(buffer) < self.batch_size:
            return None
        minibatch = random.sample(buffer, self.batch_size)

        loss_total = []
        for state, action, reward, next_state, done in minibatch:
            state_expanded = np.expand_dims(state, axis=0)
            next_state_expanded = np.expand_dims(next_state, axis=0)

            # Formatação para o método `training_step`
            batch_data = (state_expanded, [action], (next_state_expanded, reward, done))
            loss = model.training_step(batch_data)
            loss_total.append(loss.numpy().item())

        return f"Mean Loss: {np.mean(loss_total):.4f}"
    

    def train_model(self, buffer, model):
        """
        Treina o modelo com um minibatch do buffer de replay, usando Double DQN.
        """
        if len(buffer) < self.batch_size:
            return None
        
        # Amostra um minibatch
        minibatch = random.sample(buffer, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*minibatch)
        states = np.array(states)
        next_states = np.array(next_states)
        actions = np.array(actions)
        rewards = np.array(rewards)
        dones = np.array(dones, dtype=np.float32)

        # Formata os dados para o modelo
        batch_data = (states, actions, (next_states, rewards, dones))
        loss = model.training_step(batch_data)
        return f"Mean Loss: {loss.numpy():.4f}"


    def run(self):
        """
        Executa o treinamento principal.
        """
        for episode in range(self.num_episodes):
            self.env.reset()
            self.update_epsilon(episode)
            episode_rewards = {'predator': [], 'prey': []}
            episode_losses = {'predator': [], 'prey': []}
            episode_dones = {'predator': 0, 'prey': 0}

            for step in range(self.num_steps):
                if not any(a.breed == 2 and a.is_alive for a in self.env.agents) or not any(a.breed == 0 and a.is_alive for a in self.env.agents):
                    break

                for agent in self.env.agents:
                    if not agent.is_alive:
                        continue

                    # Obtem o estado atual e seleciona a ação
                    state = self.env.render_agent(agent)
                    state_expanded = np.expand_dims(state, 0)
                    model = self.model_nn_prey if agent.breed == 2 else self.model_nn_predator

                    # Seleção de ação com epsilon-greedy
                    if np.random.rand() > self.epsilon:
                        Q_values = model(state_expanded)
                        action = tf.argmax(Q_values, axis=1).numpy()[0]
                    else:
                        action = np.random.randint(0, 9)

                    # Executa a ação e coleta o feedback
                    next_state, reward, done, feedback = agent.step(action)

                    # Atualiza o buffer de replay
                    buffer = self.replay_buffer_preys if agent.breed == 2 else self.replay_buffer_predators
                    buffer.append((state, action, reward, next_state, done))

                    # Done status
                    if done:
                        agent.target = None
                        agent.is_done = False
                        agent.current_target = None
                        self.env.agents = [a for a in self.env.agents if a.is_alive]

                    # Atualiza métricas de recompensas
                    if agent.breed == 0:  # Predador
                        episode_rewards['predator'].append(reward)
                        episode_dones['predator'] += 1 if done else 0
                    elif agent.breed == 2:  # Presa
                        episode_rewards['prey'].append(reward)
                        episode_dones['prey'] += 1 if done else 0



           # Treina os modelos ao final de cada episódio
            loss_predators = self.train_model(self.replay_buffer_predators, self.model_nn_predator)
            if loss_predators:
                episode_losses['predator'].append(float(loss_predators.split(":")[1].strip()))
            loss_preys = self.train_model(self.replay_buffer_preys, self.model_nn_prey)
            if loss_preys:
                episode_losses['prey'].append(float(loss_preys.split(":")[1].strip()))

            # Logs e métricas
            print(f"Episode: {episode}, Predator Loss: {loss_predators}, Prey Loss: {loss_preys}, Epsilon: {self.epsilon:.4f}")
            
            # Salva as métricas do episódio
            self.save_to_file(episode, step, episode_rewards, episode_losses, self.epsilon)

            # Sincronizar as redes-alvo a cada 10 episódios
            if (episode + 1) % 10 == 0:
                self.model_nn_predator.update_target_network()
                self.model_nn_prey.update_target_network()



            # Salva os modelos periodicamente
            if episode == 10 or (episode) % self.save_interval == 0:
                file_path_model_prey = os.path.join(self.directory, f'model_{self.task_name}_prey_{episode}.h5')
                file_path_model_predator = os.path.join(self.directory, f'model_{self.task_name}_predator_{episode}.h5')
                self.model_nn_prey.save_model(file_path_model_prey)
                self.model_nn_predator.save_model(file_path_model_predator)

    def save_to_file(self, episode, step, episode_rewards, losses, epsilon):

        stats = {}
        for agent_type in ['predator', 'prey']:
            rewards = episode_rewards[agent_type]
            losses_values = losses[agent_type]

            stats[agent_type] = {
                'mean_reward': np.mean(rewards) if rewards else 0,
                'median_reward': np.median(rewards) if rewards else 0,
                'std_reward': np.sum(rewards) if rewards else 0,
                'mean_loss': np.mean(losses_values) if losses_values else 0,
                'median_loss': np.median(losses_values) if losses_values else 0,
                'std_loss': np.sum(losses_values) if losses_values else 0
            }
        total_prey, total_predators, _ = self.env.population_count()
        # Criação de arquivos separados
        for agent_type in ['predator', 'prey']:
            file_name = f"{self.task_name}_{agent_type}.txt"
            with open(file_name, "a") as file:
                file.write(
                    f"Episode: {episode}, step: {step}, Pop: {total_prey}/{total_predators}, "
                    f"Epsilon: {epsilon:.2f}, "
                    f"Mean Loss: {stats[agent_type]['mean_loss']:.2f}, Median Loss: {stats[agent_type]['median_loss']:.2f}, "
                    f"Std Loss: {stats[agent_type]['std_loss']:.2f}, "
                    f"Mean Reward: {stats[agent_type]['mean_reward']:.2f}, "
                    f"Median Reward: {stats[agent_type]['median_reward']:.2f}, "
                    f"Std Reward: {stats[agent_type]['std_reward']:.2f}\n"
                )


In [12]:
class TrainingSessionPER:
    def __init__(self, task_name, env, model_nn_predator, model_nn_prey, 
                 num_episodes=1000, num_steps=10, buffer_capacity=1000, batch_size=32, alpha=0.8, beta_start=0.4, beta_frames=5000):
        #, alpha=0.6, beta_start=0.4, beta_frames=1000):
        self.task_name = task_name
        self.directory = f"C:/Users/beLIVE/IA/DECISYS/final-models/train"
        self.env = env
        self.model_nn_predator = model_nn_predator
        self.model_nn_prey = model_nn_prey
        self.num_episodes = num_episodes
        self.num_steps = num_steps

        # Buffers de replay com prioridade
        self.replay_buffer_predators = PrioritizedReplayBuffer(buffer_capacity, alpha)
        self.replay_buffer_preys = PrioritizedReplayBuffer(buffer_capacity, alpha)

        # Configuração de epsilon
        self.epsilon_initial = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay_rate = np.log(self.epsilon_initial / self.epsilon_min) / self.num_episodes
        self.epsilon = self.epsilon_initial

        self.beta_start = beta_start
        self.beta_frames = beta_frames
        self.batch_size = batch_size
        
        # Adicionando save_interval
        self.save_interval = 250 # Intervalo para salvar os modelos

    def beta_by_frame(self, frame_idx):
        """Aumenta beta linearmente ao longo dos frames."""
        return min(1.0, self.beta_start + frame_idx * (1.0 - self.beta_start) / self.beta_frames)


    def update_epsilon(self, episode):
        """
        Atualiza o valor de epsilon com decaimento exponencial suave ao longo de todo o treinamento.
        """
        epsilon_decay_rate = np.log(self.epsilon_initial / self.epsilon_min) / self.num_episodes
        self.epsilon = self.epsilon_min + (self.epsilon_initial - self.epsilon_min) * np.exp(-epsilon_decay_rate * episode)

    def train_model(self, buffer, model, frame_idx):
        if len(buffer.buffer) < self.batch_size:
            return None

        # Obter beta ajustado para o frame atual
        beta = self.beta_by_frame(frame_idx)

        # Amostragem do buffer com prioridades
        minibatch, indices, weights = buffer.sample(self.batch_size, beta)

        # Processar minibatch em tensores para eficiência
        states, actions, rewards, next_states, dones = map(np.array, zip(*minibatch))

        # Predições para os próximos estados (vetorizado)
        next_q_values = model(next_states, training=False)  # Mantém cálculo em TensorFlow
        max_next_q_values = tf.reduce_max(next_q_values, axis=1)

        # Calcular TD Targets diretamente em TensorFlow
        td_targets = rewards + (1 - dones) * model.discount_factor * max_next_q_values

        # Predições para os estados atuais
        current_q_values = model(states, training=False)
        selected_q_values = tf.gather_nd(
            current_q_values, 
            tf.stack([tf.range(self.batch_size), actions], axis=1)
        )

        # TD Errors calculados diretamente
        td_errors = tf.abs(td_targets - selected_q_values)

        # Pesos normalizados (opcional, para maior estabilidade)
        normalized_weights = tf.convert_to_tensor(weights, dtype=tf.float32)
        if tf.reduce_sum(normalized_weights) > 0:
            normalized_weights /= tf.reduce_sum(normalized_weights)

        # Executa o treinamento do modelo
        loss = model.training_step((states, actions, td_targets, normalized_weights))

        # Atualizar prioridades no buffer de replay
        buffer.update_priorities(indices, td_errors.numpy().tolist())

        return f"Mean Loss: {loss.numpy():.4f}"


    def train_model_old(self, buffer, model, frame_idx):
        if len(buffer.buffer) < self.batch_size:
            return None

        beta = self.beta_by_frame(frame_idx)
        minibatch, indices, weights = buffer.sample(self.batch_size, beta)

        # Transformar minibatch para processamento vetorizado
        states, actions, rewards, next_states, dones = map(np.array, zip(*minibatch))
        
        # Predições para o próximo estado
        next_q_values = model(next_states).numpy()
        max_next_q_values = np.max(next_q_values, axis=1)
        
        # Calcula TD Target
        td_targets = rewards + (1 - dones) * model.discount_factor * max_next_q_values

        # Predições para o estado atual
        current_q_values = model(states).numpy()
        td_errors = np.abs(td_targets - current_q_values[np.arange(self.batch_size), actions])

        # Normaliza os pesos
        weights = weights / np.sum(weights) if np.sum(weights) > 0 else weights
        loss = model.training_step((states, actions, td_targets, weights))

        # Atualiza as prioridades no buffer
        buffer.update_priorities(indices, td_errors.tolist())

        return f"Mean Loss: {loss.numpy():.4f}"



    def train_model_old(self, buffer, model, frame_idx):
        if len(buffer.buffer) < self.batch_size:
            return None

        beta = self.beta_by_frame(frame_idx)
        minibatch, indices, weights = buffer.sample(self.batch_size, beta)
        states, actions, rewards, next_states, dones = map(np.array, zip(*minibatch))
        
        next_q_values = model(next_states).numpy()
        max_next_q_values = np.max(next_q_values, axis=1)
        td_targets = rewards + (1 - dones) * model.discount_factor * max_next_q_values

        current_q_values = model(states).numpy()
        td_errors = np.abs(td_targets - current_q_values[np.arange(self.batch_size), actions])

        weights = weights / np.max(weights) if np.max(weights) > 0 else weights
        loss = model.training_step((states, actions, td_targets, weights))
        buffer.update_priorities(indices, td_errors.tolist())

        return f"Mean Loss: {loss.numpy():.4f}"



    def run(self):
        """Executa o treinamento principal."""
        frame_idx = 0
        for episode in range(self.num_episodes):
            self.env.reset()
            self.update_epsilon(episode)
            episode_rewards = {'predator': [], 'prey': []}
            episode_losses = {'predator': [], 'prey': []}
            episode_dones = {'predator': 0, 'prey': 0}

            for step in range(self.num_steps):
                if not any(a.breed == 2 and a.is_alive for a in self.env.agents) or not any(a.breed == 0 and a.is_alive for a in self.env.agents):
                    break

                for agent in self.env.agents:
                    if not agent.is_alive:
                        continue

                    # Obtem o estado atual e seleciona a ação
                    state = self.env.render_agent(agent)
                    state_expanded = np.expand_dims(state, 0)
                    model = self.model_nn_prey if agent.breed == 2 else self.model_nn_predator

                    # Seleção de ação com epsilon-greedy
                    if np.random.rand() > self.epsilon:
                        Q_values = model(state_expanded)
                        action = tf.argmax(Q_values, axis=1).numpy()[0]
                    else:
                        action = np.random.randint(0, 9)

                    # Executa a ação e coleta o feedback
                    next_state, reward, done, feedback = agent.step(action)

                    # Atualiza o buffer de replay
                    buffer = self.replay_buffer_preys if agent.breed == 2 else self.replay_buffer_predators
                    td_error = abs(reward + (1 - done) * model.discount_factor * np.max(model(np.expand_dims(next_state, axis=0))) - model(state_expanded)[0, action])
                    buffer.add((state, action, reward, next_state, done), td_error)


                    # Done status
                    if done:
                        agent.target = None
                        agent.is_done = False
                        agent.current_target = None
                        self.env.agents = [a for a in self.env.agents if a.is_alive]

                    # Atualiza métricas
                    if agent.breed == 0:  # Predador
                        episode_rewards['predator'].append(reward)
                        episode_dones['predator'] += 1 if done else 0
                    elif agent.breed == 2:  # Presa
                        episode_rewards['prey'].append(reward)
                        episode_dones['prey'] += 1 if done else 0

                    # Exibe detalhes do agente
                    pop_prey, pop_predator, _ = self.env.population_count()
                    print(f"{episode}/{step}: {agent.name}, Pop: {pop_prey}/{pop_predator}, "
                        f"Position: ({agent.x}, {agent.y}), Action: {action}, Reward: {reward}, Done: {done}, Feedback: {feedback}")


                frame_idx += 1

            # Treina os modelos ao final de cada episódio
            loss_predators = self.train_model(self.replay_buffer_predators, self.model_nn_predator, frame_idx)
            if loss_predators:
                episode_losses['predator'].append(float(loss_predators.split(":")[1].strip()))
            loss_preys = self.train_model(self.replay_buffer_preys, self.model_nn_prey, frame_idx)
            if loss_preys:
                episode_losses['prey'].append(float(loss_preys.split(":")[1].strip()))

             # Salva as métricas do episódio
            self.save_to_file(episode, step, episode_rewards, episode_losses, self.epsilon)

            # Salva os modelos periodicamente
            if episode == 10 or (episode) % self.save_interval == 0:
                file_path_model_prey = os.path.join(self.directory, f'model_{self.task_name}_prey_{episode}.h5')
                file_path_model_predator = os.path.join(self.directory, f'model_{self.task_name}_predator_{episode}.h5')
                try:
                    self.model_nn_prey.save_weights(file_path_model_prey)
                    print(f"Pesos do modelo de presa salvos com sucesso em {file_path_model_prey}")
                except Exception as e:
                    print(f"Ocorreu um erro ao salvar os pesos do modelo de presa: {e}")
                try:
                    self.model_nn_predator.save_weights(file_path_model_predator)
                    print(f"Pesos do modelo de predador salvos com sucesso em {file_path_model_predator}")
                except Exception as e:
                    print(f"Ocorreu um erro ao salvar os pesos do modelo de predador: {e}")

    def save_to_file(self, episode, step, episode_rewards, losses, epsilon):

        stats = {}
        for agent_type in ['predator', 'prey']:
            rewards = episode_rewards[agent_type]
            losses_values = losses[agent_type]

            stats[agent_type] = {
                'mean_reward': np.mean(rewards) if rewards else 0,
                'median_reward': np.median(rewards) if rewards else 0,
                'std_reward': np.sum(rewards) if rewards else 0,
                'mean_loss': np.mean(losses_values) if losses_values else 0,
                'median_loss': np.median(losses_values) if losses_values else 0,
                'std_loss': np.sum(losses_values) if losses_values else 0
            }
        total_prey, total_predators, _ = self.env.population_count()
        # Criação de arquivos separados
        for agent_type in ['predator', 'prey']:
            file_name = f"{self.task_name}_{agent_type}.txt"
            with open(file_name, "a") as file:
                file.write(
                    f"Episode: {episode}, step: {step}, Pop: {total_prey}/{total_predators}, "
                    f"Epsilon: {epsilon:.2f}, "
                    f"Mean Loss: {stats[agent_type]['mean_loss']:.2f}, Median Loss: {stats[agent_type]['median_loss']:.2f}, "
                    f"Std Loss: {stats[agent_type]['std_loss']:.2f}, "
                    f"Mean Reward: {stats[agent_type]['mean_reward']:.2f}, "
                    f"Median Reward: {stats[agent_type]['median_reward']:.2f}, "
                    f"Std Reward: {stats[agent_type]['std_reward']:.2f}\n"
                )



In [13]:
class TrainingSession:
    def __init__(self, task_name, env, model_nn_predator, model_nn_prey, num_episodes=1000, num_steps=10, buffer_capacity=1000, batch_size=32):

        self.task_name = task_name
        self.directory = f"C:/Users/beLIVE/IA/DECISYS/final-models/train"
        self.env = env
        self.model_nn_predator = model_nn_predator
        self.model_nn_prey = model_nn_prey
        self.num_episodes = num_episodes
        self.num_steps = num_steps
        self.replay_buffer_predators = deque(maxlen=buffer_capacity)
        self.replay_buffer_preys = deque(maxlen=buffer_capacity)

        # Configuração de epsilon
        self.epsilon_initial = 1.0
        self.epsilon_min = 0.01
        self.half_decay_period = num_episodes // 2
        #self.decay_constant = self.half_decay_period / np.log(self.epsilon_initial / self.epsilon_min)
        self.epsilon_decay_rate = np.log(self.epsilon_initial / self.epsilon_min) / self.num_episodes
        self.epsilon = self.epsilon_initial  # Inicializa com o valor inicial


        self.discount_factor = 0.99
        self.batch_size = batch_size
        self.save_interval = 250

    def update_epsilon(self, episode):
        """
        Atualiza o valor de epsilon com decaimento exponencial suave ao longo de todo o treinamento.
        """
        epsilon_decay_rate = np.log(self.epsilon_initial / self.epsilon_min) / self.num_episodes
        self.epsilon = self.epsilon_min + (self.epsilon_initial - self.epsilon_min) * np.exp(-epsilon_decay_rate * episode)


    
    def update_epsilon_old(self, episode):
        """
        Atualiza o valor de epsilon para exploração baseado no episódio atual.
        O epsilon decai exponencialmente até a metade do período total de episódios.
        Após isso, mantém o valor mínimo.
        """
        if episode < self.half_decay_period:
            self.epsilon = self.epsilon_min + (self.epsilon_initial - self.epsilon_min) * np.exp(-episode / self.decay_constant)
        else:
            self.epsilon = self.epsilon_min

    def train_model(self, buffer, model):
        """
        Treina o modelo com um minibatch completo do buffer de replay.
        """
        if len(buffer) < self.batch_size:
            return None

        # Amostragem eficiente do buffer
        indices = np.random.choice(len(buffer), self.batch_size, replace=False)
        minibatch = [buffer[i] for i in indices]

        # Separação das transições
        states, actions, rewards, next_states, dones = zip(*minibatch)
        states = tf.convert_to_tensor(np.array(states, dtype=np.float32))
        actions = tf.convert_to_tensor(np.array(actions, dtype=np.int32))
        rewards = tf.convert_to_tensor(np.array(rewards, dtype=np.float32))
        next_states = tf.convert_to_tensor(np.array(next_states, dtype=np.float32))
        dones = tf.convert_to_tensor(np.array(dones, dtype=np.float32))

        # Predições e cálculo vetorizado com TensorFlow
        current_q_values = model(states)
        next_q_values = model(next_states)
        max_next_q_values = tf.reduce_max(next_q_values, axis=1)

        # Cálculo dos valores-alvo
        targetQ = rewards + (1 - dones) * self.discount_factor * max_next_q_values

        # Treinamento
        batch_data = (states, actions, targetQ)
        loss = model.training_step(batch_data)
        return f"Loss: {loss.numpy():.4f}"




    def train_model_new_old(self, buffer, model):
        """
        Treina o modelo com um minibatch completo do buffer de replay.
        """
        if len(buffer) < self.batch_size:
            return None

        # Amostragem do buffer
        minibatch = random.sample(buffer, self.batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*minibatch))

        # Predições dos Q-values
        current_q_values = model(states).numpy()
        next_q_values = model(next_states).numpy()

        # Cálculo vetorizado dos valores-alvo
        max_next_q_values = np.max(next_q_values, axis=1)
        targetQ = rewards + (1 - dones) * self.discount_factor * max_next_q_values

        # Treinamento
        batch_data = (states, actions, targetQ)
        loss = model.training_step(batch_data)
        return f"Loss: {loss.numpy():.4f}"

    def train_model_old(self, buffer, model):
        """
        Treina o modelo com um minibatch completo do buffer de replay.
        """
        # Certifique-se de que há dados suficientes no buffer
        if len(buffer) < self.batch_size:
            return None

        # Amostra um minibatch do replay buffer
        minibatch = random.sample(buffer, self.batch_size)

        # Separando os componentes do minibatch
        states, actions, rewards, next_states, dones = zip(*minibatch)

        # Converte os componentes para numpy arrays
        states = np.array(states)
        next_states = np.array(next_states)
        actions = np.array(actions)
        rewards = np.array(rewards)
        dones = np.array(dones)

        # Predição dos Q-values atuais e futuros
        current_q_values = model(states, training=False).numpy()
        next_q_values = model(next_states, training=False).numpy()

        # Calcula os valores-alvo (targetQ) para as ações tomadas
        targetQ = np.zeros(self.batch_size)  # Vetor 1D para os valores-alvo
        for i in range(self.batch_size):
            target_q_value = rewards[i]
            if not dones[i]:  # Adiciona o valor descontado do próximo estado se o episódio não terminou
                target_q_value += self.discount_factor * np.max(next_q_values[i])
            targetQ[i] = target_q_value  # Define o valor-alvo para a ação tomada

        # Realiza uma etapa de treinamento com o minibatch completo
        batch_data = (states, actions, targetQ)
        loss = model.training_step(batch_data)

        return f"Loss: {loss.numpy():.4f}"



    def run(self):
        """
        Executa o treinamento principal.
        """
        for episode in range(self.num_episodes):
            self.env.reset()
            self.update_epsilon(episode)
            episode_rewards = {'predator': [], 'prey': []}
            episode_losses = {'predator': [], 'prey': []}
            episode_dones = {'predator': 0, 'prey': 0}

            for step in range(self.num_steps):
                # Verifica se ainda há predadores e presas vivos
                if not any(a.breed == 2 and a.is_alive for a in self.env.agents) or not any(a.breed == 0 and a.is_alive for a in self.env.agents):
                    break

                for agent in self.env.agents:
                    if not agent.is_alive:
                        continue

                    # Obtem o estado atual e seleciona a ação
                    state = self.env.render_agent(agent)
                    state_expanded = np.expand_dims(state, 0)
                    model = self.model_nn_prey if agent.breed == 2 else self.model_nn_predator

                    # Seleção de ação com epsilon-greedy
                    if np.random.rand() > self.epsilon:
                        Q_values = model.predict(state_expanded, verbose=0)
                        action = np.argmax(Q_values[0])
                    else:
                        action = np.random.randint(0, 9)

                    # Executa a ação e coleta o feedback
                    next_state, reward, done, feedback = agent.step(action)

                    

                    # Atualiza o buffer de replay
                    buffer = self.replay_buffer_preys if agent.breed == 2 else self.replay_buffer_predators
                    buffer.append((state, action, reward, next_state, done))

                    
                    
                    # Done status
                    if done:
                        agent.target = None
                        agent.is_done = False
                        agent.current_target = None
                        self.env.agents = [a for a in self.env.agents if a.is_alive]

                    # Atualiza métricas de recompensas e estados terminais
                    if agent.breed == 0:  # Predador
                        episode_rewards['predator'].append(reward)
                        episode_dones['predator'] += 1 if done else 0
                    elif agent.breed == 2:  # Presa
                        episode_rewards['prey'].append(reward)
                        episode_dones['prey'] += 1 if done else 0

                    # Exibe detalhes do agente
                    pop_prey, pop_predator, _ = self.env.population_count()
                    print(f"{episode}/{step}: {agent.name}, Pop: {pop_prey}/{pop_predator}, "
                        f"Position: ({agent.x}, {agent.y}), Action: {action}, Reward: {reward}, Done: {done}, Feedback: {feedback}")



            # Treina os modelos no final do episódio
            loss_predators = self.train_model(self.replay_buffer_predators, self.model_nn_predator)
            if loss_predators:
                episode_losses['predator'].append(float(loss_predators.split(":")[1].strip()))
            loss_preys = self.train_model(self.replay_buffer_preys, self.model_nn_prey)
            if loss_preys:
                episode_losses['prey'].append(float(loss_preys.split(":")[1].strip()))

            # Salva as métricas do episódio
            self.save_to_file(episode, step, episode_rewards, episode_losses, self.epsilon)

            # Salva os modelos periodicamente
            if episode == 10 or (episode) % self.save_interval == 0:

                # Nome do arquivo para salvar os pesos do modelo
                file_path_model_prey = os.path.join(self.directory, f'model_{self.task_name}_prey_{episode}.h5')
                # Nome do arquivo para salvar os pesos do modelo
                file_path_model_predator = os.path.join(self.directory, f'model_{self.task_name}_predator_{episode}.h5')
                try:
                    self.model_nn_prey.save_weights(file_path_model_prey)
                    print(f"Pesos do modelo de presa salvos com sucesso em {file_path_model_prey}")
                except Exception as e:
                    print(f"Ocorreu um erro ao salvar os pesos do modelo de presa: {e}")
                try:
                    self.model_nn_predator.save_weights(file_path_model_predator)
                    print(f"Pesos do modelo de presa salvos com sucesso em {file_path_model_predator}")
                except Exception as e:
                    print(f"Ocorreu um erro ao salvar os pesos do modelo de presa: {e}")


    def save_to_file(self, episode, step, episode_rewards, losses, epsilon):

        stats = {}
        for agent_type in ['predator', 'prey']:
            rewards = episode_rewards[agent_type]
            losses_values = losses[agent_type]

            stats[agent_type] = {
                'mean_reward': np.mean(rewards) if rewards else 0,
                'median_reward': np.median(rewards) if rewards else 0,
                'std_reward': np.sum(rewards) if rewards else 0,
                'mean_loss': np.mean(losses_values) if losses_values else 0,
                'median_loss': np.median(losses_values) if losses_values else 0,
                'std_loss': np.sum(losses_values) if losses_values else 0
            }
        total_prey, total_predators, _ = self.env.population_count()
        # Criação de arquivos separados
        for agent_type in ['predator', 'prey']:
            file_name = f"{self.task_name}_{agent_type}.txt"
            with open(file_name, "a") as file:
                file.write(
                    f"Episode: {episode}, step: {step}, Pop: {total_prey}/{total_predators}, "
                    f"Epsilon: {epsilon:.2f}, "
                    f"Mean Loss: {stats[agent_type]['mean_loss']:.2f}, Median Loss: {stats[agent_type]['median_loss']:.2f}, "
                    f"Std Loss: {stats[agent_type]['std_loss']:.2f}, "
                    f"Mean Reward: {stats[agent_type]['mean_reward']:.2f}, "
                    f"Median Reward: {stats[agent_type]['median_reward']:.2f}, "
                    f"Std Reward: {stats[agent_type]['std_reward']:.2f}\n"
                )



### RUN
----------------------

In [15]:
num_episodes = 1200
buffer = 1000
batch = 64

In [None]:
# GRAVAÇÃO: training-radar-dqn-double
# ==========================================

# Definindo o nome da tarefa e o log
task_name = "nn_dqn_double64"
input_shape = (7, 7, 3)
size = 10

# Criando os modelos Double DQN para predadores e presas
model_nn_predator = nn_dqn_double(num_actions=9, input_shape=input_shape)
model_nn_prey = nn_dqn_double(num_actions=9, input_shape=input_shape)

# Sincroniza os pesos com a rede-alvo
model_nn_predator.update_target_network()
model_nn_prey.update_target_network()

# Criando o ambiente
env = Env(sizeX=10, sizeY=10, ray=3)

# Definindo o tamanho do ambiente e número de episódios e passos

num_steps = 10

num_preys = 10
num_predators = 5

num_obstacle = int(env.sizeX * env.sizeX * 0.10)

# Povoando o ambiente com obstáculos
for o in range(num_obstacle):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        obstacle = Obstacle(x, y)
        env.add_obstacle(obstacle)
    else:
        print("Não foi possível encontrar uma posição nova para o obstacle.")

# Povoando o ambiente com presas
for j in range(num_preys):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        prey = Prey(x, y, env, j, model_nn_prey)
        env.add_agent(prey)
    else:
        print("Não foi possível encontrar uma posição nova para o prey.")

# Povoando o ambiente com predadores
for i in range(num_predators):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        predator = Predator(x, y, env, i, model_nn_predator)
        env.add_agent(predator)
    else:
        print("Não foi possível encontrar uma posição nova para o predator.")

# Criando a sessão de treinamento
training_session = TrainingSessionDoubleDQN(
    task_name=task_name,
    env=env,
    model_nn_predator=model_nn_predator,
    model_nn_prey=model_nn_prey,
    num_episodes=num_episodes,
    num_steps=num_steps,
    buffer_capacity=buffer, 
    batch_size=batch
)

# Executando o treinamento
training_session.run()

directory = f"C:/Users/beLIVE/IA/DECISYS/final-models/train"
file_path_model_prey = os.path.join(directory, f'model_{task_name}_prey_1000.h5')
file_path_model_predator = os.path.join(directory, f'model_{task_name}_predator_1000.h5')
model_nn_prey.save_model(file_path_model_prey)
model_nn_predator.save_model(file_path_model_predator)

In [None]:
# GRAVAÇÃO: training-radar-dqn-per
# ==========================================

# Definindo o nome da tarefa e o log
task_name = "nn_dqn_per64"
input_shape = (7, 7, 3)
size = 10

# Modelo Neural utilizado
model_nn_predator = nn_dqn_per()
model_nn_prey = nn_dqn_per()

model_nn_predator.build(input_shape=(None,) + input_shape)
model_nn_prey.build(input_shape=(None,) + input_shape)

# Criando o ambiente
env = Env(sizeX=size, sizeY=size, ray=3)

# Definindo o tamanho do ambiente e número de episódios e passos
num_steps = 10

num_preys = 10
num_predators = 5

num_obstacle = int(env.sizeX * env.sizeX * 0.10)

# Povoando o ambiente com obstáculos
for o in range(num_obstacle):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        obstacle = Obstacle(x, y)
        env.add_obstacle(obstacle)
    else:
        print("Não foi possível encontrar uma posição nova para o obstacle.")

# Povoando o ambiente com presas
for j in range(num_preys):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        prey = Prey(x, y, env, j, model_nn_prey)
        env.add_agent(prey)
    else:
        print("Não foi possível encontrar uma posição nova para o prey.")

# Povoando o ambiente com predadores
for i in range(num_predators):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        predator = Predator(x, y, env, i, model_nn_predator)
        env.add_agent(predator)
    else:
        print("Não foi possível encontrar uma posição nova para o predator.")

training_session = TrainingSessionPER(
    task_name=task_name,
    env=env,
    model_nn_predator=model_nn_predator,
    model_nn_prey=model_nn_prey,
    num_episodes=num_episodes,       # 5000 episódios
    num_steps=10,
    buffer_capacity=buffer,   # Buffer maior para treinos longos
    batch_size=batch,           # Tamanho do minibatch
    alpha=0.3,               # Menor impacto de TD-Errors extremos
    beta_start=0.4,          # Beta inicial
 ) 

# Executando o treinamento
training_session.run()


In [None]:
# GRAVAÇÃO: training-radar-dqn-dueling
# ==========================================

# Definindo o nome da tarefa e o log
task_name = "nn_dqn_dueling64"
input_shape = (7, 7, 3)
size = 10

# Modelo Neural utilizado
model_nn_predator = nn_dqn_dueling()
model_nn_prey = nn_dqn_dueling()

model_nn_predator.build(input_shape=(None,) + input_shape)
model_nn_prey.build(input_shape=(None,) + input_shape)

env = Env(sizeX=size, sizeY=size, ray=3)

# Definindo o tamanho do ambiente e número de episódios e passos
num_steps = 10

num_preys = 10
num_predators = 5

num_obstacle = int(env.sizeX * env.sizeX * 0.10)

# Povoando o ambiente com obstacle
for o in range(num_obstacle):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        obstacle = Obstacle(x, y)
        env.add_obstacle(obstacle)
    else:
        print("Não foi possível encontrar uma posição nova para o obstacle.")

# Povoando o ambiente com presas
for j in range(num_preys):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        prey = Prey(x, y, env, j, model_nn_prey)
        env.add_agent(prey)
    else:
        print("Não foi possível encontrar uma posição nova para o prey.")

# Povoando o ambiente com predadores
for i in range(num_predators):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        predator = Predator(x, y, env, i, model_nn_predator)
        env.add_agent(predator)
    else:
        print("Não foi possível encontrar uma posição nova para o predator.")

# Criando a sessão de treinamento
training_session = TrainingSession(task_name, env, model_nn_predator, model_nn_prey, num_episodes, num_steps, buffer_capacity=buffer, batch_size=batch)

# Executando o treinamento
training_session.run()



In [None]:
# GRAVAÇÃO: training-radar-dqn
# ==========================================

# Definindo o nome da tarefa e o log
task_name = "nn_dqn64"
input_shape = (7, 7, 3)
size = 10

# Modelo Neural utilizado
model_nn_predator = nn_dqn()
model_nn_prey = nn_dqn()

model_nn_predator.build(input_shape=(None,) + input_shape)
model_nn_prey.build(input_shape=(None,) + input_shape)

env = Env(sizeX=size, sizeY=size, ray=3)

# Definindo o tamanho do ambiente e número de episódios e passos
num_steps = 10

num_preys = 10
num_predators = 5

num_obstacle = int(env.sizeX * env.sizeX * 0.10)

# Povoando o ambiente com obstacle
for o in range(num_obstacle):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        obstacle = Obstacle(x, y)
        env.add_obstacle(obstacle)
    else:
        print("Não foi possível encontrar uma posição nova para o obstacle.")

# Povoando o ambiente com presas
for j in range(num_preys):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        prey = Prey(x, y, env, j, model_nn_prey)
        env.add_agent(prey)
    else:
        print("Não foi possível encontrar uma posição nova para o prey.")

# Povoando o ambiente com predadores
for i in range(num_predators):
    pos = env.new_position()
    if pos is not None:
        x, y = pos
        predator = Predator(x, y, env, i, model_nn_predator)
        env.add_agent(predator)
    else:
        print("Não foi possível encontrar uma posição nova para o predator.")

# Criando a sessão de treinamento
training_session = TrainingSession(task_name, env, model_nn_predator, model_nn_prey, num_episodes, num_steps, buffer_capacity=buffer, batch_size=batch)

# Executando o treinamento
training_session.run()