[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/pablo-sampaio/rl_facil/blob/main/cap03/cap03-main.ipynb)

# Capítulo 3 - Ambientes, Retornos e Funções de Valor


In [5]:
from IPython.display import clear_output
from IPython.display import display, Image
import time
import sys

if 'google.colab' in sys.modules:
  !pip install gymnasium
  clear_output()

In [6]:
import gymnasium as gym
import numpy as np

A maior parte do notebook foi pensada para o ambiente `FrozenLake`, mas você pode testar com outros ambientes. (Fica como exercício ajustar as partes onde o código não funcionar para outros ambientes).

In [7]:
# vamos focar nesses três ambientes por serem mais simples
# ver mais em: 

env = gym.make("FrozenLake-v1")

#env = gym.make("MountainCar-v0")
#env = gym.make("CartPole-v1")
#env = gym.make("Taxi-v3")
#env = gym.make("Blackjack-v1")

![Figura mostrando interação agente(política)-ambiente](figura_mdp.png "Interação agente-ambiente")

## 1 - Episódio e Trajetória

Um **episódio** é uma execução completa da tarefa (ou do ambiente gym).

E a **trajetória** é a sequência de estados (observações), ações e recompensas do episódio. Assumindo um episódio de $T$ passos (ações aplicadas):

$S_0 \rightarrow A_0 \rightarrow R_1 \rightarrow S_1 \rightarrow A_1 \rightarrow R_2 \rightarrow S_2 \rightarrow \cdots S_{T-1} \rightarrow A_{n-1} \rightarrow R_T \rightarrow S_T$

Vamos ilustrar um episódio em um MDP usando o ambiente **`env`** escolhido no código acima.


In [8]:
i = 0

# inicia um novo episódio
state, _ = env.reset()
print(f"S0 = {state}")

done = False

# roda até o episódio encerrar
while not done:
    action = env.action_space.sample()
    print(f" A{i} = {action}")

    next_state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    i += 1

    print(f"  R{i} = {reward}")
    print(f"S{i} = {next_state}")

    state = next_state
    #time.sleep(0.1)

env.close()

S0 = 0
 A0 = 1
  R1 = 0.0
S1 = 0
 A1 = 2
  R2 = 0.0
S2 = 4
 A2 = 1
  R3 = 0.0
S3 = 5


Os detalhes do *episódio* que mostramos acima são chamamos de *trajetória* (ou *rollout*).

Dependendo do algoritmo, vamos precisar analisar essas informações em trios (S,A,R) ou quádruplas (S,A,R,S) ou até quíntuplas (S,A,R,S',A').

Abaixo, vamos guardar uma trajetória como uma lista de trios. Cada trio desses será chamado de um *passo* da trajetória.

In [9]:
state, _ = env.reset()
trajectory = []

done = False

while not done:
    action = env.action_space.sample()

    next_state, reward, terminated, truncated, _ = env.step(action)
    trajectory.append( (state, action, reward) )
    
    done = terminated or truncated
    state = next_state

# o último estado pode ser incluído como um trio incompleto, se preciso
# porém, o restante do notebook, assume que não tem essa informação
#trajectory.append( (obs, None, None) )

env.close()

print("Trajetória como sequência de trios (STATE, ACTION, REWARD):")
trajectory

Trajetória como sequência de trios (STATE, ACTION, REWARD):


[(0, 2, 0.0), (4, 3, 0.0), (0, 3, 0.0), (0, 2, 0.0), (1, 1, 0.0)]

Em termos teóricos, um ambiente é modelado matematicamente como um **Markov Decision Process** (MDP), como vimos em sala.

## 2 - Política

A escolha de uma ação a cada estado é feita pela chamada **política**. Até aqui, estamos usando uma política aleatória (que escolhe qualquer das ações disponíveis com igual probabilidade).

O nosso objetivo (para as próximas aulas) é ver os algoritmos que aprendem uma política "boa".

Para o restante deste notebook, vamos usar uma política simples que chamamos de **policy_0**. Ela foi pensada para o ambiente `FrozenLake`:
- com 45% de chance, escolhe a ação **1** (mover para baixo)
- com 45% de chance, escolhe a ação **2** (para a direita)
- com 10% de chance, escolhe qualquer ação aleatoriamente

Veja o código dela:

In [10]:
def policy_0(state):
    num_actions = env.action_space.n
    x = np.random.random()
    if x <= 0.45:
        return 1
    elif x <= 0.90:
      return 2
    else:
        return np.random.randint(0, num_actions)

In [11]:
def policy_1(state):
    num_actions = env.action_space.n
    x = np.random.random()
    if x <= 0.55:
        return 1
    else:
      return 2

