<a href="https://colab.research.google.com/github/santiagoec/CM0891-Aprendizaje-Automatico/blob/master/04_Aprendizaje_Reforzado_Q_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Aprendizaje Reforzado en Python

Este libro contiene un estudio de **Aprendizaje Reforzado** utilizando Python. Esto hace parte del curso de Aprendizaje Automático de la Maestría de Ciencia de Datos.  Consiste en la implementación del algoritmo Q-learning en un Proceso de Decisión de Markov

Los integrantes de este trabajo:

* Santiago Echeverri Calderon
* Edgar Leandro Jimenez Jaimes

Basado en el artículo de Venelin Valkov  
https://medium.com/@curiousily/solving-an-mdp-with-q-learning-from-scratch-deep-reinforcement-learning-for-hackers-part-1-45d1d360c120

In [0]:
import numpy as np
import random
from copy import deepcopy

## 1. Creación del entorno

Creación de una cuadricula para un proceso de decisión de Markov simple con 16 estados posibles

In [2]:
# Defición de las variables

OBSTACULO = "o"
ROBOT = "r"
TESORO = "t"
VACIO = "*"

# Definición del entorno o cuadricula
grid = [
    [ROBOT, VACIO, VACIO, OBSTACULO],
    [VACIO, OBSTACULO,VACIO, VACIO],
    [VACIO,VACIO,VACIO,OBSTACULO],
    [VACIO,OBSTACULO,VACIO,TESORO]
]


for row in grid:
    print(' '.join(row))

r * * o
* o * *
* * * o
* o * t


Definición de una clase para poder acceder al estado y poder conocer la posición del robot:

In [0]:
# Definición de clases 
class State:
    
    def __init__(self, grid, robot_pos):
        self.grid = grid
        self.robot_pos = robot_pos
        
    def __eq__(self, other):
        return isinstance(other, State) and self.grid == other.grid and self.robot_pos == other.robot_pos
    
    def __hash__(self):
        return hash(str(self.grid) + str(self.robot_pos))
    
    def __str__(self):
        return f"State(grid={self.grid}, robot_pos={self.robot_pos})"

El agente puede realizar 4 acciones: SUBIR, BAJAR, IR A LA IZQUIERDA, IR A LA DERECHA.

In [0]:
# Definición del las acciones y asignaciones variables

UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3

ACTIONS = [UP, DOWN, LEFT, RIGHT]

Definir y guardar la posicón inicial:

In [0]:
start_state = State(grid=grid, robot_pos=[0, 0]) # Posicion inicial donde se encuentra el robot

Función para ejecutar las acciones (movimiento)


In [0]:
# Funcion para el ejecutar las acciones (movimiento) y exploración

def act(state, action):
    
    def new_robot_pos(state, action):
        p = deepcopy(state.robot_pos)
        if action == UP:
            p[0] = max(0, p[0] - 1)
        elif action == DOWN:
            p[0] = min(len(state.grid) - 1, p[0] + 1)
        elif action == LEFT:
            p[1] = max(0, p[1] - 1)
        elif action == RIGHT:
            p[1] = min(len(state.grid[0]) - 1, p[1] + 1)
        else:
            raise ValueError(f"Unknown action {action}")
        return p
            
    p = new_robot_pos(state, action)
    grid_item = state.grid[p[0]][p[1]]
    
    new_grid = deepcopy(state.grid)
    
    if grid_item == OBSTACULO:
        reward = -100
        is_done = True
        new_grid[p[0]][p[1]] += ROBOT
    elif grid_item == TESORO:
        reward = 1000
        is_done = True
        new_grid[p[0]][p[1]] += ROBOT
    elif grid_item == VACIO:
        reward = -1
        is_done = False
        old = state.robot_pos
        new_grid[old[0]][old[1]] = VACIO
        new_grid[p[0]][p[1]] = ROBOT
    elif grid_item == ROBOT:
        reward = -1
        is_done = False
    else:
        raise ValueError(f"Unknown grid item {grid_item}")
    
    return State(grid=new_grid, robot_pos=p), reward, is_done

## 2. Entrenamiento de modelo (exploración del entorno)

In [0]:
# Parámetros de la exploración

random.seed(42)                                  # Semilla para aleatorios 

# Definición de pasos a seguir

