In [1]:
import gym
import numpy as np

In [2]:
env = gym.make('FrozenLake-v0')

In [3]:
env.reset()
env.render()


[41mS[0mFFF
FHFH
FFFH
HFFG


In [4]:
discount = 0.8

In [5]:
def iterate_value(env, discount=1.0, max_iters=10000, convergence=1e-20):
    
    # initialize reward value for each state
    V = np.zeros(env.observation_space.n)
    
    for i in range(max_iters):
        # for convergence comparison
        old_V = np.copy(V) 
        
        # calculate reward value for each state
        for state in range(env.observation_space.n):
            state_rewards = []
            for action in range(env.action_space.n):
                expected_rewards = []
                # calculate total expected reward for each action
                for next_state_action in env.env.P[state][action]: 
                    transition_p, next_state, reward_p, _ = next_state_action 
                    expected_rewards.append((transition_p * (reward_p + discount * V[next_state]))) 
                
                state_rewards.append(np.sum(expected_rewards))
            
            # update value table with reward for action with the best reward
            V[state] = max(state_rewards) 
            
        # check convergence
        if (np.sum(np.fabs(V - old_V)) <= convergence):
            print(f'Converged on iteration {i}')
            return(V)
    
    print(f'Did not converge in {i} iterations, returning last value table')
    return(V)

In [6]:
def get_policy(value_array, discount=1.0):
 
    # initialize policy
    policy = np.zeros(env.observation_space.n) 
    
    for state in range(env.observation_space.n):
        # initialize the rewards for a state
        V = np.zeros(env.action_space.n)
        
        # compute expected reward for all actions of state
        for action in range(env.action_space.n):
            for next_state_action in env.env.P[state][action]: 
                transition_p, next_state, reward_p, _ = next_state_action 
                V[action] += (transition_p * (reward_p + discount * value_array[next_state]))
        
        # select the action which has max reward as an optimal action of the state
        policy[state] = np.argmax(V)
    
    return policy

In [7]:
value_array =  iterate_value(env, discount=discount)

Converged on iteration 105


In [9]:
value_array.reshape(4,4)

array([[0.01543434, 0.0155907 , 0.0274401 , 0.01568006],
       [0.02685373, 0.        , 0.05978021, 0.        ],
       [0.05841341, 0.13378315, 0.1967357 , 0.        ],
       [0.        , 0.2465377 , 0.54419553, 0.        ]])

In [None]:
policy = get_policy(value_array, discount=discount)

In [None]:
policy

In [None]:
def run_policy(env, policy):
    env.reset()
    env.render()
    reward = 0
    state = 0
    step = 0
    dead = False
    while not dead and reward == 0:
        next = env.step(int(policy[state]))
        dead = next[2]
        reward += next[1]
        state = next[0]
        step += 1
        env.render()
    
    if reward == 1:
        print(f'Gotem in {step} steps')
    else: 
        print(f'RIP in {step} steps')

In [None]:
run_policy(env, policy)