## RLDMUU 2025
#### Policy Iteration
jakub.tluczek@unine.ch

In [19]:
import numpy as np

In [20]:
class DiscreteMDP:
    def __init__(self, n_states, n_actions, P = None, R = None):
        self.n_states = n_states 
        self.n_actions = n_actions 
        if (P is None):
            self.P = np.zeros([n_states, n_actions, n_states]) 
            for s in range(self.n_states):
                for a in range(self.n_actions):
                    self.P[s,a] = np.random.dirichlet(np.ones(n_states))
        else:
            self.P = P
        if (R is None):
            self.R = np.zeros([n_states, n_actions])
            for s in range(self.n_states):
                for a in range(self.n_actions):
                    self.R[s,a] = np.round(np.random.uniform(), decimals=1)
        else:
            self.R = R
        
        for s in range(self.n_states):
            for a in range(self.n_actions):
                assert(abs(np.sum(self.P[s,a,:])-1) <= 1e-3)
                assert((self.P[s,a,:] <= 1).all())
                assert((self.P[s,a,:] >= 0).all())
                
    def get_transition_probability(self, state, action, next_state):
        return self.P[state, action, next_state]
    
    def get_transition_probabilities(self, state, action):
        return self.P[state, action]
    
    def get_reward(self, state, action):
        return self.R[state, action]


In [21]:

class ChainMDP(DiscreteMDP):
    """
    Problem where we need to take the same action n_states-1 time in a row to get a highly rewarding state
    The optimal policy greatly depends on the discount factor we choose.
    """

    def __init__(self, n_states=20):
        assert  n_states > 1

        n_actions = 2
        super().__init__(n_states=n_states, n_actions=n_actions)

        self.R[:] = 0.
        self.P[:] = 0.

        self.R[:, 1] = -1 / (n_states-1)
        self.R[n_states-1, 1] = 1.
        self.R[:, 0] = 1/n_states

        for i in range(self.n_states-1):
            if i > 0:
                self.P[i, 0, i-1] = 1.
            else:
                self.P[i, 0, i] = 1.

            self.P[i, 1, i+1] = 1.

        self.P[self.n_states-1, :, self.n_states-1] = 1.

### Policy iteration

Your first task will be to implement the policy iteration algorithm. Let's start with policy evaluation. Given a policy $\pi$, you have to evaluate this policy on all states. 

