In [1]:
import numpy as np

def policy_evaluation(environment, policy, discount_factor=1.0, theta=0.00001):
    """
    Evaluates a policy given an environment and a full description of the environment's dynamics.

    Args:
        environment: OpenAI gym environment.
        policy: A function that maps an observation to an action.
        discount_factor: Gamma discount factor.
        theta: Maximum error tolerance.

    Returns:
        A float representing the value function of the given policy.
    """
    # Start with a random (all 0) value function
    V = np.zeros(environment.nS)
    while True:
        # Stopping condition
        delta = 0
        # Update each state...
        for s in range(environment.nS):
            # Do a one-step lookahead to find the best action
            a = policy[s]
            # Calculate the value of taking the action
            v_sa = 0
            for prob, next_state, reward, done in environment.P[s][a]:
                v_sa += prob * (reward + discount_factor * V[next_state])
            # How much our value function changed (across any states)
            delta = max(delta, np.abs(v_sa - V[s]))
            V[s] = v_sa
        # Check if we're "close enough"
        if delta < theta:
            break
    return np.array(V)


def policy_improvement(environment, V, discount_factor=1.0):
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.

    Args:
        environment: OpenAI gym environment.
        V: A numpy array of shape [env.nS] representing value function.
        discount_factor: Gamma discount factor.

    Returns:
        A tuple ((policy, V), (policy_stable, V_stable)). Both are numpy arrays of shape [env.nS].
    """
    # Start with a random policy
    policy = np.random.choice(env.nA, size=env.nS)
    # Policy iteration
    while True:
        policy_stable = True
        # Evaluate current policy
        V = policy_evaluation(environment, policy, discount_factor)
        # Go through each state...
        for s in range(environment.nS):
            # The best action we would take under the currect policy
            chosen_a = np.argmax(policy[s])
            # Find the best action by one-step lookahead
            # Ties are resolved arbitarily
            action_values = np.zeros(environment.nA)
            for a in range(environment.nA):
                for prob, next_state, reward, done in environment.P[s][a]:
                    action_values[a] += prob * (reward + discount_factor * V[next_state])
            best_a = np.argmax(action_values)
            # Greedily update the policy
            if chosen_a != best_a:
                policy_stable = False
            policy[s] = np.eye(environment.nA)[best_a]
        # If the policy is stable we've found an optimal policy. Return it
        if policy_stable:
            return policy, V
    return policy, V