In [1]:
import gym
import numpy as np
import time
from IPython import display

In [2]:
env = gym.make('FrozenLake-v0')

In [7]:
def initialization(env):
    policy = np.random.randint(4, size=env.observation_space.n, dtype=np.int)
    
    return policy

In [8]:
policy = initialization(env)

In [9]:
policy

array([0, 0, 2, 1, 3, 0, 0, 3, 1, 3, 3, 2, 2, 1, 0, 3])

In [10]:
def policy_evaluation(env, policy, max_iters, gamma):
    v_values = np.zeros(env.observation_space.n)

    for i in range(max_iters):
        prev_v_values = np.copy(v_values)

        # Compute the value for state
        for state in range(env.observation_space.n):
            # Compute the q-value for each action
            action = policy[state]
            q_value = 0
                # Loop through each possible outcome
            for prob, next_state, reward, done in env.P[state][action]:
                q_value += prob * (reward + gamma * prev_v_values[next_state])
                
            
            # Select the best action
            v_values[state] = q_value
        
        # Check convergence
        if np.all(np.isclose(v_values, prev_v_values)):
#             print(f'Converged at {i}-th iteration.')
            break
    
    return v_values

In [11]:
v_values = policy_evaluation(env, policy, max_iters=1000, gamma=0.9)

In [12]:
v_values

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [13]:
def policy_improvement(env, old_policy, old_v_values, gamma):
    policy = np.zeros(env.observation_space.n, dtype=np.int)
        # Compute the value for state
    for state in range(env.observation_space.n):
        q_values = []
            # Compute the q-value for each action
        for action in range(env.action_space.n):
            q_value = 0
                # Loop through each possible outcome
            for prob, next_state, reward, done in env.P[state][action]:
                q_value += prob * (reward + gamma * old_v_values[next_state])
                
            q_values.append(q_value)
            
            # Select the best action
        best_action = np.argmax(q_values)
        policy[state] = best_action
        
        # Check convergence

    return policy

In [14]:
policy_improvement(env, policy, v_values, gamma=0.9)

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0])

In [15]:
def policy_iteration(env, max_iters, gamma):
    policy = initialization(env)
    for i in range(max_iters):
        v_values = policy_evaluation(env, policy, max_iters=1000, gamma=0.9)
        new_policy = policy_improvement(env, policy, v_values, gamma=0.9)
        if (np.array_equal(policy, new_policy)):
            print(f'Converged at {i}-th iteration.')
            break
        policy = new_policy.copy()
    return policy

In [16]:
policy = policy_iteration(env, max_iters=1000, gamma=0.9)

Converged at 5-th iteration.


In [17]:
policy

array([0, 3, 0, 3, 0, 0, 0, 0, 3, 1, 0, 0, 0, 2, 1, 0])

In [18]:
def play(env, policy):
    state = env.reset()
    total_reward = 0
    done = False
    steps = 0
#     time.sleep(1)
#     display.clear_output(wait=True)
    while not done:
        action = policy[state]
        next_state, reward, done, info = env.step(action)
        total_reward += reward
        steps += 1
#         print(f'Step {steps}')
#         env.render()
#         time.sleep(0.2)
#         if not done:
#            display.clear_output(wait=True)
        state = next_state

    return total_reward

In [None]:
play(env, policy)

In [21]:
def play_multiple_times(env, policy, max_episodes):
    success = 0

    for i in range(max_episodes):
        reward = play(env, policy)

        if reward > 0:
            success += 1
    
    print(f'Number of successes: {success}/{max_episodes}')

In [22]:
play_multiple_times(env, policy, 1000)

Number of successes: 733/1000
