# Q-Learning para Labirintos Individuais

Este notebook implementa um algoritmo Q-Learning otimizado para resolver labirintos individuais. O código treina um agente separado para cada labirinto e então testa seu desempenho, coletando métricas para comparação.

A diferença principal em relação ao algoritmo multi-labirintos é que aqui cada agente é especializado em um único labirinto, o que geralmente resulta em melhor desempenho, mas não generaliza bem para labirintos novos.

## Importações e Configuração

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import time
import os
import csv
import tracemalloc
from collections import deque
import glob
import pickle

# Define as ações: Cima, Baixo, Esquerda, Direita
actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]

# Hiperparâmetros do Q-Learning
learning_rate = 0.3
discount_factor = 0.9
epsilon = 0.1  # Taxa de exploração final
num_episodes_per_maze = 5000  # Episódios por labirinto

## Classe para Representação do Labirinto

In [None]:
class Maze:
    def __init__(self, maze_array):
        self.maze = maze_array
        self.maze_height, self.maze_width = maze_array.shape
        self.start_position = (0, 1)
        self.goal_position = (self.maze_width - 1, self.maze_height - 2)

    def show_maze(self, path=None):
        """Visualização simples do labirinto"""
        plt.figure(figsize=(5, 5))
        plt.imshow(self.maze, cmap='gray')
        plt.text(self.start_position[0], self.start_position[1], 'S', ha='center', va='center', color='red', fontsize=10)
        plt.text(self.goal_position[0], self.goal_position[1], 'G', ha='center', va='center', color='green', fontsize=10)

        if path:
            for position in path:
                plt.text(position[0], position[1], "#", ha='center', va='center', color='blue', fontsize=8)

        plt.xticks([]), plt.yticks([])
        plt.show()

## Agente Q-Learning para Labirinto Único

Esta implementação do agente Q-Learning é otimizada para treinar em um único labirinto, mantendo uma Q-table específica para o tamanho desse labirinto.

In [None]:
class QLearningAgent:
    def __init__(self, state_shape, num_actions=4, learning_rate=0.1, discount_factor=0.9,
                 exploration_start=1.0, exploration_end=0.01, num_episodes=100):
        # Inicializa a Q-table para o tamanho específico do labirinto
        self.q_table = np.zeros((state_shape[0], state_shape[1], num_actions))
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_start = exploration_start
        self.exploration_end = exploration_end
        self.num_episodes = num_episodes
        self.num_actions = num_actions

        # Métricas de treinamento
        self.training_time_ms = 0
        self.training_episodes = 0
        self.total_updates = 0

    def get_exploration_rate(self, current_episode):
        """Calcula a taxa de exploração decrescente ao longo do tempo"""
        exploration_rate = self.exploration_start * (self.exploration_end / self.exploration_start) ** (current_episode / self.num_episodes)
        return exploration_rate

    def get_action(self, state, maze, current_episode, train=True):
        """Seleciona uma ação usando política epsilon-greedy durante treinamento ou greedy durante teste"""
        if train:
            exploration_rate = self.get_exploration_rate(current_episode)
            if np.random.rand() < exploration_rate:
                return np.random.randint(self.num_actions)
            else:
                return np.argmax(self.q_table[state])
        else:  # Durante o teste, sempre use a melhor ação conhecida
            return np.argmax(self.q_table[state])

    def update_q_table(self, state, action, next_state, reward):
        """Atualiza a Q-table usando a equação de Bellman"""
        best_next_action = np.argmax(self.q_table[next_state])
        current_q_value = self.q_table[state][action]
        new_q_value = current_q_value + self.learning_rate * (
            reward + self.discount_factor * self.q_table[next_state][best_next_action] - current_q_value
        )
        self.q_table[state][action] = new_q_value

## Funções para Execução de Episódios e Treinamento

