<div style="width: 100%; clear: both;">
<div style="float: left; width: 50%;">
<img src="http://www.uoc.edu/portal/_resources/common/imatges/marca_UOC/UOC_Masterbrand.jpg", align="left">
</div>
<div style="float: right; width: 50%;">
<p style="margin: 0; padding-top: 22px; text-align:right;">M2.883 · Aprendizaje por refuerzo</p>
<p style="margin: 0; text-align:right;">Máster universitario en Ciencia de datos (<i>Data science</i>)</p>
<p style="margin: 0; text-align:right; padding-button: 100px;">Estudios de Informática, Multimedia y Telecomunicación</p>
</div>
</div>
<div style="width:100%;">&nbsp;</div>


# Módulo 1: ejemplos de Gymnasium

En este _notebook_ cargaremos algunos de los escenarios de Gymnasium y veremos la interacción entre algunos agentes y estos escenarios o entornos.

En primer lugar instalaremos la librería gymnasium (si no la tenemos instalada) .

In [1]:
!pip install gymnasium



Para tener retrocompatibilidad con código creado para OpenAI Gym

In [2]:
import gymnasium as gym

## 1. CartPole
En este primer ejemplo vamos a cargar el entorno CartPole y realizaremos algunas pruebas.

### 1.1. Carga de datos

El siguiente código carga los paquetes necesarios para el ejemplo, crea el entorno mediante el método `make` e imprime por pantalla la dimensión del espacio de acciones (dos acciones: 0 = izquierda y 1 = derecha), del espacio de observaciones (cuatro observaciones: posición del carro, velocidad del carro, ángulo del poste y velocidad del poste en la punta) y el rango de la variable de recompensa (de menos infinito a más infinito).

In [3]:
import numpy as np

env = gym.make('CartPole-v1')
print("Action space is {} ".format(env.action_space))
print("Observation space is {} ".format(env.observation_space))
print("Reward range is {} ".format(env.reward_range))

Action space is Discrete(2) 
Observation space is Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32) 
Reward range is (-inf, inf) 


Seguidamente, reseteamos el entorno (acción que hay que realizar siempre después de la creación de éste) e inicializamos las variables que guardarán el número de pasos ejecutados (t), la recompensa acumulada (`total_reward`) y la variable que nos indicará cuándo finaliza un episodio (done).

In [4]:
# Environment reset
obs, info = env.reset()
t, total_reward, done = 0, 0, False

### 1.2. Ejecución de un episodio

A continuación, realizaremos la ejecución de un episodio del entorno CartPole utilizando un agente que selecciona las acciones de forma aleatoria.

El siguiente código realiza la ejecución de un episodio del entorno (este finaliza cuando la variable `done` toma el valor `True`). El agente se implementa mediante el método  `env.action_space.sample()` que selecciona una acción al azar. Se imprime por pantalla para cada paso (_time step_) la observación que genera el entorno (los cuatro valores comentados anteriormente), la acción seleccionada y la recompensa obtenida en ese paso (+ 1 en cada acción hasta que finaliza el episodio).

In [5]:
while not done:
    
    # Get random action (this is the implementation of the agent)
    action = env.action_space.sample()
    
    # Execute action and get response
    new_obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
    print("Obs: {} -> Action: {} and reward: {}".format(np.round(obs, 3), action, reward))
    
    obs = new_obs
    total_reward += reward
    t += 1
    
total_reward += reward
t += 1
print("Obs: {} -> Action: {} and reward: {}".format(np.round(obs, 3), action, reward))