N_STATES = 16                                    # Posibles estados del entorno
N_EPISODES = 100                                   # Ajustamos los episodios de exploracion
MAX_EPISODE_STEPS = 100                          # Número máximo de pasos por episodio
MIN_ALPHA = 0.02                                 # Valor mínimo de la tasa de aprendizaje
alphas = np.linspace(1.0, MIN_ALPHA, N_EPISODES) # Array decreciente de valores alfa para cada episodio
gamma = 1.0
eps = 0.2                                        # epsilon de balance entre exploración y explotación
q_table = dict()                                 # Diccionario para almacenar la tabla 'q'

In [0]:
# Función para calcular el valor 'q' de cada acción para un estado dado

def q(state, action=None):
    
    if state not in q_table:
        q_table[state] = np.zeros(len(ACTIONS))
        
    if action is None:
        return q_table[state]
    
    return q_table[state][action]

In [0]:
# Función para definir que accion tomar

def choose_action(state):
    if random.uniform(0, 1) < eps:
        return random.choice(ACTIONS) 
    else:
        return np.argmax(q(state))

In [20]:
# Exploración del entorno según los parámetros definidos

for e in range(N_EPISODES):
    
    state = start_state
    total_reward = 0
    alpha = alphas[e]
    
    for _ in range(MAX_EPISODE_STEPS):
        action = choose_action(state)
        next_state, reward, done = act(state, action)
        total_reward += reward
        
        q(state)[action] = q(state, action) + \
                alpha * (reward + gamma *  np.max(q(next_state)) - q(state, action))
        state = next_state
        if done:
            break
    print(f"Episode {e + 1}: total reward -> {total_reward}")

Episode 1: total reward -> -113
Episode 2: total reward -> -104
Episode 3: total reward -> -102
Episode 4: total reward -> -108
Episode 5: total reward -> -101
Episode 6: total reward -> -107
Episode 7: total reward -> -107
Episode 8: total reward -> -101
Episode 9: total reward -> -111
Episode 10: total reward -> -104
Episode 11: total reward -> -106
Episode 12: total reward -> -105
Episode 13: total reward -> -106
Episode 14: total reward -> -108
Episode 15: total reward -> -115
Episode 16: total reward -> -108
Episode 17: total reward -> 995
Episode 18: total reward -> 989
Episode 19: total reward -> -112
Episode 20: total reward -> -103
Episode 21: total reward -> -111
Episode 22: total reward -> 990
Episode 23: total reward -> 985
Episode 24: total reward -> 994
Episode 25: total reward -> 995
Episode 26: total reward -> -105
Episode 27: total reward -> -103
Episode 28: total reward -> -103
Episode 29: total reward -> -101
Episode 30: total reward -> -103
Episode 31: total reward 

## 3. Prueba del modelo

Recordemos el estado inicial y la posicón del robot:

In [21]:
for row in start_state.grid:
    print(' '.join(row))

print('Posición inicial del robot: ', start_state.robot_pos)

r * * o
* o * *
* * * o
* o * t
Posición inicial del robot:  [0, 0]


Calculemos los valores *q* para cada posible acción en el estado inicial


In [22]:
r = q(start_state)
print(f"up={r[UP]}, down={r[DOWN]}, left={r[LEFT]}, right={r[RIGHT]}")

up=792.9452455350621, down=211.5794085743094, left=968.1105568278055, right=994.9999999820002


En el resultado anterior se observa que la acción de mayor valor *q* es **RIGHT**, lo que significa que esta debería ser la acción a ejecutar pues es la que maximiza la recompensa


Ejecutemos ahora esa acción, y veamos en donde quedó el robot, cuál fue la recompensa de la acción y si ya finalizó la tarea:


In [23]:
new_state, reward, done = act(start_state, RIGHT)

print("Nueva cuadricula del entorno: ")

for row in new_state.grid:
    print(' '.join(row))

print('Posición del robot: ', new_state.robot_pos)
print('Recompensa de la acción: ', reward)
print('¿Tarea terminada?: ', done)

Nueva cuadricula del entorno: 
* r * o
* o * *
* * * o
* o * t
Posición del robot:  [0, 1]
Recompensa de la acción:  -1
¿Tarea terminada?:  False


En la cuadricula se puede observar que el robot se desplazó a la derecha, quedando en la posición (0,1).  Como llegó a una cuadricula vacía su recompensa fue -1.  Y como no ha llegado al tesoro, ni se chocó con un obstáculo, la tarea aún no termina

Automaticemos ahora el proceso de desiciones del robot:

In [24]:
max_iter = 100                  # Definición de maximas iteraciones
finished = False                # Parametro de parada
actual_state = start_state      # Estado incial 
i = 0                           # Contador

