# **Lunar Lander con Q-Learning**

### **1. Bibliotecas**

In [1]:
import sys
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random

### **2. Jugando a mano**

A continuación se puede jugar un episodio del lunar lander. Se controlan los motores con el teclado. Notar que solo se puede realizar una acción a la vez (que es parte del problema), y que en esta implementación, izq toma precedencia sobre derecha, que toma precedencia sobre el motor principal.

In [2]:
import pygame
from pygame.locals import *

# Inicializar pygame (para el contro|l con el teclado) y el ambiente
pygame.init()
env = gym.make('LunarLander-v2', render_mode='human')
env.reset()
pygame.display.set_caption('Lunar Lander')

clock = pygame.time.Clock()
done = False

while not done:
    for event in pygame.event.get():
        if event.type == QUIT:
            done = True
            break

    keys = pygame.key.get_pressed()

    # Map keys to actions
    if keys[K_LEFT]:
        action = 3  # Fire left orientation engine
    elif keys[K_RIGHT]:
        action = 1 # Fire right orientation engine
    elif keys[K_UP]:
        action = 2  # Fire main engine
    else:
        action = 0  # Do nothing

    _, _, terminated, truncated, _ = env.step(action)
    env.render()
    clock.tick(10)

    if terminated or truncated:
        done = True

env.close()
pygame.quit()

## **3. Discretizando el estado**

El estado consiste de posiciones y velocidades en (x,y,theta) y en información de contacto de los pies con la superficie.

Como varios de estos son continuos, tenemos que discretizarlos para aplicar nuestro algoritmo de aprendizaje por refuerzo tabular.

In [3]:
# Cuántos bins queremos por dimensión
# Pueden considerar variar este parámetro
bins_per_dim = 20

# Estado:
# (x, y, x_vel, ely_v, theta, theta_vel, pie_izq_en_contacto, pie_derecho_en_contacto)
NUM_BINS = [bins_per_dim, bins_per_dim, bins_per_dim, bins_per_dim, bins_per_dim, bins_per_dim, 2, 2]

env = gym.make('LunarLander-v2')
env.reset()

# Tomamos los rangos del env
OBS_SPACE_HIGH = env.observation_space.high
OBS_SPACE_LOW = env.observation_space.low
OBS_SPACE_LOW[1] = 0 # Para la coordenada y (altura), no podemos ir más abajo que la zona dea aterrizae (que está en el 0, 0)

# Los bins para cada dimensión
bins = [
    np.linspace(OBS_SPACE_LOW[i], OBS_SPACE_HIGH[i], NUM_BINS[i] - 1)
    for i in range(len(NUM_BINS) - 2) # last two are binary
]
# Se recomienda observar los bins para entender su estructura
#print ("Bins: ", bins)

def discretize_state(state, bins):
    """Discretize the continuous state into a tuple of discrete indices."""
    state_disc = list()
    for i in range(len(state)):
        if i >= len(bins):  # For binary features (leg contacts)
            state_disc.append(int(state[i]))
        else:
            state_disc.append(
                np.digitize(state[i], bins[i])
            )
    return tuple(state_disc)

In [4]:
# Ejemplos
print(discretize_state([0.0, 0.0, 0, 0, 0, 0, 1, 1], bins)) # En la zona de aterrizaje y quieto
print(discretize_state([0, 1.5, 0, 0, 0, 0, 0, 0], bins)) # Comenzando la partida, arriba y en el centro

(np.int64(10), np.int64(1), np.int64(10), np.int64(10), np.int64(10), np.int64(10), 1, 1)
(np.int64(10), np.int64(19), np.int64(10), np.int64(10), np.int64(10), np.int64(10), 0, 0)


## **4. Agentes y la interacción con el entorno**

Vamos a definir una interfaz para nuestro agente:

In [5]:
class Agente:
    def elegir_accion(self, estado, max_accion, explorar = True) -> int:
        """Elegir la accion a tomar en el estado actual y el espacio de acciones
            - estado_anterior: el estado desde que se empezó
            - estado_siguiente: el estado al que se llegó
            - accion: la acción que llevo al agente desde estado_anterior a estado_siguiente
            - recompensa: la recompensa recibida en la transicion
            - terminado: si el episodio terminó
        """
        pass

    def aprender(self, estado_anterior, estado_siguiente, accion, recompensa, terminado):
        """Aprender a partir de la tupla 
            - estado_anterior: el estado desde que se empezó
            - estado_siguiente: el estado al que se llegó
            - accion: la acción que llevo al agente desde estado_anterior a estado_siguiente
            - recompensa: la recompensa recibida en la transicion
            - terminado: si el episodio terminó en esta transición
        """
        pass
    def fin_episodio(self):
        """Actualizar estructuras al final de un episodio"""
        pass


Para un agente aleatorio, la implementación sería:

In [6]:
import random

class AgenteAleatorio(Agente):
    def elegir_accion(self, estado, max_accion, explorar = True) -> int:
        # Elige una acción al azar
        return random.randrange(max_accion)

    def aprender(self, estado_anterior, estado_siguiente, accion, recompensa, terminado):
        # No aprende
        pass

Luego podemos definir una función para ejecutar un episodio con un agente dado:

In [7]:
def ejecutar_episodio(agente, aprender = True, render = None, max_iteraciones=500):
    entorno = gym.make('LunarLander-v2').env

    iteraciones = 0
    recompensa_total = 0

    termino = False
    truncado = False
    estado_anterior, info = entorno.reset()
    while iteraciones < max_iteraciones and not termino and not truncado:
        # Le pedimos al agente que elija entre las posibles acciones (0..entorno.action_space.n)
        accion = agente.elegir_accion(estado_anterior, entorno.action_space.n, not aprender)
        # Realizamos la accion
        estado_siguiente, recompensa, termino, truncado, info = entorno.step(accion)
        # Le informamos al agente para que aprenda
        if (aprender):
            agente.aprender(estado_anterior, estado_siguiente, accion, recompensa, termino)

        estado_anterior = estado_siguiente
        iteraciones += 1
        recompensa_total += recompensa
    if (aprender):
        agente.fin_episodio()
    entorno.close()

    return recompensa_total

In [8]:
# Nota: hay que transformar esta celda en código para ejecutar (Esc + y)

# Ejecutamos un episodio con el agente aleatorio y modo render 'human', para poder verlo
ejecutar_episodio(AgenteAleatorio(), render = 'human')

np.float64(-247.67675496724314)

Podemos ejecutar este ambiente muchas veces y tomar métricas al respecto

In [9]:
agente = AgenteAleatorio()
recompensa_episodios = []

exitos = 0
num_episodios = 100
for i in range(num_episodios):
    recompensa = ejecutar_episodio(agente)
    # Los episodios se consideran exitosos si se obutvo 200 o más de recompensa total
    if (recompensa >= 200):
        exitos += 1
    recompensa_episodios += [recompensa]

import numpy
print(f"Tasa de éxito: {exitos / num_episodios}. Se obtuvo {numpy.mean(recompensa_episodios)} de recompensa, en promedio")

Tasa de éxito: 0.0. Se obtuvo -166.12152614441925 de recompensa, en promedio


### **5. Programando un agente que aprende**

La tarea a realizar consiste en programar un agente de aprendizaje por refuerzos:

In [10]:
import numpy as np
import random