Obs: [-0.04  -0.044 -0.045  0.008] -> Action: 0 and reward: 1.0
Obs: [-0.041 -0.239 -0.045  0.286] -> Action: 0 and reward: 1.0
Obs: [-0.045 -0.433 -0.04   0.565] -> Action: 0 and reward: 1.0
Obs: [-0.054 -0.628 -0.028  0.845] -> Action: 0 and reward: 1.0
Obs: [-0.067 -0.823 -0.011  1.128] -> Action: 1 and reward: 1.0
Obs: [-0.083 -0.627  0.011  0.832] -> Action: 0 and reward: 1.0
Obs: [-0.096 -0.823  0.028  1.128] -> Action: 0 and reward: 1.0
Obs: [-0.112 -1.018  0.05   1.43 ] -> Action: 0 and reward: 1.0
Obs: [-0.132 -1.214  0.079  1.738] -> Action: 0 and reward: 1.0
Obs: [-0.157 -1.41   0.114  2.054] -> Action: 0 and reward: 1.0
Obs: [-0.185 -1.606  0.155  2.379] -> Action: 1 and reward: 1.0
Obs: [-0.217 -1.412  0.202  2.138] -> Action: 1 and reward: 1.0
Obs: [-0.245 -1.22   0.245  1.914] -> Action: 1 and reward: 1.0


Finalmente, imprimimos los resultados y cerramos el entorno.

In [6]:
print("Episode finished after {} timesteps and reward was {} ".format(t, total_reward))
env.close()

Episode finished after 13 timesteps and reward was 13.0 


### 1.3. Simulando varios episodios

El siguiente fragmento de código repite el proceso del apartado anterior para el número de episodios definido en la variable `num_episodes`. Se renderizan los episodios para mostrarlos por pantalla en una ventana externa (no funciona en Google Colab).

In [7]:
env = gym.make("CartPole-v1", render_mode="human") # if don't want to render: env = gym.make("CartPole-v1")

num_episodes = 10

for episode in range(num_episodes):

    # Environment reset
    obs, info = env.reset()
    t, total_reward, done = 0, 0, False
    
    print('Running episode {} '.format(episode+1))
    
    while not done:
    
        # Get random action (this is the implementation of the agent)
        action = env.action_space.sample()
    
        # Execute action and get response
        new_obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        print("Obs: {} -> Action: {} and reward: {}".format(np.round(obs, 3), action, reward))
    
        obs = new_obs
        total_reward += reward
        t += 1
        
    total_reward += reward
    t += 1
    print("Obs: {} -> Action: {} and reward: {}".format(np.round(obs, 3), action, reward))
    print("Episode {} finished after {} timesteps and reward was {} ".format(episode+1, t, total_reward))
    print('')
    
env.close()

