# Imports

In [None]:
import gymnasium as gym
import ale_py       
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

# Partie 1: Frozen Lake

Nous allons commencer notre TP avec un environnement type grille: le [Frozen Lake](https://gymnasium.farama.org/environments/toy_text/frozen_lake). Cela vous aidera également à vous familiariser avec l'API Gym, largement utilisée de le monde du RL.

Commencez par lire la [documentation](https://gymnasium.farama.org/environments/toy_text/frozen_lake) de l'environnement Frozen Lake.

## Print utils

In [None]:
def int_action_to_str(int_action):
    if int_action == 0:
        return "left"
    if int_action == 1:
        return "down"
    if int_action == 2:
        return "right"
    else:
      return "up"

for i in range(4):
    print(f"Corresponding action to {i} : {int_action_to_str(i)}")

In [None]:
def print_values(values, grid_width):
    for i in range(int(len(values)/grid_width)):
        print(values[i*grid_width:(i+1)*grid_width])

print(f"Example of printing values in a grid of width 4:")
print_values(np.arange(16), 4)

def print_policy(policy, grid_width):
    for i in range(int(len(policy)/grid_width)):
        print([int_action_to_str(_a) for _a in policy[i*grid_width:(i+1)*grid_width]])

print()
print(f"Example of printing policy in a grid of width 4:")
print_policy(np.arange(16)%4, 4)

In [None]:
def print_q_values(q_values, grid_width, precision : int = 2):
    for i in range(int(len(q_values)/grid_width)):
        _q_values_to_print = []
        for _q_values in q_values[i*grid_width:(i+1)*grid_width]:
            _q_values_to_print.append({int_action_to_str(_k): round(_v, precision).item() for _k, _v in enumerate(_q_values)})
        print(_q_values_to_print)
        
print(f"Example of printing q_values in a grid of width 4:")
print_q_values(np.random.rand(16, 4), 4)

## A) Version déterministe

Créons une première instance de l'environnement avec une carte spécifique:

In [None]:
env = gym.make(
    'FrozenLake-v1', 
    desc=["SFFF", "FHFH", "FFFH", "HFFG"], 
    is_slippery=False, 
    render_mode="rgb_array",   # "rgb_array" produces RGB frames to plot, "human" opens a window (doesn't work on notebooks)
    )

Il est nécessaire de reset l'environnement pour lancer un épisode:

In [None]:
obs, info = env.reset()

In [None]:
img = env.render()

plt.imshow(img)
plt.axis('off')
plt.show()

Regardez la fonction de transition:

In [None]:
def print_transition_function(env, state, action, ):
    print(f"From state {state} when playing action {action}:")
    for next_state_transition in env.unwrapped.P[state][action]:
        print(f"- Reaching state {next_state_transition[1]} along with reward {next_state_transition[2]} with probability {next_state_transition[0]}")
        
# State: initial
# Action: right
print_transition_function(env, state=0, action=2)

In [None]:
# State: cell 4 (next ot a hole)
# Action: right
print_transition_function(env, state=4, action=2)

In [None]:
# State: cell 14 (next to the goal)
# Action: right
print_transition_function(env, state=14, action=2)

Cette classe permet d'aggréger les frames des env.render() puis d'afficher l'épisode à la fin. 

In [None]:
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

class EpisodeAnimator:
    def __init__(self, interval: int = 200):
        """
        interval: delay between frames in milliseconds
        """
        self.frames = []
        self.interval = interval
    
    def add_frame(self, frame):
        """Add a single frame (numpy RGB array) to the animation."""
        self.frames.append(frame)
    
    def display_animation(self):
        """Render and display the animation inline in a Jupyter notebook."""
        if not self.frames:
            raise ValueError("No frames to animate. Use add_frame() first.")
        
        fig = plt.figure()
        img = plt.imshow(self.frames[0])
        plt.axis('off')

        def animate(i):
            img.set_data(self.frames[i])
            return [img]
        
        ani = animation.FuncAnimation(
            fig, animate, frames=len(self.frames),
            interval=self.interval, blit=True
        )
        plt.close(fig)  # Prevent static last frame from showing
        return HTML(ani.to_jshtml())

Cette fonction joue (évalue) une certaine politique dans un certain environnement précisés pendant 1 épisode, et retourne les récompenses cumulées ainsi que les frames de l'épisode.

In [None]:
from typing import Callable


def eval_policy_1_episode(env : gym.Env, policy : Callable, do_animation : bool = True):
    observation, info = env.reset()
    done = False
    total_reward = 0
    step = 0
    animator = EpisodeAnimator(interval=300) if do_animation else None
    
    while not done:
        
        if do_animation:
            animator.add_frame(env.render())

        action = policy(observation)
        observation, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        total_reward += reward
        step += 1
    
    if do_animation:
        animator.add_frame(env.render()) # add last frame
        animation = animator.display_animation()
    else:
        animation = None
        
    return {"total_reward": total_reward, "steps": step, "animation" : animation}

Codez désormais une stratégie permettant de résoudre le maze:

In [None]:
def policy_hardcoded(observation):
    # Implement a hardcoded policy
    if observation in [0, 4, 10]:
        action = 1
    else:
        action = 2
    return action

result = eval_policy_1_episode(env, policy_hardcoded, do_animation=True)
print(f"Total reward: {result['total_reward']}")
print(f"Total steps: {result['steps']}")
print(f"Animation:")
result['animation']

Testez à nouveau votre stratégie avec cette nouvelle carte. Que fait-elle ?

In [None]:
env = gym.make('FrozenLake-v1', desc=["SHFF", "FHFH", "FFHF", "FFFG"], is_slippery=False, render_mode="rgb_array")

result = eval_policy_1_episode(env, policy_hardcoded, do_animation=True)

print(f"Total reward: {result['total_reward']}")
print(f"Total steps: {result['steps']}")
print(f"Animation:")
result['animation']

## B) Version stochastique

Passons maintenant à une verison stochastique où notre agent peut "glisser".

In [None]:
env = gym.make('FrozenLake-v1', desc=["SFFF", "FHFH", "FFFH", "HFFG"], is_slippery=True, render_mode="rgb_array")

In [None]:
result = eval_policy_1_episode(env, policy_hardcoded, do_animation=True)

print(f"Total reward: {result['total_reward']}")
print(f"Total steps: {result['steps']}")
print(f"Animation:")
result['animation']

Regardons ce que cela change sur la fonction de transition:

In [None]:
# State: initial
# Action: right
print_transition_function(env, state=0, action=2)

In [None]:
# State: cell 4 (next ot a hole)
# Action: right
print_transition_function(env, state=4, action=2)

In [None]:
# State: cell 14 (next to the goal)
# Action: right
print_transition_function(env, state=14, action=2)

Essayez maintenant de trouver une stratégie sûre pour la carte suivante:

In [None]:
env = gym.make('FrozenLake-v1', desc=["SFFF", "FFFF", "HHFF", "HHFG"], is_slippery=True, render_mode="rgb_array")
obs, info = env.reset()
array = env.render()
plt.imshow(array)

Votre stratégie sera testée plusieurs fois pour s'assurer de sa fiabilité.

In [None]:
def eval_policy_n_episodes(env, policy, n=5, do_animation=True):
    """Evaluate a policy over multiple episodes.

    Args:
        env (gym.Env): The environment to evaluate the policy on.
        policy (Callable): the policy to evaluate : policy(observation) -> action
        n (int, optional): The number of episodes to evaluate the policy on. Defaults to 5.
        do_animation (bool, optional): Whether to render the environment during evaluation. Only the last episode is rendered. Defaults to True.

    Returns:
        dict: A dictionary containing the evaluation results.
    """
    results = {}
    for i in tqdm(range(n)):
        result_episode = eval_policy_1_episode(env, policy, do_animation=do_animation and i == n-1)
        for key, value in result_episode.items():
            if key not in results:
                results[key] = []
            results[key].append(value)
    return results


In [None]:
env = gym.make('FrozenLake-v1', desc=["SFFF", "FFFF", "HHFF", "HHFG"], is_slippery=True, render_mode="rgb_array")

def policy_hardcoded_2(observation):
    # ---- <your code here> ----
    action = ...
    # --------------------------
    return action

results = eval_policy_n_episodes(env, policy_hardcoded, n=10, do_animation=True)

print(f"Total reward: {results['total_reward']}")
print(f"Total steps: {results['steps']}")
print(f"Animation (last episode):")
results['animation'][-1]

# Partie 3: Q-Learning

## Framework for Reinforcement Learning Agents


Pour faire les choses proprement nous allons définir une classe de base pour les agents de RL.
Cette classe définit un interface python à respecter pour tout agent de RL. Les méthodes suivantes doivent être implémentées :

- `__init__(self, env: gym.Env, **kwargs)`: Le constructeur prend un environnement OpenAI Gym comme entrée (et d'autre paramètres dépendant de l'agent si besoin)
- `act(self, obs)`: Cette méthode doit retourner l'action à entreprendre en fonction de l'observation de l'état actuel.
- `learn(self, obs, action, reward, next_obs, done)`: Cette méthode doit mettre à jour la politique de l'agent en fonction de l'expérience acquise sur la dernière transition. La variable `done` indique si l'épisode a été terminé lors de cette transition.


In [None]:
class AgentRL:
    
    def __init__(self, env : gym.Env):
        self.env = env
    
    def act(self, obs):
        raise NotImplementedError()
    
    def learn(self, obs, action, reward, next_obs, done):
        raise NotImplementedError()

Un exemple d'agent (aléatoire) respectant ce framework :

In [None]:
class RandomAgent(AgentRL):
    
    def act(self, obs):
        return self.env.action_space.sample()

    def learn(self, obs, action, reward, next_obs, done):
        pass  # do nothing

Un autre exemple d'agent basé sur une fonction politique spécifiée :

In [None]:
class HardcodedAgent(AgentRL):
    
    def __init__(self, env : gym.Env, policy : Callable):
        super().__init__(env)
        self.policy = policy

    def act(self, obs):
        return self.policy(obs)

    def learn(self, obs, action, reward, next_obs, done):
        pass  # do nothing

Une fois votre agent implémenté, ces deux fonctions serviront respectivement à l'entraîner et à l'évaluer.

In [None]:
def train_agent_n_episodes(env : gym.Env, agent : AgentRL, n : int = 20, verbose_freq : int = None) -> dict:
    """Train the agent for a number of episodes.

    Args:
        env (gym.Env): The environment to train the agent in.
        agent (AgentRL): The agent to train.
        n (int, optional): The number of episodes to train the agent for. Defaults to 20.
        verbose_freq (int, optional): Episode frequency to print progress. If None, no progress is printed. Defaults to None.

    Returns:
        dict: A dictionary containing the total rewards and steps taken in each episode.
    """
    results = {
        "total_rewards": [],
        "steps": [],
    }
    
    # Training loop
    for episode in tqdm(range(n)):
        observation, info = env.reset()
        total_reward = 0
        step = 0
        done = False
        while not done:
            action = agent.act(observation)
            next_observation, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            total_reward += reward
            agent.learn(observation, action, reward, next_observation, done)
            step += 1
            observation = next_observation
        
        # Log results
        results["total_rewards"].append(total_reward)
        results["steps"].append(step)
        if verbose_freq is not None and (episode + 1) % verbose_freq == 0:
            avg_reward = np.mean(results["total_rewards"][-verbose_freq:])
            avg_steps = np.mean(results["steps"][-verbose_freq:])
            print(f"Episode {episode + 1}/{n} - Average Reward: {avg_reward:.2f}, Average Steps: {avg_steps:.2f}")
    
    return results

def eval_agent_n_episodes(env : gym.Env, agent : AgentRL, n : int = 5, do_animation : bool = True) -> dict:
    return eval_policy_n_episodes(env, agent.act, n=n, do_animation=do_animation)

Pour commencer, nous utiliserons cet environnement très simple et sans trous.

In [None]:
env = gym.make('FrozenLake-v1', desc=["SFF", "FFF", "FFG"], is_slippery=False, render_mode="rgb_array")
obs, info = env.reset()
env.render()
plt.imshow(env.render())

## A) Q-Learning

Implémentez à présent votre premier algorithme : le Q-Learning.

Rappel : l'algorithme de Q-Learning garde en mémoire une table de Q-values estimées $\hat{Q}$ qui à chaque couple d'état action $(s,a)$ associe une valeur $\hat{Q}(s,a)$ représentant une estimation de la valeur future de l'état $s$ après avoir effectué l'action $a$. Il met à jour cette table en utilisant la formule suivante :

$$\hat{Q}(s,a) \leftarrow \hat{Q}(s,a) + \alpha \left( r + \gamma \max_{a'} \hat{Q}(s',a') - \hat{Q}(s,a) \right)$$

où :
- $\alpha$ est le taux d'apprentissage,
- $r$ est la récompense reçue après avoir effectué l'action $a$ dans l'état $s$,
- $s'$ est l'état résultant de l'action $a$,
- $\gamma$ est le discount factor.

Note : dans un état terminal (indiqué par la valeur de la variable `done`), la valeur de $\hat{Q}(s',a')$ est nulle car il n'y a pas d'état suivant. Une formule plus correcte pour la mise à jour dans ce cas est :

$$\hat{Q}(s,a) \leftarrow \hat{Q}(s,a) + \alpha \left( r - \hat{Q}(s,a) \right)$$

### Conseils :

- Vous aurez besoin de maintenir une table de Q-values $\hat{Q}$ pour chaque paire d'état-action $(s,a)$, initialisés arbitrairement.
- N'oubliez pas la notion d'exploration : vous devez parfois choisir une action aléatoire plutôt que celle qui maximise la Q-value estimée, afin d'explorer de nouvelles régions de l'espace d'état/actions.
- Le nombre d'actions et d'état existant est accessible par `env.action_space.n` et `env.observation_space.n`.
- A vous de trouver de bonnes valeurs d'hyperparamètres (pour $\alpha$, $\gamma$, et $\epsilon$).


In [None]:
class QLearningAgent(AgentRL):
    
    def __init__(
        self, 
        env : gym.Env, 
        alpha : float,
        gamma : float,
        epsilon : float,
        ):
        super().__init__(env)
        # ---- <your code here> ----
        
        # --------------------------
        
    def act(self, obs):
        # ---- <your code here> ----
        
        # --------------------------
        pass
    
    def learn(self, obs, action, reward, next_obs, done):
        # ---- <your code here> ----
        
        # --------------------------
        pass
    
agent = QLearningAgent(
    env = env,
    # ---- <your code here> ----
    
    # --------------------------
)



# Train for 2 episodes here to debug
train_results = train_agent_n_episodes(
    env = env,
    agent = agent,
    n = 2,
)
print(train_results)

Entrainez ensuite votre agent Q-Learning sur l'environnement.

In [None]:
env = gym.make('FrozenLake-v1', desc=["SFF", "FFF", "FFG"], is_slippery=False, render_mode="rgb_array")
agent = agent

train_results = train_agent_n_episodes(
    env = env,
    agent = agent,
    n = 200,
)

Après entrainement, vous pouvez afficher les courbes d'apprentissage depuis `train_results`.

In [None]:
import matplotlib.pyplot as plt

def plot_results(results: dict):
    """
    Plots each metric in results on a separate subplot.
    Example results:
        {
            'total_rewards': [0.0, 1.0, 0.0],
            'steps': [100, 95, 100]
        }
    """
    n_metrics = len(results)
    fig, axes = plt.subplots(n_metrics, 1, figsize=(10, 4 * n_metrics), sharex=True)

    if n_metrics == 1:
        axes = [axes]  # Make iterable if only one metric

    for ax, (key, values) in zip(axes, results.items()):
        ax.plot(values, label=key)
        ax.set_title(key.replace("_", " ").title())
        ax.set_ylabel("Value")
        ax.grid(True)
        ax.legend()

    axes[-1].set_xlabel("Episode")
    plt.tight_layout()
    plt.show()


In [None]:
plot_results(train_results)

Evaluons à présent notre agent.

Remarque (facultatif) : il pourrait être pertinent de passer l'agent en mode évaluation lorsqu'il n'est pas en cours d'entraînement. Ici cela correspondrait à désactiver l'exploration (c'est-à-dire, utiliser une politique déterministe).

In [None]:
eval_results = eval_agent_n_episodes(
    env = env,
    agent = agent,
    n = 10,
    )

print(f"Total reward: {eval_results['total_reward']}, mean = {np.mean(eval_results['total_reward'])}")
print(f"Total steps: {eval_results['steps']}, mean = {np.mean(eval_results['steps'])}")
print(f"Animation (last episode):")
eval_results['animation'][-1]

## B) SARSA

Implémentons à présent SARSA, dont la formule d'apprentissage est : $Q(s, a) \leftarrow Q(s, a) + \alpha \left( r + \gamma Q(s', a') - Q(s, a) \right)$

Vous aurez donc à prendre $a' = \pi(s')$ dans votre code, plutôt que de prendre le maximum de $Q(s', \cdot)$.

In [None]:
# Define environment
env = gym.make('FrozenLake-v1', desc=["SFF", "FFF", "FFG"], is_slippery=False, render_mode="rgb_array")

# Define agent
class SARSA_Agent(AgentRL):
    
    def __init__(
        self, 
        env : gym.Env, 
        alpha : float,
        gamma : float,
        epsilon : float,
        ):
        
        super().__init__(env)
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.q_table = np.zeros((env.observation_space.n, env.action_space.n))
        self.last_obs = None
        self.last_action = None

    def act(self, obs):
        if np.random.rand() < self.epsilon:
            action = np.random.choice(self.env.action_space.n)
        else:
            action = np.argmax(self.q_table[obs])
        return action

    def learn(self, obs, action, reward, next_obs, done):
        if done:
            target = reward
        else:
            next_action = self.act(next_obs)
            target = reward + self.gamma * self.q_table[next_obs, next_action]
        self.q_table[obs, action] += self.alpha * (target - self.q_table[obs, action])

agent = SARSA_Agent(
    env=env,
    alpha=0.1,
    gamma=0.9,
    epsilon=0.1
)

# Train the agent
training_results = train_agent_n_episodes(env, agent, n=200)

# Plot training curves
plot_results(training_results)

# Evaluate the agent
eval_results = eval_agent_n_episodes(env, agent, n=10)

print(f"Total reward: {eval_results['total_reward']}, mean = {np.mean(eval_results['total_reward'])}")
print(f"Total steps: {eval_results['steps']}, mean = {np.mean(eval_results['steps'])}")
print(f"Animation (last episode):")
eval_results['animation'][-1]

## C) Environnement plus difficile

Entrainons nos agents Q-Learning et SARSA dans l'environnement "The Cliff" du cours. 

Cet environnement est en l'état assez difficile parce que le signal de récompense est très sparse. L'agent ne devrait pas réussir normalement.

In [None]:
env = gym.make('FrozenLake-v1', desc=["FFFFF", "FFFFF", "SHHHG"], is_slippery=False, render_mode="rgb_array")
obs, info = env.reset()
img = env.render()
plt.imshow(img)

In [None]:
agent = QLearningAgent(env, alpha=0.8, gamma=0.99, epsilon=0.33)

# Train the agent
training_results = train_agent_n_episodes(env, agent, n=1000)

# Plot training curves
plot_results(training_results)

# Evaluate the agent
eval_results = eval_agent_n_episodes(env, agent , n=10)
print(f"Total reward: {eval_results['total_reward']}, mean = {np.mean(eval_results['total_reward'])}")
print(f"Total steps: {eval_results['steps']}, mean = {np.mean(eval_results['steps'])}")
print(f"Animation (last episode):")
eval_results['animation'][-1]

Comment rendre cet environnement plus facile à apprendre pour notre agent ? Le problème est que l'agent doit explorer beaucoup avant de tomber sur un signal de reward (lorsqu'il arrive au coffre). Toutes les autres trajectoires offrent une récompense totale nulle, et l'agent va en pratique très souvent tomber dans les trous (qui terminent l'épisode avec une reward nulle) et est donc bloqué.

Les solutions proposés sont les suivantes :
- 1) Rendre punitif les trous, pour cela on peut utiliser un "wrapper" d'environnement. Les wrapper sont des classes qui modifient le comportement d'un environnement Gym sans en changer le code source. Dans notre cas, on peut penser à donner une récompense négative lorsque l'épisode est terminé (`done=True`) mais qu'il n'a pas trouvé le coffre (`reward` < 1). Vous pouvez vous inspirer du code suivant (double click sur cette cellule pour copier le code) :