Também vamos definir uma função `run_episode()` para gerar uma trajetória (de 1 episódio completo) usando uma política dada como parâmetro:

In [12]:
def run_episode(env, agent_policy):
    obs, _ = env.reset()
    trajectory = []

    done = False

    while not done:
        action = agent_policy(obs)
        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        trajectory.append( (obs, action, reward) )
        obs = next_obs

    env.close()

    return trajectory

Agora, vamos criar uma trajetória com a política proposta antes. Veja que passamos a função `policy_0` como parâmetro.

In [13]:
run_episode(env, policy_0)

[(0, 1, 0.0), (4, 2, 0.0)]

A seguir, vamos criar uma trajetória que termine em sucesso (recompensa final == `1.0`), pensando no ambiente `FrozenLake`:

In [14]:
while trajectory[-1][2] < 1.0:
  trajectory = run_episode(env, policy_0)

trajectory

[(0, 1, 0.0),
 (4, 2, 0.0),
 (8, 2, 0.0),
 (9, 2, 0.0),
 (10, 2, 0.0),
 (14, 1, 1.0)]

## 3 - Calcular os Retornos

O *retorno (final)* $G$ é uma medida da recompensa total obtida ao longo de um episódio.

Em um MDP, o objetivo é otimizar o valor médio de $G$, para infinitos episódios.


### 3.1 - Retorno (completo) do episódio ($G$)



Para um episódio com $n$ passos, o **retorno (não-descontado)** é calculado assim:

$ G = R_1 + R_2 + R_3 + \cdots + R_n = \displaystyle\sum_{i=1}^{n} R_i$

No código a seguir, vamos calcular o *retorno não-descontado* da trajetória calculada antes.

*Observação*: Em código, como "return" é uma palavra reservada de Python, o *retorno* do episódio será representando por nomes como:
- `sum_rewards`
- ou `episode_return`
- ou `episode_reward`
- ou versões abreviadas desses nomes.


In [15]:
episode_reward = 0.0
for (s, a, r) in trajectory:
    episode_reward += r

print("Retorno não-descontado:", episode_reward)

Retorno não-descontado: 1.0


Porém, é mais usado o **retorno descontado** de um episódio.

Neste caso, $G$ é uma soma que "atenua" recompensas mais distantes, valorizando mais as recompensas iniciais. (Você prefere receber 100 reais agora, de uma vez, ou em 100 parcelas de 1 real?)

Para isso, a cada passo, a recompensa tem uma *redução* na sua relevância, dada por um parâmetro $\gamma\;$, tal que $0 < \gamma \leq 1$.

Para um episódio com $n$ passos, o *retorno descontado* é calculado assim:

$ G = R_1 + \gamma R_2 + \gamma^2 R_3 + \cdots + \gamma^{(n-1)} R_n = \displaystyle\sum_{i=1}^{n} \gamma^{(i-1)} R_i$

Vamos criar uma função para fazer esse cálculo, a partir de uma dada trajetória (de 1 episódio):

In [16]:
def get_episode_return(trajectory, gamma):
    step = 0
    episode_reward = 0.0
    for (s, a, r) in trajectory:
        episode_reward += (gamma ** step) * r
        step += 1
    return episode_reward

A seguir, calculamos o *retorno descontado* da trajetória calculada na seção anterior, assumindo um valor específico de $\gamma$ (variável `GAMMA`):

In [17]:
GAMMA = 0.99

epi_return = get_episode_return(trajectory, GAMMA)
print("Retorno descontado:", epi_return)

Retorno descontado: 0.9509900498999999


### 3.2 - Retornos parciais ($G_i$)



Também podemos calcular um retorno parcial, a partir de um passo específico $i$ de um dado episódio:

$$
\begin{align*}
   && &\quad G_0     &= &\;\;R_1 &+ \gamma &R_2 &+ \gamma^2 &R_3 &+ \gamma^3 &R_4 &+ \gamma^4 &R_5 &+ \;\cdots \;&+ \gamma^{n-1} &R_n & \;\;\;(= G) \\
   && &\quad G_1     &= &        &         &R_2 &+ \gamma   &R_3 &+ \gamma^2 &R_4 &+ \gamma^3 &R_5 &+ \;\cdots \;&+ \gamma^{n-2} &R_n &       \\
   && &\quad G_2     &= &        &         &    &           &R_3 &+ \gamma   &R_4 &+ \gamma^2 &R_5 &+ \;\cdots \;&+ \gamma^{n-3} &R_n &       \\
   && &\quad G_3     &= &        &         &    &           &    &           &R_4 &+ \gamma   &R_5 &+ \;\cdots \;&+ \gamma^{n-4} &R_n &       \\
   && &\quad         &\cdots   & & & & & & & & & & & &    & \\
   && &\quad G_{n-1} &= &        & & & & & & & & & & &R_n & \\
   && &\quad G_n     &= &\;\;0 & & & & & & & & & & &      & \\