Running episode 1 
Obs: [ 0.001 -0.045 -0.016  0.017] -> Action: 0 and reward: 1.0
Obs: [ 0.    -0.24  -0.016  0.305] -> Action: 1 and reward: 1.0
Obs: [-0.004 -0.045 -0.01   0.007] -> Action: 1 and reward: 1.0
Obs: [-0.005  0.151 -0.009 -0.288] -> Action: 0 and reward: 1.0
Obs: [-0.002 -0.044 -0.015  0.001] -> Action: 1 and reward: 1.0
Obs: [-0.003  0.151 -0.015 -0.296] -> Action: 0 and reward: 1.0
Obs: [-0.    -0.044 -0.021 -0.008] -> Action: 1 and reward: 1.0
Obs: [-0.001  0.151 -0.021 -0.308] -> Action: 0 and reward: 1.0
Obs: [ 0.002 -0.043 -0.027 -0.022] -> Action: 1 and reward: 1.0
Obs: [ 0.001  0.152 -0.028 -0.323] -> Action: 0 and reward: 1.0
Obs: [ 0.004 -0.043 -0.034 -0.039] -> Action: 1 and reward: 1.0
Obs: [ 0.003  0.153 -0.035 -0.342] -> Action: 0 and reward: 1.0
Obs: [ 0.006 -0.042 -0.042 -0.061] -> Action: 1 and reward: 1.0
Obs: [ 0.006  0.154 -0.043 -0.367] -> Action: 1 and reward: 1.0
Obs: [ 0.009  0.35  -0.051 -0.673] -> Action: 1 and reward: 1.0
Obs: [ 0.016  0.546 -

## 2. Frozen Lake
En este segundo ejemplo vamos a cargar el entorno FrozenLake y volveremos a realizar algunas pruebas.

### 2.1. Carga de datos

De la misma forma que en el ejemplo inicial, el siguiente código carga los paquetes necesarios para el ejemplo, crea el entorno mediante el método `make` e imprime por pantalla la dimensión del espacio de acciones (0 = izquierda, 1 = derecha, 2 = abajo y 3 = arriba), el espacio de observaciones (un número del 0 al 15 que indica la posición del agente en el entorno) y el rango de la variable de recompensa (0 para cualquier acción excepto si se llega a la casilla de destino, en cuyo caso la recompensa es 1).

In [8]:
import time

env = gym.make("FrozenLake-v1")
print("Action space is {} ".format(env.action_space))
print("Observation space is {} ".format(env.observation_space))
print("Reward range is {} ".format(env.reward_range))

Action space is Discrete(4) 
Observation space is Discrete(16) 
Reward range is (0, 1) 


Para observar el mapa por defecto (S = casilla de partida, G = casilla de destino, H = agujero, F = casilla con hielo).

In [9]:
print(env.desc)

[[b'S' b'F' b'F' b'F']
 [b'F' b'H' b'F' b'H']
 [b'F' b'F' b'F' b'H']
 [b'H' b'F' b'F' b'G']]


### 2.2. Ejecución de un episodio

A continuación, realizaremos la ejecución de un episodio del entorno FrozenLake utilizando un agente que selecciona las acciones de forma aleatoria.

En el siguiente código inicializamos el entorno, definimos el máximo número de pasos por episodio (`max_steps`) y realizamos la ejecución de un episodio del entorno (este finaliza cuando la variable 'done' toma el valor 'True' o cuando se alcanza el número máximo de pasos estipulado). De nuevo, utilizamos un agente que implementa una política completamente aleatoria (`env.action_space.sample()`). Mediante el método `env.render()` podemos ir viendo la evolución del agente en el entorno desde la casilla de salida S hasta que llega a la casilla de destino G o cae en un agujero H.

In [10]:
# Environment reset
obs, info = env.reset()
t, total_reward, done = 0, 0, False
max_steps = 100

while t < max_steps:
    # Get random action (this is the implementation of the agent)
    action = env.action_space.sample()
    
    # Execute action and get response
    obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
        
    t += 1
    if done:
        break
    time.sleep(0.1)

print("Episode finished after {} timesteps and reward was {} ".format(t, reward))
env.close()

Episode finished after 2 timesteps and reward was 0.0 


### 2.3. Simulando varios episodios

El siguiente fragmento de código repite el proceso del apartado anterior para el número de episodios definido en la variable `num_episodes`. Se renderizan los episodios para mostrarlos por pantalla en una ventana externa (no funciona en Google Colab).

In [11]:
env = gym.make("FrozenLake-v1", render_mode="human") # if don't want to render: env = gym.make("FrozenLake-v1")

num_episodes = 10

for episode in range(num_episodes):

    # Environment reset
    obs, info = env.reset()
    t, done = 0, False
    
    print('Running episode {} '.format(episode+1))

    while t < max_steps:
        # Get random action (this is the implementation of the agent)
        action = env.action_space.sample()
    
        # Execute action and get response
        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        
        t += 1
        if done:
            break
        time.sleep(0.1)
      
    print("Episode {} finished after {} timesteps and reward was {} ".format(episode+1, t, reward))
    print('')

env.close()

Running episode 1 
Episode 1 finished after 6 timesteps and reward was 0.0 

Running episode 2 
Episode 2 finished after 15 timesteps and reward was 0.0 

Running episode 3 
Episode 3 finished after 4 timesteps and reward was 0.0 

Running episode 4 
Episode 4 finished after 2 timesteps and reward was 0.0 

Running episode 5 
Episode 5 finished after 8 timesteps and reward was 0.0 

Running episode 6 
Episode 6 finished after 2 timesteps and reward was 0.0 

Running episode 7 
Episode 7 finished after 18 timesteps and reward was 0.0 

Running episode 8 
Episode 8 finished after 5 timesteps and reward was 0.0 

Running episode 9 
Episode 9 finished after 6 timesteps and reward was 0.0 

Running episode 10 
Episode 10 finished after 11 timesteps and reward was 0.0 



### 2.4. Calculando la recompensa total de varios episodios

Para medir la eficiencia del agente, podemos calcular la recompensa total de varios episodios. Dado que en cada episodio la recompensa acumulada es 0 si no se llega a la celda de destino y 1 si se consigue el objetivo, medir la recompensa total acumulada de un número de episodios nos da una medida del porcentaje de éxito de nuestro agente.

El siguiente fragmento de código repite el proceso del apartado anterior para el número de episodios definido en la variable `num_episodes` y calcula el porcentaje de acierto del agente. Se omite la renderización del entorno con el objetivo de agilizar la ejecución.

In [12]:
env = gym.make("FrozenLake-v1")

num_episodes = 1000
total_reward = 0

for episode in range(num_episodes):

    # Environment reset
    obs, info = env.reset()
    t, done = 0, False
    
    while t < max_steps:
        # Get random action (this is the implementation of the agent)
        action = env.action_space.sample()
    
        # Execute action and get response
        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        
        total_reward += reward
        t += 1
        if done:
            break
    
success_rate = total_reward*100/num_episodes
print("{} successes in {} episodes: {} % of success".format(total_reward, num_episodes, success_rate))

13.0 successes in 1000 episodes: 1.3 % of success


### 2.5. Entrenando a un agente

Tal y como hemos podido ver en el apartado anterior, como el agente utilizado elige las acciones al azar, es casi imposible llegar a la casilla de destino G con esta política (el porcentaje de éxito está en un 1 % o 2 %). Vamos a entrenar un agente utilizando el método Q-Learning. Este método (que se estudiará en módulos posteriores) puede implementarse mediante una tabla que va actualizándose a partir de la interacción del agente con el entorno.
El siguiente código implementa este método y realiza el entrenamiento del agente a partir de la ejecución de varios episodios.

__Nota__: recordad que las simulaciones ejecutadas tienen un componente aleatorio y los porcentajes pueden variar de una ejecución a otra.

Empezamos importando algunos paquetes:

In [13]:
import pickle

Inicializamos algunas variables del método que queremos implementar, entre las que se encuentran el número de episodios (`num_episodes`) y el número máximo de pasos por cada episodio (`max_steps`).

In [14]:
epsilon = 0.9
num_episodes = 100000
max_steps = 100

learning_rate = 0.81
gamma = 0.96

Inicializamos a cero todos los valores de la tabla de la función Q (de dieciseis estados por cuatro acciones cada estado), que acabará dándonos una idea de cuál es la mejor acción para cada estado.

In [15]:
Q = np.zeros((env.observation_space.n, env.action_space.n))
print(Q)

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]