```python
class PunishTerminationWrapper(gym.Wrapper):
    """
    Punishes any termination that doesn't give +1 reward with -1.
    """
    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        
        # ---- <your code here> ----
        
        # --------------------------
        
        return obs, reward, terminated, truncated, info

env = PunishTerminationWrapper(env)
```

- 2) Utiliser la politique d'exploration UCB (Upper-Confidence Bound) plutôt que epsilon-greedy. Cette politique est fondamentalement curieuse envers les couples (états, actions) qu'elle n'a jamais vu, et peut donc explorer la totalité de notre environnement beaucoup plus rapidement. Notez que la politique UCB est très efficace mais n'est pas applicable lorsque l'espace des états ou des actions est non discret. Pour rappel, la formule de la politique UCB est la suivante : $UCB(s, a) = Q(s, a) + c \sqrt{\frac{\ln N(s)}{N(s, a)}}$ avec $N()$ dénotant le nombre de fois qu'un état ou qu'un couple (état, action) a été visité.

- 3) Une méthode incitant l'exploration est l'Initialisation Optimiste. Cette méthode consiste à initialiser les valeurs de la fonction de valeur (ou Q-Values) à des valeurs optimistes. Les politiques qui se basent sur la Q-Value vont ainsi explorer davantage les actions qui n'ont pas encore été essayées, car elles commenceront avec une valeur Q élevée, puis celle-ci sera ajustée à la baisse au fur et à mesure de l'apprentissage, ce qui incite l'agent à passer à d'autres actions.

