** Dynamic Programming Lab **  

This is the Lab for the Dynamic Programming module of the edX "Reinforcement Learning Explained" course.  The lab consists of 4 exercises:

   - implement Policy Evaluation using the 2 array approach
   - implement Policy Evaluation using the in-place approach
   - implement Policy Iteration 
   - implement Value Iteration
   
On each of the 4 code cells below (one for each exercise), make sure you don't change the function signature for the primary function you are implementing, and the call to the tester code that verifies its correctness.

When you finish your implemention of each function, execute the code cell and vertify that the code passes.  If it does, save the printed "passcode" value for when you later submit your results on the course webpage for the lab.  If it doesn't pass, correct your code and try again.

** Exercise 1: Policy Evaluation - 2 arrays **  

Policy Evaluation calculates the value function for a policy, given the policy and the full definition of the associated Markov Decision Process.  The full definition of an MDP is the set of states, the set of available actions for each state, the set of rewards, the discount factor, and the state/reward transition function.

Implement the algorithm for Iterative Policy Evaluation using the 2 array approach in the below code cell.  In the 2 array approach, one array holds the value estimates for each state computed on the previous iteration, and one array holds the value estimates for the states computing in the current iteration.

In [2]:
import tester       # required for testing and grading your code

def policy_eval_two_arrays(state_count, gamma, theta, get_policy_actions, get_transitions):
    """
    This function uses the two-array approach to evaluate the specified policy for the specified MDP:
    'state_count' is the total number of states in the MDP. States are represented as 0-relative numbers.
    'gamma' is the MDP discount factor for rewards.
    'theta' is the small number threshold to signal convergence of the value function (see Iterative Policy Evaluation algorithm).
    'get_policy_actions' is the stochastic policy function - it takes a state parameter and returns list of tuples, 
        where each tuple is of the form: (action, probability).  It represents the policy being evaluated.
    'get_transitions' is the state/reward transiton function.  It accepts two parameters, state and action, and returns
        a list of tuples, where each tuple is of the form: (next_state, reward, probabiliity).  
    """
    V = state_count*[0]
    # insert code here to evaluate the policy using the 2 array approach 
    return V

tester.policy_eval_two_arrays_test(policy_eval_two_arrays)   


Testing: Policy Evaluation (two-arrays)
passed test: return value is list
passed test: length of list = 15
ERROR: list elements don't match expected values: # of mismatches=14


In [8]:
#--- Solutions below - remove all below code cells on the student version of the labs ---
import tester

def eval_formula2(state, action, get_transitions, gamma, V):
    trans_sum = 0
    trans_tuples = get_transitions(state, action)
    for tt in trans_tuples:
        next_state = tt[0]
        reward = tt[1]
        trans_prob = tt[2]
        trans_sum += trans_prob * (reward + gamma * V[next_state])
    return trans_sum

def eval_formula(state, state_action_tuples, get_transitions, gamma, V):
    action_sum = 0
    for at in state_action_tuples:
        action = at[0]
        action_prob = at[1]
        action_sum += action_prob * eval_formula2(state, action, get_transitions, gamma, V)
    return action_sum

def policy_eval_two_arrays(state_count, gamma, theta, get_policy_actions, get_transitions):
    V = state_count*[0]
    V_last = state_count*[0]
    k = 0

    while True:
        delta = 0
        #print("k=", k, "V=", V)

        for s in range(state_count):
            v = V_last[s]
            state_action_tuples = get_policy_actions(s)
            
            V[s] = eval_formula(s, state_action_tuples, get_transitions, gamma, V_last)

            delta = max(delta, abs(v-V[s]))
        k += 1
        if (delta < theta):
            break
        V_last = list(V)

    print("FINAL k=", k)
    #print("FINAL V=", V)
    return V

tester.policy_eval_two_arrays_test(policy_eval_two_arrays)



Testing: Policy Evaluation (two-arrays)
FINAL k= 173
passed test: return value is list
passed test: length of list = 15
passed test: values of list elements
PASSED: Policy Evaluation (two-arrays) passcode = 9986-145


In [15]:
tester.get_equiprobable_policy_actions(0)

[('l', 0.25), ('u', 0.25), ('r', 0.25), ('d', 0.25)]

