<b>What is this notebook?</b><br>
This notebook includes code for the policy iteration algorithm. This algorithm takes as input the Markov Decision process (shown below) and returns the number of iterations required for the algorithm to terminate.

In [32]:
import numpy as np

In [4]:
# example of MDP
input = {
"gamma":0.75,
"states": [
{
"id": 0,
"actions": [
{
"id": 0,
"transitions": [
{
"id": 0,
"probability": 0.5,
"reward": 0,
"to": 0
},
{
"id": 1,
"probability": 0.5,
"reward": 0,
"to": 1
}
]
}
]
},
{
"id": 1,
"actions": [
{
"id": 0,
"transitions": [
{
"id": 0,
"probability": 1,
"reward": 1,
"to": 1
}
]
}
]
}
]
}

In [22]:
states = input['states']
initial_policy = [state['actions'][0] for state in states]
initial_policy

[{'id': 0,
  'transitions': [{'id': 0, 'probability': 0.5, 'reward': 0, 'to': 0},
   {'id': 1, 'probability': 0.5, 'reward': 0, 'to': 1}]},
 {'id': 0, 'transitions': [{'id': 0, 'probability': 1, 'reward': 1, 'to': 1}]}]

In [27]:
states[0]['actions']

[{'id': 0,
  'transitions': [{'id': 0, 'probability': 0.5, 'reward': 0, 'to': 0},
   {'id': 1, 'probability': 0.5, 'reward': 0, 'to': 1}]}]

{'transitions': [{'to': 0, 'reward': 0, 'id': 0, 'probability': 0.5}, {'to': 1, 'reward': 0, 'id': 1, 'probability': 0.5}], 'id': 0}

In [38]:
def get_value(V, action, gamma):
    '''
    Calculates value of given action.
    INPUT 
        V: numpy vector. Vector of values.
        action: action item in MDP dictionary. Includes 'transitions' key.
        gamma: float. Discount factor.
    OUTPUT
        Value: float.
    '''
    Value = np.sum([
        transition['probability'] * transition['reward'] + (gamma * V[transition['to']])
        for transition in action['transitions']
    ])
    return Value

In [28]:
def perform_policy_evaluation(V, max_delta, theta, states, current_policy, gamma):
    '''
    Performs evaluation step to product Value vector.
    INPUT 
        V: vector of length num_states. 
        max_delta: float. biggest delta value.
        theta: float. Cutoff value. Once max_delta < theta, we cut off this evaluation.
        states: dictionary. Item in MDP dict.
        current_policy: dictionary. Created in policy iteration function.
    '''
    # Policy Evaluation
    while max_delta > theta:
        max_delta = 0
        for idx, state in enumerate(states):
            # get value of each state
            v = V[idx]
            V[idx] = get_value(V, current_policy[idx], gamma)
            delta = np.abs(v - V[idx])

            if delta > max_delta:
                max_delta = delta
                
    return V

In [29]:
def policy_improvement(states, current_policy, V, gamma):
    '''
    Performs policy improvement component of policy iteration.
    INPUT 
        states: dictionary. MDP item.
        current_policy: dictionary. Created in policy iteration function.
        V: vector of state-action values.
        gamma: float. 
    OUTPUT
        policy_stable: boolean. Whether we've "converged" or not.
        current_policy: dictionary. Updated (or not) policy.
    '''
    policy_stable = True
    for idx, state in enumerate(states):
        b = current_policy[idx]
        policy_value = V[idx]
        for action in state['actions']:
            action_value =  get_value(V, action, gamma)
            if action_value > policy_value:
                current_policy[idx] = action
        if current_policy[idx] != b:
            policy_stable = False
            
    return policy_stable, current_policy

In [30]:
def policy_iteration(mdp, theta=.0000000001):
    '''
    Performs policy iteration on MDP and returns number of iterations required for algorithm
    to terminate.
    INPUT 
        mdp: dictionary, see cell above for example.
        theta: minimum update threshold between iterations.
    OUTPUT:
        num_iterations: integer. Number of PI iterations required, given MDP.
    '''
    gamma = mdp['gamma']
    states = mdp['states']
    
    # Initialize all states' Q values to 0.
    num_states = len(states)
    Q = np.zeros(num_states)
    V = np.zeros(num_states)
    #TODO: build initial policy that's assured to be coherent
    current_policy = [state['actions'][0] for state in states]
    policy_outcomes = []
    max_delta = 1
    
    V = perform_policy_evaluation(V, max_delta, theta, states, current_policy, gamma)
    
    # Policy Improvement
    num_iterations = 0
    policy_stable = False
    while policy_stable == False:
        policy_stable, current_policy = policy_improvement(states, current_policy, V, gamma)
        num_iterations += 1
    
    return num_iterations

In [39]:
policy_iteration(input, theta=.0000000001)

1