\end{align*}
$$

Podemos calcular um retorno parcial $G_i$ simplesmente omitindo os $i$ passos inicias da trajectória:

In [18]:
get_episode_return(trajectory[3:], GAMMA)

0.9801

In [19]:
for i in range(len(trajectory)+1):
    Gi = get_episode_return(trajectory[i:], GAMMA)
    print(f"Retorno parcial G_{i} :", Gi)

Retorno parcial G_0 : 0.9509900498999999
Retorno parcial G_1 : 0.96059601
Retorno parcial G_2 : 0.970299
Retorno parcial G_3 : 0.9801
Retorno parcial G_4 : 0.99
Retorno parcial G_5 : 1.0
Retorno parcial G_6 : 0.0


Observe novamente a série de equações anteriores para os retornos parciais $G_0$, $G_1$, $G_2$, etc.

Percebe que existe apenas uma pequena mudança entre cada equação (para $G_i$) e a equação logo abaixo?

De fato, existe uma relação (recursiva) entre $G_i$ e $G_{i+1}$ que pode ser expressa assim:
$$
   G_{i} = R_{i+1} + \gamma G_{i+1}
$$

(Por exemplo: $G_0 = R_1 + \gamma G_1$).


Usando essa relação recursiva, podemos calcular todos os retornos parciais de maneira mais simples:

In [20]:
# calcula os retornos parciais a partir de cada passo
# em ordem invertida (G_i, para cada i de n a 0)
i = len(trajectory)

Gi = 0.0
print(f"G_{i} =", Gi)

for (s, a, r) in reversed(trajectory):
    i = i - 1
    Gi = r + GAMMA*Gi
    print(f"G_{i} =", Gi)


G_6 = 0.0
G_5 = 1.0
G_4 = 0.99
G_3 = 0.9801
G_2 = 0.9702989999999999
G_1 = 0.96059601
G_0 = 0.9509900498999999


## 4 - Funções de Valor (para uma Política Dada)


Elas são usadas para definir o que é uma política "melhor" do que outra em MDP. E são usadas diretamente em alguns algoritmos (família *value-based*).

Veremos dois tipos de função de valor. Ambas dão os valores *retornos esperados* (médios) **para uma política específica**!

Elas são calculadas a partir dos retornos parciais, que vimos antes.

### 4.1 - Função de valor do estado $V(s)$

Esta função dá o retorno esperado a partir de cada estado $s$, para uma política específica. Aqui estamos pensando na *identidade* do estado $s$, independente de sua posição na trajetória.

De forma matemática, ela é definida assim:

$$V(s) = E[G_t | S_t=s, \mathrm{\text{t qualquer}}]$$

Ou seja, $V(s)$ é o valor médio dos valores $G_t$ calculados a partir do estado $s$ quando ele aparece em uma posição $t$ qualquer do episódio. 

#### Algoritmo para Estimativa de Valor por Monte Carlo

Um algoritmo para calcular o valor de $V(s)$ para um $s$ específico é dado a seguir. Ele é um algoritmo de Monte Carlo para o **problema de predição**, ou seja, para calcular a estimativa de uma função de valor.

**Pseudocódigo**

1. Inicialize o histórico de retornos para todos os estados.
2. Repita "muitas" vezes:
   - Rode um episódio seguindo a política.
   - Para cada passo no episódio, começando do estado $s_i$:
     - calcule o retorno parcial $G_i$.
     - guarde $G_i$ no histórico de retornos para o estado $s_i$.
3. Calcule a estimativa de valor para cada estado $s$:
   - Para cada estado $s$:
     - $V(s)$ = média dos retornos no histórico para o estado $s$.

### Implementação

Implementamos esta ideia abaixo, guardando cada retorno em um dicionário `returns_history` (um histórico dos retornos) indexado pelo $s$ onde se originou o retorno parcial.

Esta implementação assume ambiente de *estado discreto*, representados por inteiros iniciados em 0. 
- Exemplos de ambientes assim: *FrozenLake-v1* e *Taxi-v3*.

Nestes casos, o $V$ pode ser representado como um array, e o estado serve como índice de onde está guardado o valor daquele estado.

In [21]:
# associa cada estado a uma lista de retornos parciais
# obtidos a partir do estado (em um episódio qualquer)
returns_history = dict()

# roda muitos episódios, com a política desejada
for epi in range(20000):
    trajectory = run_episode(env, policy_0)

    # calcula os retornos a cada passo (G_i, para cada i de n a 0) do episódio
    # guardando o valor em returns_history
    Gi = 0.0
    for (s, a, r) in reversed(trajectory):
        Gi = r + GAMMA*Gi
        if s not in returns_history.keys():
            returns_history[s] = [ Gi ]
        else:
            returns_history[s].append(Gi)

