# Solución de MDPs por medio de iteración de políticas

Para este laboratorio utilizaremos el método de iteración de políticas para solucionar el problema del motor del vehículo definido en el laboratorio de MDPs.

Para ello utilizaremos el ambiente definido de los motores y construiremos el método de iteración de políticas para resolverlo.

## Ambiente

Para recordar la definición del ambiente se realiza en la clase `MDP`. Esta clase recibe como parámetro una codificación de los estados, la dimensión del MDP dada por la cantidad de estados en el ambiente (dados como una tupla) y el estado inicial. 

La codificación del ambiente se define como una tabla de tamaño `estados x acciones` que para cada uno de los estados (filas) contiene una lista de tuplas con la información de la probabilidad asociada a la acción de la columna, el estado de llegada de ejecutar la acción y la recompensa asociada a la ejecución de la acción. La codificación de la tabla se muestra a continuación, donde los estados son `cold=0`, `warm=1` y `overheated=2`.

![Encoding_table](table.png)

Las dimensiones del tablero se almacenan en los astributos `nrows` y `ncols` de la clase `MDP`.
Los valores codificados en el tablero se almacenarán en dos atributos de la clase, un atributo `transitions` que mantiene la probabilidad de transición de un estado a otro dada una acción como una lista de tuplas `(probability, state)` y un atributo `rewards` que mantiene la información las recompensas obtenida en el paso entre estados definido como una lista `(reward, state)`.

Adicionalmente la clase tiene los atributos `initial_state` y `state` que corresponden al estado inicial, dado por parámetro, y el estado actual del agente en el ambiente, inicializado en el estado inicial.

Note también que el estado `overheated` no tienen ninguna acción asociada a él y por lo tanto lo consideramos como un estado final.

In [1]:
# Ambiente de motores
#Librerias requeridas
import random

class MDP:
    def __init__(self, table, dimensions, initial_state):
        self.nrows, self.ncols = dimensions
        self.transitions = [[0 for _ in range(self.ncols)] for _ in range(self.nrows)]
        self.rewards = [[0 for _ in range(self.ncols)] for _ in range(self.nrows)]
        
        for i in range(self.nrows):
            for j in range(self.ncols):
                state = table[i][j]
                if state is not None:
                    list_transitions = []
                    list_rewards = []
                    for inf in state:
                        p, s, r = inf
                        list_transitions.append((p,s))
                        list_rewards.append((r,s))
                    self.transitions[i][j] = list_transitions
                    self.rewards[i][j] = list_rewards
                else:
                    self.transitions[i][j] = None
                    self.rewards[i][j] = None
        self.initial_state = initial_state
        self.state = self.initial_state
    
    def get_current_state(self):
        return self.state
    
    def get_possible_actions(self, state:int) -> list[str]:
        actions = ['slow', 'fast']
        if state == 2:
            actions = []
        return actions
    
    def get_action_index(self, action: str) -> int:
        actions = ['slow', 'fast']
        index = 0
        for a in actions:
            if action == a:
                return index
            index += 1
    
    def get_possible_states(self, state, action) -> tuple[list[float], list[int], list[int]]:
        action_index = self.get_action_index(action)
        new_state = None
        rewards=[]
        states=[]
        probabilities = []
        for i in range(len(self.transitions[state][action_index])):
            prob, new_state = self.transitions[state][action_index][i]
            reward, _ = self.rewards[state][action_index][i]
            probabilities.append(prob)
            rewards.append(reward)
            states.append(new_state)
        return probabilities, rewards, states
        
    def simulate_action(self, state, action):
        act = -1
        if action == 'slow':
            act = 0
        else:
            act = 1
        start_state_action = self.transitions[state][act]
        prob = 0
        index = -1
        new_state = -1
        for op in start_state_action:
            r = random.random() 
            if prob < r and r <= prob + op[0]:
                new_state = op[1]
            prob += op[0]
            index += 1
        return self.rewards[state][act][index][0], new_state
    
    def do_action(self, action):
        act = -1
        state = self.state
        if action == 'slow':
            act = 0
        else:
            act = 1
        start_state_action = self.transitions[state][act]
        prob = 0
        index = -1
        for op in start_state_action:
            r = random.random() 
            if prob < r and r <= prob + op[0]:
                self.state = op[1]
            prob += op[0]
            index += 1
        return self.rewards[state][act][index][0], self.state

    def reset(self):
        self.state = self.initial_state
    
    def is_terminal(self, state):
        return self.get_possible_actions(state) == []

## Creación del agente 

#### 1. definición de la clase de iteración de políticas

Implemente la clase `PolicyIteration` que define cinco atributos:

- `mdp` que corresponde al MDP a resolver
- `discount` que corresponde al factor de decuento a utilizar.
- `iterations` que corresponde a el número de iteraciones a realizar
- `values` que corresponde a un mapa con los valores calculados para los estados del MDP. Los estados se definen como una tupla de los valores posibles del estado. En el caso de Gridworld, la tupla es la posición de cada casilla. Inicialmente definiremos el mapa como un mapa vacío, el cual poblaremos con los estados descubiertos durante cada iteración.
- `policy` que corresponde a un mapa con los nombres de las acciones a ejecutar para cada uno de los estados del ambiente. La política se debe inicializar con una acción aleatoria para cada estado. Si el estado no tiene acciones posibles, su política se debe inicializar en `None`. 

Al momento de crear la clase `PolicyIteration`, esta debe recibir por parámetro, la definición del MDP `MDP`, el valor de descuento `discount` (inicializado por defecto en 0.9 si no se pasa ningún valor) y la cantidad de iteraciones a ejecutar `iterations` (con un valor de 64 por defecto, si no se pasa ningún valor).

In [2]:
import random

class PolicyIteration():
    def __init__(self, mdp: MDP, discount: float =0.9, iterations:int=64):
        self.mdp = mdp
        self.discount = discount
        self.iterations = iterations
        self.values = {} 
        self.policy = {} 
        
        for s in range(self.mdp.nrows):
            
            actions = self.mdp.get_possible_actions(s)
            if not actions:
                self.policy[s] = None
            else:
                self.policy[s] = random.choice(actions)
    
    def set_policy(self, policy:list[str]):
        self.policy = policy

    def get_policy(self, state:int) -> str:
        return self.policy.get(state, None)
    
    def get_value(self, state:int) -> float:
        return self.values.get(state, 0.0)
    
    def value_evaluation(self, state:int, action:str) -> float:
        
        if self.mdp.is_terminal(state):
            return 0.0
            
        probabilities, rewards, next_states = self.mdp.get_possible_states(state, action)
        value_sum = 0.0
        for p, r, s_prime in zip(probabilities, rewards, next_states):
            value_sum += p * (r + self.discount * self.get_value(s_prime))
        return value_sum
        
    def compute_new_value(self, state:int) -> tuple[float,str]:
        
        if self.mdp.is_terminal(state):
            return 0.0, None 

        actions = self.mdp.get_possible_actions(state)
        if not actions:
            return 0.0, None

        best_value = float('-inf')
        best_action = None
        
        for a in actions:
            current_value = self.value_evaluation(state, a)
            if current_value > best_value:
                best_value = current_value
                best_action = a
                
        return best_value, best_action

    def get_action(self, state:int) -> str:

        if self.mdp.is_terminal(state):
            return None
        return self.policy.get(state)

    def compute_new_action(self, state:int) -> str:
        if self.mdp.is_terminal(state):
            return None
            
        _ , best_action = self.compute_new_value(state)
        return best_action

    def policy_evaluation(self, new_policy:dict[int, str]) -> bool:
        has_changed = False
        for state in range(self.mdp.nrows):
            if self.policy.get(state) != new_policy.get(state):
                has_changed = True
                break
        return has_changed
    
    def policy_iteration(self) -> tuple[bool, dict[int,str]]:
        tolerance = 1e-4 

        for _ in range(100): 
            delta = 0
            new_values = self.values.copy()
            
            for state in range(self.mdp.nrows):
                old_value = self.get_value(state)
                
                if self.mdp.is_terminal(state):
                    new_value = 0.0
                else:
                    action = self.get_policy(state) 
                    new_value = self.value_evaluation(state, action) 
                    
                new_values[state] = new_value
                delta = max(delta, abs(old_value - new_value))
                
            self.values = new_values
            if delta < tolerance: 
                break

        new_policy = {}
        policy_stable = True
        
        for state in range(self.mdp.nrows):
            old_action = self.get_policy(state)
            new_action = self.compute_new_action(state) 
            
            new_policy[state] = new_action
            
            if old_action != new_action:
                policy_stable = False

        converged = policy_stable
        self.policy = new_policy
        
        return converged, self.policy

    def run_policy_iteration(self) -> tuple[dict[float], dict[str]]:
        done = False
        i = 1
        policy = {}
        while not done and i <= self.iterations:
            done, policy = self.policy_iteration()
            i += 1
        
        if done:
            print(f"La política converge en {i-1} iteraciones")
            print(f"Política : {policy}")
        else:
            print(f"La política no convergió después de {self.iterations} iteraciones")
            print(f"Política final: {policy}")
            
        return self.values, self.policy

In [3]:
#Ambiente general de prueba para todos los casos siguientes

