In [13]:
import gym
import numpy as np


In [14]:
env = gym.make("FrozenLake-v1")
P_FrozenLake = env.env.P


In [15]:
def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
    old_V = np.zeros(len(P))
    while True:
        V = np.zeros(len(P))
        for s in range(len(P)):
            for prob, next_state, reward, done in P[s][pi(s)]:
                V[s] += prob * (reward + gamma * old_V[next_state] * (not done))
        if np.max(np.abs(V - old_V)) < theta:
            break;
        old_V = V.copy()
    return V

In [16]:
def policy_improvement(V, P, gamma=1.0):
    Q = np.zeros((len(P), len(P[0])))
    for s in range(len(P)):
        for a in range(len(P[s])):
            for prob, next_state, reward, done in P[s][a]:
                Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
    new_pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
    return new_pi

In [17]:
def policy_iteration(P, gamma=1.0, theta=1e-10):
    pi = lambda s: s % 4  # 随便给一个初始策略
    while True:
        old_pi_values = {s:pi(s) for s in range(len(P))}
        V = policy_evaluation(pi, P, gamma, theta)
        pi = policy_improvement(V, P, gamma)
        if old_pi_values == {s:pi(s) for s in range(len(P))}:
            break
    return V, pi

In [18]:
optimal_V, optimal_pi = policy_iteration(P_FrozenLake, gamma=1.0, theta=1e-10)

In [19]:
optimal_V

array([0.82352941, 0.82352941, 0.82352941, 0.82352941, 0.82352941,
       0.        , 0.52941176, 0.        , 0.82352941, 0.82352941,
       0.76470588, 0.        , 0.        , 0.88235294, 0.94117647,
       0.        ])

In [27]:
{s:optimal_pi(s) for s in range(len(P_FrozenLake))}

{0: 0,
 1: 3,
 2: 3,
 3: 3,
 4: 0,
 5: 0,
 6: 0,
 7: 0,
 8: 3,
 9: 1,
 10: 0,
 11: 0,
 12: 0,
 13: 2,
 14: 1,
 15: 0}

In [23]:
def value_iteration(P, gamma=1.0, theta=1e-10):
    V = np.zeros(len(P))
    while True:
        Q = np.zeros((len(P), len(P[0])))
        for s in range(len(P)):
            for a in range(len(P[s])):
                for prob, next_state, reward, done in P[s][a]:
                    Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
        new_V = np.max(Q, axis=1)
        if np.max(np.abs(new_V - V)) < theta:
            break
        V = new_V.copy()
    pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
    return V, pi

In [24]:
opt_V, opt_pi = value_iteration(P_FrozenLake, gamma=1.0, theta=1e-10)

In [25]:
opt_V

array([0.82352941, 0.82352941, 0.82352941, 0.82352941, 0.82352941,
       0.        , 0.52941176, 0.        , 0.82352941, 0.82352941,
       0.76470588, 0.        , 0.        , 0.88235294, 0.94117647,
       0.        ])

In [28]:
{s:opt_pi(s) for s in range(len(P_FrozenLake))}

{0: 0,
 1: 3,
 2: 3,
 3: 3,
 4: 0,
 5: 0,
 6: 0,
 7: 0,
 8: 3,
 9: 1,
 10: 0,
 11: 0,
 12: 0,
 13: 2,
 14: 1,
 15: 0}