Vous pouvez implémenter une ou plusieurs de ces solutions pour améliorer l'apprentissage de vos agents, puis run à nouveau la cellule précédente.


# Partie 4: Algorithme Deep Q-Network (DQN)

Il est temps de passer à un environnnement plus compliqué: Breakout (Casse-Briques).

Cet environnement a un espace d'observation continu et de grande dimension (210x160x3 pixels). L'apprentissage de la Q-Value passera donc non plus par une table mais par un réseau de neurones.

In [None]:
env_breakout = env = gym.make("BreakoutNoFrameskip-v4", render_mode="rgb_array")
print("Observation space:", env_breakout.observation_space)
print("Action space:", env_breakout.action_space)
print(f"Actions' meaning: {env_breakout.unwrapped.get_action_meanings()}")
obs,_ = env_breakout.reset()
plt.imshow(env_breakout.render(),vmin=0,vmax=255)

## A) Preprocessing

Nous allons utiliser des wrappers déjà fournis par Gymnasium pour:
- processer l'image (réduire sa taille, tout mettre en gris...)
- stacker les N dernières images (permettant de tracker le mouvement des objets)

In [None]:
from gymnasium.wrappers import AtariPreprocessing, FrameStackObservation

In [None]:
env_breakout = env = gym.make("Breakout-v4", render_mode="rgb_array", frameskip=1)
env_breakout = AtariPreprocessing(env_breakout)
env_breakout = FrameStackObservation(env_breakout, 4)
env_breakout.reset()
env_breakout.step(0)