board = [[[(1,0,1)], [(0.5, 0, 2), (0.5,1,2)]],
         [[(0.5, 0, 1), (0.5,1,1)], [(1, 2, -10)]],
         [None, None]]

env = MDP(board, (3,2), 0)

In [4]:
#Pruebas estructura del agente

a = PolicyIteration(env, 0.8, 10)

try:
    a.mdp
    assert type(a.mdp) is MDP, "El tipo del mdp debe ser MDP (el tipo de la clase)"
except:
    print("El atributo mdp no está definido")
try:
    a.discount
    assert type(a.discount) is float, "El tipo del factor de descuento debe ser float"
    assert 0 < a.discount and a.discount <=1, "El factor de descuento debe ser un valor entre 0 (excluido) y 1 (incluido)"
except:
    print("El atributo discount no está definido")
try:
    a.iterations
    assert type(a.iterations) is int, "El tipo de la cantidad de iteraciones debe ser entero"
except:
    print("El atributo iterations no está definido")

try:
    a.values
    assert type(a.values) is dict or type(a.values) is dict[int,float], "El tipo del mapa de valeres debe ser dict (el tipo de los mapas en python). El mapa puede estar isntanciado en su llave y valor si fue inicializado previamente"
except:
    print("El atributo values no está definido")


In [5]:
#Pruebas adicionales


#### 2. Política del agente

Implemente la función `get_policy` que recibe un estado por parámetro y retorna el nombre de la acción (política) actual para dicho estado.

In [6]:
#Pruebas política

a = PolicyIteration(env, 0.8, 10)

try:
    a.get_policy
except:
    print("La función get_policy no está definida")

assert type(a.get_policy(1)) is str, "La función get_policy debe retornan un string con el nombre de la acción a ejecutar en el estado dado, o None si del estado no se puede ejecutar ninguna acción"


In [7]:
#Pruebas adicionales


#### 3. Valores de estados 

Defina la función `get_value` que recibe un estado (el entero representando el estado) como parámetro y retorna el valor correspondiente para dicho estado. Si el estado no ha sido visitado, la función debe retornar 0, el valor inicial para el estado.

In [8]:
#Pruebas de get_value

a = PolicyIteration(env, 0.8, 10)


try:
    a.get_value
except:
    print("La función get_value no está definida")

assert type(a.get_value(0)) is int or type(a.get_value(0)) is float, f"La función debe retornar un valor entero o un flotante"


In [9]:
#Pruebas adicionales


#### 3. Evaluación de valores 

Defina la función `value_evaluation` que tiene como proposito calcular el valor de un estado (sin modificar el atributo de valores), dada una acción.
Esta función recibe por parámetro un estado y una acción (el string del nombre) a ejecutar en el estado (entero en este caso) y retorna el valor calculado para dicho estado. 

Notas:
- El valor por defecto de todos los estados debe ser 0.
- Para poder realizar este cálculo requerimos la nueva función del ambiente `get_possible_states`. La nueva función recibe el nombre de la acción a ejecutar para el estado actual del ambiente y retorna una lista con las probabilidades de transiciones entre los estados, una lista de recompensas y una lista de los estados de llegada del estado actual.

In [10]:
#Pruebas de value_evaluation

a = PolicyIteration(env, 0.8, 10)

try:
    a.value_evaluation
except:
    print("La función value_evaluation no está definida")

assert type(a.value_evaluation(0, 'fast')) is int or type(a.value_evaluation(0,'fast')) is float, f"La función value_evaluation debe retornar un valor de tipo entero o flotante, no {type(a.value_evaluation(0, 'fast'))}"

In [11]:
#Pruebas adicionales


#### 4. Cálculo de valor 

Defina la función `compute_new_value` que de forma similar a `value_evaluation` calcula el valor para el estado. La diferencia con la función anterior, es que `compute_new_value` calcula el valor máximo de todas las acciones posibles, dado un estado y no solo el valor de una única acción. 
Esta función recibe un estado como parámetro y calcula el nuevo valor para el estado siguiendo la fórmula de iteración de políticas. Esta función calcula el nuevo valor para el estado actual del agente (`s`) a partir de las acciones, las recompensas y los valores de los posibles estados de llegada (`s'`) de acuerdo a la función de transición (o ruido), obteniendo el valor máximo de todas las posibles acciones en el estado de entrada.

Adicional al valor máximo, esta función debe retornar el nombre de la acción que nos lleva a ese máximo valor.

In [12]:
#Pruebas de compute_new_value

a = PolicyIteration(env, 0.8, 50)

try:
    a.compute_new_value
except:
    print("La función compute_new_value no está definida")

assert type(a.compute_new_value(0)) is tuple, "La función compute_new_value debe retornar una tupla (de tipo entero o flotante), no {type(a.compute_new_value(0))}"