# associa cada estado à média dos retornos parciais
V = np.zeros(env.observation_space.n)

# calcula V
for s in returns_history.keys():
    V[s] = np.mean( returns_history[s] )

V

array([0.03539695, 0.02521524, 0.04803949, 0.01579362, 0.04419733,
       0.        , 0.09595079, 0.        , 0.10048972, 0.22846086,
       0.27130909, 0.        , 0.        , 0.37533681, 0.64659988,
       0.        ])

Abaixo, redimensionamos o array para deixá-lo bidimensional. Assim, conseguimos comparar com a imagem do ambiente e entender as posições de maior retorno esperado, quando seguimos a política em questão. 

In [24]:
# Mostra o mapa de FrozenLake
display(Image(url="https://gymnasium.farama.org/_images/frozen_lake.gif"))

# Mostra a função V na mesma disposição do mapa do FrozenLake
print(V.reshape(4,4))

[[0.03539695 0.02521524 0.04803949 0.01579362]
 [0.04419733 0.         0.09595079 0.        ]
 [0.10048972 0.22846086 0.27130909 0.        ]
 [0.         0.37533681 0.64659988 0.        ]]


### 4.2 - Função de valor da ação $Q(s,a)$

O $Q(s,a)$ responde a esta pergunta:

*Quando estava no estado* **s** *e fez a ação* **a** *, qual o retorno esperado (se continuar seguindo a política no restante do episódio)?*

Assim, de maneira análoga ao $V(s)$, o $Q(s,a)$ representa o retorno esperado a partir do par $(s,a)$:

$$Q(s,a) = E[G_t | S_t=s, A_t=a]$$



#### Algoritmo

Existe um algoritmo de Monte-Carlo para calcular $Q(s,a)$, que é análogo ao anterior.

---
1. rode infinitos episódios com a política
2. analise cada episódio, e a cada passo iniciado no estado $s_i$ seguido de uma ção $a_i$:
   - calcule o retorno parcial $G_i$
   - salve $G_i$ no histórico do par $(s_i, a_i)$
3. Para cada par $(s,a)$:
   - $Q(s, a)$ = média de todos os retornos do histórico do par $(s, a)$

---

O código é semelhante ao anterior. Porém, aqui, vamos usar um dicionário `returns_history` indexado pelo par $(s,a)$ onde se originou cada retorno $G_i$.

Tente completar a implementação.

In [23]:
# associa cada par (estado, ação) a uma lista de retornos parciais
# obtidos em episódios quaisquer onde esse par apareceu
returns_history = dict()

# roda muitos episódios, com a política desejada
for epi in range(20000):
    trajectory = run_episode(env, policy_0)

    # calcula os retornos a cada passo (G_i, para cada i de n a 0) do episódio
    # guardando o valor em returns_history
    Gi = 0.0
    for (s, a, r) in reversed(trajectory):
        # FAÇA: complete o código (no lugar do pass)...
        pass

# matriz para associar cada par (estado, ação) à média dos retornos parciais
Qtable = np.zeros(shape=(env.observation_space.n, env.action_space.n))

# calcula Q
# FAÇA: complete o código...

Qtable

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]])

Note que a função $Q$ calculado para estados discretos e ações discretas (ambos representados por inteiros) pode ser representada como uma matriz. Ela pode ser vista como uma _tabela_ com linhas representando os *estados* e colunas representando as *ações*.

Por esse motivo, chamamos essa representação de **Q-table** (tabela-Q).

## 5 - Preparando para os Métodos Baseados em Q-table

Agora, sim, podemos falar um pouquinho sobre os *algoritmos de controle* da aprendizagem por reforço, que são os algoritmos capazes de aprender políticas "boas", ou seja, políticas que dão altos retornos.

Coloque-se no lugar do algoritmo e suponha que você inicie com uma política qualquer (provavelmente ruim) e que você tenha calculado o *Q* dessa política com uma Q-table.

**De que forma você poderia melhorar a política olhando para os valores de Q?**

**Ou, como escolher a melhor ação a cada estado, usando a Q-table?**

---

No próxima parte do curso, veremos um algoritmo para RL, também da família Monte-Carlo, onde a política é implicitamente representada pelo $Q$.

Este método repete $N$ vezes esses passos:
1. Rode um episódio, usando a política representada pela tabela $Q$
   - salve a trajetória
1. Depois, calcule os valores de $G_i$ e use esses valores para atualizar $Q$
   - ao atualizar $Q$, a política eventualmente muda

---

Veremos mais detalhes no próximo capítulo.