# Reinforcement Learning in a nutshell

(Apprendimento per Rinforzo in poche parole)

![“agent-environment-loop”](./assets/AE_loop.png)

Copyright © 2024 [Farama Foundation](https://farama.org)

## Cliff Walking: living on the edge

- Useremo [`Cliff walking`](https://gymnasium.farama.org/environments/toy_text/cliff_walking/) come esempio:

  ![](./assets/cliffw_reset.png)

### Agente

Cosa (non) conosce il nostro "agente":

1. posizione
   - **non** sono coordinate!
2. possibili azioni (0, 1, 2, 3)
   - **non** sa a cosa corrispondono!
3. **non** conosce la mappa!

### Ambiente

- ogni azione sposta l'agente
- precipizio riporta al via
- **Premio** (reward):
  - -1 ogni movimento
  - -100 precipizio
  - +100 obiettivo

## [Farama Gymnasium](https://gymnasium.farama.org): una palestra per AI

- interfaccia Python e librerie per interagire con ambienti simulati
- astrazione di un ambiente per la simulazione
  - osservazioni
  - azioni

### Interfaccia

Interfaccia costruita intorno all'ambiente di simulazione ([`Env`](https://gymnasium.farama.org/api/env/#gymnasium.Env))

- [`make()`](https://gymnasium.farama.org/api/registry/#gymnasium.make): crea un ambiente di simulazione
- [`Env.reset()`](https://gymnasium.farama.org/api/env/#gymnasium.Env.reset): inizializza l'ambiente
- [`Env.step()`](https://gymnasium.farama.org/api/env/#gymnasium.Env.step); esegue un'azione
- [`Env.render()`](https://gymnasium.farama.org/api/env/#gymnasium.Env.render): genera una rappresentazione della situazione

### Interazione con la simulazione

- **episodio**: una simulazione
  - inizializzazione
  - ciclo azione-osservazione
- conclusione
  - situazione conclusiva (e.g. crash)
  - dopo in certo numero di azioni

```python
import gymnasium as gym

env = gym.make("CliffWalking-v0", render_mode="human")
observation, info = env.reset()

episode_over = False
while not episode_over:
    action = choose_action(observation)
    observation, reward, terminated, truncated, info = env.step(action)

    episode_over = terminated or truncated

env.close()
```

Per approfondimenti consultare la [documentazione](https://gymnasium.farama.org/introduction/basic_usage/)

## Q-learning

- obiettivo: imparare una *strategia*
  - *cosa fare in una situazione*
  - useremo "situazione"/"contesto"/"stato" come sinonimi
- tabella $Stato\rightarrow Azione$

### Q-table

- massimizzare il *risultato*
- tabella $Stato \times Azione \rightarrow Valore$
- il valore è il risultato *stimato* del fare l'azione nel contesto
  - somma delle future ricompense
- una possibile strategia:
  - scegli l'azione che massimizza il risultato

### Imparare la Q-table

- immaginiamo di avere una tabella $Q$
- la miglioriamo con l'*esperienza*:
  - proviamo $a$ in $s$, finiamo in $s'$ con una ricompensa di $r$
  - usiamo $r$ per migliorare $Q(s,a)$:
    $$Q(s,a) \leftarrow Q(s,a) + \delta_r$$
- continuiamo finché siamo soddisfatti della tabella
  - otteniamo "buoni" risultati

### Bellman Equation

> proviamo $a$ in $s$, finiamo in $s'$ con una ricompensa di $r$

$$Q(s,a) \leftarrow \underbrace{Q(s,a)}_{\text{valore corrente}} + \underbrace{\alpha}_{\text{tasso di apprendimento}} \cdot \bigg( \underbrace{\underbrace{r}_{\text{ricompensa}} + \underbrace{\gamma}_{\text{fattore di sconto}} \cdot \underbrace{\max_{a'}Q(s', a')}_{\text{valore futuro stimato}}}_{\text{valore appreso}} - \underbrace{Q(s,a)}_{\text{valore corrente}}\bigg)$$

- **tasso di apprendimento** ($\alpha$): quanto influisce la nuova informazione
- **fattore di sconto** ($\gamma$): l'importanza del valore futuro

### Esplorazione o riutilizzo?

- come scegliere la prossima azione?
  - se l'agente non prova, non impara
  - se prova a caso, rischia di esplorare strategie poco utili
- bilanciamento tra esplorazione e sfruttamento di quello già imparato
  - sceglie un'azione a caso con una certa probabilità
  - altrimenti sceglie quella col valore migliore

## Q-Learning per Cliff Walking

In [None]:
from typing import Any, Callable, Optional, Sequence, SupportsFloat

import gymnasium as gym
from gymnasium.core import ActType, ObsType, RenderFrame

from matplotlib import pyplot as plt
from matplotlib.animation import ArtistAnimation, TimedAnimation

from IPython import display
import numpy as np

def run_episode(env: gym.Env, agent: Optional[Callable[[ObsType], ActType]]=None) -> tuple[ObsType, SupportsFloat, bool, bool, dict[str, Any]]:
    """Run an episode of the environment using the provided agent (default to random), returning the last `step` method output"""
    observation, info = env.reset()
    if agent == None:
        agent = lambda o: env.action_space.sample()
    while True:
        action = agent(observation)
        observation, reward, terminated, truncated, info = env.step(action)
        if terminated or truncated:
            return observation, reward, terminated, truncated, info

def plot_rewards(rewards: Sequence[float], window: int=10):
    running_avg = np.convolve(rewards, np.ones(window), 'valid') / window
    fig, ax = plt.subplots()
    ax.plot(rewards)
    ax.plot(window + np.arange(len(running_avg)), running_avg)

def plt_animation(frames: Sequence[RenderFrame], fps: int) -> TimedAnimation:
    """Generate a Pyplot animation from a sequence of Gymnasium environment RGB rendering."""
    fig, ax = plt.subplots()
    ax.set_axis_off()
    imgs = []
    if len(frames) > 0:
        imgs.append([ax.imshow(frames[0])])
        for a in frames[1:]:
            imgs.append([ax.imshow(a, animated=True)])
    # prevent showing pyplot default window
    plt.close(fig)
    return ArtistAnimation(fig, imgs, interval=int(1000/fps), repeat=False, blit=True)

def show_video(frames: Sequence[RenderFrame], fps: int=10):
    return display.HTML(plt_animation(frames, fps=fps).to_html5_video())

def show_image(frame: RenderFrame) -> None:
    fig, ax = plt.subplots()
    ax.set_axis_off()
    ax.imshow(frame)


### Creiamo l'ambiente

Inizializziamo l'ambiente per gli esperimenti

In [None]:
cliffw = gym.wrappers.RecordEpisodeStatistics(
    gym.wrappers.TimeLimit(
        gym.make("CliffWalking-v0", render_mode="rgb_array"),
        max_episode_steps=200))

cliffw_rec = gym.wrappers.RenderCollection(cliffw, pop_frames=False)

cliffw

Vediamo come "funziona" `CliffWalking-v0`:

In [None]:
cliffw.action_space

In [None]:
osservazione, info = cliffw.reset()
print(f'Osservazione: {osservazione}')
show_image(cliffw.render())

Proviamo una delle azioni:

In [None]:
azione = 0
observation, reward, _, _, _ = cliffw.step(azione)
print(f'Azione: {azione}, Nuova osservazione: {observation}, Ricompensa: {reward}')
show_image(cliffw.render())

### Comportamento casuale

Vediamo i risultati di un comportamento totalmente casuale

- ogni istante sceglie un'azione a caso
  ```python
  azione = env.action_space.sample()
  ```

In [None]:
observation, reward, terminated, truncated, info = run_episode(cliffw_rec)
print(f'Punteggio finale: {info['episode']['r']}')
show_video(cliffw_rec.render(), fps=6)

### Algoritmo di Q-Learning

Vediamo una semplice implementazione dell'algoritmo di Q-Learning

In [None]:
import numpy as np


def q_learn(env: gym.Env, seed: Optional[int]=None,
            episodes: int=300,
            max_steps: int=200,
            epsilon: float=0.1,
            alpha: float=0.5,
            gamma: float=0.9) -> tuple[Any, Sequence[float]]:

    rng = np.random.default_rng(seed=seed)

    Q = np.zeros((env.observation_space.n, env.action_space.n))
    rewards: list[float] = []

    for episode in range(episodes):
        obs, info = env.reset(seed=seed)
        tot_reward = 0
        for _ in range(max_steps):
            if rng.random() > epsilon:
                action = rng.choice(np.nonzero(np.isclose(Q[obs], np.max(Q[obs]))))[0]
            else:
                action = env.action_space.sample()
            new_obs, reward, terminated, truncated, info = env.step(action)
            tot_reward += reward
            Q[obs, action] += alpha * (reward + gamma * np.max(Q[new_obs]) - Q[obs, action])
            obs = new_obs
            if terminated or truncated:
                break
        rewards.append(tot_reward)
        if episode % 10 == 0:
            print(f"Episode: {episode}, Reward: {tot_reward}")

    return Q, rewards

#### Apprendimento

Proviamo ad applicare l'algoritmo e vedere come si comporta l'apprendimento:

- si parte con una tabella arbitraria
- aggiorniamo la tabella per un certo numero di episodi
- visualizziamo i risultati dei singoli episodi

In [None]:
Q, rewards = q_learn(cliffw, episodes=300, max_steps=200, alpha=.5, gamma=.9, epsilon=.1)
plot_rewards(rewards)

#### Valutazione

Compariamo l'agente "casuale" con la strategia imparata

- generiamo un numero di episodi
- visualizziamo i risultati

In [None]:
plot_rewards([ run_episode(cliffw)[4]['episode']['r'] for _ in range(200) ])

In [None]:
plot_rewards([ run_episode(cliffw, agent=lambda o: np.argmax(Q[o]))[4]['episode']['r'] for _ in range(200) ])

#### La strategia

Visualizziamo la strategia

In [None]:
observation, reward, terminated, truncated, info = run_episode(cliffw_rec, agent=lambda o: np.argmax(Q[o]))
show_video(cliffw_rec.render(), fps=6)

Vediamo com'è fatta la Q-table:

In [None]:
map_shape = cliffw.unwrapped.shape
actions = ['⬆︎', '➡︎', '⬇︎', '⬅︎']

directions = [''.join(actions[a] for a in np.nonzero(np.isclose(r, np.max(r)))[0]) for r in Q]

plt.table(np.reshape(np.array(directions), map_shape), cellLoc='center', loc='center').axes.set_axis_off()

## È veramente così semplice?

Vediamo un ambiente più complicato, il [Lunar Lander](https://gymnasium.farama.org/environments/box2d/lunar_lander/)

![lunar lander](./assets/lunar_lander.gif)

Copyright © 2024 [Farama Foundation](https://farama.org)

### Complicato?

- osservazione include posizione e velocità
- valori continui

#### Pilota automatico

Vediamo l'atterraggio col pilota automatico

In [None]:
!python ./scripts/user_lander.py autopilot

Usa una [strategia algoritmica](https://github.com/Farama-Foundation/Gymnasium/blob/d2707290b5ae8d3070f11b9f5e701d5ca8bfaa5f/gymnasium/envs/box2d/lunar_lander.py#L794):

```python
angle_targ = s[0] * 0.5 + s[2] * 1.0  # angle should point towards center
if angle_targ > 0.4:
    angle_targ = 0.4  # more than 0.4 radians (22 degrees) is bad

# ...

else:
    a = 0
    if hover_todo > np.abs(angle_todo) and hover_todo > 0.05:
        a = 2
    elif angle_todo < -0.05:
        a = 3
    elif angle_todo > +0.05:
        a = 1
return a
```

#### Pilota umano

Ora prova tu, i controlli sono:

- barra spaziatrice per motore principale
- freccia destra/sinistra per motori laterali
- chiudere la finestra per uscire

In [None]:
!python ./scripts/user_lander.py