# Lista de Exercícios 1: Processos de Decisão de Markov e Programação Dinâmica

#### Disciplina: Aprendizado por Reforço
#### Professor: Luiz Chaimowicz
#### Monitores: Marcelo Lemos e Ronaldo Vieira

---

## Instruções

- Leia atentamente toda a lista de exercícios e familiarize-se com o código fornecido **antes** de começar a implementação.
- Os locais onde você deverá escrever suas soluções estão demarcados com comentários `# YOUR CODE HERE` ou `YOUR ANSWER HERE`.
- **Não altere o código fora das áreas indicadas, nem adicione ou remova células. O nome deste arquivo também não deve ser modificado.**
- Antes de submeter, certifique-se de que o código esteja funcionando do início ao fim sem erros.
- Submeta apenas este notebook com as suas soluções no Moodle.
- Prazo de entrega: 15/04/2025. **Submissões fora do prazo terão uma penalização de -20% da nota final por dia de atraso.**
- Utilize a [documentação do Gymnasium](https://gymnasium.farama.org/) para auxiliar sua implementação.
- Em caso de dúvidas entre em contato pelo fórum "Dúvidas com relação aos exercícios e trabalho de curso" no moodle da Disciplina.

---

## Frozen Lake

O ambiente Frozen Lake é uma simulação clássica utilizada para treinamento de agentes em aprendizado por reforço. Neste ambiente, o agente navega por um lago congelado representado por um grid de tamanho $n \times m$, com o objetivo de alcançar um alvo. O lago contém dois tipos de células: (1) células com gelo sólido, que são seguras para o agente se mover, e (2) as células com buracos, nas quais o agente cai e falha a missão. Embora o Gymnasium já possua uma implementação do Frozen Lake, neste exercício iremos implementá-lo do zero.

No início de cada episódio, o agente é posicionado na célula $[0, 0]$ enquanto o alvo é posicionado na célula mais distante do agente, na posição $[n-1, m-1]$ em um mapa de tamanho $n \times m$. A cada passo, o agente recebe uma observação indicando sua posição atual no lago e tem a possibilidade de escolher entre quatro ações possíveis: mover-se para cima, para baixo, para a esquerda ou para a direita. No entanto, devido à superfície escorregadia do lago, ele nem sempre se move na direção desejada, podendo acabar se movendo em uma direção perpendicular à escolhida. O agente recebe uma recompensa de 1 se alcançar o alvo e zero em todos os outros estados. Um episódio termina quando o agente alcança o objetivo ou cai na água.

Neste exercício, vamos trabalhar sempre com o mesmo mapa $4 \times 4$, representado na figura abaixo.

![Frozen Lake Map](https://gymnasium.farama.org/_images/frozen_lake.gif)

Sua primeira tarefa será implementar o ambiente Frozen Lake utilizando o arcabouço fornecido pelo Gymnasium. Abaixo, você encontrará um código inicial que deverá ser utilizado em sua implementação. Siga essas instruções para garantir que seu código está de acordo com o esperado:

1. Na função `__init__`, já definimos o mapa que será utilizado e armazenamos essa informação na variável `_description`. Nesse mapa, a letra 'S' representa a posição inicial do agente, a letra 'G' indica o alvo, as letras 'F' representam gelo sólido (que é seguro) e as letras 'H' marcam os buracos. No entanto, ainda é necessário adicionar mais algumas informações no ambiente, especificamente sobre a representação das observações e das ações. Embora existam várias maneiras de representar os espaços de observações e de ação, neste exercício, você deve usar a forma mais simples possível, que pode ser representada por um único valor discreto. Na função `__init__`, defina o espaço de observações e o espaço de ações, atribuindo-os às variáveis `self.observation_space` e `self.action_space`, respectivamente. Utilize apenas a classe `gymnasium.spaces.Discrete` nesta tarefa.

2. Antes de prosseguirmos com as funcionalidades do gymnasium, vamos implementar algumas funções auxiliares para facilitar as próximas etapas. Implemente a função `_get_obs`, que retorna a observação atual do ambiente. Além disso, implemente a função `_set_state`, que recebe um valor inteiro correspondente a uma posição no lago e coloca o agente nesta localização.

3. A função `reset` deve resetar o ambiente e inicializar um novo episódio, posicionando o agente na célula $[0, 0]$ e fazendo todos os ajustes internos necessários. Esta função deve retornar uma tupla contendo a observação inicial e as informações do ambiente. Neste exercício, vamos retornar um dicionario vazio `{}` para as informações. Lembre-se que a observação deve ser um único valor discreto, como definido no item 1. Implemente a função `reset`.

4. A função `step()` é responsável por atualizar o ambiente com base na ação executada pelo agente. Ela recebe como entrada a ação escolhida pelo agente, um parâmetro seed e uma variável options, e calcula o novo estado atual com base na função de transição previamente definida. Neste exercício, você pode ignorar os parâmetros seed e options, pois não precisaremos deles. Neste ambiente que estamos desenvolvendo, o agente tem 80% de chance de se mover na direção desejada e 20% de chance de se mover em uma direção perpendicular à escolhida, distribuída igualmente entre os dois sentidos possíveis (10% para cada um). **As ações do agente devem ser representadas pelos valores 0 (mover-se para a esquerda), 1 (mover-se para baixo), 2 (mover-se para a direita) e 3 (mover-se para cima)**. Além disso, a função atribui uma recompensa ao agente e verifica se o episódio chegou ao fim. Implemente a função step() para que ela retorne a observação do estado atual, a recompensa recebida, um valor booleano indicando se o estado é terminal, um valor booleano informando se o episódio foi truncado e as informações do ambiente. Não se preocupe com estes dois últimos valores, nós não vamos utilizá-los.

In [2]:
import sys
import numpy as np
import gymnasium as gym

In [7]:
class FrozenLake(gym.Env):
    def __init__(self):
        self._description = np.asarray([
            "SFFF",
            "FHFH",
            "FFFH",
            "HFFG"
        ], dtype='c')

        self.n_rows, self.m_cols = self._description.shape

        # Observation space id a n * m grid
        self.observation_space = gym.spaces.Discrete(self.n_rows * self.m_cols)

        # 0 = Left, 1 = Down, 2 = Right, 3 = Up
        self.action_space = gym.spaces.Discrete(4)


    def _get_obs(self):
        row, col = self.agent_pos
        return row * self.m_cols + col


    def _set_state(self, state):
        row = state // self.m_cols
        col = state % self.m_cols
        self.agent_pos = (row, col)


    def reset(self, seed = None, options = None):
        super().reset(seed=seed)

        self.agent_pos = (0, 0)

        obs = self._get_obs()
        return obs, {}


    def step(self, action):
        directions_and_moves = {
        # Direction, (line, column). Not (x, y)
            0: (0, -1),  # Left
            1: (1, 0),   # Down
            2: (0, 1),   # Right
            3: (-1, 0)   # Up
        }

        perpendiculars_and_actions = {
            0: [1, 3], # Down, Up
            1: [0, 2], # Left, Right
            2: [1, 3], # Down, Up
            3: [0, 2]  # Left, Right
        }

        probability_of_actions = [0.8, 0.1, 0.1]
        possible_actions = [action] + perpendiculars_and_actions[action]

        # The real action will be defined based on the defined probabilities
        action = np.random.choice(possible_actions, p=probability_of_actions)

        new_row = self.agent_pos[0] + directions_and_moves[action][0]
        new_col = self.agent_pos[1] + directions_and_moves[action][1]

        # Check if the new position is valid
        if new_row > 0 and new_row < self.n_rows:
            if new_col > 0 and new_col < self.m_cols:
                self.agent_pos = (new_row, new_col)

        new_cell = self._description[self.agent_pos[0], self.agent_pos[1]].decode()

        if new_cell == 'G':
            reward = 1
            is_terminal = True

        elif new_cell == 'H':
            reward = 0
            is_terminal = True

        else:
            reward = 0
            is_terminal = False

        is_truncated = None
        environment_info = {}

        return self._get_obs(), reward, is_terminal, is_truncated, environment_info


In [6]:
# REMOVE

frozenLake = FrozenLake()
print(frozenLake._description[0, 0])


b'S'


In [None]:
# Não altere ou remova esta célula

In [None]:
# Não altere ou remova esta célula

In [None]:
# Não altere ou remova esta célula

In [None]:
# Não altere ou remova esta célula

## Policy Iteration

Agora que estamos familiarizados com o ambiente FrozenLake, nosso objetivo é encontrar uma política ótima para esse ele.  Desta vez, utilizaremos a versão oficial do Frozen Lake disponibilizado pelo Gymnasium. Ele possui algumas propriedades que facilitarão as próximas implementações. **Fique atento às transições do ambiente, pois elas são diferentes das do exercício anterior.** Neste ambiente oficial, o agente possui apenas $1/3$ de chance de se mover na direção desejada e $2/3$ de chance de se mover perpendicularmente. Sua tarefa será implementar o algoritmo *Policy Iteration*, conforme ilustrado abaixo.

![Policy Iteration](policy_iteration.png)

5. A implementação será realizada em etapas. Comece implentando a função `init_policy_iteration`, que inicializa e retorna dois arrays. O primeiro array armazenará os valores esperados de cada estado $V(s)$, enquanto o segundo conterá a política do agente: para cada estado, ele indicará a ação que o agente deve realizar. Ambos os arrays devem ser inicializados com zeros.

In [None]:
def init_policy_iteration(env: gym.Env) -> tuple[np.ndarray[float], np.ndarray[int]]:
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# Não altere ou remova esta célula

6. Agora, vamos computar o valor esperado $V(s) = \sum_{s', r}p(s',r|s, a)[r + \gamma V(s')]$. Implemente a função `compute_expected_value`que recebe como parâmetros o ambiente, o vetor $V$, um estado, uma ação, o valor de $\gamma$ (fator de desconto), e retorna o valor esperado. Não altere os valores de $V$ nesta função.

**Importante:** A variável `env.unwrapped.P[state][action]` contém as transições do ambiente, retornando uma lista com todas as transições possíveis para o par (state, action). Cada elemento dessa lista inclui, na seguinte ordem: a probabilidade da transição, o estado $s'$ alcançado, a recompensa recebida e um indicador de estado terminal. 

In [None]:
def compute_expected_value(env: gym.Env, V: np.ndarray[float], state: int, action: int, gamma: float) -> float:
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# Não altere ou remova esta célula

7. O pŕoximo passo será avaliar a política do agente. Implemente o loop de avaliação de política do policy iteration na função `evaluate_policy`. Ela receberá o ambiente, a política do agente, o vetor $V$, o valor $\gamma$, e o valor $\theta$. Esta função não precisa retornar nada.

In [None]:
def evaluate_policy(env: gym.Env, policy: np.ndarray[int], V: np.ndarray[float], gamma: float, theta: float) -> None:
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# Não altere ou remova esta célula

8. A seguir, vamos implementar a atualização da política. Na função `improve_policy` implemente uma iteração da atualização da política. Ela recebe o ambiente, a política do agente, o vetor $V$, e o valor $\gamma$. Ela deverá retornar um booleano indicando se política está estável.

In [None]:
def improve_policy(env: gym.Env, policy: np.ndarray[int], V: np.ndarray[float], gamma: float) -> bool:
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# Não altere ou remova esta célula

A célula abaixo implementa a estrutura do algoritmo *Policy Iteration* utilizando as funções desenvolvidas nas etapas anteriores. Não é necessário realizar nenhuma implementação nesta parte.

In [None]:
def policy_iteration(env: gym.Env, gamma: float, theta: float) -> tuple[np.ndarray[float], np.ndarray[int]]:
    V, policy = init_policy_iteration(env)
    
    while True:
        evaluate_policy(env, policy, V, gamma, theta)
        policy_stable = improve_policy(env, policy, V, gamma)
        if policy_stable:
            break
    return V, policy

In [None]:
def print_policy(env:gym.Env, policy: np.ndarray[int]):
    action_map = ['←', '↓', '→', '↑']
    policy_grid = np.full((4, 4), "")
    
    for index, action in enumerate(policy):
        row, col = divmod(index, 4)
        if env.unwrapped.desc[row, col] == b'H':
            policy_grid[row, col] = '▢'
        elif env.unwrapped.desc[row, col] == b'G':
            policy_grid[row, col] = '◎'
        else:
            policy_grid[row, col] = action_map[action]
    np.savetxt(sys.stdout, policy_grid, fmt='%s', delimiter=' ')

Observe a política obtida pelo policy iteration na célula abaixo e responda as perguntas a seguir.

In [None]:
gamma = 0.99
theta = 1e-8

env = gym.make("FrozenLake-v1", map_name="4x4")
V, policy = policy_iteration(env, gamma, theta)
print_policy(env, policy)
env.close()

9. A política aprendida está de acordo com o comportamento esperado para este problema? Justifique sua resposta com base nos resultados observados.

YOUR ANSWER HERE

10. Quais estratégias poderiam ser adotadas para melhorar a política obtida?

YOUR ANSWER HERE

In [None]:
# Não altere ou remova esta célula

## Value Iteration

Neste exercício vamos encontrar uma política ótima para o Frozen Lake utilizando o algoritmo *Value Iteration* como descrito abaixo.

![Value Iteration](value_iteration.png)

11. Novamente vamos dividir este exercícios em etapas menores. O primeiro passo consiste em inicializar o vetor $V$, que armazenará os valores esperados para cada estado. Para isso, implemente a função `init_value_iteration`, que recebe um ambiente como parâmetro e retorna o vetor $V$. Este vetor deve ser inicializado com valores zero.

In [None]:
def init_value_iteration(env: gym.Env) -> np.ndarray[float]:
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# Não altere ou remova esta célula

12. Agora, vamos gerar uma política determinística a partir de um vetor $V$, conforme definido pela equação $\pi(s)= \textrm{argmax}_a \sum_{s', r}p(s', r|s, a)[r + \gamma V(s')]$. Implemente a função `generate_policy`, que recebe um ambiente e um vetor $V$, retornando a política determinística resultante.

**Dica:** Utilize a função `compute_expected_value` do exercício anterior para facilitar sua implementação.

In [None]:
def generate_policy(env: gym.Env, V: np.ndarray[float], gamma: float) -> np.ndarray[int]:
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# Não altere ou remova esta célula

13. Por fim, implemente o loop principal do *Value Iteration* na função `value_iteration`. Ela deverá retornar, nesta ordem, o array de valores $V$ e a política obtida.

In [None]:
def value_iteration(env: gym.Env, gamma:float, theta: float) -> tuple[np.ndarray[float], np.ndarray[int]]:
    # YOUR CODE HERE
    raise NotImplementedError()

Certifique-se que seu algoritmo esteja funcionando corretamente na célula abaixo.

In [None]:
gamma = 0.99
theta = 1e-8

env = gym.make("FrozenLake-v1", map_name="4x4")
V, policy = value_iteration(env, gamma, theta)
print_policy(env, policy)
env.close()

In [None]:
# Não altere ou remova esta célula