In [13]:
#Pruebas adicionales


#### 5. Acción a ejecutar 

Defina la función `get_action` que retorna la acción a tomar para un estado dado por parámetro, de acuerdo a los valores de los estados aledaños. Esta función debe observar el posible valor a obtener para cada una de sus acciones posibles y retornar aquella acción (su nombre) que lleva al mejor valor.
Tenga en cuenta que la evaluación de las acciones no debe modificar el valor actual del estado, o tener un efecto visible sobre el ambiente o el agente.

In [14]:
#Pruebas de get_action

a = PolicyIteration(env, 0.8, 10)


try:
    a.get_action
except:
    print("La función get_action no está definida")

assert type(a.get_action(0)) is str, "La función get_action debe retornar el nombre de una acción tipo string"


In [15]:
#Pruebas adicionales


#### 6. Calculo de acciones

Implemente la función `compute_new_action` que retorna la nueva mejor acción a realizar luego de ejecutar el cálculo del nuevo valor para un estado. La función recibe un estado como parámetro y retorna la mejor acción a ejecutar para ese estado (actualizando la política).


In [16]:
#Pruebas de compute_new_action

a = PolicyIteration(env, 0.8, 10)

try:
    a.compute_new_action
except:
    print("La función compute_new_action no está definida")

assert type(a.compute_new_action(0)) is str, "La función compute_new_action debe retornar un str (o None si no hay acción posible)"


In [17]:
#Pruebas adicionales


#### 7. Evaluación de políticas 

Implemente la función `policy_evaluation` recibe la nueva política calculada como parámetro y evalua si existió algún cambio. En caso de tener un cambio con la política actual, la función retorna `True`, de lo contrario retorna `False`.

Tenga en cuenta que la evaluación de la política se debe hacer sobre los estados del ambiente, en este caso `cold`, `hot`, `overheated`.

In [18]:
#Pruebas de policy_evaluation

a = PolicyIteration(env, 0.8, 50)

try:
    a.policy_evaluation
except:
    print("La función policy_evaluation no está definida")

assert type(a.policy_evaluation(a.policy)) is bool, "La función policy_evaluation debe retornar un booleano"


In [19]:
#Pruebas adicionales


#### 8. Iteración de políticas

Implemente la función `policy_iteration` encargada de ejecutar la iteración de políticas en tres pasos. 
1. Primero se realiza el cálculo de los nuevos valores. Esto quiere decir que para cada uno de los estados del ambiente se calcula su nuevo valor utilizando la política actual (con la función `value_evaluation`).
2. Segundo, para los nuevos valores se revisa la mejora política posible, calculando la mejor acción posible para cada uno de los estados (usando función `compute_new_action`). 
3. Tercero, se evalua la política calculada de las nuevas acciones en el paso anterior, actualizando la políticia si se encontraron mejores acciones. Si no hay una mejor acción, la función debe retornar la política calculada y un indicador (booleano) que hay convergencia para detener la iteración.

La función debe retornar `True` si la política converge y `False` si no lo hace, junto con la política calculada 

In [20]:
#Pruebas de policy_iteration

a = PolicyIteration(env, 0.8, 10)

### BEGIN TESTS
board = [[[(1,0,1)], [(0.5, 0, 2), (0.5,1,2)]],
         [[(0.5, 0, 1), (0.5,1,1)], [(1, 2, -10)]],
         [None, None]]
env = MDP(board, (3,2), 0)
a = PolicyIteration(env, 0.8, 10)

try:
    a.policy_iteration
except:
    print("La función policy_iteration no está definida")

assert type(a.policy_iteration()) is tuple, "La iteración de política debe retornar una tupla con un booleano y un mapa con la política"


In [21]:
#Pruebas adicionales



#### Ejecución del agente 

Finalmente definimos la función `run_policy_iteration` que no recibe ningún parámatetro y ejecuta el algoritmo de solución del MDP.

Esta función ejecuta el número de iteraciones dadas a la clase. Para cada una de las iteraciones se debe calcular el nuevo valor para cada uno de los estados del ambiente y la nueva política. 

Tenga en cuenta que el cálculo de los nuevos valores se debe realizar con los valores anteriores y únicamente se deben actualizar los valores cuando se actualicen los valores para todos los estados.

In [22]:
#Pruebas de ejecución del agente

a = PolicyIteration(env, 0.9, 100)

a.run_policy_iteration()


La política converge en 3 iteraciones
Política : {0: 'fast', 1: 'slow', 2: None}


({0: 15.499115145513922, 1: 14.499115145513922, 2: 0.0},
 {0: 'fast', 1: 'slow', 2: None})

In [23]:
print('ok')

ok
