# RL I: Environnement 

## 1- Le dictionnaire P

P:{$S_0$: {$a_0$: [(s'), (s'), ...], $a_1$: [(s'), (s'), ...]},
   $S_1$: {$a_0$: [(s'), (s'), ...], $a_1$: [(s'), (s'), ...]}}

P[S][a] --> [(s'), (s'), ...]

(s') = (p(s'|s,a), s', r(s,a,s'), done?)

## 2- Exemple du Row Gridword

|-100|0|0|0|+10|
|||s|||

In [1]:
class GridworldRow():
    def __init__(self, size=5):
        self.size = size

        self.nS = size  # number of states
        self.nA = 2     # number of actions: left, right

        self.MAX_X = size - 1 

        P = {}

        for s in range(self.nS):
            dynamics_s = {}
            for a in range(self.nA):
                s_prime_list = []

                p = 1 if s!=0 and s != self.MAX_X else 0

                if a == 0:  # left
                    s_prime = max(s - 1, 0)
                else:
                    s_prime = min(s + 1, self.MAX_X)
                
                if (s_prime == 0):
                    reward = -100
                    done = True
                elif (s_prime == self.MAX_X):
                    reward = 10
                    done = True
                else:
                    reward = 0
                    done = False

                s_prime_list.append((p, s_prime, reward, done))
                dynamics_s.update({a: s_prime_list})
            P.update({s: dynamics_s})
        self.P = P

In [2]:
env = GridworldRow()
env.P  # Access the transition dynamics

{0: {0: [(0, 0, -100, True)], 1: [(0, 1, 0, False)]},
 1: {0: [(1, 0, -100, True)], 1: [(1, 2, 0, False)]},
 2: {0: [(1, 1, 0, False)], 1: [(1, 3, 0, False)]},
 3: {0: [(1, 2, 0, False)], 1: [(1, 4, 10, True)]},
 4: {0: [(0, 3, 0, False)], 1: [(0, 4, 10, True)]}}

# RL II: Implémentation de Policy Evaluation, Improvement, Iteration

## 1. Policy Evaluation

Calcul de la value function $v_{\pi}$ de la policy aléatoire $\pi$ dans le Row GridWorld

L'algorithme de *Policy Evaluation* calcule $v_{\pi} pour $\pi$, pour chaque state S. Pour ce faire il applique itérativement la première équation d'espérance de Bellman.

$$ V(s) \leftarrow \sum_a \pi(a|s)\sum_{s'}p(s'|s, a)[r(s, a, s') + \gamma V(s')], \forall s$$

In [3]:
import numpy as np 
env = GridworldRow()
pi = np.ones([env.nS, env.nA]) / env.nA     # uniform random policy
V = np.zeros([env.nS, 1])

gamma = 0.9 # discount factor
theta = 1e-10 # stopping threshold

On décompose le problème :

$$
V_\pi(s) \;=\; \sum_{a}\pi(a\mid s)\,Q_\pi(s,a), \quad \forall s.
$$

avec
$$
Q_\pi(s,a)\;=\;\sum_{s'} p(s'\mid s,a)\Bigl[r(s,a,s')+\gamma\,V_\pi(s')\Bigr].
$$

Calculons donc $Q_\pi(s,a)$ à partir de $V_\pi$.

In [4]:
def compute_q_value_s_a(env, V, s, a, gamma):
    q_sa = 0
    for p, s_prime, r, done in env.P[s][a]:
        q_sa += p*(r + gamma*V[s_prime])
    return float(q_sa[0])

boucle d'update

In [5]:
import numpy as np

def policy_evaluation(env, pi, V, gamma=0.99, theta=1e-8, max_iter=10_000):
    """
    Évalue une politique pi en calculant V_pi via des mises à jour itératives (Bellman expectation backup).

    Paramètres
    ----------
    env : environnement (discret)
        Doit exposer env.nS (nb d'états), env.nA (nb d'actions), et la dynamique utilisée
        par compute_q_value_s_a(...).
    pi : array-like, shape (nS, nA)
        Politique stochastique : pi[s][a] = P(a|s).
    V : np.ndarray, shape (nS,)
        Valeur d'état initiale. Modifiée en place et aussi renvoyée.
    gamma : float
        Facteur d'actualisation.
    theta : float
        Seuil de convergence (norme infinie sur la variation de V).
    max_iter : int
        Garde-fou pour éviter une boucle infinie si problème de convergence.

    Retour
    ------
    V : np.ndarray
        Estimation de V_pi.
    n_iter : int
        Nombre d'itérations effectuées.
    """
    V = np.asarray(V, dtype=float)  # assure le type float
    pi = np.asarray(pi, dtype=float)

    for n_iter in range(1, max_iter + 1):
        delta = 0.0

        for s in range(env.nS):
            v_old = V[s]

            v_new = 0.0
            for a in range(env.nA):
                prob_a = pi[s, a]
                q_sa = compute_q_value_s_a(env, V, s, a, gamma)
                v_new += prob_a * q_sa

            delta = max(delta, abs(v_new - v_old)) 
            V[s] = v_new

        # Critère de convergence
        if delta < theta:
            return V, n_iter

    # Si on sort ici, on n'a pas convergé dans max_iter itérations
    return V, n_iter


