# Policy Iteration

$\pi_0 \overset{evaluate}{\longrightarrow }  v_{\pi_0} \overset{improve}{\longrightarrow } \pi_1 \overset{evaluate}{\longrightarrow }  v_{\pi_1} \overset{improve}{\longrightarrow } \pi_2 \overset{evaluate}{\longrightarrow } ... \overset{improve}{\longrightarrow } \pi_* \overset{evaluate}{\longrightarrow }  v_{\pi_*}$ 

We need the policy evaluation and policy improvement functions:

In [1]:
import numpy as np

def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
    prev_V = np.zeros(len(P))
    while True:
        V = np.zeros(len(P))
        for s in range(len(P)):
            for prob, next_state, reward, done in P[s][pi(s)]:
                V[s] += prob * (reward + gamma * prev_V[next_state])
        if np.max(np.abs(prev_V - V)) < theta:
            break
        prev_V = V.copy()
    return V

In [2]:
def policy_improvement(V, P, gamma=1.0):
    Q = np.zeros((len(P), len(P[0])))
    for s in range(len(P)):
        for a in range(len(P[s])):
            for prob, next_state, reward, done in P[s][a]:
                Q[s][a] += prob * (reward + gamma * V[next_state])
    new_pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
    return new_pi

We can thus obtain a sequence of monotonically improving policies and value functions:

In [3]:
def policy_iteration(P, gamma=1.0, theta=1e-10):
    
    # create a random policy: a list of random actions for each state
    random_actions = np.random.choice(tuple(P[0].keys()), len(P))
    pi = lambda s: {s:a for s, a in enumerate(random_actions)}[s]
    
    while True:
        # keep a copy of the policy before modify it
        old_pi = {s:pi(s) for s in range(len(P))}
        
        # get the state-value function of the policy
        V = policy_evaluation(pi, P, gamma, theta)
        
        # get an improved policy
        pi = policy_improvement(V, P, gamma)
        
        # if it’s different, we do it all over again
        if old_pi == {s:pi(s) for s in range(len(P))}:
            break
    
    # if it’s not, we break out of the loop and return 
    # the optimal policy and the optimal state-value function 
    return V, pi

Let’s try it on the frozen-lake environment.

In [4]:
import gymnasium as gym

frozen_lake = gym.make('FrozenLake-v1')
P = frozen_lake.env.unwrapped.P
goal_state = 15

LEFT, DOWN, RIGHT, UP = range(4)

In [5]:
V_best_p, pi_best_p = policy_iteration(P, gamma=0.99)

We can use the supporting functions to print the policy and its state value function:

In [7]:
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi(s)
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [8]:
print_policy(pi_best_p, P)

Policy:
| 00      < | 01      ^ | 02      ^ | 03      ^ |
| 04      < |           | 06      < |           |
| 08      ^ | 09      v | 10      < |           |
|           | 13      > | 14      v |           |


In [9]:
def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [10]:
print_state_value_function(V_best_p, P, prec=4)

State-value function:
| 00  0.542 | 01 0.4988 | 02 0.4707 | 03 0.4569 |
| 04 0.5585 |           | 06 0.3583 |           |
| 08 0.5918 | 09 0.6431 | 10 0.6152 |           |
|           | 13 0.7417 | 14 0.8628 |           |


We can also measure the probability of success and the mean return:

In [11]:
def evaluate(env, pi, goal_state, n_episodes=100, max_steps=200):
    success = 0;
    results = []
    for _ in range(n_episodes):
        done = False;
        steps = 0;
        state, _ = env.reset();
        results.append(0.0)
        while not done and steps < max_steps:
            state, reward, done, _, _ = env.step(pi(state))
            results[-1] += reward;
            steps += 1
        if(state == goal_state):
            success += 1;
    return (success/n_episodes)*100, np.mean(results);

In [12]:
probability_success, mean_return = evaluate(frozen_lake, pi_best_p, goal_state=goal_state);

print("Reaches goal ", probability_success, "%");
print("Obtains an average undiscounted return of ", mean_return);

Reaches goal  77.0 %
Obtains an average undiscounted return of  0.77
