# Approximation paramétrique de $Q(s,a)$
---

## 1. Approximation de la fonction valeur d’action

Lorsque l’espace d’états $S$ est continu ou très grand, on approxime la fonction $Q(s,a)$ par un modèle paramétrique :

$$
Q_\theta(s,a)
$$

où $\theta \in \mathbb{R}^d$ désigne les paramètres.

On suppose que l’espace d’actions $A$ est discret alors que nous n'impososn pas ça sur S. 

---

## 2. Problème d’approximation

On cherche à approximer la fonction $Q_\pi(s,a)$ associée à une politique $\pi$.

On définit la fonction coût quadratique :

$$
J(\theta)
=
\frac{1}{2}
\mathbb{E}
\left[
\left(
Q_\pi(s,a) - Q_\theta(s,a)
\right)^2
\right]
$$

où l’espérance est prise selon la distribution induite par la politique $\pi$.

---

## 3. Gradient exact

Le gradient s’écrit :

$$
\nabla_\theta J(\theta)
=
\mathbb{E}
\left[
\left(
Q_\theta(s,a) - Q_\pi(s,a)
\right)
\nabla_\theta Q_\theta(s,a)
\right]
$$

---

## 4. Descente de gradient stochastique

On cherche à estimer le meilleur $\theta$, pour cela on fait une descente de gradient

$$
\theta
\leftarrow
\theta
-
\alpha
\nabla_\theta J(\theta)
$$

---

### Approximation de la descente de gradient 

On peut approximer la formule exacte du gradient en considérant une seule réalisation $(s,a)$.

On remplace l’espérance par :

$$
\nabla_\theta J(\theta)
\approx
\left(
Q_\theta(s,a) - Q_\pi(s,a)
\right)
\nabla_\theta Q_\theta(s,a)
$$

Ce qui conduit à la mise à jour stochastique :

$$
\theta
\leftarrow
\theta
-
\alpha
\left(
Q_\theta(s,a) - Q_\pi(s,a)
\right)
\nabla_\theta Q_\theta(s,a)
$$

---

## 5. Remplacement de $Q_\pi$ par une cible TD

La quantité $Q_\pi(s,a)$ est inconnue.

