In [None]:
# 1) Instalação de dependências
!pip -q install -q gym pyvirtualdisplay tqdm
!apt-get update -qq && apt-get install -y xvfb ffmpeg

In [None]:
import numpy as np
np.bool8 = np.bool_

import gym
from collections import defaultdict
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML
import itertools
import pandas as pd
from tqdm.notebook import tqdm

In [None]:
# 2) Configurar display virtual para render no Colab
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

In [None]:
# 3) Criar ambiente para teste
env = gym.make('CartPole-v1')

In [None]:
# 4) Discretização do espaço de estados contínuo
def create_bins():
    # Definimos bins para cada dimensão do espaço de observações
    bins = [
        np.linspace(-4.8, 4.8, 20),      # para a posição do carrinho
        np.linspace(-5, 5, 20),          # para a velocidade do carrinho
        np.linspace(-0.418, 0.418, 20),  # para o ângulo do bastão em radianos
        np.linspace(-5, 5, 20)           # para a velocidade angular do bastão
    ]
    return bins

# Função para discretizar uma observação contínua
def discretize(observation, bins):
    """
    Converte um estado contínuo em um estado discreto usando bins predefinidos.

    Args:
        observation: Array com 4 valores contínuos [posição, velocidade, ângulo, velocidade_angular]
        bins: Lista de bins para cada dimensão

    Returns:
        Tupla com valores discretizados
    """
    return tuple(int(np.digitize(observation[i], bins[i])) for i in range(len(observation)))

In [None]:
# 5) Função para escolher ação usando política epsilon-greedy
def choose_action(state, Q, epsilon, action_space):
    """
    Seleciona uma ação usando a política epsilon-greedy.

    Args:
        state: Estado discretizado atual
        Q: Tabela Q
        epsilon: Probabilidade de exploração
        action_space: Espaço de ações do ambiente

    Returns:
        Ação escolhida (0 ou 1)
    """
    if np.random.random() < epsilon:
        # Exploração: escolhe uma ação aleatória
        return action_space.sample()
    else:
        # Exploração: escolhe a ação com maior valor Q
        return np.argmax(Q[state])

In [None]:
# 6) Implementação do Q-learning
def q_learning(env, params, verbose=False):
    """
    Implementa o algoritmo Q-learning para o problema CartPole

    Args:
        env: Ambiente Gym
        params: Dicionário com os parâmetros alpha, gamma, epsilon_initial,
                epsilon_decay, epsilon_min, num_episodes
        verbose: Flag para imprimir informações durante o treinamento

    Returns:
        Q: Tabela Q treinada
        stats: Dicionário com estatísticas do treinamento
    """
    # Extrair parâmetros
    alpha = params['alpha']
    gamma = params['gamma']
    epsilon = params['epsilon_initial']
    epsilon_decay = params['epsilon_decay']
    epsilon_min = params['epsilon_min']
    num_episodes = params['num_episodes']

    # Criar bins para discretização
    bins = create_bins()

    # Inicialização da tabela Q
    Q = defaultdict(lambda: np.zeros(env.action_space.n))

    # Lista para acompanhar o desempenho
    episode_rewards = []
    episode_lengths = []

    # Loop principal de treinamento
    for episode in range(1, num_episodes + 1):
        # Reset do ambiente e do estado inicial
        reset_output = env.reset()
        if isinstance(reset_output, tuple):
            state_obs, _ = reset_output
        else:
            state_obs = reset_output

        # Discretiza o estado inicial
        state = discretize(state_obs, bins)

        # Inicialização de variáveis para o episódio atual
        total_reward = 0
        done = False
        steps = 0

        # Loop até o fim do episódio
        while not done:
            # Escolhe uma ação usando epsilon-greedy
            action = choose_action(state, Q, epsilon, env.action_space)

            # Executa a ação no ambiente
            step_out = env.step(action)
            if len(step_out) == 5:
                next_state_obs, reward, terminated, truncated, _ = step_out
                done = terminated or truncated
            else:
                next_state_obs, reward, done, _ = step_out

            # Discretiza o próximo estado
            next_state = discretize(next_state_obs, bins)

            # Atualiza a tabela Q usando a equação de Bellman
            # Q(s,a) = Q(s,a) + α[r + γ max Q(s',a') - Q(s,a)]
            best_next_action = np.argmax(Q[next_state])
            td_target = reward + gamma * Q[next_state][best_next_action]
            td_error = td_target - Q[state][action]
            Q[state][action] += alpha * td_error

            # Atualiza o estado atual
            state = next_state
            total_reward += reward
            steps += 1

        # Decaimento do epsilon após cada episódio
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # Armazena métricas para análise
        episode_rewards.append(total_reward)
        episode_lengths.append(steps)

        # Exibe progresso a cada 100 episódios se verbose=True
        if verbose and episode % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episódio {episode}/{num_episodes}, Recompensa Média: {avg_reward:.2f}, Epsilon: {epsilon:.4f}")

    # Cálculo de estatísticas finais
    stats = {
        'avg_reward_last_100': np.mean(episode_rewards[-100:]),
        'max_episode_length': max(episode_lengths),
        'episode_rewards': episode_rewards,
        'episode_lengths': episode_lengths
    }

    return Q, stats

