In [None]:
# GPI - GENERAL POLICY ITERATION
# MONTE CARLO POLICY EVALUATION AND IMPROVEMENT

import gymnasium as gym
import numpy as np
from collections import defaultdict

env = gym.make("FrozenLake-v1", is_slippery=False)
n_states = env.observation_space.n
n_actions = env.action_space.n

V = np.zeros(n_states)
returns = [[] for _ in range(n_states)]
policy = np.random.choice(n_actions, size=n_states) # Q function! Q(s, a)
discount = 0.9
eps = 0.1
max_episodes = 1000
episode_counter = 0

transitions = defaultdict(list)
terminal_states = set()  # Discovered terminal states

def get_action(state):
    return np.random.choice(n_actions) if np.random.rand() < eps else policy[state]

def value_update(trajectory):
    visited = set()
    G = 0
    for s, a, r, s1 in reversed(trajectory):
        if s in terminal_states:
            continue 
        G = r + discount * G
        if s not in visited:
            returns[s].append(G)
            V[s] = np.mean(returns[s])
            visited.add(s)

def improve_policy():
    global policy
    for s in range(n_states):
        if s in terminal_states:
            continue
        action_values = np.zeros(n_actions)
        for a in range(n_actions):
            outcomes = transitions[(s, a)]
            if not outcomes:
                continue
            for s1, r in outcomes:
                action_values[a] += r + discount * V[s1]
            action_values[a] /= len(outcomes)
        best = np.flatnonzero(action_values == np.max(action_values))
        if best.size > 0:
            policy[s] = np.random.choice(best) # Q function

# Main training loop
while episode_counter < max_episodes:
    trajectory = []
    observation, _ = env.reset()
    s = observation

    while True:
        if s in terminal_states:
            break  # Don't act from terminal states
        a = get_action(s)
        s1, r, terminated, truncated, _ = env.step(a)
        trajectory.append((s, a, r, s1))
        transitions[(s, a)].append((s1, r))

        if terminated:
            terminal_states.add(s1)
            value_update(trajectory)
            break
        if truncated:
            break
        s = s1

    if episode_counter % 100 == 0:
        improve_policy()

    episode_counter += 1

print("Final policy:")
print(policy.reshape(4, 4))
print("Value function:")
print(V.reshape(4, 4))

env.close()


Final policy:
[[2 2 1 0]
 [3 0 1 0]
 [2 2 1 3]
 [3 2 2 1]]
Value function:
[[0.36135307 0.44071563 0.50923008 0.12997341]
 [0.06468798 0.         0.63016505 0.        ]
 [0.01695349 0.27320339 0.78938872 0.        ]
 [0.         0.466125   0.98722045 0.        ]]


In [5]:
# Run the environment once using the learned policy

env = gym.make("FrozenLake-v1", is_slippery=False, render_mode="human")
observation, info = env.reset()

done = False

while not done:
    action = policy[observation]
    observation, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

env.close()