El siguiente código define las funciones que caracterizan al agente (se estudiarán en módulos posteriores de este curso).

In [16]:
def choose_action(state):
    action=0
    if np.random.uniform(0, 1) < epsilon:
        action = env.action_space.sample()
    else:
        action = np.argmax(Q[state, :])
    return action

def learn(state, new_state, reward, action):
    predict = Q[state, action]
    target = reward + gamma * np.max(Q[new_state, :])
    Q[state, action] = Q[state, action] + learning_rate * (target - predict)

El siguiente código realiza tantas partidas del juego como se indican en la variable `num_episodes`. En cada partida (episodio), el agente va interactuando con el entorno y, como fruto de esa interacción, va actualizando los valores de la tabla _Q_. Se imprimen por pantalla aquellos episodios en los que el agente alcanza la casilla de destino.

In [17]:
# Start
for episode in range(num_episodes):
    state, info = env.reset()
    t = 0
    
    while t < max_steps:
        action = choose_action(state)  
        state2, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated  
        learn(state, state2, reward, action)
        state = state2
        t += 1
       
        if done:
            break

    if reward == 1:
        print("Episode {} finished after {} timesteps and reward was {} ".format(episode+1, t, reward)) 

Episode 157 finished after 11 timesteps and reward was 1.0 
Episode 234 finished after 21 timesteps and reward was 1.0 
Episode 321 finished after 17 timesteps and reward was 1.0 
Episode 323 finished after 14 timesteps and reward was 1.0 
Episode 364 finished after 8 timesteps and reward was 1.0 
Episode 365 finished after 22 timesteps and reward was 1.0 
Episode 367 finished after 10 timesteps and reward was 1.0 
Episode 377 finished after 11 timesteps and reward was 1.0 
Episode 496 finished after 20 timesteps and reward was 1.0 
Episode 522 finished after 13 timesteps and reward was 1.0 
Episode 579 finished after 14 timesteps and reward was 1.0 
Episode 581 finished after 13 timesteps and reward was 1.0 
Episode 650 finished after 25 timesteps and reward was 1.0 
Episode 695 finished after 25 timesteps and reward was 1.0 
Episode 737 finished after 21 timesteps and reward was 1.0 
Episode 786 finished after 22 timesteps and reward was 1.0 
Episode 805 finished after 14 timesteps a