$$ V^{\pi}(s) = \sum_{s'} P(s' | s, \pi(s)) [R(s, \pi(s)) + \gamma V^{\pi}(s')] $$ 

You can either program the policy evaluation using dynamic programming, or by using the equation:

$$ V^{\pi} = \left[\mathbb{I} - \gamma \mathbb{P} \right]^{-1} r $$ 

where $\mathbb{I}$ is an identity matrix, $\mathbb{P}$ a probability matrix and $r$ a reward vector.

In [22]:
mdp = ChainMDP()
np.array([mdp.get_transition_probabilities(s, 0) for s in range(mdp.n_states)])

array([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.

In [44]:
def policy_evaluation_dynamic_programming(mdp: DiscreteMDP, gamma: float, policy: list[int], n_iters: int):
    V = np.zeros(mdp.n_states)
    Q = np.zeros(mdp.n_states)
    for _ in range(n_iters):
        for s in range(mdp.n_states):
            a = int(policy[s])
            P = mdp.get_transition_probabilities(s, a)
            Q[s] = mdp.get_reward(s, a)
            for next_s in range(mdp.n_states):
                Q[s] += gamma * P[next_s] * V[next_s]
        V = Q
    return V

def policy_evaluation_matrix_multiplication(mdp: DiscreteMDP, gamma: float, policy: list[int]):
    I = np.eye(mdp.n_states)
    R = np.array([mdp.R[s, int(policy[s])] for s in range(mdp.n_states)])
    P = np.array([mdp.get_transition_probabilities(s, int(policy[s])) for s in range(mdp.n_states)])
    return np.linalg.inv(I - gamma * P) @ R  


Now implement policy iteration by evaluating policy at each state and set the policy to:

$$ \pi(s) = \arg\max_{a \in A} Q^\pi (s,a) $$

Iterate until you reach maximal number of iterations or until newly computed values don't differ by more than some $\epsilon$ or until $\pi$ doesn't change.

In [45]:
def policy_iteration(mdp: DiscreteMDP, gamma: float, n_iters: int, n_eval_iters: int, use_dp: bool = False):
    policy = np.zeros(mdp.n_states)
    V = np.zeros(mdp.n_states)
    Q = np.zeros([mdp.n_states, mdp.n_actions])
    for _ in range(n_iters):
        if use_dp:
            V = policy_evaluation_dynamic_programming(mdp, gamma, policy, n_eval_iters)
        else:
            V = policy_evaluation_matrix_multiplication(mdp, gamma, policy)

        # Policy improvement
        for s in range(mdp.n_states):
            for a in range(mdp.n_actions):
                P = mdp.get_transition_probabilities(s,a)
                Q[s,a] = mdp.get_reward(s, a) 
                for next_s in range(mdp.n_states):
                    Q[s,a] += gamma * P[next_s] * V[next_s]
            policy[s] = np.argmax(Q[s,:])
    
    return policy, V
        

### Value iteration

Your second task will be to implement Value Iteration algorithm. At each timestep $t$, you have to compute a new value function by maximizing the Bellman equation.

$$ V_t (s) = \max_a \left( \sum_{s'} P(s'|s, a) [R(s,a) + \gamma V_{t-1}(s')] \right) $$

Once $V$ converges to some $V^{*}$ (or once you reach the limit of iterations), you can extract the policy for each state:

$$ \pi^{*}(s) = \arg\max_a \sum_{s'} P(s' | s, a) [R(s,a) + \gamma V^{*}(s')] $$ 

In [46]:
def value_iteration(mdp: DiscreteMDP, gamma: float, n_iters: int):
    policy = np.zeros(mdp.n_states)
    V = np.zeros(mdp.n_states)
    Q = np.zeros([mdp.n_states, mdp.n_actions])
    for _ in range(n_iters):
        for s in range(mdp.n_states):
            for a in range(mdp.n_actions):
                Q[s,a] = mdp.get_reward(s,a)
                P = mdp.get_transition_probabilities(s,a)
                for next_s in range(mdp.n_states):
                    Q[s,a] += gamma * P[next_s] * V[next_s]
        V = np.max(Q, axis=1)
        policy = np.argmax(Q, axis=1)
    return policy, V

### Results

Run both methods on the provided MDP for a given Chain MDP instance and compare the results 

In [48]:
mdp = ChainMDP()

N_ITERS = 10_000
N_EVAL_ITERS = 100
GAMMA = .9

policy, V = policy_iteration(mdp, GAMMA, N_ITERS, N_EVAL_ITERS)
print("POLICY ITERATION")
print(f"POLICY:\n{policy}")
print(f"V\n{V}")
policy, V = value_iteration(mdp, GAMMA, N_ITERS)
print("VALUE ITERATION")
print(f"POLICY:\n{policy}")
print(f"V\n{V}")

POLICY ITERATION
POLICY:
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
V
[ 0.89563339  1.05362774  1.22917702  1.42423178  1.64095929  1.88176763
  2.14933245  2.4466267   2.77695364  3.14398358  3.55179462  4.004918
  4.50838842  5.0678      5.68936842  6.38        7.14736842  8.
  8.94736842 10.        ]
VALUE ITERATION
POLICY:
[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]
V
[ 0.89563339  1.05362774  1.22917702  1.42423178  1.64095929  1.88176763
  2.14933245  2.4466267   2.77695364  3.14398358  3.55179462  4.004918
  4.50838842  5.0678      5.68936842  6.38        7.14736842  8.
  8.94736842 10.        ]
