In [None]:
import sys
import gym
import numpy as np

In [None]:
env = gym.make('FrozenLake-v1',is_slippery=True)

In [None]:

def calculate_state_value(env, current_state, value_matrix, discount_factor):

    num_actions = env.action_space.n
    action_values = np.zeros(shape=num_actions)
    for action in range(num_actions):
        for transition_prob, next_state, reward, done in env.env.P[current_state][action]:
            action_values[action] += transition_prob * (reward + discount_factor * value_matrix[next_state])
    return action_values


In [None]:
def evaluate_policy(policy_matrix, environment, discount_factor=1.0, convergence_threshold=1e-9, max_iterations=1000):
    num_states = environment.observation_space.n
    evaluation_iterations = 1
    state_values = np.zeros(shape=num_states)

    for iteration in range(int(max_iterations)):
        delta = 0
        for current_state in range(num_states):
            new_state_value = 0
            for action, action_probability in enumerate(policy_matrix[current_state]):
                for state_probability, next_state, reward, done in environment.P[current_state][action]:
                    new_state_value += action_probability * state_probability * (reward + discount_factor * state_values[next_state])
            delta = max(delta, np.abs(state_values[current_state] - new_state_value))
            state_values[current_state] = new_state_value

        evaluation_iterations += 1

        if delta < convergence_threshold:
            print(f'Policy evaluation terminated after {evaluation_iterations} iterations.\n')
            return state_values


In [None]:
def policy_iteration_algorithm(environment, discount_factor=1.0, max_iterations=1000):
    num_states = environment.observation_space.n
    num_actions = environment.action_space.n
    policy_matrix = np.ones(shape=[num_states, num_actions]) / num_actions
    evaluated_policies_count = 1

    for iteration in range(int(max_iterations)):
        stable_policy = False
        value_function = evaluate_policy(policy_matrix, environment, discount_factor)

        for current_state in range(num_states):
            current_action = np.argmax(policy_matrix[current_state])
            action_values = calculate_state_value(environment, current_state, value_function, discount_factor)
            best_action = np.argmax(action_values)

            if current_action != best_action:
                stable_policy = True

            policy_matrix[current_state] = np.eye(num_actions)[best_action]

        evaluated_policies_count += 1

        if stable_policy:
            print(f'Found a stable policy after {evaluated_policies_count:,} evaluations.\n')
            return policy_matrix, value_function


In [None]:
def value_iteration_algorithm(environment, discount_factor=1e-1, convergence_threshold=1e-9, max_iterations=1e4):
    state_values = np.zeros(environment.observation_space.n)

    for iteration in range(int(max_iterations)):
        delta = 0

        for current_state in range(environment.observation_space.n):
            action_values = calculate_state_value(environment, current_state, state_values, discount_factor)
            best_action_value = np.max(action_values)
            delta = max(delta, np.abs(state_values[current_state] - best_action_value))
            state_values[current_state] = best_action_value

        if delta < convergence_threshold:
            print(f'\nValue iteration converged at iteration #{iteration+1:,}')
            break

    policy_matrix = np.zeros(shape=[environment.observation_space.n, environment.action_space.n])

    for current_state in range(environment.observation_space.n):
        action_values = calculate_state_value(environment, current_state, state_values, discount_factor)
        best_action = np.argmax(action_values)
        policy_matrix[current_state, best_action] = 1.0

    return policy_matrix, state_values


In [None]:
def play_episodes_and_evaluate(env, num_episodes, policy_matrix, max_actions=100, render=False):

    total_wins = 0
    total_rewards, total_actions = 0, 0

    for episode in range(num_episodes):
        current_state = env.reset()
        episode_done, actions_taken = False, 0

        while actions_taken < max_actions:
            selected_action = np.argmax(policy_matrix[current_state])
            next_state, reward, episode_done, _ = env.step(selected_action)

            if render:
                env.render()

            actions_taken += 1
            total_rewards += reward
            current_state = next_state

            if episode_done:
                total_wins += 1
                break

        total_actions += actions_taken

    print(f'Total rewards: {total_rewards:,}\tMax actions: {actions_taken:,}')

    average_reward = total_rewards / num_episodes
    average_actions = total_actions / num_episodes

    print('')
    return total_wins, total_rewards, average_reward, average_actions


In [None]:
num_episodes = 1000

def agent_and_evaluate(env):

    total_rewards_list = []

    action_mapping = {
        0: '\u2191',  # up
        1: '\u2192',  # right
        2: '\u2193',  # down
        3: '\u2190'   # left
    }

    iteration_methods = [
        ('Policy Iteration', policy_iteration_algorithm),
        ('Value Iteration', value_iteration_algorithm)
    ]

    for method_name, method_func in iteration_methods:
        policy_matrix, value_function = method_func(env)

        print(f'Final policy using {method_name}:')
        print(' '.join([action_mapping[action] for action in np.argmax(policy_matrix, axis=1)]))

        total_wins, total_rewards, avg_reward, avg_actions = play_episodes_and_evaluate(env, num_episodes, policy_matrix)
        total_rewards_list.append(total_rewards)

        print(f'Number of wins = {total_wins:,}')
        print(f'Average reward = {avg_reward:.2f}')
        print(f'Average actions = {avg_actions:.2f}')

    return total_rewards_list


In [None]:
rewards = agent_and_evaluate(env)

Policy evaluation terminated after 66 iterations.

Found a stable policy after 2 evaluations.

Final policy using Policy Iteration:
↑ ← ↑ ← ↑ ↑ ↑ ↑ ← → ↑ ↑ ↑ ↓ → ↑


  if not isinstance(terminated, (bool, np.bool8)):


Total rewards: 735.0	Max actions: 45

Number of wins = 1,000
Average reward = 0.73
Average actions = 41.14

Value iteration converged at iteration #8
Final policy using Value Iteration:
→ ← ↓ ← ↑ ↑ ↑ ↑ ← → ↑ ↑ ↑ ↓ → ↑
Total rewards: 438.0	Max actions: 24

Number of wins = 1,000
Average reward = 0.44
Average actions = 27.64