In [14]:
tester.get_transitions(2,'l')

[(1, -1, 1)]

In [18]:
for a, action in testSuper.get_equiprobable_policy_actions(0):
    print(a, action)

l 0.25
u 0.25
r 0.25
d 0.25


In [23]:
import numpy as np

In [30]:
def policy_eval_two_arrays(state_count, gamma, theta, get_policy_actions, get_transitions):
    #V = np.zeros(env.nS)
    V = state_count*[0]
    V_last = state_count*[0]
    while True:
        delta = 0
        # For each state, perform a "full backup"
        for s in range(state_count):
            v = 0
            # Look at the possible next actions
            for a, action_prob in get_policy_actions(s):
                # For each action, look at the possible next states...
                for  next_state, reward, prob in get_transitions(s,a):
                    # Calculate the expected value
                    v += action_prob * prob * (reward + gamma * V_last[next_state])
            # How much our value function changed (across any states)
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        # Stop evaluating once our value function change is below a threshold
        if delta < theta:
            break
        V_last = list(V)
    return V
tester.policy_eval_two_arrays_test(policy_eval_two_arrays)


Testing: Policy Evaluation (two-arrays)
passed test: return value is list
passed test: length of list = 15
passed test: values of list elements
PASSED: Policy Evaluation (two-arrays) passcode = 9986-145


** Exercise 2: Policy Evaluation - in-place method **  

Implement the algorithm for Iterative Policy Evaluation using the in-place approach in the below code cell.  In the in-place approach, one array holds the values being estimated for each state and the same array is used for estimates of states needed by the algorithm.


In [4]:
import tester       # required for testing and grading your code

def policy_eval_in_place(state_count, gamma, theta, get_policy_actions, get_transitions):
    """
    This function uses the in-place approach to evaluate the specified policy for the specified MDP:
    'state_count' is the total number of states in the MDP. States are represented as 0-relative numbers.
    'gamma' is the MDP discount factor for rewards.
    'theta' is the small number threshold to signal convergence of the value function (see Iterative Policy Evaluation algorithm).
    'get_policy_actions' is the stochastic policy function - it takes a state parameter and returns list of tuples, 
        where each tuple is of the form: (action, probability).  It represents the policy being evaluated.
    'get_transitions' is the state/reward transiton function.  It accepts two parameters, state and action, and returns
        a list of tuples, where each tuple is of the form: (next_state, reward, probabiliity).  
    """
    V = state_count*[0]
    # insert code here to evaluate the policy using the in-place approach 
    return V

tester.policy_eval_in_place_test(policy_eval_in_place)   


Testing: Policy Evaluation (in-place)
passed test: return value is list
passed test: length of list = 15
ERROR: list elements don't match expected values: # of mismatches=14


In [5]:
#--- Solutions below - remove all below code cells on the student version of the labs ---
import tester

def eval_formula2(state, action, get_transitions, gamma, V):
    trans_sum = 0
    trans_tuples = get_transitions(state, action)
    for tt in trans_tuples:
        next_state = tt[0]
        reward = tt[1]
        trans_prob = tt[2]
        trans_sum += trans_prob * (reward + gamma * V[next_state])
    return trans_sum

def eval_formula(state, state_action_tuples, get_transitions, gamma, V):
    action_sum = 0
    for at in state_action_tuples:
        action = at[0]
        action_prob = at[1]
        action_sum += action_prob * eval_formula2(state, action, get_transitions, gamma, V)
    return action_sum

def policy_eval_in_place(state_count, gamma, theta, get_policy_actions, get_transitions):
    V = state_count*[0]
    k = 0

    while True:
        delta = 0
        #print("k=", k, "V=", V)

        for s in range(state_count):
            v = V[s]
            state_action_tuples = get_policy_actions(s)
            
            V[s] = eval_formula(s, state_action_tuples, get_transitions, gamma, V)

            delta = max(delta, abs(v-V[s]))
        k += 1
        if (delta < theta):
            break

    print("FINAL k=", k)
    #print("FINAL V=", V)
    return V

tester.policy_eval_in_place_test(policy_eval_in_place)



Testing: Policy Evaluation (in-place)
FINAL k= 114
passed test: return value is list
passed test: length of list = 15
passed test: values of list elements
PASSED: Policy Evaluation (in-place) passcode = 9991-562


