# **Lunar Lander con Q-Learning**

### **1. Bibliotecas**

In [1]:
import sys
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import pygame
from pygame.locals import *

### **2. Jugando a mano**

A continuación se puede jugar un episodio del lunar lander. Se controlan los motores con el teclado. Notar que solo se puede realizar una acción a la vez (que es parte del problema), y que en esta implementación, izq toma precedencia sobre derecha, que toma precedencia sobre el motor principal.

In [16]:
# Inicializar pygame (para el control con el teclado) y el ambiente
pygame.init()
env = gym.make('LunarLander-v2', render_mode='human')
env.reset()
pygame.display.set_caption('Lunar Lander')

clock = pygame.time.Clock()
done = False

while not done:
    for event in pygame.event.get():
        if event.type == QUIT:
            done = True
            break

    keys = pygame.key.get_pressed()

    # Map keys to actions
    if keys[K_LEFT]:
        action = 3  # Fire left orientation engine
    elif keys[K_RIGHT]:
        action = 1 # Fire right orientation engine
    elif keys[K_UP]:
        action = 2  # Fire main engine
    else:
        action = 0  # Do nothing

    _, _, terminated, truncated, _ = env.step(action)
    env.render()
    clock.tick(10)

    if terminated or truncated:
        done = True

env.close()
pygame.quit()

## **3. Discretizando el estado**

El estado consiste de posiciones y velocidades en (x,y,theta) y en información de contacto de los pies con la superficie.

Como varios de estos son continuos, tenemos que discretizarlos para aplicar nuestro algoritmo de aprendizaje por refuerzo tabular.

In [3]:
# Cuántos bins queremos por dimensión
# Pueden considerar variar este parámetro
bins_per_dim = 16

#          Estado:
#          (x,            y,            x_vel,        y_vel,        theta,        theta_vel,    pie_izq_en_contacto, pie_derecho_en_contacto)
NUM_BINS = [bins_per_dim, bins_per_dim, bins_per_dim, bins_per_dim, bins_per_dim, bins_per_dim, 2, 2]

env = gym.make('LunarLander-v2', render_mode='human')
env.reset()

# Tomamos los rangos del env
OBS_SPACE_HIGH = env.observation_space.high
OBS_SPACE_LOW = env.observation_space.low
OBS_SPACE_LOW[1] = 0 # Para la coordenada y (altura), no podemos ir más abajo que la zona de aterrizaje (que está en el 0, 0)

# Los bins para cada dimensión
bins = [
    np.linspace(OBS_SPACE_LOW[i], OBS_SPACE_HIGH[i], NUM_BINS[i] - 1)
    for i in range(len(NUM_BINS) - 2) # last two are binary
]
# Se recomienda observar los bins para entender su estructura
# print ("Bins: ", bins)

def discretize_state(state, bins):
    """Discretize the continuous state into a tuple of discrete indices, taking the closest bin."""
    state_disc = list()
    for i in range(len(state)):
        if i >= len(bins):  # For binary features (leg contacts)
            state_disc.append(int(state[i]))
        else:
            # Encuentra el índice del valor más cercano en los bins
            closest_index = np.argmin(np.abs(bins[i] - state[i]))
            state_disc.append(closest_index)
    return tuple(state_disc)

In [3]:
# Ejemplos
print(discretize_state([0.0, 0.0, 0, 0, 0, 0, 1, 1], bins)) # En la zona de aterrizaje y quieto
print(discretize_state([0, 1.5, 0, 0, 0, 0, 0, 0], bins)) # Comenzando la partida, arriba y en el centro

(10, 1, 10, 10, 10, 10, 1, 1)
(10, 19, 10, 10, 10, 10, 0, 0)


## **4. Agentes y la interacción con el entorno**

Vamos a definir una interfaz para nuestro agente:

In [4]:
class Agente:
    def elegir_accion(self, estado, max_accion, explorar = True) -> int:
        """Elegir la accion a tomar en el estado actual y el espacio de acciones
            - estado: el estado en el que se encuentra actualmente el agente
            - max_accion: el espacio de acciones posibles
            - explorar: si se debe elegir una acción de forma que explore el espacio de estados, o eligiendo la que mejor recompensa cree que devuelve
        """
        pass

    def aprender(self, estado_anterior, estado_siguiente, accion, recompensa, terminado) -> None:
        """Aprender a partir de la tupla 
            - estado_anterior: el estado desde que se empezó
            - estado_siguiente: el estado al que se llegó
            - accion: la acción que llevo al agente desde estado_anterior a estado_siguiente
            - recompensa: la recompensa recibida en la transicion
            - terminado: si el episodio terminó en esta transición
        """
        pass

    def fin_episodio(self) -> None:
        """Actualizar estructuras al final de un episodio"""
        pass