In [None]:
# Trying a random agent in Breakout
agent = RandomAgent(env_breakout)

# Evaluate the agent
eval_results = eval_agent_n_episodes(env_breakout, agent, n=10)
print(f"Total reward: {eval_results['total_reward']}, mean = {np.mean(eval_results['total_reward'])}")
print(f"Total steps: {eval_results['steps']}, mean = {np.mean(eval_results['steps'])}")
print(f"Animation (last episode):")
eval_results['animation'][-1]

## B) Replay Buffer

Comme dit dans le cours, plutôt que d'apprendre totalement "online" (c'est-à-dire en utilisant la dernière transition collectée pour faire une mise à jour de la Q-Value), nous allons stocker les transitions dans un buffer, et faire des mises à jour en samplant aléatoirement des transitions dans ce buffer.

Implémentons notre replay buffer permettant de récolter et de sampler aléatoirement des transitions collectées.

In [None]:
import random
import torch
import numpy as np

class ReplayBuffer:
    def __init__(self, capacity, device):
        self.capacity = capacity # capacity of the buffer
        self.data = []
        self.index = 0 # index of the next cell to be filled
        self.device = device
    def append(self, obs, action, reward, next_obs, done):
        if len(self.data) < self.capacity:
            self.data.append(None)
        self.data[self.index] = (obs, action, reward, next_obs, done)
        self.index = (self.index + 1) % self.capacity
    def sample(self, batch_size):
        batch = random.sample(self.data, batch_size)
        return list(map(lambda x:torch.Tensor(np.array(x)).to(self.device), list(zip(*batch))))
    def __len__(self):
        return len(self.data)