In [29]:
def policy_eval_in_place(state_count, gamma, theta, get_policy_actions, get_transitions):
    #V = np.zeros(env.nS)
    V = state_count*[0]
    while True:
        delta = 0
        # For each state, perform a "full backup"
        for s in range(state_count):
            v = 0
            # Look at the possible next actions
            for a, action_prob in get_policy_actions(s):
                # For each action, look at the possible next states...
                for  next_state, reward, prob in get_transitions(s,a):
                    # Calculate the expected value
                    v += action_prob * prob * (reward + gamma * V[next_state])
            # How much our value function changed (across any states)
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        # Stop evaluating once our value function change is below a threshold
        if delta < theta:
            break
    return V
tester.policy_eval_in_place_test(policy_eval_in_place)


Testing: Policy Evaluation (in-place)
passed test: return value is list
passed test: length of list = 15
passed test: values of list elements
PASSED: Policy Evaluation (in-place) passcode = 9991-562


** Exercise 3: Policy Iteration **  

Implement the algorithm for Policy Iteration in the code cell below.  

** Can I just call "policy_eval_in_place()" for the Policy Evaluation step of this algorithm? **  

Note that there is a subtle difference between the algorithm for Policy Evaluation, which assumes the policy is stochastic, and the Policy Evaluation step for the Policy Iteration algorithm, which assumes the policy is deterministic.  This means that you cannot directly call your previous code, but you can reuse large pieces of it for the Policy Evaluation step.


In [3]:
import tester       # required for testing and grading your code

def policy_iteration(state_count, gamma, theta, get_available_actions, get_transitions):
    """
    This function computes the optimal value function and policy for the specified MDP, using the Policy Iteration algorithm.
    'state_count' is the total number of states in the MDP. States are represented as 0-relative numbers.
    'gamma' is the MDP discount factor for rewards.
    'theta' is the small number threshold to signal convergence of the value function (see Iterative Policy Evaluation algorithm).
    'get_available_actions' returns a list of the MDP available actions for the specified state parameter.
    'get_transitions' is the MDP state / reward transiton function.  It accepts two parameters, state and action, and returns
        a list of tuples, where each tuple is of the form: (next_state, reward, probabiliity).  
    """
    V = state_count*[0]                # init all state value estimates to 0
    pi = state_count*[0]
    
    # init with a policy with first avail action for each state
    for s in range(state_count):
        avail_actions = get_available_actions(s)
        pi[s] = avail_actions[0][0]
        
    # insert code here to iterate using policy evaluation and policy improvement (see Policy Iteration algorithm)
    return (V, pi)        # return both the final value function and the final policy

tester.policy_iteration_test(policy_iteration)  


Testing: Policy Iteration
passed test: return value is tuple
passed test: length of tuple = 2
passed test: v is list of length=15
ERROR: v elements don't match expected values: # of mismatches=14


** Exercise 4: Value Iteration **  

Implement the algorithm for Value Iteration in the code cell below.  

In [4]:
import tester       # required for testing and grading your code

def value_iteration(state_count, gamma, theta, get_available_actions, get_transitions):
    """
    This function computes the optimal value function and policy for the specified MDP, using the Value Iteration algorithm.
    'state_count' is the total number of states in the MDP. States are represented as 0-relative numbers.
    'gamma' is the MDP discount factor for rewards.
    'theta' is the small number threshold to signal convergence of the value function (see Iterative Policy Evaluation algorithm).
    'get_available_actions' returns a list of the MDP available actions for the specified state parameter.
    'get_transitions' is the MDP state / reward transiton function.  It accepts two parameters, state and action, and returns
        a list of tuples, where each tuple is of the form: (next_state, reward, probabiliity).  
    """
    V = state_count*[0]                # init all state value estimates to 0
    pi = state_count*[0]
    
    # (this section of code can be removed when actual implementation is added)
    # init with a policy with first avail action for each state
    for s in range(state_count):
        avail_actions = get_available_actions(s)
        pi[s] = avail_actions[0][0]
        
    # insert code here to iterate using policy evaluation and policy improvement (see Policy Iteration algorithm)
    return (V, pi)        # return both the final value function and the final policy