Para un agente aleatorio, la implementación sería:

In [30]:
import random

class AgenteAleatorio(Agente):
    def elegir_accion(self, estado, max_accion, explorar = True) -> int:
        # Elige una acción al azar
        return random.randrange(max_accion)

    def aprender(self, estado_anterior, estado_siguiente, accion, recompensa, terminado) -> None:
        # No aprende
        pass

    def fin_episodio(self) -> None:
        # Nada que actualizar
        pass

Luego podemos definir una función para ejecutar un episodio con un agente dado:

In [5]:
def ejecutar_episodio(agente, aprender=True, render=None, max_iteraciones=500):
    entorno = gym.make('LunarLander-v2', render_mode=render).env
    
    iteraciones = 0
    recompensa_total = 0

    termino = False
    truncado = False
    estado_anterior, info = entorno.reset()
    while iteraciones < max_iteraciones and not termino and not truncado:
        # Le pedimos al agente que elija entre las posibles acciones (0..entorno.action_space.n)
        accion = agente.elegir_accion(estado_anterior, entorno.action_space.n, aprender)
        # Realizamos la accion
        estado_siguiente, recompensa, termino, truncado, info = entorno.step(accion)
        # Le informamos al agente para que aprenda
        if (aprender):
            agente.aprender(estado_anterior, estado_siguiente, accion, recompensa, termino)

        estado_anterior = estado_siguiente
        iteraciones += 1
        recompensa_total += recompensa
        
    if (aprender):
        agente.fin_episodio()

    entorno.close()
    return recompensa_total

In [31]:
# Ejecutamos un episodio con el agente aleatorio y modo render 'human', para poder verlo
ejecutar_episodio(AgenteAleatorio(), render = 'human')

-85.00346673972187

Podemos ejecutar este ambiente muchas veces y tomar métricas al respecto

In [None]:
#AgenteAleatorio = AgenteAleatorio()
recompensa_episodios = []

exitos = 0
num_episodios = 100
for i in range(num_episodios):
    recompensa = ejecutar_episodio(AgenteAleatorio, render='human')
    # Los episodios se consideran exitosos si se obutvo 200 o más de recompensa total
    if (recompensa >= 200):
        exitos += 1
    recompensa_episodios += [recompensa]

import numpy
print(f"Tasa de éxito: {exitos / num_episodios}. Se obtuvo {numpy.mean(recompensa_episodios)} de recompensa, en promedio")

### **5. Programando un agente que aprende**

La tarea a realizar consiste en programar un agente de aprendizaje por refuerzos:

In [22]:
import numpy as np
import random

'''
Hiperparametros
  Politica de exploracion (random, epsilon-greedy, softmax)
  Politica de aprendizaje (Aprender en el momento / Aprender al final)
  Temperatura / k
  Alfa / Learning rate
  Cantidad de bins
'''

