
### ***Implementing Policy and Value Iteration on the FrozenLake-v1 Environment***
#  
#### **Done by:**  Kishan Kanaiyalal Patel (200527734) 
---



In [1]:
# Step 1: Import necessary libraries

import numpy as np         
import gym                 # OpenAI gym for reinforcement learning environments
import warnings            # Python warnings module for handling warning messages

# Filter out warning messages to keep the output clean
warnings.filterwarnings('ignore')


In [2]:
# Step 2: Initialize the FrozenLake-v1 environment

env = gym.make('FrozenLake-v1', is_slippery=False)  # Create the environment object
env.reset()  # Reset the environment to the initial state

# Get the number of possible states and actions in the environment
n_states = env.observation_space.n   # Number of possible states
n_actions = env.action_space.n       # Number of possible actions

# Print the size of the action space and the number of possible states
print("Action space size: {}".format(n_actions))
print("State space size: {}".format(n_states))

Action space size: 4
State space size: 16


In [3]:
# Step 3: Policy evaluation function (Evaluates the value function for the given policy using iterative policy evaluation.)
     
def policy_evaluation(policy, gamma, smallest_change):
    
    # Initialize the value function for all states to 0
    V = np.zeros(n_states)  

    while True:

        # Initialize the change in the value function to 0      
        delta = 0  

        for s in range(n_states):  # Loop through all possible states
            
            # Store the previous value of the state
            v = V[s]  

            # Calculate the new value of the state using the Bellman equation for the given policy
            V[s] = sum([p*(r + gamma*V[s_]) for p, s_, r, _ in env.P[s][policy[s]]])

            # Update the change in the value function
            delta = max(delta, abs(v - V[s]))

        if delta < smallest_change:  # If the change is below the threshold, break out of the loop
            break

    return V  # Return the resulting value function


In [4]:
# Step 4: Policy improvement function (Improves the policy based on the value function)

def policy_improvement(V, gamma):

    # Create an array to hold the new policy
    policy = np.zeros(n_states, dtype=int)
    
    # Iterate over all states in the environment
    for s in range(n_states):
      
        # Compute the Q-values for all actions in the current state
        q_values = [sum([p*(r + gamma*V[s_]) for p, s_, r, _ in env.P[s][a]]) for a in range(n_actions)]
        
        # Choose the action with the highest Q-value as the new action for the current state
        policy[s] = np.argmax(q_values)
    
    # Return the new policy
    return policy


In [5]:
# Step 5: Find the optimal policy using policy iteration

gamma = 0.9  # set the discount factor
smallest_change = 1e-10  # set the smallest change to consider for convergence
n_iterations = 1000  # set the maximum number of iterations

policy = np.zeros(n_states, dtype=int)  # initialize the policy

for i in range(n_iterations):

    # evaluate the value function for the current policy
    V = policy_evaluation(policy, gamma, smallest_change)  

    # improve the policy based on the value function
    new_policy = policy_improvement(V, gamma)  

    if np.array_equal(policy, new_policy):  # check if the policy has converged
        print("Policy iteration converged after %d iterations." % i)
        break

    policy = new_policy  # update the policy


Policy iteration converged after 6 iterations.


In [6]:
# Step 6: Performing the test pass on the environment 

# Defining a function to test a given policy
def test_policy(policy):
    total_reward = 0
    state = env.reset() # Reset the environment and get the initial state

    while True:
        action = policy[state] # Select the action to take based on the policy
        state, reward, done, _ = env.step(action) # Take the action and observe the next state and reward
        total_reward += reward # Accumulate the reward

        # Print out the current state, the action taken, the reward received and the total reward accumulated so far
        print(f"State: {state}, Action: {action}, Reward: {reward}, Total reward: {total_reward}")

        if done: # If the episode is finished, break the loop
            print("Episode finished")
            break
            
    return total_reward

# Test the optimal policy obtained from policy iteration and print the total reward
total_reward = test_policy(policy)
print("\nTotal reward with optimal policy: ", total_reward)


State: 4, Action: 1, Reward: 0.0, Total reward: 0.0
State: 8, Action: 1, Reward: 0.0, Total reward: 0.0
State: 9, Action: 2, Reward: 0.0, Total reward: 0.0
State: 13, Action: 1, Reward: 0.0, Total reward: 0.0
State: 14, Action: 2, Reward: 0.0, Total reward: 0.0
State: 15, Action: 2, Reward: 1.0, Total reward: 1.0
Episode finished

Total reward with optimal policy:  1.0


