In [2]:
import gym
import sys
import numpy as np

taxi_env = gym.make("Taxi-v3").env

taxi_env.render()

+---------+
|[35mR[0m: | : :[34;1mG[0m|
| : | : : |
| : : : : |
| | : | : |
|Y| : |[43mB[0m: |
+---------+



In [3]:
def value_iteration(env, gamma=0.9, theta=1e-5):
    '''
    Value Iteration Algorithm

    Inputs:
        env: environment, i.e. openai gym taxicab game
        gamma: Discount rate for future rewards.
        theta: Stopping criterion value. When change in Value function is less than theta for every state, stop.

    Helper Methods:
        calculate_action_values: Calculates the values for all actions for a given state.
                                Returns a vector action_values of length num_actions, where 
                                action_values[a] = expected value of action a.
                                The expected value is calculated according to the Bellman equation:
                                V(s) = P(s'|s,a) * ( R(s,a) + (gamma * V(s')) )
        extract_policy: Returns the optimal policy for a given value function. It is run once at the end of the algorithm
                        after the optimal V (value function) has been calculated.

    Outputs:
        A tuple (policy, V, steps) of the optimal policy, the approximated optimal value function, and the number of steps
        the algorithm took to converge.
    '''
    
    def calculate_action_values(current_state, V):
        action_values = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[current_state][a]:
                action_values[a] += prob * (reward + (gamma * V[next_state]))
        return action_values
    
    def extract_policy(V):
        policy = np.zeros([env.nS, env.nA])
        
        for s in range(env.nS):
            action_values = calculate_action_values(s, V)
            best_action = np.argmax(action_values) # returns index of action that has maximum V
            policy[s, best_action] = 1 # deterministic optimal policy, i.e. always take best_action for given state
        
        return policy
    
    V = np.zeros(env.nS) # arbitrarily initialize vector V to be all zeros
    converged = False
    steps = 0
    
    # iteratively calculate optimal V
    while not converged:
        #print('Value iteration, step ', steps, '...')
        delta = 0
        for s in range(env.nS):
            action_values = calculate_action_values(s, V)
            max_action_value = np.max(action_values)
            delta = max( delta, np.abs(max_action_value - V[s]) ) # the maximum difference between V'(s) and V(s) for all s
            V[s] = max_action_value        
        
        steps += 1
        
        #print('Delta: ', delta)
        converged = (delta < theta)
        #print(converged)
    
    # extract optimal policy after calculating optimal V
    policy = extract_policy(V)
    
    return policy, V, steps

In [4]:
policy, V, steps = value_iteration(env=taxi_env)

In [34]:
print(policy)
print('Steps: ', steps)

[[0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 1. 0.]
 ...
 [0. 1. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0.]]
Steps:  74


In [38]:
from IPython.display import clear_output
from time import sleep

# run the game once under the current policy
# and animate game output
def run_episode():
    gamma = 0.9
    state = taxi_env.reset() # initialize the environment to a random state
    #taxi_env.render()
    total_reward = 0
    penalties = 0
    step = 0
    frames = [] # for rendering animation of output
    done = False

    while not done:
        action = policy[state].tolist().index(1)
        state, reward, done, info = taxi_env.step(action) # move to the next state

        if reward == -10:
            penalties += 1

        total_reward += (gamma ** step * reward)
        step += 1

        frames.append(taxi_env.render(mode='ansi'))

    #print('Total reward: ', total_reward)
    for i in range(len(frames)):
        clear_output(wait=True)
        print(frames[i])
        sleep(.1)
        
    return total_reward, penalties

In [39]:
run_episode()

+---------+
|[35m[34;1m[43mR[0m[0m[0m: | : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |B: |
+---------+
  (Dropoff)



(-3.1369622635116983, 0)

In [41]:
# run for 5 simulations and calculate average metrics

all_rewards = []
all_penalties = []

for t in range(5):
    print('Simulation # ', t)
    total_reward, penalties = run_episode()
    all_rewards.append(total_reward)
    all_penalties.append(penalties)

print('Avg total reward: ', np.mean(all_rewards))
print('Avg # penalties: ', np.mean(all_penalties))
    

+---------+
|R: | : :[35m[34;1m[43mG[0m[0m[0m|
| : | : : |
| : : : : |
| | : | : |
|Y| : |B: |
+---------+
  (Dropoff)

Avg total reward:  -0.8601362155944445
Avg # penalties:  0.0