In [None]:
def finish_episode(agent, maze, current_episode, train=True, goal_reward=100, wall_penalty=-10, step_penalty=-1):
    """Executa um episódio completo e retorna métricas"""
    current_state = maze.start_position
    is_done = False
    episode_reward = 0
    episode_step = 0
    path = [current_state]
    nos_visitados = set()
    nos_visitados.add(current_state)

    # Inicia medição de memória e tempo
    tracemalloc.start()
    tempo_inicio = time.perf_counter()

    while not is_done:
        action = agent.get_action(current_state, maze, current_episode, train=train)
        next_state_coords = actions[action]
        next_state = (current_state[0] + next_state_coords[0], current_state[1] + next_state_coords[1])

        # Verifica se está fora dos limites ou bateu na parede
        if (next_state[0] < 0 or next_state[0] >= maze.maze_width or
            next_state[1] < 0 or next_state[1] >= maze.maze_height or
            maze.maze[next_state[1]][next_state[0]] == 0):  # 0 é parede nos .npy
            reward = wall_penalty
            next_state = current_state  # Permanece no estado atual
        # Verifica se chegou ao objetivo
        elif next_state == maze.goal_position:
            path.append(next_state)
            nos_visitados.add(next_state)
            reward = goal_reward
            is_done = True
        # Deu mais um passo, mas não chegou ao objetivo
        else:
            path.append(next_state)
            nos_visitados.add(next_state)
            reward = step_penalty

        episode_reward += reward
        episode_step += 1

        if train:
            agent.update_q_table(current_state, action, next_state, reward)

        current_state = next_state

        # Limite de passos para evitar loops infinitos
        if episode_step > 10000:
            is_done = True
            if path[-1] != maze.goal_position:
                path = []  # Zera o caminho se não chegou ao objetivo

    # Finaliza medição de memória e tempo
    tempo_fim = time.perf_counter()
    memoria_usada = tracemalloc.get_traced_memory()[1]
    tracemalloc.stop()

    return {
        "caminho": path,
        "nos_visitados": list(nos_visitados),
        "tempo_ms": (tempo_fim - tempo_inicio) * 1000,
        "memoria_bytes": memoria_usada,
        "comprimento": len(path) if path and path[-1] == maze.goal_position else 0,
        "qtd_nos_visitados": len(nos_visitados),
        "encontrou_caminho": path and path[-1] == maze.goal_position
    }

def treinar_agente_em_labirinto_unico(agente, maze, num_episodes):
    """Treina o agente em um único labirinto por um número de episódios"""
    print(f"Iniciando treinamento em um único labirinto ({maze.maze_height}x{maze.maze_width})...")
    tempo_inicio_treino = time.perf_counter()

    agente.training_episodes = num_episodes
    total_updates = 0

    for episodio in range(num_episodes):
        if (episodio + 1) % 100 == 0:
            print(f"  Episódio de Treinamento {episodio + 1}/{num_episodes}")
        resultado = finish_episode(agente, maze, episodio, train=True)
        total_updates += len(resultado["caminho"])

    tempo_fim_treino = time.perf_counter()
    agente.training_time_ms = (tempo_fim_treino - tempo_inicio_treino) * 1000
    agente.total_updates = total_updates

    print(f"Treinamento concluído em {agente.training_time_ms/1000:.2f} segundos!")
    print(f"Total de episódios: {agente.training_episodes}")
    print(f"Total de atualizações da Q-table: {agente.total_updates}")

## Carregamento e Visualização de Labirintos

In [None]:
def carregar_labirintos(diretorio):
    """Carrega todos os labirintos de um diretório"""
    labirintos = []
    arquivos = sorted(glob.glob(os.path.join(diretorio, "*.npy")))

    for arquivo in arquivos:
        labirinto_array = np.load(arquivo)
        labirintos.append(Maze(labirinto_array))

    return labirintos

def get_dynamic_figsize(width_data, height_data, min_inches=8, max_inches=20):
    """Calcula o tamanho dinâmico da figura com base nas dimensões dos dados"""
    dpi = plt.rcParams.get('figure.dpi', 100.0)

    ideal_w_inches = width_data / dpi
    ideal_h_inches = height_data / dpi

    final_w_inches = max(min_inches, min(max_inches, ideal_w_inches))
    final_h_inches = max(min_inches, min(max_inches, ideal_h_inches))

    # Ajustes para manter a proporção adequada
    if (ideal_w_inches > max_inches or ideal_h_inches > max_inches) and not (ideal_w_inches > max_inches and ideal_h_inches > max_inches):
        if ideal_w_inches > ideal_h_inches:
            ratio = height_data / width_data
            final_h_inches = final_w_inches * ratio
        elif ideal_h_inches > ideal_w_inches:
            ratio = width_data / height_data
            final_w_inches = final_h_inches * ratio

        final_w_inches = max(min_inches, final_w_inches)
        final_h_inches = max(min_inches, final_h_inches)

    elif ideal_w_inches < min_inches and ideal_h_inches < min_inches and ideal_w_inches != ideal_h_inches:
        if ideal_w_inches > ideal_h_inches:
            ratio = ideal_h_inches / ideal_w_inches
            final_h_inches = final_w_inches * ratio
        else:
            ratio = ideal_w_inches / ideal_h_inches
            final_w_inches = final_h_inches * ratio

    return (final_w_inches, final_h_inches)

