In [1]:
import numpy as np

def value_iteration(env, theta=0.0001, gamma=1.0):
    """
    Value Iteration Algorithm.
    Args:
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment.
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than delta for all states.
        gamma: Gamma discount factor.
    Returns:
        A tuple (policy, V, iterations) of the optimal policy, the optimal value function, and iterations.
    """

    def one_step_lookahead(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        A = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                A[a] += prob * (reward + gamma * V[next_state])
        return A

    V = np.zeros(env.nS)
    iterations = 0
    while True:
        iterations += 1
        delta = 0
        for state in range(env.nS):
            A = one_step_lookahead(state, V)  
            best_act_value = np.max(A)
            delta = max(delta, np.abs(best_act_value - V[state]))
            V[state] = best_act_value  # update value to best action value
        if delta < theta:  # if delta improvement is less than threshold
            break
            
    policy = np.zeros([env.nS, env.nA])
    for state in range(env.nS): 
        A = one_step_lookahead(state, V)
        best_action = np.argmax(A)
        policy[state][best_action] = 1.0
        
    return policy, V, iterations

def policy_evaluation(policy, env, theta=0.00001, gamma=1.0):
    """
    Evaluate a policy given an environment and a full description of the environment's dynamics.
    Args:
        policy: [S, A] shaped matrix representing the policy.
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment.
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        gamma: Gamma discount factor.
    Returns:
        Vector of length env.nS representing the value function.
    """
    # Start with a random (all 0) value function
    V = np.zeros(env.nS)
    while True:
        delta = 0
        for state in range(env.nS):
            val = 0  # initiate value as 0
            for action, act_prob in enumerate(policy[state]):  # for all actions/action probabilities
                for prob, next_state, reward, done in env.P[state][action]:
                    val += act_prob * prob * (reward + gamma * V[next_state])
            delta = max(delta, np.abs(val - V[state]))
            V[state] = val
        if delta < theta:  # if delta improvement is less than threshold
            break
    return np.array(V)


def policy_iteration(env, policy_eval_function=policy_evaluation, gamma=1.0):
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
    Args:
        env: The OpenAI envrionment.
        policy_eval_function: Policy Evaluation function that takes 3 arguments:
            policy, env, gamma.
        discount_factor: gamma discount factor.
    Returns:
        A tuple (policy, V).
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
    """

    def one_step_lookahead(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        A = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                A[a] += prob * (reward + gamma * V[next_state])
        return A
    
    policy = np.ones([env.nS, env.nA]) / env.nA # Start with a random policy
    iterations = 0
    while True:
        iterations += 1
        curr_pol_value = policy_eval_function(policy, env, gamma)  # eval current policy
        policy_stable = True  # Check if policy did improve
        for state in range(env.nS): 
            chosen_act = np.argmax(policy[state])  # best action base on current policy
            act_values = one_step_lookahead(state, curr_pol_value)
            best_act = np.argmax(act_values)  # find best action
            if chosen_act != best_act:
                policy_stable = False  # Greedily find best action
            policy[state] = np.eye(env.nA)[best_act]  # update
        if policy_stable:
            return policy, curr_pol_value, iterations

    return policy, np.zeros(env.nS)

In [2]:
import gym
from timeit import default_timer as timer
env_name  = 'Taxi-v3'
env = gym.make(env_name)
start = timer()
policy, V, iterations = value_iteration(env, gamma=0.99)
end = timer()
print("Value Iteration: {}s in {} iters".format(end - start, iterations))
print(V)


Value Iteration: 7.247082096999975s in 610 iters