while  (i != max_iter or finished == True):
  # Escoger la accion segun el estado
  action = choose_action(actual_state)
  # Ejecutar la accion definda y calcular la recompensa
  new_state, reward, done = act(actual_state, action)
  # print(i, action, done)
  # Definir el nuevo estado y si ya llegó el al objetivo
  actual_state = new_state
  finished = done
  i += 1
  #total_reward = []
  #total_reward += reward
  # Si se llegó al objetivo imprime
  if grid[new_state.robot_pos[0]][new_state.robot_pos[1]] == 't':
    print("Lo lograste!, encontraste el tesoro!:",new_state.robot_pos)
    print("Iteraciones realizadas : ",i)
    print("Cuadricula del entorno final : ")
    for row in new_state.grid:
      print(' '.join(row))
    break
  elif grid[new_state.robot_pos[0]][new_state.robot_pos[1]] == 'o':
    print("El robot quedó en el obstaculo, entrenalo mejor e intenta de nuevo")
    print("Quedaste en la posición: ",new_state.robot_pos)
    print("Iteraciones realizadas : ",i)
    print("Cuadricula del entorno final : ")
    for row in new_state.grid:
      print(' '.join(row))
    break
  elif i== max_iter:
   print('El algoritmo aun no converge, prueba mas iteraciones o mas episodios')


Lo lograste!, encontraste el tesoro!: [3, 3]
Iteraciones realizadas :  6
Cuadricula del entorno final : 
* * * o
* o * *
* * * o
* o r tr


Vamos a explorar los cambios a partir de hiperparametros

EL CODIGO A CONTINUACIÓN ES PARA ITERAR MANUALMENTE ENTRE EPISODIOS E ITERACIONES Y COPIAR LOS RESULTADOS EN PRUEBAS

In [0]:
N_EPISODES = 20  #[20,50,100]   
max_iter = 10 #[10,30,50,100]
actual_state = start_state      # Estado incial 

start_state = State(grid=grid, robot_pos=[0, 0]) # Posicion inicial donde se encuentra el robot
N_STATES = 16                                    # Posibles estados del entorno
MAX_EPISODE_STEPS = 100                          # Número máximo de pasos por episodio
MIN_ALPHA = 0.02                                 # Valor mínimo de la tasa de aprendizaje
alphas = np.linspace(1.0, MIN_ALPHA, N_EPISODES) # Array decreciente de valores alfa para cada episodio
gamma = 1.0
eps = 0.2                                        # epsilon de balance entre exploración y explotación
q_table = dict()                                 # Diccionario para almacenar la tabla 'q'

# Exploración del entorno según los parámetros definidos

for e in range(N_EPISODES):
    
    state = start_state
    total_reward = 0
    alpha = alphas[e]
    
    for _ in range(MAX_EPISODE_STEPS):
        action = choose_action(state)
        next_state, reward, done = act(state, action)
        total_reward += reward
        
        q(state)[action] = q(state, action) + \
                alpha * (reward + gamma *  np.max(q(next_state)) - q(state, action))
        state = next_state
        if done:
            break
    #print(f"Episode {e + 1}: total reward -> {total_reward}")

#max_iter = 100                  # Definición de maximas iteraciones
finished = False                # Parametro de parada
actual_state = start_state      # Estado incial 
i = 0                           # Contador

while  (i != max_iter or finished == True):
  # Escoger la accion segun el estado
  action = choose_action(actual_state)
  # Ejecutar la accion definda y calcular la recompensa
  new_state, reward, done = act(actual_state, action)
  # print(i, action, done)
  # Definir el nuevo estado y si ya llegó el al objetivo
  actual_state = new_state
  finished = done
  i += 1
  #total_reward = []
  #total_reward += reward
  # Si se llegó al objetivo imprime
  if grid[new_state.robot_pos[0]][new_state.robot_pos[1]] == 't':
    print("Lo lograste!, encontraste el tesoro!:",new_state.robot_pos)
    print("Iteraciones realizadas : ",i)
    print("El numero maximo de iteraciones era", max_iter)
    print("Cuadricula del entorno final : ")
    for row in new_state.grid:
      print(' '.join(row))
    break
  elif grid[new_state.robot_pos[0]][new_state.robot_pos[1]] == 'o':
    print("El robot quedó en el obstaculo, entrenalo mejor e intenta de nuevo")
    print("Quedaste en la posición: ",new_state.robot_pos)
    print("Iteraciones realizadas : ",i)
    print("Cuadricula del entorno final : ")
    for row in new_state.grid:
      print(' '.join(row))
    break
  elif i== max_iter:
   print('El algoritmo aun no converge, prueba mas iteraciones')