Podemos ver los valores finales de la tabla _Q_ después del entrenamiento.

In [18]:
print(Q)

[[0.71415329 0.68091137 0.65231581 0.6030245 ]
 [0.58573481 0.02044371 0.09146671 0.60872957]
 [0.62365978 0.67736475 0.58684585 0.59654687]
 [0.4996932  0.11135843 0.57780738 0.59475426]
 [0.73103756 0.14409173 0.65337548 0.02630718]
 [0.         0.         0.         0.        ]
 [0.72106628 0.00422472 0.62225003 0.11753828]
 [0.         0.         0.         0.        ]
 [0.74641145 0.1600479  0.76029823 0.80784801]
 [0.14706561 0.7618735  0.15802398 0.68193294]
 [0.7649838  0.14928451 0.53621513 0.09613124]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.81612647 0.9148098  0.84146532 0.92136879]
 [0.86387168 0.78894728 0.93198836 0.96642325]
 [0.         0.         0.         0.        ]]


### 2.6. Comprobando la mejora
En este último apartado comprobaremos que el agente diseñado consigue mejores prestaciones que el agente aleatorio.

El código es muy parecido al que hemos utilizado mientras entrenábamos al agente, pero se omite la parte de aprendizaje de este. Para ello, simularemos varios episodios utilizando los valores de la tabla _Q_ obtenida en el entrenamiento. Concretamente, el agente selecciona el valor máximo de la tabla _Q_ para cada estado:

In [19]:
def choose_action_max(state):
    action = np.argmax(Q[state, :])
    return action

De nuevo, calculamos la recompensa total de varios episodios y se calcula el porcentaje de acierto que, como puede comprobarse, es superior al del agente aleatorio.

In [20]:
from IPython.display import clear_output

num_episodes = 1000
total_reward = 0

# start
for episode in range(num_episodes):

    state, info = env.reset()
    
    t = 0
    while t < 100:
        action = choose_action_max(state)  
        state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated  
        
        if done:
            break

    total_reward += reward
    
success_rate = total_reward*100/num_episodes
print("{} successes in {} episodes: {} % of success".format(total_reward, num_episodes, success_rate))

321.0 successes in 1000 episodes: 32.1 % of success