def visualizar_caminho_qlearning(maze, caminho, nos_visitados, title_suffix="", save_path=None):
    """Visualiza o labirinto com o caminho encontrado e nós visitados. Opcionalmente salva a figura."""
    visualizacao = np.array(maze.maze.copy(), dtype=int)

    # Marcar nós visitados com valor 3
    for x, y in nos_visitados:
        if visualizacao[y][x] == 1:  # Só marca se for um espaço vazio
            visualizacao[y][x] = 3

    # Marcar caminho encontrado com valor 2
    if caminho and caminho[-1] == maze.goal_position:
        for x, y in caminho:
            visualizacao[y][x] = 2

    # Tamanho dinâmico da figura
    figsize = get_dynamic_figsize(maze.maze_width, maze.maze_height)
    plt.figure(figsize=figsize)

    # Mostrar o labirinto com os caminhos
    plt.imshow(visualizacao, cmap='viridis', vmin=0, vmax=3, interpolation='nearest')
    plt.axis('off')
    plt.title(f"Caminho e Nós Visitados (Q-Learning) {title_suffix}")

    # Marcar início e fim com símbolos mais visíveis
    plt.text(maze.start_position[0], maze.start_position[1], "S",
             ha='center', va='center', color='white', fontsize=12, fontweight='bold')
    plt.text(maze.goal_position[0], maze.goal_position[1], "G",
             ha='center', va='center', color='white', fontsize=12, fontweight='bold')

    # Adicionar legenda
    legenda = [
        mpatches.Patch(color=plt.colormaps['viridis'](0/3), label='Parede (0)'),
        mpatches.Patch(color=plt.colormaps['viridis'](1/3), label='Espaço livre (1)'),
        mpatches.Patch(color=plt.colormaps['viridis'](2/3), label='Caminho final (2)'),
        mpatches.Patch(color=plt.colormaps['viridis'](3/3), label='Nós visitados (3)')
    ]
    plt.legend(handles=legenda, loc='upper left', bbox_to_anchor=(1.01, 1.0))
    plt.tight_layout(rect=[0, 0, 0.85, 1])

    # Salvar a figura se um caminho for fornecido
    if save_path:
        diretorio = os.path.dirname(save_path)
        if diretorio:
            os.makedirs(diretorio, exist_ok=True)
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"Figura salva em: {save_path}")

    plt.show()

    # Informações adicionais sobre o resultado
    encontrou_caminho = caminho and caminho[-1] == maze.goal_position
    print(f"Caminho encontrado: {encontrou_caminho}")
    if encontrou_caminho:
        print(f"Comprimento do caminho: {len(caminho)}")
    print(f"Número de nós visitados: {len(nos_visitados)}")

## Função Principal: Treinar e Avaliar em Labirintos Individuais

Esta função treina um agente específico para cada labirinto e coleta métricas de desempenho.

