In [2]:
# Importar librerias

import gymnasium as gym
import numpy as np

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from IPython.display import clear_output 

In [3]:
env = gym.make("LunarLander-v3",render_mode="human")

In [4]:
env.reset()

(array([-0.00279675,  1.4215324 , -0.28330046,  0.47166222,  0.00324757,
         0.06417184,  0.        ,  0.        ], dtype=float32),
 {})

In [None]:
# Play without training
env.reset()
steps = 0
G = 0
done = False

#while not done:
for i in range(300):
    action = env.action_space.sample()
    state, reward, done, truncated,info = env.step(action)
    
    G += reward
    steps += 1
    
print("Timesteps taken: {}".format(steps))
print("Total Reward: {}".format(G))

Timesteps taken: 300
Total Reward: -22528.48669500898


: 

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
import random
from collections import deque

In [None]:
# Hiperparámetros del agente y del entorno
EPOCHS = 20000  # Número máximo de episodios de entrenamiento
MAX_STEPS = 1000  # Máximo de pasos por episodio
BATCH_SIZE = 64  # Tamaño del lote para el entrenamiento
GAMMA = 0.99  # Factor de descuento para recompensas futuras
LEARNING_RATE = 0.001  # Tasa de aprendizaje de la red
EPSILON_DECAY = 0.995  # Factor de decaimiento de epsilon (exploración)
EPSILON_MIN = 0.01  # Valor mínimo de epsilon
EPSILON = 1.0  # Valor inicial de epsilon (exploración total)
UPDATE_TARGET_NETWORK = 1000  # Frecuencia de actualización de la red objetivo
UPDATE_AFTER_ACTIONS = 4  # Frecuencia de entrenamiento de la red principal
MAX_MEMORY_LENGTH = 100000  # Capacidad máxima del buffer de experiencia

# Crear el entorno de LunarLander
env = gym.make("LunarLander-v3", render_mode="none")

# Definir la arquitectura de la red neuronal Q (Q-network)
def build_model(state_shape, action_size):
    # Red neuronal simple con dos capas ocultas
    model = tf.keras.Sequential([
        layers.Dense(64, activation='relu', input_shape=state_shape),
        layers.Dense(64, activation='relu'),
        layers.Dense(action_size, activation='linear')  # Salida: un valor Q por acción
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE), loss='mse')
    return model

# Inicializar las redes principal y objetivo
state_shape = (env.observation_space.shape[0],)  # Dimensión del estado
action_size = env.action_space.n  # Número de acciones posibles
model = build_model(state_shape, action_size)  # Red principal
model_target = build_model(state_shape, action_size)  # Red objetivo
model_target.set_weights(model.get_weights())  # Sincronizar pesos al inicio

# Inicializar el buffer de experiencia (replay buffer)
replay_buffer = deque(maxlen=MAX_MEMORY_LENGTH)

# Variables para el entrenamiento
episode_count = 0  # Contador de episodios
frame_count = 0  # Contador de pasos totales
episode_reward_history = []  # Historial de recompensas por episodio

# Bucle principal de entrenamiento
while episode_count < EPOCHS:
    state, info = env.reset()  # Reiniciar el entorno y obtener el estado inicial
    state = np.array(state)
    episode_reward = 0  # Recompensa acumulada en el episodio
    done = False

    # Un episodio completo
    for timestep in range(MAX_STEPS):
        # Selección de acción: política epsilon-greedy
        # Durante las primeras iteraciones, forzamos exploración; luego, exploramos con probabilidad epsilon
        if frame_count < EPSILON_DECAY * EPOCHS or EPSILON > np.random.rand(1)[0]:
            action = np.random.choice(action_size)  # Exploración: acción aleatoria
        else:
            # Explotación: elegir la mejor acción según la red principal
            state_tensor = tf.convert_to_tensor(state, dtype=tf.float32)
            state_tensor = tf.expand_dims(state_tensor, 0)  # Añadir dimensión batch
            action_values = model(state_tensor, training=False)
            action = tf.argmax(action_values[0]).numpy()

        # Ejecutar la acción en el entorno
        next_state, reward, done, truncated, _ = env.step(action)
        next_state = np.array(next_state)
        episode_reward += reward  # Acumular recompensa

        # Guardar la experiencia en el buffer de repetición
        replay_buffer.append((state, action, reward, next_state, done))

        state = next_state  # Actualizar el estado actual
        frame_count += 1  # Incrementar el contador de pasos

        # Entrenar la red principal si hay suficientes muestras y toca entrenar
        if len(replay_buffer) > BATCH_SIZE and frame_count % UPDATE_AFTER_ACTIONS == 0:
            # Seleccionar un lote aleatorio del buffer
            batch = random.sample(replay_buffer, BATCH_SIZE)
            states, actions, rewards, next_states, dones = zip(*batch)

            states = np.array(states)
            next_states = np.array(next_states)
            actions = np.array(actions)
            rewards = np.array(rewards)
            dones = np.array(dones, dtype=float)

            # Calcular los valores Q futuros usando la red objetivo
            future_rewards = model_target.predict(next_states)
            # Q-valor objetivo: recompensa inmediata + valor futuro descontado (si no ha terminado)
            updated_q_values = rewards + GAMMA * np.max(future_rewards, axis=1) * (1 - dones)

            # Calcular los Q-valores actuales para las acciones tomadas
            with tf.GradientTape() as tape:
                q_values = model(states)
                action_mask = tf.one_hot(actions, action_size)  # Enmascarar la acción tomada
                q_action = tf.reduce_sum(q_values * action_mask, axis=1)

                # Calcular la pérdida (error cuadrático medio)
                loss = tf.reduce_mean(tf.square(updated_q_values - q_action))

            # Aplicar los gradientes para actualizar la red principal
            grads = tape.gradient(loss, model.trainable_variables)
            model.optimizer.apply_gradients(zip(grads, model.trainable_variables))

            # Mostrar información de entrenamiento cada 100 pasos
            if frame_count % 100 == 0:
                print(f"Frame {frame_count} | Loss: {loss.numpy():.4f} | Epsilon: {EPSILON:.2f}")

        # Actualizar la red objetivo cada cierto número de pasos
        if frame_count % UPDATE_TARGET_NETWORK == 0:
            model_target.set_weights(model.get_weights())

        # Reducir epsilon (menos exploración con el tiempo, pero nunca menor que EPSILON_MIN)
        EPSILON = max(EPSILON * EPSILON_DECAY, EPSILON_MIN)

        # Si el episodio termina (por done o truncado), salir del bucle
        if done or truncated:
            break

    # Guardar la recompensa del episodio y calcular la media móvil
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        episode_reward_history.pop(0)
    running_reward = np.mean(episode_reward_history)

    # Mostrar información cada 100 episodios
    if episode_count % 100 == 0:
        print(f"Episode {episode_count} | Episode Reward: {episode_reward} | Running Reward: {running_reward}")

    # Comprobar si la tarea se considera resuelta
    if running_reward > 200:
        print(f"Solved at episode {episode_count}!")
        break

    episode_count += 1  # Siguiente episodio