In [1]:
import numpy as np
import gym
from gym import wrappers

# Intialize P for test example
#Left =0
#Down = 1
#Right = 2
#Up= 3

P = {s : {a: [] for a in range(4)} for s in range(4)}
P[0][0] = [(0, 0, 0, False)]
P[0][1] = [(1, 2, -1, False)]
P[0][2] = [(1, 1, 0, False)]
P[0][3] = [(0, 0, 0, False)]
P[1][0] = [(1, 0, -1, False)]
P[1][1] = [(1, 3, 1, True)]
P[1][2] = [(0, 0, 0, False)]
P[1][3] = [(0, 0, 0, False)]
P[2][0] = [(0, 0, 0, False)]
P[2][1] = [(0, 0, 0, False)]
P[2][2] = [(1, 3, 1, True)]
P[2][3] = [(1, 0, 0, False)]
P[3][0] = [(0, 0, 0, True)]
P[3][1] = [(0, 0, 0, True)]
P[3][2] = [(0, 0, 0, True)]
P[3][3] = [(0, 0, 0, True)]



# Problem 1
def value_iteration(P, nS ,nA, beta = 1, tol=1e-8, maxiter=3000):
    """Perform Value Iteration according to the Bellman optimality principle.

    Parameters:
        P (dict): The Markov relationship
                (P[state][action] = [(prob, nextstate, reward, is_terminal)...]).
        nS (int): The number of states.
        nA (int): The number of actions.
        beta (float): The discount rate (between 0 and 1).
        tol (float): The stopping criteria for the value iteration.
        maxiter (int): The maximum number of iterations.

    Returns:
       v (ndarray): The discrete values for the true value function.
       n (int): number of iterations
    """
    V_old = np.zeros(nS)
    for k in range(maxiter):              #only perform maxiter times
        V_new = np.copy(V_old)
        for s in range(nS):
            sa_vector = np.zeros(nA)
            for a in range(nA):
                for tuple_info in P[s][a]:
                    p, s_, u, urmom = tuple_info

                    sa_vector[a] += (p * (u + beta * V_old[s_]))       #calculate the possible rewards of each action

            V_new[s] = np.max(sa_vector)                #find the max reward
        if np.linalg.norm((V_old - V_new)) < tol:       #check to see if the the V's are close enough to end
            break
        V_old = np.copy(V_new)                            #continue to the next step by letting Vk = Vk+1

    return np.array(V_new), k+1

def test1():
    print(value_iteration(P, 4, 4))
test1()

(array([1., 1., 1., 0.]), 3)


In [2]:
# Problem 2
def extract_policy(P, nS, nA, v, beta = 1.0):
    """Returns the optimal policy vector for value function v

    Parameters:
        P (dict): The Markov relationship
                (P[state][action] = [(prob, nextstate, reward, is_terminal)...]).
        nS (int): The number of states.
        nA (int): The number of actions.
        v (ndarray): The value function values.
        beta (float): The discount rate (between 0 and 1).

    Returns:
        policy (ndarray): which direction to move in from each square.
    """
    pi = np.zeros(nS)      #create the vector of policies
    for s in range(nS):
        action_r = np.zeros(nA)       #we will calculate the consequences of each possible action
        for a  in range(nA):
            for tuple_info in P[s][a]:
                p, s_, u, urmom = tuple_info
                action_r[a] += (p * (u + beta * v[s_]))           #calculate the results of a move from a given state

        pi[s] = np.argmax(action_r)          #find the action that gives the greatest reward

    return pi

def test2():
    v  = value_iteration(P, 4, 4)[0]
    print(extract_policy(P, 4, 4, v, beta=1.0))
test2()

[2. 1. 2. 0.]


In [3]:
# Problem 3
def compute_policy_v(P, nS, nA, policy, beta=1.0, tol=1e-8):
    """Computes the value function for a policy using policy evaluation.

    Parameters:
        P (dict): The Markov relationship
                (P[state][action] = [(prob, nextstate, reward, is_terminal)...]).
        nS (int): The number of states.
        nA (int): The number of actions.
        policy (ndarray): The policy to estimate the value function.
        beta (float): The discount rate (between 0 and 1).
        tol (float): The stopping criteria for the value iteration.

    Returns:
        v (ndarray): The discrete values for the true value function.
    """
    go = True
    V = np.zeros(nS)
    while go is True:
        V1 = np.zeros(nS)
        for s in range(nS):          #iterate for each possible state
            for tuple_info in P[s][policy[s]]:
                p, s_, u, urmom = tuple_info
                V1[s] += (p * (u + beta * V[s_]))        #use 11.7 to calculate the possible reward
        if np.linalg.norm((V1 - V)) < tol:            #end if we are sufficiently close
            break
        V = np.copy(V1)

    return V1

def test3():
    v  = value_iteration(P, 4, 4)[0]
    policy = extract_policy(P, 4, 4, v, beta=1.0)
    print(compute_policy_v(P, 4,4,policy))
