In [1]:
import numpy as np
import gymnasium as gym

In [12]:
def construct_dynamics(env):
    P = env.env.env.env.P
    P_tensor = np.zeros((env.observation_space.n, 2, env.observation_space.n, env.action_space.n))
    rewards = set()
    terminals = np.zeros(env.observation_space.n, dtype=np.bool)
    for state, actions in P.items():
        for action, outcomes in actions.items():
            for prob, next_state, reward, terminated in outcomes:
                P_tensor[next_state, int(reward), state, action] += prob
                terminals[next_state] = terminated
                rewards.add(reward)

    rewards = np.array(sorted(list(rewards)))
    return P_tensor, rewards, terminals

In [13]:
env = gym.make('FrozenLake8x8-v1')
P_tensor, rewards, terminals = construct_dynamics(env)

In [14]:
# Check to make sure that each tensor has been correctly constructed.
np.all(np.sum(P_tensor, axis=(0,1)) == 1),rewards,terminals

(np.True_,
 array([0., 1.]),
 array([False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False,
        False,  True, False, False, False, False, False, False, False,
        False, False,  True, False, False, False, False, False,  True,
        False, False, False, False, False,  True,  True, False, False,
        False,  True, False, False,  True, False, False,  True, False,
         True, False, False, False, False,  True, False, False, False,
         True]))

In [17]:
def v2q(P_tensor, rewards, terminals, v, gamma=1):
    r_sa = np.sum(P_tensor * rewards[np.newaxis, :, np.newaxis, np.newaxis], axis=(0,1))
    q = r_sa + gamma * np.sum(np.sum(P_tensor, axis=1) *
                              v[:, np.newaxis, np.newaxis] *
                              (1 - terminals[:, np.newaxis, np.newaxis]), axis=0)
    return q

In [20]:
def iterate_value(P_tensor, rewards, terminals, gamma=1, tolerance=1e-20):
    n_states = P_tensor.shape[0]
    n_actions = P_tensor.shape[3]
    v = np.zeros(n_states)
    while True:
        v_next = np.max(v2q(P_tensor, rewards, terminals, v, gamma), axis=1)
        delta = np.max(np.abs(v_next - v))
        v = v_next
        if delta < tolerance:
            break

    # Calculate optimal policy
    policy = np.zeros((n_states, n_actions))
    q = v2q(P_tensor, rewards, terminals, v, gamma)
    actions = np.argmax(q, axis=1)
    policy[np.arange(n_states), actions] = 1.0
    return policy, v

In [21]:
policy, v = iterate_value(P_tensor, rewards, terminals)

In [23]:
np.argmax(policy, axis=1).reshape(8,8)

array([[1, 1, 2, 2, 2, 2, 1, 2],
       [3, 3, 3, 3, 3, 3, 3, 1],
       [0, 0, 0, 0, 2, 3, 3, 2],
       [0, 0, 0, 1, 0, 0, 2, 2],
       [0, 3, 0, 0, 2, 1, 3, 2],
       [0, 0, 0, 1, 3, 0, 0, 2],
       [0, 0, 1, 0, 0, 0, 0, 2],
       [0, 1, 0, 0, 1, 2, 1, 0]])

In [24]:
v.reshape(8,8)

array([[1.        , 1.        , 1.        , 1.        , 1.        ,
        1.        , 1.        , 1.        ],
       [1.        , 1.        , 1.        , 1.        , 1.        ,
        1.        , 1.        , 1.        ],
       [1.        , 0.97820163, 0.92643052, 0.        , 0.85661768,
        0.94623163, 0.98207721, 1.        ],
       [1.        , 0.9346049 , 0.80108992, 0.47490377, 0.6236214 ,
        0.        , 0.94467761, 1.        ],
       [1.        , 0.82561308, 0.54223433, 0.        , 0.53934275,
        0.61118923, 0.85195561, 1.        ],
       [1.        , 0.        , 0.        , 0.16804079, 0.38321763,
        0.44226934, 0.        , 1.        ],
       [1.        , 0.        , 0.19467347, 0.12090475, 0.        ,
        0.33240114, 0.        , 1.        ],
       [1.        , 0.73155782, 0.46311564, 0.        , 0.27746705,
        0.5549341 , 0.77746705, 0.        ]])

In [25]:
def test_policy(policy, n=200):
    env = gym.make("FrozenLake8x8-v1")
    rewards = []
    for i in range(n):
        episode_reward = 0
        state, _ = env.reset()
        while True:
            action = np.random.choice(env.action_space.n, p=policy[state])
            state, reward, terminated, truncated, _ = env.step(action)
            episode_reward += reward
            if terminated or truncated: break
        rewards.append(episode_reward)
    return rewards


In [27]:
rewards = test_policy(policy)
np.mean(rewards), np.std(rewards)

(np.float64(0.775), np.float64(0.41758232721225164))