In [None]:
# 7) Função para avaliar a política aprendida
def evaluate_policy(env, Q, num_eval_episodes=10):
    """
    Avalia a política aprendida em novos episódios

    Args:
        env: Ambiente Gym
        Q: Tabela Q treinada
        num_eval_episodes: Número de episódios para avaliação

    Returns:
        float: Recompensa média nos episódios de avaliação
    """
    bins = create_bins()
    eval_rewards = []

    for _ in range(num_eval_episodes):
        reset_output = env.reset()
        if isinstance(reset_output, tuple):
            obs, _ = reset_output
        else:
            obs = reset_output
        state = discretize(obs, bins)
        done = False
        total_reward = 0

        while not done:
            action = np.argmax(Q[state])  # Política gulosa (sem exploração)
            step_out = env.step(action)
            if len(step_out) == 5:
                obs, reward, terminated, truncated, _ = step_out
                done = terminated or truncated
            else:
                obs, reward, done, _ = step_out
            state = discretize(obs, bins)
            total_reward += reward

        eval_rewards.append(total_reward)

    return np.mean(eval_rewards)

In [None]:
# 8) Função para gerar animação da política treinada
def visualize_policy(env, Q):
    """
    Cria uma animação da política aprendida

    Args:
        env: Ambiente Gym
        Q: Tabela Q treinada

    Returns:
        HTML: Animação da execução da política
    """
    bins = create_bins()
    frames = []
    reset_output = env.reset()
    if isinstance(reset_output, tuple):
        obs, _ = reset_output
    else:
        obs = reset_output
    state = discretize(obs, bins)
    done = False
    total_reward = 0

    while not done:
        frames.append(env.render(mode='rgb_array'))
        action = np.argmax(Q[state])  # Política gulosa (sem exploração)
        step_out = env.step(action)
        if len(step_out) == 5:
            obs, reward, terminated, truncated, _ = step_out
            done = terminated or truncated
        else:
            obs, reward, done, _ = step_out
        state = discretize(obs, bins)
        total_reward += reward

    print(f"Avaliação: Recompensa total = {total_reward}")

    # Criação da animação
    fig = plt.figure(figsize=(8, 6))
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])
        return (patch,)

    anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=50)
    return HTML(anim.to_jshtml())

In [None]:
# 9) Função para testar múltiplas combinações de parâmetros
def grid_search_parameters(param_grid, num_episodes=500, num_eval_episodes=5):
    """
    Realiza uma busca em grade para encontrar os melhores parâmetros

    Args:
        param_grid: Dicionário com listas de valores para cada parâmetro
        num_episodes: Número de episódios para treinamento
        num_eval_episodes: Número de episódios para avaliação

    Returns:
        pd.DataFrame: DataFrame com os resultados da busca
        dict: Melhores parâmetros encontrados
        object: Tabela Q treinada com os melhores parâmetros
    """
    # Criar todas as combinações possíveis de parâmetros
    param_keys = list(param_grid.keys())
    param_values = list(param_grid.values())
    param_combinations = list(itertools.product(*param_values))

    results = []
    best_reward = -np.inf
    best_params = None
    best_Q = None

    # Testar cada combinação de parâmetros
    print(f"Testando {len(param_combinations)} combinações de parâmetros...")

    for i, values in enumerate(tqdm(param_combinations)):
        params = {
            'num_episodes': num_episodes,
            **{param_keys[j]: values[j] for j in range(len(param_keys))}
        }

        # Treinar o agente com os parâmetros atuais
        Q, stats = q_learning(env, params)

        # Avaliar a política aprendida
        eval_reward = evaluate_policy(env, Q, num_eval_episodes)

        # Registrar resultados
        result = {
            **params,
            'avg_reward_training': stats['avg_reward_last_100'],
            'avg_reward_eval': eval_reward
        }
        results.append(result)

        # Atualizar os melhores parâmetros se necessário
        if eval_reward > best_reward:
            best_reward = eval_reward
            best_params = params.copy()
            best_Q = Q

    # Criar DataFrame com os resultados
    results_df = pd.DataFrame(results)

    # Ordenar por desempenho de avaliação
    results_df = results_df.sort_values('avg_reward_eval', ascending=False).reset_index(drop=True)

    return results_df, best_params, best_Q

