In [4]:
from IPython.core.debugger import set_trace
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import pprint
from gridworld import GridworldEnv

## Policy Evalution

In [5]:
pp = pprint.PrettyPrinter(indent=2)
env = GridworldEnv()

In [10]:
def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
    """
    policy: [S, A]
    env.P[s][a]: (prob, next_state, reward, done)
    env.nS: # of states
    env.nA: # of actions
    theta: threshold to stop policy evaluation
    discount_factor: gamma in vf
    """
    V = np.zeros(env.nS)
    while True: # this is to make the code below iterable for 'env'
        delta = 0
        for s in range(env.nS):
            v = 0
            for a, action_prob in enumerate(policy[s]):
                # for each action, look at the possible next states
                for prob, next_state, reward, done in env.P[s][a]:
                    v += action_prob * prob * (reward + discount_factor * V[next_state])
            # the change of vf for each state
            delta = max(delta, np.abs(v-V[s]))
            V[s] = v
        # stop evaluating once the change of vf is below a threshold
        if delta < theta:
            break
    return np.array(V)             

In [11]:
random_policy = np.ones([env.nS, env.nA]) / env.nA # this is a random initial test policy
v = policy_eval(random_policy, env)

In [12]:
print(v)

[  0.         -13.99993529 -19.99990698 -21.99989761 -13.99993529
 -17.9999206  -19.99991379 -19.99991477 -19.99990698 -19.99991379
 -17.99992725 -13.99994569 -21.99989761 -19.99991477 -13.99994569
   0.        ]


In [6]:
def one_step_lookahead(state, V, discount_factor=1.0):
    """
    This function calculates the expected values of all actions in a given state.
    """
    A = np.zeros(env.nA)
    
    for a in range(env.nA):
        for prob, next_state, reward, done in env.P[state][a]:
            A[a] += prob * (reward + discount_factor * V[next_state])
    
    return A


## Policy Iteration

In [13]:
def policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=1.0):
    """
    by default we take in the `policy_eval()` function from above
    
    Two parts of policy iteration:
    1. policy evaluation
    2. policy improvement
    """
    policy = np.ones([env.nS, env.nA]) / env.nA
    while True: 
        V = policy_eval_fn(policy, env, discount_factor) # from above
        policy_stable = True
        for s in range(env.nS):
            # for each state, return the index of action with highest prob (initial: 1/(env.nA))
            chosen_a = np.argmax(policy[s]) 
            action_values = one_step_lookahead(s, V, discount_factor)
            # return the index of the best one step lookahead action
            best_a = np.argmax(action_values)

            if chosen_a != best_a:
                policy_stable = False

            # make it '1' for the best action for this state; other actions '0'
            policy[s] = np.eye(env.nA)[best_a] 

        if policy_stable:
            return policy, V
    

In [14]:
policy_pi, v_pi = policy_improvement(env)
print(policy_pi)
print(np.reshape(np.argmax(policy_pi, axis=1), env.shape))
print(v_pi)

[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]
[[0 3 3 2]
 [0 0 0 2]
 [0 0 1 2]
 [0 1 1 0]]
[ 0. -1. -2. -3. -1. -2. -3. -2. -2. -3. -2. -1. -3. -2. -1.  0.]


## Value Iteration

In [7]:
def value_iteration(env, theta=0.0001, discount_factor=1.0):
    V = np.zeros(env.nS)
    while True:
        delta = 0
        for s in range(env.nS):
            A = one_step_lookahead(s, V, discount_factor)
            best_action_value = np.max(A)
            delta = max(delta, np.abs(best_action_value-V[s]))
            V[s] = best_action_value
        if delta < theta:
            break
            
    policy = np.zeros([env.nS, env.nA])
    for s in range(env.nS):
        A = one_step_lookahead(s, V, discount_factor)
        best_action = np.argmax(A)
        policy[s, best_action] = 1.0
    
    return policy, V    
   

In [8]:
policy_vi, v_vi = value_iteration(env)
print(policy_vi)
print(np.reshape(np.argmax(policy_vi, axis=1), env.shape))
print(v_vi)

[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]
[[0 3 3 2]
 [0 0 0 2]
 [0 0 1 2]
 [0 1 1 0]]
[ 0. -1. -2. -3. -1. -2. -3. -2. -2. -3. -2. -1. -3. -2. -1.  0.]


In [15]:
np.testing.assert_array_almost_equal(v_pi, v_vi, decimal=2)