In [1]:
!pip install gymnasium



In [2]:
# 1. Policy Evaluation

# Goal: Calculate the value of each state (V(s)) under the current policy.

# How:
# For each state, update its value to be the expected sum of rewards the agent would get by following the current policy starting from that state.
# What you do:
# For every state, use the current policy to decide the action.
# Calculate the expected value (reward + discounted future value) for that action.
# Repeat for all states, updating their values until they stop changing significantly (convergence).

# 2. Policy Improvement

# Goal: Improve the policy by acting greedily with respect to the current value function.
# How:
# For each state, look at all possible actions and choose the action that yields the highest expected value (according to the value function you just computed).
# What you do:
# For every state, consider all actions.
# Pick the action that gives the highest expected value (reward + discounted value of next state).
# Update the policy to always pick this best action in that state.


# 3. Repeat Until Policy Stable

# If the policy didn't change in the last policy improvement step, you're done!
# Otherwise, go back to Policy Evaluation and repeat.





# Initialize: policy arbitrarily, V(s) = 0 for all states

# Repeat:
#     1. Policy Evaluation:
#         Repeat:
#             For each state s:
#                 V(s) = expected return from following policy in state s
#         Until V(s) converges

#     2. Policy Improvement:
#         For each state s:
#             policy(s) = argmax_a expected return for taking action a in s

# Until policy is stable (no change)


In [3]:
import numpy as np
import gymnasium as gym

In [4]:
# 1. Create the FrozenLake environment (4x4 version, deterministic for clarity)
env = gym.make('FrozenLake-v1', map_name="4x4", is_slippery=False)  # Set is_slippery=False for deterministic behavior

n_states = env.observation_space.n   # Number of states (16 for 4x4 grid)
n_actions = env.action_space.n       # Number of possible actions (4: Left, Down, Right, Up)


In [5]:
# 2. Initialize Policy and Value Function
policy = np.zeros(n_states, dtype=int)     # Start with "all left" actions
V = np.zeros(n_states)                     # Value function for all states


In [6]:
# 3. Set algorithm hyperparameters
theta = 1e-8       # Small threshold for policy evaluation convergence
gamma = 0.99       # Discount factor

In [7]:
def one_step_lookahead(state, V):
    """
    Helper function to calculate action values for a given state.
    """
    action_values = np.zeros(n_actions)
    for action in range(n_actions):
        for prob, next_state, reward, terminated in env.unwrapped.P[state][action]:
            action_values[action] += prob * (reward + gamma * V[next_state])
    return action_values

In [8]:
# 4. Policy Iteration Algorithm

is_policy_stable = False
iteration = 0

while not is_policy_stable:
    iteration += 1
    # --- POLICY EVALUATION ---
    while True:
        delta = 0
        for state in range(n_states):
            v = V[state]
            action = policy[state]
            v_new = 0
            for prob, next_state, reward, terminated in env.unwrapped.P[state][action]:
                v_new += prob * (reward + gamma * V[next_state])
            V[state] = v_new
            delta = max(delta, abs(v - v_new))
        if delta < theta:
            break

    # --- POLICY IMPROVEMENT ---
    is_policy_stable = True
    for state in range(n_states):
        old_action = policy[state]
        action_values = one_step_lookahead(state, V)
        best_action = np.argmax(action_values)
        policy[state] = best_action
        if old_action != best_action:
            is_policy_stable = False

    print(f"Iteration {iteration}: Policy Stable = {is_policy_stable}")

Iteration 1: Policy Stable = False
Iteration 2: Policy Stable = False
Iteration 3: Policy Stable = False
Iteration 4: Policy Stable = False
Iteration 5: Policy Stable = False
Iteration 6: Policy Stable = False
Iteration 7: Policy Stable = True


In [9]:
# 5. Show results
print("\nOptimal Policy (0=Left, 1=Down, 2=Right, 3=Up):")
print(policy.reshape(4, 4))
print("\nOptimal State Value Function:")
print(V.reshape(4, 4))


Optimal Policy (0=Left, 1=Down, 2=Right, 3=Up):
[[1 2 1 0]
 [1 0 1 0]
 [2 1 1 0]
 [0 2 2 0]]

Optimal State Value Function:
[[0.95099005 0.96059601 0.970299   0.96059601]
 [0.96059601 0.         0.9801     0.        ]
 [0.970299   0.9801     0.99       0.        ]
 [0.         0.99       1.         0.        ]]


In [12]:
# 6. Evaluate the learned policy
def run_episode(env, policy, render=False):
    state, _ = env.reset()
    total_reward = 0
    steps = 0
    while True:
        if render:
            env.render()
        action = policy[state]
        next_state, reward, terminated, truncated, info = env.step(action)
        total_reward += reward
        state = next_state
        steps += 1
        if terminated or truncated:
            break
            
    env.close()
    
    return total_reward, steps



In [None]:
# Create FrozenLake
env = gym.make("FrozenLake-v1", is_slippery=False, render_mode="human", map_name="4x4")


n_episodes = 100
total_rewards = []
for _ in range(n_episodes):
    reward, steps = run_episode(env, policy, render=False)
    total_rewards.append(reward)

print(f"\nAverage reward over {n_episodes} episodes: {np.mean(total_rewards)}")