In [None]:
def comparar_qlearning_em_labirintos_individuais(diretorio_labirintos, num_episodes_per_maze,
                                        salvar_qtables=True, salvar_visualizacoes=True):
    """Treina e avalia um agente específico para cada labirinto"""
    labirintos = carregar_labirintos(diretorio_labirintos)
    all_results = []

    # Configuração dos diretórios de saída
    csv_output_dir = "resultados/q_learning"
    os.makedirs(csv_output_dir, exist_ok=True)

    csv_output_name = f"{csv_output_dir}/q_learning_individual_maze_baseline.csv"

    # Diretório para salvar Q-tables
    q_table_dir = "q_tables"
    if salvar_qtables:
        os.makedirs(q_table_dir, exist_ok=True)

    # Diretório para salvar visualizações
    visualizacoes_dir = f"{csv_output_dir}/visualizacoes"
    if salvar_visualizacoes:
        os.makedirs(visualizacoes_dir, exist_ok=True)

    with open(csv_output_name, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([
            "algoritmo", "tamanho_labirinto", "indice_labirinto",
            "tempo_execucao_ms", "nos_visitados", "encontrou_caminho",
            "comprimento_caminho", "memoria_kb",
            "tempo_treinamento_ms", "episodios_treinamento",
            "taxa_aprendizado", "fator_desconto",
            "exploracao_inicial", "exploracao_final",
            "total_atualizacoes"
        ])

        for idx, maze in enumerate(labirintos):
            maze_name = f"Labirinto {idx+1}/{len(labirintos)}"
            print(f"\n--- Processando {maze_name} ({maze.maze_height}x{maze.maze_width}) ---")

            # 1. Criar um agente específico para este labirinto
            agente_individual = QLearningAgent(
                state_shape=(maze.maze_height, maze.maze_width),
                learning_rate=learning_rate,
                discount_factor=discount_factor,
                exploration_start=1.0,
                exploration_end=epsilon,
                num_episodes=num_episodes_per_maze
            )

            # 2. Treinar o agente neste labirinto
            treinar_agente_em_labirinto_unico(agente_individual, maze, num_episodes=num_episodes_per_maze)

            # 3. Testar o agente no mesmo labirinto
            print(f"Testando agente em {maze_name} (labirinto de treinamento)...")
            resultado_teste = finish_episode(agente_individual, maze, 0, train=False)

            # 4. Caminho para salvar a visualização, se necessário
            save_path = None
            if salvar_visualizacoes:
                save_path = f"{visualizacoes_dir}/maze_{idx+1}_qlearning.png"

            # 5. Visualizar resultados
            visualizar_caminho_qlearning(maze, resultado_teste["caminho"],
                                         resultado_teste["nos_visitados"],
                                         title_suffix=f" no {maze_name}",
                                         save_path=save_path)

            # 6. Coletar e salvar os resultados no CSV
            row_data = [
                "Q-Learning_Baseline",
                f"{maze.maze_height}x{maze.maze_width}",
                idx + 1,
                round(resultado_teste["tempo_ms"], 2),
                resultado_teste["qtd_nos_visitados"],
                resultado_teste["encontrou_caminho"],
                resultado_teste["comprimento"],
                round(resultado_teste["memoria_bytes"] / 1024, 2),  # em KB
                round(agente_individual.training_time_ms, 2),
                agente_individual.training_episodes,
                agente_individual.learning_rate,
                agente_individual.discount_factor,
                agente_individual.exploration_start,
                agente_individual.exploration_end,
                agente_individual.total_updates
            ]
            writer.writerow(row_data)
            all_results.append(row_data)

            # 7. Salvar a Q-table para uso futuro
            if salvar_qtables:
                q_table_filename = f"{q_table_dir}/q_table_maze_{idx+1}_alpha{agente_individual.learning_rate}_gamma{agente_individual.discount_factor}_eps{agente_individual.exploration_end}_ep{agente_individual.training_episodes}.pkl"

                with open(q_table_filename, 'wb') as qtable_file:
                    pickle.dump(agente_individual.q_table, qtable_file)
                print(f"Q-table salva em: {q_table_filename}")

    print(f"\n✅ Todos os resultados salvos em: {csv_output_name}")
    return all_results

## Execução do Experimento

Execute esta célula para treinar e testar o algoritmo Q-Learning em cada labirinto individualmente.

In [None]:
# Configurações do experimento
diretorio_labirintos = "labirintos/pequenos"  # Diretório dos labirintos
salvar_qtables = False  # Define se as Q-tables serão salvas
salvar_visualizacoes = False  # Define se as visualizações serão salvas

# Para executar com menos episódios durante testes, descomente e ajuste a linha abaixo
# num_episodes_per_maze = 500  # Valor menor para testes rápidos

print(f"Iniciando experimento com labirintos do diretório: {diretorio_labirintos}")
print(f"Número de episódios por labirinto: {num_episodes_per_maze}")
print(f"Taxa de aprendizado (alpha): {learning_rate}")
print(f"Fator de desconto (gamma): {discount_factor}")
print(f"Taxa de exploração final (epsilon): {epsilon}")
print(f"Salvar Q-tables: {salvar_qtables}")
print(f"Salvar visualizações: {salvar_visualizacoes}")

# Execute a função para treinar e testar em cada labirinto individualmente
individual_baseline_results = comparar_qlearning_em_labirintos_individuais(
    diretorio_labirintos,
    num_episodes_per_maze,
    salvar_qtables=salvar_qtables,
    salvar_visualizacoes=salvar_visualizacoes
)

print("\nProcesso concluído.")