On note $(s,a,r,s')$ une transition observée, où $s'$ est l’état obtenu après avoir exécuté l’action $a$ dans l’état $s$.

On utilise une approximation bootstrap. On note $y$ cette aproximation de $Q_\pi(s,a)$.

---

### Cas SARSA (on-policy)

On choisit une action $a'$ selon la politique $\pi$ au nouvel état $s'$.

La cible est :

$$
y = r + \gamma Q_\theta(s', a')
$$


### Cas Expected SARSA

On prend l’espérance sous la politique $\pi$ :

$$
y =
r + \gamma  \sum_{a' \in  \mathcal{A}} \pi(a'\mid s') Q_{\theta}(s',a')
$$



### Cas Q-learning (off-policy)

On prend la meilleure action au prochain état :

$$
y =
r + \gamma \max_{a'} Q_\theta(s',a')
$$


---

## 6. Replay Buffer

Un replay buffer est une structure de données garder en mémoire afin de mieux estimer l'espérance mentionner précédemment :

$$
\mathcal{D} = \{(s,a,r,s')\}
$$

Au lieu d’utiliser uniquement la transition courante, on échantillonne un mini-batch aléatoire dans $\mathcal{D}$.

Objectifs :

- casser les corrélations temporelles,
- stabiliser l’optimisation,
- réutiliser les données.

On obtient alors une descente de gradient stochastique sur la perte empirique.

---

## 7. Passage à l’apprentissage batch

Dans la mise à jour précédente, l’estimation du gradient repose sur une seule transition.

On peut améliorer la stabilité en utilisant un ensemble de transitions

$$
\mathcal{D} = \{(s_i, a_i, r_i, s'_i)\}_{i=1}^N
$$

On définit alors la perte empirique :

$$
J_N(\theta)
=
\frac{1}{2N}
\sum_{i=1}^N
\left(
Q_\theta(s_i,a_i) - y_i
\right)^2
$$

où, pour le cas Q-learning :

$$
y_i
=
r_i + \gamma \max_{a'} Q_\theta(s'_i,a')
$$

Le gradient empirique devient :

$$
\nabla_\theta J_N(\theta)
=
\frac{1}{N}
\sum_{i=1}^N
\left(
Q_\theta(s_i,a_i) - y_i
\right)
\nabla_\theta Q_\theta(s_i,a_i)
$$

---

## 8. Approximation linéaire

On suppose qu’il existe un vecteur de caractéristiques

$$
\phi(s,a) \in \mathbb{R}^d
$$

et on définit :

$$
Q_\theta(s,a)
=
\theta^\top \phi(s,a)
$$

avec $\theta \in \mathbb{R}^d$.

### Gradient

Dans ce cas :

$$
\nabla_\theta Q_\theta(s,a)
=
\phi(s,a)
$$

Puis on met à jour $\theta$ par une descente de gradient en approximant les quantités en utilisant les différentes méthodes vues plus haut.

---

## 9. Neural Fitted Q-Iteration (NFQ)

NFQ est une version batch de Q-learning avec approximation fonctionnelle.

Principe :

1. Collecter un ensemble de transitions $\mathcal{D}$.

2. Construire les cibles :

$$
y_i =
r_i + \gamma \max_{a'} Q_{\theta_k}(s'_i,a')
$$

où $\theta_k$ désigne les paramètres à l’itération précédente.

3. Résoudre le problème de régression :

$$
\theta_{k+1}
=
\arg\min_\theta
\frac{1}{N}
\sum_i
\left(
Q_\theta(s_i,a_i) - y_i
\right)^2
$$

On alterne ainsi :

- calcul des cibles avec les paramètres courants,
- résolution d’un problème de régression supervisée,
- mise à jour des paramètres.

NFQ est donc une procédure itérative de type :

$$
Q_{k+1}
\approx
\mathcal{T} Q_k
$$

où $\mathcal{T}$ est l’opérateur de Bellman optimal.

---

On va commencer par une descente de gradient sur un modèle linéaire dépendant de theta, il approxime la fonction Q. On doit puisqu'on ne conait pas la vraie focntion Q utiliser ce qu'on appelle un bootstrap, les deux différents qui existent sont présentés section 6.


In [9]:
import gymnasium as gym
import numpy as np
from collections import deque
import random
import matplotlib.pyplot as plt

In [2]:
env = gym.make("CartPole-v1") 
# On reprend ici on prend un en vironnment diférents où s est continue et on a que deux actions possibles 
# (gauche ou droite), le but est de faire tenir le pole en équilibre le plus longtemps possible.

n_actions = env.action_space.n
n_states = env.observation_space.shape[0]
# States ici est le quadruplet (position du chariot, vitesse du chariot, angle du pole, vitesse angulaire du pole)

print("States:", n_states)
print("Actions:", n_actions)

States: 4
Actions: 2


In [3]:
# On rappelle que l'idée de l'algorithme ici est d'effectuer une descente de gradient pour déterminer 
# les paramètres theta de la fonction d'action-valeur Q, qui est définie à partir du vecteur de paramètres theta 
# et de la fonction d'indexation index. Une fois que les paramètres theta sont déterminés, on peut utiliser 
# la fonction d'action-valeur Q pour déterminer la politique optimale.

alpha = 0.1 # taux d'apprentissage pour la descente de gradient
gamma = 0.99 # facteur de discount pour les récompenses futures, qui détermine l'importance accordée aux récompenses 
# futures par rapport aux récompenses immédiates.
epsilon = 0.1 # taux d'exploration pour la politique epsilon-greedy, 
#qui détermine la probabilité d'explorer de nouvelles actions
n_episodes = 100000 # nombre d'épisodes d'entraînement, qui détermine le nombre de fois que l'agent 
# va interagir avec l'environnement

In [4]:
def phi(state, action):
    x, x_dot, theta_pole, theta_dot = state
    # on utilise l'angle et la position du chariot, ainsi que leurs vitesses respectives, 
    # pour construire les features de notre fonction d'action-valeur Q.

    # On utilise ici une base de fonctions polynomiales pour approximer la fonction d'action-valeur Q.
    features = np.array([
        1.0,
        x,
        x_dot,
        theta_pole,
        theta_dot,
        x**2,
        x_dot**2,
        theta_pole**2,
        theta_dot**2,
        x * theta_pole,
        x_dot * theta_dot
    ])

    d_state = len(features)

    phi_sa = np.zeros(d_state * n_actions) # on crée un vecteur de features pour chaque action possible
    # Q_theta(s, a) = theta @ phi(s, a) est simplifiée en theta_a @ features, où theta_a est la partie 
    # de theta correspondant à l'action a (ici deux actriosn possibles, on a donc deux parties dans theta, 
    # chacune de taille d_state)

    start = action * d_state # début des features pour l'action choisie dans le vecteur phi_sa
    phi_sa[start:start+d_state] = features # on remplit les features pour l'action choisie dans 
    # le vecteur phi_sa, les autres restent à zéro

    return phi_sa

d = 11 * n_actions
theta = np.zeros(d)

def Q(theta, state, action):
    return theta @ phi(state, action) # la fonction d'action-valeur Q est définie comme le produit 
# scalaire entre les paramètres theta et les features phi(state, action), le but est toujours de trouver 
# les paramètres theta qui permettent d'approximer au mieux la fonction d'action-valeur Q.


In [5]:
for episode in range(n_episodes):
    s, _ = env.reset()
    done = False

    while not done:

        # epsilon-greedy
        if np.random.rand() < epsilon: # avec une probabilité epsilon, on choisit une action 
            # aléatoire pour encourager l'exploration
            a = env.action_space.sample()
        else:
            q_values = [Q(theta, s, a_) for a_ in range(n_actions)]
            a = np.argmax(q_values) # on choisit l'action qui maximise la fonction 
            # d'action-valeur Q pour l'état actuel s

        s_next, r, terminated, truncated, _ = env.step(a) # on exécute l'action choisie et on observe la transition
        # vers le nouvel état s_next, la récompense r, et les indicateurs de fin d'épisode terminated et truncated
        done = terminated or truncated # on vérifie si l'épisode est terminé

        
        # Target Q-learning
        q_next = max(Q(theta, s_next, a_) for a_ in range(n_actions))
        y = r + gamma * q_next #réalisation du bootstrap  (concept clef ici) pour calculer la cible y, 
        # qui est la récompense immédiate r plus la valeur maximale de Q pour le nouvel état s_next, estimée 
        # à partir de la fonction d'action-valeur Q actuelle et du facteur de discount gamma. On utilise 
        # l'approximation Q par le max (Q learning) pour estimer la valeur future et non la somme pondérée (SARSA).
        # On devrait modifier la mise à jour de q_next = sum pi(a|s_next) * Q(theta, s_next, a) for a in range(n_actions) 
        # pour faire du SARSA, mais ici on fait du Q-learning, qui est plus simple et plus rapide à calculer.
        
        """
        q_values = [Q(theta, s_next, a_) for a_ in range(n_actions)]
        q_max = max(q_values)
        q_min = min(q_values)
        expected_q = (1 - epsilon + epsilon/2) * q_max + (epsilon/2) * q_min
        y = r + gamma * expected_q
        # il n'y a que deux action possible, on peut faire une moyenne pondérée entre la valeur maximale et 
        # la valeur minimale de Q pour le nouvel état s_next, en utilisant les probabilités d'exploration et d'exploitation 
        # de la politique epsilon-greedy, 
        """

        """
        if np.random.rand() < epsilon:
            a_next = env.action_space.sample()
        else:
            q_values = [Q(theta, s_next, a_) for a_ in range(n_actions)]
            a_next = np.argmax(q_values)

        q_next = Q(theta, s_next, a_next) # pour faire du SARSA, on utilise la même action a pour calculer la valeur future,
        y = r + gamma * q_next # la cible y est calculée en utilisant la récompense immédiate r plus la valeur de Q pour 
        # le nouvel état s_next et l'action a, ce qui rend l'algorithme plus stable mais aussi plus lent à converger, 
        # #car on utilise une estimation plus précise de la valeur future, mais qui peut être plus bruit
        """


        td_error = y - Q(theta, s, a) # calcul de l'erreur temporelle-différence (TD error), 
        # qui mesure la différence entre la cible y et l'estimation actuelle de Q pour l'état s et l'action a,
        # cette erreur est utilisée pour mettre à jour les paramètres theta de la fonction d'action-valeur Q.
        # C'est une approximation de l'espérance par une seule réalisation, ce qui rend l'algorithme plus efficace
        # en termes de calcul. 

        # Mise à jour linéaire 
        theta += alpha * td_error * phi(s, a) # mise à jour des paramètres theta en utilisant l'erreur TD, 
        # le taux d'apprentissage alpha, et les features phi(s, a) (gradient de Q ici) pour l'état s et l'action a, 
        # ce qui permet d'ajuster les paramètres theta pour mieux approximer la fonction d'action-valeur Q.
        # On utlise l'approximation de l'espérance par une seule réalisation 

        s = s_next

  return theta @ phi(state, action) # la fonction d'action-valeur Q est définie comme le produit
  td_error = y - Q(theta, s, a) # calcul de l'erreur temporelle-différence (TD error),


In [6]:
def policy(theta, state):
    q_values = [Q(theta, state, a) for a in range(n_actions)]
    return np.argmax(q_values)

# Ici on extrait la politique finale à partir des paramètres theta appris, en choisissant pour chaque 
# état l'action qui maximise la fonction d'action-valeur Q.

In [7]:
def evaluate(env, theta, n_eval=50):
    rewards = []
    #Ici regarde le reward moyen obtenu par la politique apprise sur un certain nombre 
    # d'épisodes d'évaluation n_eval. Dans cette modélisation les récompenses sont de 1 à chaque étape 
    # tant que le pole ne tombe pas. 

    for _ in range(n_eval):
        state, _ = env.reset()
        done = False
        total_reward = 0

        while not done: # tant que le pole n'est pas tombé
            action = policy(theta, state) # on choisit l'action selon la politique apprise 
            state, reward, terminated, truncated, _ = env.step(action) # applique l'action à l'environnement 
            # et observe la transition
            done = terminated or truncated # vérifie si l'épisode est terminé
            total_reward += reward # accumule la récompense totale obtenue dans cet épisode

        rewards.append(total_reward)

    return np.mean(rewards) 



In [8]:
print("Average reward:", evaluate(env, theta))

Average reward: 9.18


On a donc implémenté une version d'approximation linéaire de Q. On a vu que le reward moyen n'était pas exeptionnel, nous allosn docn implémenté une version avec Replay Buffer pour améliorer l'estimation de l'espérance. 

In [99]:
buffer_size = 20000
batch_size = 64

replay_buffer = deque(maxlen=buffer_size)

def add_to_buffer(s, a, r, s_next, done):
    replay_buffer.append((s, a, r, s_next, done))

def sample_batch():
    return random.sample(replay_buffer, batch_size)

In [100]:
theta = np.zeros(d) #on réinitialise les paramètres theta à zéro pour entraîner un 
# nouveau modèle à partir du dataset collecté

for episode in range(n_episodes):
    s, _ = env.reset()
    done = False

    while not done:

        # epsilon-greedy
        if np.random.rand() < epsilon:
            a = env.action_space.sample()
        else:
            q_values = [Q(theta, s, a_) for a_ in range(n_actions)]
            a = np.argmax(q_values)

        s_next, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated

        # Ajouter au buffer
        add_to_buffer(s, a, r, s_next, done)

        s = s_next

        # Apprentissage si assez de données
        if len(replay_buffer) >= batch_size:

            batch = sample_batch() #baych aléatoire de transitions (s, a, r, s_next, done) est prélevé du 
            # replay buffer pour entraîner le modèle en evaluant l'espérance par une moyenne sur un batch de
            #  transitions plutôt que par une seule réalisation.

            theta_old = theta.copy()          # θ figé pour calcul des cibles
            grad = np.zeros_like(theta)

            for (s_b, a_b, r_b, s_next_b, done_b) in batch: # on somme sur les transitions du batch pour 
                #calculer la mise à jour des paramètres theta, ce qui permet d'obtenir une estimation plus 
                # stable de la cible y et de l'erreur TD, en réduisant la variance par rapport à l'utilisation 
                # d'une seule transition.

                if done_b:
                    y = r_b
                else:
                    q_next = max(Q(theta_old, s_next_b, a_)
                                 for a_ in range(n_actions)) 
                    y = r_b + gamma * q_next # on applique la même formule de bootstrap que précédement pour 
                    # calculer la cible y, mais cette fois-ci en utilisant les transitions du batch prélevé du 
                    # replay buffer. 

                td_error = y - Q(theta_old, s_b, a_b) # calcul de l'erreur temporelle-différence (TD error) 
                # pour la transition du batch, qui mesure la différence entre la cible y et l'estimation
                # actuelle de Q. 

                grad += td_error * phi(s_b, a_b)

            theta += (alpha/batch_size) * grad

  return theta @ phi(state, action) # la fonction d'action-valeur Q est définie comme le produit
  theta += alpha * td_error * phi(s_b, a_b)
  return theta @ phi(state, action) # la fonction d'action-valeur Q est définie comme le produit


In [123]:
print("Average reward (replay):", evaluate(env, theta))

Average reward (replay): 9.2


## Passage au NFQ

On observe une non-amélioration du reward moyen avec le replay buffer.  
Cela indique que le problème ne venait pas de l’estimation de l’espérance ni de la variance du gradient, mais plutôt de la capacité du modèle ou de la structure de l’optimisation.

Nous passons donc au NFQ.

---

## Replay Buffer

Avec un replay buffer :

- On échantillonne un mini-batch.
- On calcule les cibles avec les paramètres courants.
- On calcule le gradient et on fait un pas de descente.
- On met à jour les paramètres.
- On recommence immédiatement.

Les cibles sont donc recalculées fréquemment.

L’optimisation est incrémentale.

Il n’y a pas de séparation stricte entre la construction des cibles et l’optimisation.

---

## NFQ

Dans le NFQ, on travaille sur l’ensemble du dataset d’un seul coup.

À l’itération k :

1. On fixe les paramètres $\theta_k$

2. On construit toutes les cibles :

$$
y_i^{(k)} = r_i + \gamma \max_{a'} Q_{\theta_k}(s'_i, a')
$$

3. On optimise complètement le problème (soit par descente de gradient itérative durant laquelle les cibles ne changent  pas soit par résolution d'un système linéaire) :

$$
\min_\theta \sum_i \left( Q_\theta(s_i, a_i) - y_i^{(k)} \right)^2
$$

Pendant cette optimisation :

- les cibles restent fixes,
- seule la fonction d’approximation évolue.

On obtient ensuite $\theta_{k+1}$

Puis on reconstruit de nouvelles cibles.

---

## Interprétation structurelle

Replay buffer  
→ optimisation incrémentale avec recalcul fréquent des cibles.

NFQ  
→ application de l’opérateur de Bellman puis projection complète.

La dynamique correspond à :

$$
Q_{k+1} = \Pi(\mathcal{T} Q_k)
$$

---


In [None]:
def collect_dataset(env, n_transitions=50000):
    dataset = []
    state, _ = env.reset() # on initialise l'environnement et on obtient le premier état,
    # qui est un vecteur de taille 4

    while len(dataset) < n_transitions:
        action = env.action_space.sample()
        next_state, reward, terminated, truncated, _ = env.step(action)

        done = terminated or truncated

        dataset.append((state, action, reward, next_state, done))

        if done:
            state, _ = env.reset()
        else:
            state = next_state

    return dataset 

# l'idée ici est de collecter un dataset de transitions (s, a, r, s_next, done) en interagissant avec l'environnement

dataset = collect_dataset(env, n_transitions=50000)

In [128]:
theta = np.zeros(d) # on réinitialise les paramètres theta à zéro pour entraîner un 
# nouveau modèle à partir du dataset collecté

In [129]:
def nfq_iteration(theta, dataset, gamma, ridge=1e-4):
    # le ridge c'ets juste pour la régularisation de la solution de régression linéaire, 
    # qui permet d'éviter la non inversibilité de la matrice dans la solution de régression linéaire

    N = len(dataset)
    Phi = np.zeros((N, len(theta)))
    y = np.zeros(N)

    # Construction des cibles
    for i, (s, a, r, s_next, done) in enumerate(dataset):

        Phi[i] = phi(s, a)

        if done:
            target = r
        else:
            q_next = max(Q(theta, s_next, a_) for a_ in range(n_actions))
            target = r + gamma * q_next # construction des cibles y à partir des transitions du dataset, 
            # en utilisant la même formule de bootstrap que précédement.

        y[i] = target

    # Solution ridge : (ΦᵀΦ + λI)θ = Φᵀy
    A = Phi.T @ Phi + ridge * np.eye(len(theta))
    b = Phi.T @ y
    # résolution du système linéaire pour trouver les paramètres theta qui minimisent l'erreur 
    # quadratique entre les cibles y et les prédictions Phi @ theta, ce qui correspond à une 
    # étape de régression linéaire pour ajuster les paramètres theta de la fonction d'action-valeur Q 
    # en fonction du dataset collecté.

    theta_new = np.linalg.solve(A, b)
    

    return theta_new

In [130]:
n_iterations = 50

for k in range(n_iterations):
    theta = nfq_iteration(theta, dataset, gamma)
    # On itère plusieurs fois sur le dataset pour affiner les paramètres theta, 
    # ce qui permet d'améliorer l'approximation de la fonction d'action-valeur Q à partir du dataset collecté.
    # entre chaque itération, les paramètres theta sont mis à jour en fonction du dataset collecté, 
    # ce qui permet d'améliorer l'approximation de la fonction d'action-valeur Q à partir du dataset collecté.

In [138]:
print("Average reward:", evaluate(env, theta))

Average reward: 173.35


On remarque un reward bien superieur et un temps de calcul comparable à la version sans replay buffer. C'est une $\textcolor{red}{\text{large}}$ victoire. 