class AgenteRL(Agente):
    def __init__(self, bins, num_acciones, episodios, epsilon=1.0, epsilon_min=0.05):
        super().__init__()
        self.epsilon = epsilon  # Factor de exploración
        self.epsilon_min = epsilon_min  # Valor mínimo de epsilon
        self.bins = bins  # Bins para discretizar el estado
        self.q_table = np.zeros((*[len(b) + 1 for b in bins], 2, 2, num_acciones))  # Q-table
        self.visit_counts = np.zeros_like(self.q_table)  # Tabla para contar visitas de estado-acción
        self.epsilon_decay = (self.epsilon-self.epsilon_min)/episodios  # Decaimiento de epsilon
        print(self.epsilon_decay)
    def elegir_accion(self, estado, max_accion, explorar=True) -> int:
        # Discretizamos el estado antes de elegir una acción
        estado_discreto = discretize_state(estado, self.bins)
        
        # Política epsilon-greedy para elegir acción
        if explorar and np.random.rand() < self.epsilon:
            # Explorar: elige una acción aleatoria
            return np.random.randint(max_accion)
        else:
            # Explotar: elige la acción con el mayor valor Q para el estado actual
            return np.argmax(self.q_table[estado_discreto])

    def aprender(self, estado_anterior, estado_siguiente, accion, recompensa, terminado):
        # Discretizamos los estados antes de actualizar la Q-table
        estado_anterior_discreto = discretize_state(estado_anterior, self.bins)
        estado_siguiente_discreto = discretize_state(estado_siguiente, self.bins)
        
        # Incrementamos el contador de visitas para el par (estado, acción)
        self.visit_counts[estado_anterior_discreto + (accion,)] += 1
        
        # Calculamos alpha dinámicamente como 1 / número de visitas
        visitas = self.visit_counts[estado_anterior_discreto + (accion,)]
        alpha = 1 / visitas
        
        # Actualiza la Q-table usando la ecuación de Q-Learning
        max_accion_siguiente = np.max(self.q_table[estado_siguiente_discreto])  # Mejor acción posible en el siguiente estado
        q_actual = self.q_table[estado_anterior_discreto + (accion,)]  # Valor actual de Q para el estado y la acción
        
        # Si el episodio terminó, no hay valor futuro, de lo contrario aplicamos la ecuación de Bellman
        if terminado:
            q_nuevo = recompensa  # Si terminó, la recompensa es lo único que importa
        else:
            q_nuevo = recompensa + max_accion_siguiente  # Bellman update

        # Actualizamos la Q-table con el nuevo valor Q
        self.q_table[estado_anterior_discreto + (accion,)] += alpha * (q_nuevo - q_actual)

        # Reducimos epsilon para que exploremos menos con el tiempo (decaimiento)


    def fin_episodio(self):
        self.epsilon = max(self.epsilon_min, self.epsilon - self.epsilon_decay)
        # Función que se llama al final de cada episodio para realizar tareas adicionales si es necesario
        pass


Y ejecutar con el muchos episodios:

In [11]:
# Nota: hay que transformar esta celda en código para ejecutar (Esc + y)
# Advertencia: este bloque es un loop infinito si el agente se deja sin implementar
from tqdm import tqdm 
entorno = gym.make('LunarLander-v2').env
num_episodios = 50000
agente = AgenteRL(bins=bins, num_acciones=entorno.action_space.n, episodios=num_episodios)
exitos = 0
recompensa_episodios = []
with tqdm(total=num_episodios) as pbar:
    for i in range(num_episodios):
        recompensa = ejecutar_episodio(agente)
        if (recompensa >= 200):
            print(f"Episodio {i} exitoso, recompensa: {recompensa}, Se obtuvo {numpy.mean(recompensa_episodios)} de recompensa, en promedio")
            exitos += 1
        recompensa_episodios += [recompensa]
        pbar.set_postfix(recompensa=recompensa, promedio=numpy.mean(recompensa_episodios))
        pbar.update(1)
print(f"Tasa de éxito: {exitos / num_episodios}. Se obtuvo {numpy.mean(recompensa_episodios)} de recompensa, en promedio")

0.00018999999999999998


 16%|█▌        | 809/5000 [00:21<01:56, 35.83it/s, promedio=-151, recompensa=-335]  

Episodio 801 exitoso, recompensa: 237.62340986975698, Se obtuvo -151.1781231950822 de recompensa, en promedio


 62%|██████▏   | 3106/5000 [01:27<00:56, 33.61it/s, promedio=-139, recompensa=-76.7]  