class AgenteRL(Agente):

    def __init__(self, k=1, epsilon=0.1, politica_exploracion='epsilon-greedy', aprender_inmediatamente=True) -> None:
        '''
        Parametros
        ----------
        k: Temperatura
        epsilon: Probabilidad de explotacion en caso de que politica_exploracion=epsilon-greedy
        politica_exploracion: Funcion que determinará la accion a tomar en caso de encontrarse explorando. ( random, epsilon-greedy, softmax )
        aprender_inmediatamente: Si el agente debe aprender en el momento o al final del episodio haciendo un reccorido "hacia atras"

        Utiliza una función de aprendizaje Q-Learning no-determinista, mediante la formula:
        Q(s, a) = (1 - alfa) * Q(s, a) + alfa * (recompensa + max(Q(s', a')))
        donde a=1/cantidad_visitas(s, a)
        '''
        super().__init__()
        self.Q = {}
        self.cantidad_visitas = {}
        self.k = k
        self.epsilon = epsilon
        self.politica_exploracion = politica_exploracion
        self.aprender_inmediatamente = aprender_inmediatamente

        # En caso de que aprenda al finalizar el episodio, se guardan las acciones tomadas
        self.acciones_tomadas = []

    def elegir_accion(self, estado, max_accion, explorar=True) -> int:
        estado = discretize_state(estado, bins)

        if estado not in self.Q:
            self.Q[estado] = {i: 0 for i in range(max_accion)}
            for accion in range(max_accion):
                self.cantidad_visitas[(estado, accion)] = 0

        if (estado[6] == 1 and estado[7] == 1):
            return 0        

        if explorar: # Explorar

            if (self.politica_exploracion == 'random'):
                return random.randrange(max_accion)
            
            elif self.politica_exploracion == 'epsilon-greedy':
                if random.random() < self.epsilon:
                    return random.randrange(max_accion)
                else:
                    return max(self.Q[estado], key=self.Q[estado].get)
                
            elif self.politica_exploracion == 'softmax':
                # Voy probando varias veces y no funciona :(
                '''
                q_values = [self.Q[estado][a] for a in range(max_accion)]
    
                min_q, max_q = min(q_values), max(q_values)
                
                if max_q - min_q > 0:
                    normalized_q_values = [(q - min_q) / (max_q - min_q) for q in q_values]
                else:
                    normalized_q_values = [0] * len(q_values)  
                
                k_values = [self.k**q for q in normalized_q_values]
                sum_k_values = sum(k_values)
                probs = [k / sum_k_values for k in k_values]
                
                return np.random.choice(range(max_accion), p=probs)
                '''

                '''
                q_values = [self.Q[estado][a] for a in range(max_accion)]
                q_values_scaled = [q / self.k for q in q_values]
                max_q = max(q_values_scaled)
                q_values_scaled = [q - max_q for q in q_values_scaled]
                exp_q = [np.exp(q) for q in q_values_scaled]
                sum_exp_q = sum(exp_q)
                probs = [q / sum_exp_q for q in exp_q]
                return np.random.choice(range(max_accion), p=probs)
                '''

        else: # Explotacion
            return max(self.Q[estado], key=self.Q[estado].get)

    def aprender(self, estado_anterior, estado_actual, accion, recompensa, terminado) -> None:
        estado_anterior = discretize_state(estado_anterior, bins)
        estado_actual   = discretize_state(estado_actual, bins)

        for estado in [estado_anterior, estado_actual]:
            if estado not in self.Q:
                self.Q[estado] = {i: 0 for i in range(4)}
                for a in range(4):
                    self.cant_visitas[(estado, a)] = 0
            
        self.cantidad_visitas[(estado_anterior, accion)] += 1 

        if (self.aprender_inmediatamente):
            alfa = 1 / self.cantidad_visitas[(estado_anterior, accion)]
            self.Q[estado_anterior][accion] = (1 - alfa) * self.Q[estado_anterior][accion] + alfa * (recompensa + max(self.Q[estado_actual].values()))
        else:
            self.acciones_tomadas.append((estado_anterior, estado_actual, accion, recompensa))     


    def fin_episodio(self) -> None:
        '''
        En caso de que el agente aprenda al final del episodio, se recorre hacia atras las acciones tomadas durante
        el episodio y se actualizan los valores de Q
        En caso contrario, no se hace nada
        '''
        if (not self.aprender_inmediatamente):
            for i in range(len(self.acciones_tomadas)-1, -1, -1):
                estado_anterior, estado_actual, accion, recompensa = self.acciones_tomadas[i]
                alfa = 1 / self.cantidad_visitas[(estado_anterior, accion)]
                self.Q[estado_anterior][accion] = (1 - alfa) * self.Q[estado_anterior][accion] + alfa * (recompensa + max(self.Q[estado_actual].values()))

            self.acciones_tomadas = []

Y ejecutar con el muchos episodios:

In [20]:
entorno = gym.make('LunarLander-v2').env
agente = AgenteRL(politica_exploracion='epsilon-greedy', epsilon=0.1, aprender_inmediatamente=False)
exitos = 0
recompensa_parcial=[]
recompensa_episodios = []
num_episodios = 200000
for i in range(num_episodios):
    recompensa = ejecutar_episodio(agente, aprender=True, max_iteraciones=1000)
    
    # Los episodios se consideran exitosos si se obutvo 200 o más de recompensa total
    if (recompensa >= 200):
        exitos += 1
    recompensa_episodios += [recompensa]
    recompensa_parcial += [recompensa]
    
    if (i % 100 == 0):
        print(f'Episodio: {i}')
        print(f'Recompensa parcial promedio: {np.mean(recompensa_parcial)}')
        print('')
        recompensa_parcial = []
    