v, n_it = policy_evaluation(env, pi, V)
print(v, n_it)

[[  0.        ]
 [-71.62197274]
 [-43.68075301]
 [-16.62197274]
 [  0.        ]] 32


## 2. Policy improvement

Déduction de $\pi$' grâce à $v_{\pi}$ dans le Row Gridworld
Il attribue à $\pi$' pour chaque state, l'action qui maximize le return $q_{\pi}(s, a)$ de la manière suivante:

$$\pi'(s) \leftarrow argmax_a Q(s, a), \forall s$$

In [6]:
def policy_improvement(env, V, gamma=0.9):
    """
    Améliore une politique à partir d'une fonction de valeur V (greedy policy improvement).

    Retour
    ------
    pi_new : np.ndarray, shape (nS, nA)
        Politique déterministe greedy : pi_new[s, a*] = 1 pour l'action a* argmax_a Q(s,a).
    """
    pi_new = np.zeros((env.nS, env.nA), dtype=float)

    for s in range(env.nS):
        # Calcule tous les Q(s,a) pour l'état s
        q_values = np.zeros(env.nA, dtype=float)
        for a in range(env.nA):
            q_values[a] = compute_q_value_s_a(env, V, s, a, gamma)

        best_a = int(np.argmax(q_values))
        # Politique déterministe: 1 sur best_a, 0 ailleurs
        pi_new[s, best_a] = 1.0

    return pi_new


new_pi = policy_improvement(env, V, gamma=0.9)
new_pi

array([[1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.]])

## 3. Policy iteration

Résolution complète du Row GridWorld

Enchaînement de la tâche de *prédiction* et de *contrôle*. Prédiction: **Policy Evaluation**, Contrôle: **Policy improvement**

In [7]:
import numpy as np

def policy_iteration(env, V=None, pi=None, gamma=0.9, theta=1e-8, max_iter=10_000):
    """
    Policy Iteration (itération sur les politiques) :
    1) Policy Evaluation : calcule V^{pi}
    2) Policy Improvement : pi <- greedy(V^{pi})
    Jusqu'à stabilité de la politique.

    Paramètres
    ----------
    env : env discret 
    V : np.ndarray (nS,) ou None
        Valeur initiale. Si None -> vecteur de zéros.
    pi : np.ndarray (nS,nA) ou None
        Politique initiale (stochastique). Si None -> uniforme.
    gamma : float
        Discount.
    theta : float
        Seuil de stabilité de la politique (on compare old_pi vs new_pi).
    max_iter : int
        Nombre max d'itérations de policy iteration.

    Retour
    ------
    V : np.ndarray (nS,)
    pi : np.ndarray (nS,nA)
    n_outer : int (nombre d'itérations externes)
    """
    # Initialisations robustes
    if V is None:
        V = np.zeros(env.nS, dtype=float)
    else:
        V = np.asarray(V, dtype=float)

    if pi is None:
        pi = np.ones((env.nS, env.nA), dtype=float) / env.nA
    else:
        pi = np.asarray(pi, dtype=float)

    for n_outer in range(1, max_iter + 1):
        old_pi = pi.copy() 

        # 1) Évaluation de la politique courante
        V, _ = policy_evaluation(env, old_pi, V, gamma=gamma, theta=theta)

        # 2) Amélioration (politique greedy)
        pi = policy_improvement(env, V, gamma=gamma)

        # 3) Test de stabilité de la politique
        delta = np.max(np.abs(pi - old_pi))

        if delta < theta:
            return V, pi, n_outer

    return V, pi, n_outer


V_star, pi_star, n = policy_iteration(env, V, pi)
print(V_star, pi_star, n)

    

[[ 0. ]
 [ 8.1]
 [ 9. ]
 [10. ]
 [ 0. ]] [[1. 0.]
 [0. 1.]
 [0. 1.]
 [0. 1.]
 [1. 0.]] 2
