# Policy Iteration

Once a policy has been improved using its value function to yield a better policy, we can then compute a new value function and improve again to yield an even better policy. We can thus obtain a sequence of monotonically improving policies and value functions.

We need the policy evaluation and policy improvement functions:

In [None]:
import numpy as np

def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
    prev_V = np.zeros(len(P))
    while True:
        V = np.zeros(len(P))
        for s in range(len(P)):
            for prob, next_state, reward, done in P[s][pi(s)]:
                V[s] += prob * (reward + gamma * prev_V[next_state] * (not done))
        if np.max(np.abs(prev_V - V)) < theta:
            break
        prev_V = V.copy()
    return V

In [None]:
def policy_improvement(V, P, gamma=1.0):
    Q = np.zeros((len(P), len(P[0])))
    for s in range(len(P)):
        for a in range(len(P[s])):
            for prob, next_state, reward, done in P[s][a]:
                Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
    new_pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
    return new_pi

In [None]:
def policy_iteration(P, gamma=1.0, theta=1e-10):
    
    # create a random policy: create a list of random actions 
    # and then map them to states
    random_actions = np.random.choice(tuple(P[0].keys()), len(P))
    pi = lambda s: {s:a for s, a in enumerate(random_actions)}[s]
    
    while True:
        # keep a copy of the policy before modify it
        old_pi = {s:pi(s) for s in range(len(P))}
        
        # get the state-value function of the policy
        V = policy_evaluation(pi, P, gamma, theta)
        
        # get an improved policy
        pi = policy_improvement(V, P, gamma)
        
        # if it’s different, we do it all over again
        if old_pi == {s:pi(s) for s in range(len(P))}:
            break
    
    # if it’s not, we break out of the loop and return 
    # the optimal policy and the optimal state-value function 
    return V, pi

Let’s try it on the taxi environment.

In [None]:
import gym

env = gym.make('Taxi-v3')
P = env.env.P
init_state, _ = env.reset()

In [None]:
V_best_p, pi_best_p = policy_iteration(P, gamma=0.99)

We can print the state value function of the policy:

In [None]:
def print_state_value_function(V, P, n_cols=5, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(3), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [None]:
print_state_value_function(V_best_p, P, prec=4)

We can also print the policy, the probability of success and the mean return:

In [None]:
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^', 'P', 'D'), n_cols=5, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi(s)
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(3), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [None]:
import random

def probability_success(env, pi, n_episodes=100, max_steps=200):
    random.seed(123); np.random.seed(123) ; # env.seed(123)
    results = []
    for _ in range(n_episodes):
        state, _ = env.reset()
        done, steps = False, 0
        while not done and steps < max_steps:
            state, _, done, _, h = env.step(pi(state))
            steps += 1
        results.append(done)
    return np.sum(results)/len(results)

In [None]:
def mean_return(env, pi, n_episodes=100, max_steps=200):
    random.seed(123); np.random.seed(123) ; # env.seed(123)
    results = []
    for _ in range(n_episodes):
        state, _ = env.reset()
        done, steps = False, 0
        results.append(0.0)
        while not done and steps < max_steps:
            state, reward, done, _, _ = env.step(pi(state))
            results[-1] += reward
            steps += 1
    return np.mean(results)

In [None]:
print_policy(pi_best_p, P)

ps = probability_success(env, pi_best_p)*100
mr = mean_return(env, pi_best_p)

print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(ps,mr))