## C) Réseau de neurones pour la Q-Value

Nous allons utiliser un réseau de neurones convolutifs (car adapté aux images) à 3 couches pour approximer la Q-Value.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AtariCNN(nn.Module):
    def __init__(self, in_channels=4, n_actions=6):
        super(AtariCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc4 = nn.Linear(7 * 7 * 64, 512)
        self.head = nn.Linear(512, n_actions)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.fc4(x.view(x.size(0), -1)))
        return self.head(x)

A votre tour d'implémenter une fonction calculant les Q-values pour un state et jouant greedy à partir de ces Q-values (i.e. retournant l'index de la plus haute Q-value):

In [None]:
def greedy_action(network, obs):
    device = "cuda" if next(network.parameters()).is_cuda else "cpu"
    with torch.no_grad():        
        # ---- <your code here> ----
        action = ...
        # --------------------------
    return action

obs,_ = env_breakout.reset()
network_q_value = AtariCNN()
if torch.cuda.is_available():  # Set the DQN to cuda if possible
    network_q_value = network_q_value.to("cuda")
    
greedy_action(network_q_value, obs)   # test the function

## D) Algorithme DQN

Il est temps d'implémenter l'entraînement de notre DQN.
Nous allons utiliser:
- une exploration epsilon-greedy avec un epsilon qui décroît linéairement
- une phase de warm-up au début où l'agent joue aléatoirement pour remplir le replay buffer
- un target Q-network pour stabiliser l'entrainement : il s'agit d'une copie du Q-network qui "suit" le Q-network principal plus lentement (par Polyak averaging). Le target Q-network est utilisé pour calculer la target dans la loss de DQN :
$$\text{target} = r + \gamma \max_{a'} Q_{\text{target}}(s', a')$$

$$Q_{\text{target}} \leftarrow \tau Q + (1-\tau) Q_{\text{target}}$$

Avec $\tau \ll 1$.

C'est à vous d'implémenter la méthode `act` et le calcul de la loss à partir des batch de transitions samplées dans le replay buffer.

In [None]:
from copy import deepcopy


class DQNAgent(AgentRL):
    
    def __init__(
        self, 
        env : gym.Env, 
        network_q_value : nn.Module,
        replay_buffer : ReplayBuffer,
        batch_size : int,
        learning_rate : float,
        gamma : float,
        epsilon_start : float,
        epsilon_end : float,
        epsilon_decay_per_episode : int,
        warmup_episodes : int = 20,
        update_target_tau : float = 0.005,
        device : str = "cpu",
    ):
        super().__init__(env)
        
        # Hyperparameters
        self.batch_size = batch_size   # batch size for sampling from the replay buffer and do a gradient step
        self.gamma = gamma
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay_per_episode  # how much substract to epsilon per episode
        self.gradient_step_freq = 4  # do a gradient step every gradient_step_freq steps
        self.learning_rate = learning_rate
        self.warmup_episodes = warmup_episodes # number of episodes to fill the replay buffer before starting training
        self.update_target_tau = update_target_tau  # target network update rate (Polyak averaging)
        
        # Internal variables
        self.network_q_value = network_q_value.to(device)
        self.network_q_value_target = deepcopy(self.network_q_value).to(device)
        self.replay_buffer = replay_buffer
        self.optimizer = torch.optim.Adam(self.network_q_value.parameters(), lr=self.learning_rate)
        self.device = device
        self.training_steps = 0  # number of training steps done so far
        self.training_episodes = 0  # number of training episodes done so far
        self.epsilon = epsilon_start
        
    def act(self, obs):
        # ---- <your code here> ----
        action = ...
        # --------------------------
    
    def learn(self, obs, action, reward, next_obs, done):
        
        # Store the transition in the replay buffer
        self.replay_buffer.append(obs, action, reward, next_obs, done)
        self.training_steps += 1
        
        # Linearly decay epsilon
        if done:
            self.training_episodes += 1
            self.epsilon -= self.epsilon_decay
            self.epsilon = max(self.epsilon, self.epsilon_end)
        
        # Skip if not enough samples in the replay buffer or not time for a gradient step
        if len(self.replay_buffer) < self.batch_size or self.training_steps % self.gradient_step_freq != 0:
            return
        
        # Sample a batch of transitions from the replay buffer and do some conversions
        obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = self.replay_buffer.sample(self.batch_size)
        obs_batch = obs_batch / 255.0 # dim = (batch_size, 4, 84, 84)
        next_obs_batch = next_obs_batch / 255.0 # dim = (batch_size, 4, 84, 84)
        action_batch = action_batch.long().unsqueeze(1) # dim = (batch_size, 1)
        reward_batch = reward_batch.unsqueeze(1)  # dim = (batch_size, 1)
        done_batch = done_batch.unsqueeze(1)  # dim = (batch_size, 1)
        
        # Compute current Q values
        # ---- <your code here> ----
        current_q_values = ...
        # --------------------------        
        
        # Compute target Q values
        with torch.no_grad():
            # ---- <your code here> ----
            target_q_values = ...
            # --------------------------
            pass
        
        # Compute loss
        # ---- <your code here> ----
        loss = ... 
        # --------------------------
        
        
        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # Update the target network with Polyak averaging
        with torch.no_grad():
            for param, target_param in zip(self.network_q_value.parameters(), self.network_q_value_target.parameters()):
                target_param.data.copy_(
                    (1 - self.update_target_tau) * target_param.data + self.update_target_tau * param.data
                )
        
            
            

# Train for 2 episodes here to debug
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

agent = DQNAgent(
    env = env_breakout,
    network_q_value = AtariCNN(
        in_channels=env_breakout.observation_space.shape[0],
        n_actions=env_breakout.action_space.n,
    ),
    replay_buffer = ReplayBuffer(capacity=100000, device=device),
    batch_size = 64,
    learning_rate = 0.001,
    gamma = 0.95,
    epsilon_start = 1.0,
    epsilon_end = 0.1,
    epsilon_decay_per_episode = 0.9 / 100,  # from 1.0 to 0.1 in 100 episodes
    device = device,
)

train_results = train_agent_n_episodes(
    env = env_breakout,
    agent = agent,
    n = 2,
)
print(train_results)

C'est parti pour lancer l'entraînement de notre DQN !

In [None]:
# Train the agent
training_results = train_agent_n_episodes(env_breakout, agent, n=200, verbose_freq=10)

# Plot training curves
plot_results(training_results)

# Evaluate the agent
eval_results = eval_agent_n_episodes(env_breakout, agent, n=10)
print(f"Total reward: {eval_results['total_reward']}, mean = {np.mean(eval_results['total_reward'])}")
print(f"Total steps: {eval_results['steps']}, mean = {np.mean(eval_results['steps'])}")
print(f"Animation (last episode):")
eval_results['animation'][-1]

Tips :
- la loss `F.smooth_l1_loss` (aussi appelée Huber loss) est souvent plus stable que la MSE loss
- une amélioration possible est la variante Double DQN, qui consiste à utiliser le Q-network principal pour choisir l'action maximisante dans la target, mais le target Q-network pour évaluer cette action. La target devient donc :
$$\text{target} = r + \gamma Q_{\text{target}}(s', \arg\max_{a'} Q(s', a'))$$