tester.value_iteration_test(value_iteration)  


Testing: Value Iteration
passed test: return value is tuple
passed test: length of tuple = 2
passed test: v is list of length=15
ERROR: v elements don't match expected values: # of mismatches=14


In [7]:
#--- Solutions below - remove all below code cells on the student version of the labs ---
import tester

def eval_formula2(state, action, get_transitions, gamma, V):
    trans_sum = 0
    trans_tuples = get_transitions(state, action)
    for tt in trans_tuples:
        next_state = tt[0]
        reward = tt[1]
        trans_prob = tt[2]
        trans_sum += trans_prob * (reward + gamma * V[next_state])
    return trans_sum

def eval_formula(state, state_action_tuples, get_transitions, gamma, V):
    action_sum = 0
    for at in state_action_tuples:
        action = at[0]
        action_prob = at[1]
        action_sum += action_prob * eval_formula2(state, action, get_transitions, gamma, V)
    return action_sum

def policy_eval_in_place(state_count, gamma, theta, get_policy_actions, get_transitions):
    V = state_count*[0]
    k = 0

    while True:
        delta = 0
        #print("k=", k, "V=", V)

        for s in range(state_count):
            v = V[s]
            state_action_tuples = get_policy_actions(s)
            
            V[s] = eval_formula(s, state_action_tuples, get_transitions, gamma, V)

            delta = max(delta, abs(v-V[s]))
        k += 1
        if (delta < theta):
            break

    print("FINAL k=", k)
    #print("FINAL V=", V)
    return V

def policy_eval_two_arrays(state_count, gamma, theta, get_policy_actions, get_transitions):
    V = state_count*[0]
    V_last = state_count*[0]
    k = 0

    while True:
        delta = 0
        #print("k=", k, "V=", V)

        for s in range(state_count):
            v = V_last[s]
            state_action_tuples = get_policy_actions(s)
            
            V[s] = eval_formula(s, state_action_tuples, get_transitions, gamma, V_last)

            delta = max(delta, abs(v-V[s]))
        k += 1
        if (delta < theta):
            break
        V_last = list(V)

    print("FINAL k=", k)
    #print("FINAL V=", V)
    return V

tester.policy_eval_two_arrays_test(policy_eval_two_arrays)
tester.policy_eval_in_place_test(policy_eval_in_place)



Testing: Policy Evaluation (two-arrays)
FINAL k= 173
passed test: return value is list
passed test: length of list = 15
passed test: values of list elements
PASSED: Policy Evaluation (two-arrays) passcode = 9986-145

Testing: Policy Evaluation (in-place)
FINAL k= 114
passed test: return value is list
passed test: length of list = 15
passed test: values of list elements
PASSED: Policy Evaluation (in-place) passcode = 9991-562


In [9]:
import tester       # required for testing and grading your code

def calc_max_action(state, avail_actions, get_transitions, gamma, V):
    max_action = avail_actions[0]
    max_value = -999999
    
    for action in avail_actions:
        value = eval_formula3(state, action, get_transitions, gamma, V)
        if (value >= max_value):
            max_value = value
            max_action = action
            
    #print("avail_actions=", avail_actions, ", max_action=", max_action, ", max_value=", max_value)
    return max_action

def eval_formula3(state, action, get_transitions, gamma, V):
    trans_sum = 0
    trans_tuples = get_transitions(state, action)
    for tt in trans_tuples:
        next_state = tt[0]
        reward = tt[1]
        trans_prob = tt[2]
        trans_sum += trans_prob * (reward + gamma * V[next_state])
    return trans_sum

def deterministic_policy_eval(state_count, gamma, theta, pi, get_transitions):
    V = state_count*[0]
    k = 0
    #print("deterministic_policy_eval: theta=", theta)
    
    while True:
        delta = 0
        #print("k=", k, "V=", V)

        for s in range(state_count):
            v = V[s]
            at = pi[s]
            action = at[0]
            V[s] = eval_formula3(s, action, get_transitions, gamma, V)
            delta = max(delta, abs(v-V[s]))
        k += 1
        #print("k=", k, "delta=", delta)
        if (delta < theta):
            break

    #print("  Policy Eval step completed: k=", k)
    return V