Episodio 3099 exitoso, recompensa: 256.4353615160943, Se obtuvo -139.15252797415457 de recompensa, en promedio


 63%|██████▎   | 3164/5000 [01:29<00:52, 35.14it/s, promedio=-138, recompensa=-114] 

Episodio 3155 exitoso, recompensa: 255.81002126280282, Se obtuvo -138.6241433770526 de recompensa, en promedio


 65%|██████▌   | 3267/5000 [01:32<00:48, 35.97it/s, promedio=-138, recompensa=-62.9] 

Episodio 3260 exitoso, recompensa: 226.97952850575945, Se obtuvo -137.92351933993478 de recompensa, en promedio


 70%|██████▉   | 3498/5000 [01:39<00:52, 28.55it/s, promedio=-138, recompensa=-89]   

Episodio 3493 exitoso, recompensa: 242.88857667686682, Se obtuvo -138.13138240018125 de recompensa, en promedio


 87%|████████▋ | 4327/5000 [02:03<00:18, 35.65it/s, promedio=-135, recompensa=-90.3]

Episodio 4319 exitoso, recompensa: 205.22620011267173, Se obtuvo -134.85963229315425 de recompensa, en promedio


 95%|█████████▌| 4770/5000 [02:16<00:07, 31.52it/s, promedio=-133, recompensa=-284] 

Episodio 4764 exitoso, recompensa: 212.7902372148168, Se obtuvo -133.38595993738966 de recompensa, en promedio


 98%|█████████▊| 4879/5000 [02:20<00:03, 34.46it/s, promedio=-133, recompensa=-216]  

Episodio 4873 exitoso, recompensa: 212.8868346410261, Se obtuvo -133.18379793179625 de recompensa, en promedio


 99%|█████████▊| 4926/5000 [02:21<00:01, 37.77it/s, promedio=-133, recompensa=-131] 

Episodio 4921 exitoso, recompensa: 231.3635688222462, Se obtuvo -132.7164449144809 de recompensa, en promedio


100%|██████████| 5000/5000 [02:23<00:00, 34.76it/s, promedio=-132, recompensa=-247] 

Tasa de éxito: 0.0018. Se obtuvo -132.46999727136506 de recompensa, en promedio





In [12]:
# Nota: hay que transformar esta celda en código para ejecutar (Esc + y)
# Advertencia: este bloque es un loop infinito si el agente se deja sin implementar

entorno = gym.make('LunarLander-v2').env
exitos = 0
recompensa_episodios = []
num_episodios = 1000
for i in range(num_episodios):
    recompensa = ejecutar_episodio(agente,aprender=False)
    # Los episodios se consideran exitosos si se obutvo 200 o más de recompensa total
    if (recompensa >= 200):
        print(f"Episodio {i} exitoso, recompensa: {recompensa}")
        exitos += 1
    recompensa_episodios += [recompensa]
print(f"Tasa de éxito: {exitos / num_episodios}. Se obtuvo {numpy.mean(recompensa_episodios)} de recompensa, en promedio")

Episodio 94 exitoso, recompensa: 235.39945199091608
Episodio 364 exitoso, recompensa: 289.58388033933284
Episodio 405 exitoso, recompensa: 216.4251363080483
Tasa de éxito: 0.003. Se obtuvo -140.51044915298328 de recompensa, en promedio


Analizar los resultados de la ejecución anterior, incluyendo:
 * Un análisis de los parámetros utilizados en el algoritmo (aprendizaje, política de exploración)
 * Un análisis de algunos 'cortes' de la matriz Q y la política (p.e. qué hace la nave cuando está cayendo rápidamente hacia abajo, sin rotación)
 * Un análisis de la evolución de la recompensa promedio
 * Un análisis de los casos de éxito
 * Un análisis de los casos en el que el agente falla
 * Qué limitante del agente de RL les parece que afecta más negativamente su desempeño. Cómo lo mejorarían? 

In [13]:
# Analizar los resultados aqui