In [None]:
# 10) Definição dos parâmetros a serem testados
param_grid = {
    'alpha': [0.1, 0.2],  # Taxa de aprendizado
    'gamma': [0.95, 0.99, 0.999],  # Fator de desconto
    'epsilon_initial': [1.0, 5.0],  # Valor inicial para exploração
    'epsilon_decay': [0.99, 0.995],  # Taxa de decaimento do epsilon
    'epsilon_min': [0.001, 0.01]  # Valor mínimo do epsilon
}

In [None]:
# 11) Executar a busca de parâmetros
results_df, best_params, best_Q = grid_search_parameters(
    param_grid,
    num_episodes=3000,  # Reduzido para testes mais rápidos
    num_eval_episodes=5
)

In [None]:
# 12) Mostrar os melhores resultados
print("\nMelhores Parâmetros Encontrados:")
for param, value in best_params.items():
    print(f"{param}: {value}")

print(f"\nMelhor Recompensa Média em Avaliação: {results_df.iloc[0]['avg_reward_eval']:.2f}")

In [None]:
# 13) Visualizar os resultados em um gráfico
plt.figure(figsize=(10, 6))
plt.bar(range(min(10, len(results_df))), results_df['avg_reward_eval'].head(10))
plt.xticks(range(min(10, len(results_df))), [f"Config {i+1}" for i in range(min(10, len(results_df)))])
plt.title('Top 10 Configurações de Parâmetros')
plt.xlabel('Configuração')
plt.ylabel('Recompensa Média em Avaliação')
plt.tight_layout()
plt.show()

In [None]:
# 14) Treinar o modelo final com os melhores parâmetros
print("\nTreinando o modelo final com os melhores parâmetros...")
final_params = best_params.copy()
final_params['num_episodes'] = 3000  # Mais episódios para o modelo final
final_Q, final_stats = q_learning(env, final_params, verbose=True)

In [None]:
# 15) Plotar resultados do treinamento final
plt.figure(figsize=(12, 10))

plt.subplot(211)
plt.plot(final_stats['episode_rewards'])
plt.title('Recompensas por Episódio (Melhores Parâmetros)')
plt.xlabel('Episódio')
plt.ylabel('Recompensa Total')

# Média móvel de 100 episódios
window_size = 100
moving_avg = np.convolve(final_stats['episode_rewards'], np.ones(window_size)/window_size, mode='valid')
plt.plot(range(window_size-1, len(final_stats['episode_rewards'])), moving_avg, 'r-', linewidth=2)
plt.legend(['Recompensa por Episódio', 'Média Móvel (100 episódios)'])

plt.subplot(212)
plt.plot(final_stats['episode_lengths'])
plt.title('Duração dos Episódios (Melhores Parâmetros)')
plt.xlabel('Episódio')
plt.ylabel('Número de Passos')

# Média móvel de 100 episódios para duração
moving_avg_length = np.convolve(final_stats['episode_lengths'], np.ones(window_size)/window_size, mode='valid')
plt.plot(range(window_size-1, len(final_stats['episode_lengths'])), moving_avg_length, 'r-', linewidth=2)
plt.legend(['Duração por Episódio', 'Média Móvel (100 episódios)'])

plt.tight_layout()
plt.show()

In [None]:
# 16) Visualizar a política final
print("\nVisualizando a política final treinada...")
animation_html = visualize_policy(env, final_Q)
display(animation_html)

In [None]:
# 17) Salvar os resultados
results_df.to_csv('parameter_search_results.csv', index=False)
print("Resultados da busca de parâmetros salvos em 'parameter_search_results.csv'")

In [None]:
# 18) Exibir tabela com as 5 melhores configurações
print("\nTop 5 Melhores Configurações:")
display(results_df.head(5))

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.14).
0 upgraded, 0 newly installed, 0 to remove and 91 not upgraded.
Testando 48 combinações de parâmetros...


  deprecation(
  deprecation(


  0%|          | 0/48 [00:00<?, ?it/s]