In [7]:
# Step 7: Taking random steps

# Define a function to test a random policy
def random_policy_test():
  
    # Initialize total_reward to 0 and get the initial state of the environment
    total_reward = 0
    state = env.reset()

    # Start an infinite loop until the episode is finished
    while True:

        # Choose a random action from the action space
        action = env.action_space.sample()

        # Take the chosen action and get the resulting state, reward, and whether the episode is finished
        state, reward, done, _ = env.step(action) 

        # Update the total reward with the received reward
        total_reward += reward

        # Print out the current state, the action taken, the reward received and the total reward accumulated so far
        print(f"State: {state}, Action: {action}, Reward: {reward}, Total reward: {total_reward}")
        
        # If the episode is finished, break the loop
        if done:
            print("Episode finished")
            break
        
    return total_reward

# Call the random policy test function and print the total reward received
total_reward = random_policy_test()
print("\nTotal reward with random policy: ", total_reward)


State: 0, Action: 3, Reward: 0.0, Total reward: 0.0
State: 4, Action: 1, Reward: 0.0, Total reward: 0.0
State: 8, Action: 1, Reward: 0.0, Total reward: 0.0
State: 12, Action: 1, Reward: 0.0, Total reward: 0.0
Episode finished

Total reward with random policy:  0.0


In [8]:
# Step 8: Perform value iteration

# Define a function to perform value iteration
def value_iteration(gamma, theta):

    # Initialize the state value function to 0 for all states
    V = np.zeros(n_states)
    
    # Start an infinite loop until convergence
    while True:
  
        # Initialize the change in state values to 0
        delta = 0
        
        # Update the value of each state
        for s in range(n_states):

            # Keep track of the old value of the state
            v = V[s]
            
            # Calculate the Q-values for each action in the state
            q_values = [sum([p*(r+gamma*V[s_]) for p, s_, r, _ in env.P[s][a]]) for a in range(n_actions)]
            
            # Set the value of the state to the maximum Q-value
            V[s] = max(q_values)
            
            # Update the change in state values
            delta = max(delta, abs(v - V[s]))
        
        # If the change change in state values is less than the threshold, stop iterating
        if delta < theta:
            break
    
    # Improve the policy based on the updated value function
    policy = policy_improvement(V, gamma)
    
    # Return the optimal policy
    return policy

# Set the convergence threshold
theta = 1e-8

# Perform value iteration to obtain the optimal policy
optimal_policy = value_iteration(gamma, theta)


In [9]:
# Test the optimal policy
total_reward = test_policy(optimal_policy)
print("\nTotal reward with optimal policy: ", total_reward)
print('\n')

# Test the random policy
total_reward = random_policy_test()
print("\nTotal reward with random policy: ", total_reward)

State: 4, Action: 1, Reward: 0.0, Total reward: 0.0
State: 8, Action: 1, Reward: 0.0, Total reward: 0.0
State: 9, Action: 2, Reward: 0.0, Total reward: 0.0
State: 13, Action: 1, Reward: 0.0, Total reward: 0.0
State: 14, Action: 2, Reward: 0.0, Total reward: 0.0
State: 15, Action: 2, Reward: 1.0, Total reward: 1.0
Episode finished

Total reward with optimal policy:  1.0


State: 4, Action: 1, Reward: 0.0, Total reward: 0.0
State: 0, Action: 3, Reward: 0.0, Total reward: 0.0
State: 0, Action: 0, Reward: 0.0, Total reward: 0.0
State: 1, Action: 2, Reward: 0.0, Total reward: 0.0
State: 0, Action: 0, Reward: 0.0, Total reward: 0.0
State: 1, Action: 2, Reward: 0.0, Total reward: 0.0
State: 1, Action: 3, Reward: 0.0, Total reward: 0.0
State: 0, Action: 0, Reward: 0.0, Total reward: 0.0
State: 0, Action: 3, Reward: 0.0, Total reward: 0.0
State: 4, Action: 1, Reward: 0.0, Total reward: 0.0
State: 5, Action: 2, Reward: 0.0, Total reward: 0.0
Episode finished

Total reward with random policy:  0.

In [10]:
print("Optimal value function:")
print(V, '\n')

print("Optimal policy:")
print(optimal_policy)

Optimal value function:
[0.59049 0.6561  0.729   0.6561  0.6561  0.      0.81    0.      0.729
 0.81    0.9     0.      0.      0.9     1.      0.     ] 

Optimal policy:
[1 2 1 0 1 0 1 0 2 1 1 0 0 2 2 0]