print(f"Tasa de éxito: {exitos / num_episodios}. Se obtuvo {np.mean(recompensa_episodios)} de recompensa, en promedio")

Episodio: 0
Recompensa parcial promedio: 245.35356572086934

Episodio: 100
Recompensa parcial promedio: 3.3364254901326973

Episodio: 200
Recompensa parcial promedio: -5.386503777088735

Episodio: 300
Recompensa parcial promedio: 8.324409562916209

Episodio: 400
Recompensa parcial promedio: -18.24223016285598

Episodio: 500
Recompensa parcial promedio: 18.603376136572123

Episodio: 600
Recompensa parcial promedio: 16.159709447470394

Episodio: 700
Recompensa parcial promedio: -15.88301758518149

Episodio: 800
Recompensa parcial promedio: -1.5870692120729148

Episodio: 900
Recompensa parcial promedio: 4.881306871822027

Episodio: 1000
Recompensa parcial promedio: -19.767102589115094

Episodio: 1100
Recompensa parcial promedio: 13.763906854221588

Episodio: 1200
Recompensa parcial promedio: -11.31539204429864

Episodio: 1300
Recompensa parcial promedio: -37.02650068392963

Episodio: 1400
Recompensa parcial promedio: 13.224657346972714

Episodio: 1500
Recompensa parcial promedio: -27.5889

KeyboardInterrupt: 

Analizar los resultados de la ejecución anterior, incluyendo:
 * Un análisis de los parámetros utilizados en el algoritmo (aprendizaje, política de exploración)
 * Un análisis de algunos 'cortes' de la matriz Q y la política (p.e. qué hace la nave cuando está cayendo rápidamente hacia abajo, sin rotación)
 * Un análisis de la evolución de la recompensa promedio
 * Un análisis de los casos de éxito
 * Un análisis de los casos en el que el agente falla
 * Qué limitante del agente de RL les parece que afecta más negativamente su desempeño. Cómo lo mejorarían? 

In [21]:
entorno = gym.make('LunarLander-v2').env
exitos = 0
recompensa_episodios = []
num_episodios = 1000
for i in range(num_episodios):
    recompensa = ejecutar_episodio(agente, aprender = False, max_iteraciones=1000)
    recompensa_episodios += [recompensa]
    # Los episodios se consideran exitosos si se obutvo 200 o más de recompensa total
    if (recompensa >= 200):
        exitos += 1
        print(f'Episodio: {i}')
        print(f'Recompensa: {recompensa}')
        print(f'Recompensa promedio: {np.mean(recompensa_episodios)}')
        print('')

    
print(f"Tasa de éxito: {exitos / num_episodios}. Se obtuvo {np.mean(recompensa_episodios)} de recompensa, en promedio")

Episodio: 0
Recompensa: 302.76429676931855
Recompensa promedio: nan



  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Episodio: 6
Recompensa: 239.41004553694577
Recompensa promedio: -23.170199066852174

Episodio: 9
Recompensa: 234.0533557878106
Recompensa promedio: 8.081132353349727

Episodio: 18
Recompensa: 257.3529935827414
Recompensa promedio: -8.798332854190933

Episodio: 19
Recompensa: 223.6439866242992
Recompensa promedio: 5.2096316951212955

Episodio: 34
Recompensa: 213.58223946839553
Recompensa promedio: -18.158430585490947

Episodio: 38
Recompensa: 250.5290524483476
Recompensa promedio: -13.006138465322953

Episodio: 44
Recompensa: 217.33306540209145
Recompensa promedio: -10.778786890886359

Episodio: 50
Recompensa: 207.4394328331374
Recompensa promedio: -11.006006750466238

Episodio: 62
Recompensa: 300.9749612241151
Recompensa promedio: -29.71259351234987

Episodio: 64
Recompensa: 231.4753753071301
Recompensa promedio: -24.275950060192727

Episodio: 68
Recompensa: 217.60454972342663
Recompensa promedio: -20.902501761008935

Episodio: 69
Recompensa: 246.60290582133663
Recompensa promedio: -17