def policy_iteration(state_count, gamma, theta, get_available_actions, get_transitions):
    # step 1 - initialization
    V = state_count * [0]                # init all state value estimates to 0
    pi = state_count * [0]           
    
    # init with a policy with first avail action for each state
    for s in range(state_count):
        avail_actions = get_available_actions(s)
        pi[s] = avail_actions[0][0]
    
    iteration = 1
    while (True):
        print("Iteration: " + str(iteration))
        
        # step 2 - Policy Evaluation
        V = deterministic_policy_eval(state_count, gamma, theta, pi, get_transitions)

        # step 3 - Policy Improvement
        policy_stable = True
        for s in range(state_count):
            old_action = pi[s]
            avail_actions = get_available_actions(s)
            pi[s] = calc_max_action(s, avail_actions, get_transitions, gamma, V)
            if (old_action != pi[s]):
                policy_stable = False
        #print("  Policy Improvement step completed")

        if policy_stable:
            V = deterministic_policy_eval(state_count, gamma, theta, pi, get_transitions)
            break
        
        iteration += 1
        
    print("final V=", V)
    print("final pi=", pi)
    return (V, pi)        # return both the final value function and the final policy

tester.policy_iteration_test(policy_iteration)  


Testing: Policy Iteration
Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
final V= [0.0, -1.0, -1.999, -2.997001, -1.0, -1.999, -2.997001, -1.999, -1.999, -2.997001, -1.999, -1.0, -2.997001, -1.999, -1.0]
final pi= ['d', 'l', 'l', 'd', 'u', 'u', 'd', 'd', 'u', 'd', 'd', 'd', 'r', 'r', 'r']
passed test: return value is tuple
passed test: length of tuple = 2
passed test: v is list of length=15
passed test: values of v elements
passed test: pi is list of length=15
passed test: values of pi elements
PASSED: Policy Iteration passcode = 9970-010


In [10]:
import tester       # required for testing and grading your code

def eval_formula3(state, action, get_transitions, gamma, V):
    trans_sum = 0
    trans_tuples = get_transitions(state, action)
    for tt in trans_tuples:
        next_state = tt[0]
        reward = tt[1]
        trans_prob = tt[2]
        trans_sum += trans_prob * (reward + gamma * V[next_state])
    return trans_sum

def calc_max_action_value(state, avail_actions, get_transitions, gamma, V):
    max_action = avail_actions[0]
    max_value = -999999
    
    for action in avail_actions:
        value = eval_formula3(state, action, get_transitions, gamma, V)
        if (value >= max_value):
            max_value = value
            max_action = action
            
    #print("avail_actions=", avail_actions, ", max_action=", max_action, ", max_value=", max_value)
    return (max_action, max_value)

def value_iteration(state_count, gamma, theta, get_available_actions, get_transitions):
    V = state_count * [0]                # init all state value estimates to 0
    iteration = 1

    while (True):
        print("Iteration: " + str(iteration))
        delta = 0
        for s in range(state_count):
            v = V[s]
            avail_actions = get_available_actions(s)
            _, V[s] = calc_max_action_value(s, avail_actions, get_transitions, gamma, V)
            delta = max(delta, abs(v - V[s]))
        if (delta < theta):
            break
        iteration += 1

    # finally, calculate the optimal policy from the optimal value function V
    pi = state_count * [0]           
    for s in range(state_count):
        avail_actions = get_available_actions(s)
        pi[s], _ = calc_max_action_value(s, avail_actions, get_transitions, gamma, V)

    print("final V=", V)
    print("final pi=", pi)
    return (V, pi)        # return both the final value function and the final policy

tester.value_iteration_test(value_iteration)  


Testing: Value Iteration
Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
final V= [0.0, -1.0, -1.999, -2.997001, -1.0, -1.999, -2.997001, -1.999, -1.999, -2.997001, -1.999, -1.0, -2.997001, -1.999, -1.0]
final pi= ['d', 'l', 'l', 'd', 'u', 'u', 'd', 'd', 'u', 'd', 'd', 'd', 'r', 'r', 'r']
passed test: return value is tuple
passed test: length of tuple = 2
passed test: v is list of length=15
passed test: values of v elements
passed test: pi is list of length=15
passed test: values of pi elements
PASSED: Value Iteration passcode = 9990-000