test3()

[1. 1. 1. 0.]


In [4]:
# Problem 4
def policy_iteration(P, nS, nA, beta=1, tol=1e-8, maxiter=200):
    """Perform Policy Iteration according to the Bellman optimality principle.

    Parameters:
        P (dict): The Markov relationship
                (P[state][action] = [(prob, nextstate, reward, is_terminal)...]).
        nS (int): The number of states.
        nA (int): The number of actions.
        beta (float): The discount rate (between 0 and 1).
        tol (float): The stopping criteria for the value iteration.
        maxiter (int): The maximum number of iterations.

    Returns:
    	v (ndarray): The discrete values for the true value function
        policy (ndarray): which direction to move in each square.
        n (int): number of iterations
    """
    V = np.zeros(nS)
    pi0 = np.random.choice(nA, nS)      #initialize random policy vector
    for k in range(maxiter):
        V = compute_policy_v(P, nS, nA, pi0, beta, tol)       #use our previous functions in the given algorithm
        pi1 = extract_policy(P, nS, nA, V, beta)

        if np.linalg.norm((pi1 - pi0)) < tol:        #end if the two policy functions are sufficiently close
            break

        pi0 = pi1

    return V, pi1, k

def test4():
    print(policy_iteration(P, 4, 4))
test4()

(array([1., 1., 1., 0.]), array([2., 1., 2., 0.]), 2)


In [7]:
# Problem 5 and 6
def frozen_lake(basic_case=True, M=1000, render=False):
    """ Finds the optimal policy to solve the FrozenLake problem

    Parameters:
    basic_case (boolean): True for 4x4 and False for 8x8 environemtns.
    M (int): The number of times to run the simulation using problem 6.
    render (boolean): Whether to draw the environment.

    Returns:
    vi_policy (ndarray): The optimal policy for value iteration.
    vi_total_rewards (float): The mean expected value for following the value iteration optimal policy.
    pi_value_func (ndarray): The maximum value function for the optimal policy from policy iteration.
    pi_policy (ndarray): The optimal policy for policy iteration.
    pi_total_rewards (float): The mean expected value for following the policy iteration optimal policy.
    """
    if basic_case is True:
        env_name = 'FrozenLake-v1'
        env = gym.make(env_name).env
        # Find number of states and actions
        number_of_states = env.observation_space.n
        number_of_actions = env.action_space.n
        # Get the dictionary with all the states and actions
        dictionary_P = env.P

    else:
        env_name = 'FrozenLake8x8-v1'
        env = gym.make(env_name).env
        # Find number of states and actions
        number_of_states = env.observation_space.n
        number_of_actions = env.action_space.n
        # Get the dictionary with all the states and actions
        dictionary_P = env.P

    totrewvi = 0           #keep track of total rewards for mean reward
    totrewpi = 0

    vi_value_func, vi_iters = value_iteration(dictionary_P, number_of_states,number_of_actions, beta = 1.0, tol=1e-8, maxiter=3000) #use our function find the discrete values 
    vi_policy = extract_policy(dictionary_P, number_of_states, number_of_actions, vi_value_func, beta = 1.0)   #find the optimal policy by vi

    for m in range(M):
        rewardvi = run_simulation(env_name, vi_policy, render)     #run the simulation with vi policy and calculate the rewards
        totrewvi += rewardvi

    vi_total_rewards = totrewvi/M

    pi_value_func, pi_policy, _ = policy_iteration(dictionary_P, number_of_states, number_of_actions, beta=1, tol=1e-8, maxiter=200)  #find optimal policy by pi
    for m in range(M):
        rewardpi = run_simulation(env_name, pi_policy, render)       #run the simulation with pi policy and calculate rewards
        totrewpi += rewardpi

    pi_total_rewards = totrewpi/M

    env.close()

    return vi_policy, vi_total_rewards, pi_value_func, pi_policy, pi_total_rewards

    


# Problem 6
def run_simulation(env, policy, render=False, beta = 1.0):
    """ Evaluates policy by using it to run a simulation and calculate the reward.

    Parameters:
    env (gym environment): The gym environment.
    policy (ndarray): The policy used to simulate.
    beta float: The discount factor.
    render (boolean): Whether to draw the environment.

    Returns:
    total reward (float): Value of the total reward received under policy.
    """
    env_name = env
    done = False
    env = gym.make(env_name).env
    # Put environment in starting state
    obs = env.reset()
    totreward = 0
    k = 0

    while done is False:          #continue until simulation ends
        obs, reward, done, _ = env.step(int(policy[obs]))      #take the step directed by our policy
        if render is True: 
            env.render(mode = 'human')
        totreward += (beta**k) * reward           #calculate the reward taking beta into account
        k += 1

    return totreward

def test6():
    policy = frozen_lake()[0]
    print(run_simulation('FrozenLake-v1', policy, render=True))
test6()

0.0
