In [25]:
import numpy as np
from itertools import cycle

# !pip install git+https://github.com/mimoralea/gym-walk#egg=gym-walk
import gym, gym_walk
import random

np.set_printoptions(suppress=True)
random.seed(123)
np.random.seed(123)


In [26]:
def print_policy(pi, P, action_symbols=("<", "v", ">", "^"), n_cols=4, title="Policy:"):
    print(title)
    arrs = {k: v for k, v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi(s)
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0:
            print("|")


def print_state_value_function(V, P, n_cols=4, prec=3, title="State-value function:"):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), "{}".format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0:
            print("|")


def print_action_value_function(
    Q, optimal_Q=None, action_symbols=("<", ">"), prec=3, title="Action-value function:"
):
    vf_types = ("",) if optimal_Q is None else ("", "*", "err")
    headers = [
        "s",
    ] + [" ".join(i) for i in list(itertools.product(vf_types, action_symbols))]
    print(title)
    states = np.arange(len(Q))[..., np.newaxis]
    arr = np.hstack((states, np.round(Q, prec)))
    if not (optimal_Q is None):
        arr = np.hstack((arr, np.round(optimal_Q, prec), np.round(optimal_Q - Q, prec)))
    print(tabulate(arr, headers, tablefmt="fancy_grid"))


def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200):
    random.seed(123)
    np.random.seed(123)
    env.seed(123)
    results = []
    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        while not done and steps < max_steps:
            state, _, done, h = env.step(pi(state))
            steps += 1
        results.append(state == goal_state)
    return np.sum(results) / len(results)


def mean_return(env, pi, n_episodes=100, max_steps=200):
    random.seed(123)
    np.random.seed(123)
    env.seed(123)
    results = []
    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        results.append(0.0)
        while not done and steps < max_steps:
            state, reward, done, _ = env.step(pi(state))
            results[-1] += reward
            steps += 1
    return np.mean(results)


In [27]:
def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
    prev_V = np.zeros(len(P), dtype=np.float64)
    while True:
        V = np.zeros(len(P), dtype=np.float64)
        for s in range(len(P)):
            for prob, next_state, reward, done in P[s][pi(s)]:
                V[s] += prob * (reward + gamma * prev_V[next_state] * (not done))
        if np.max(np.abs(V - prev_V)) < theta:
            break
        prev_V = V.copy()
    return V


In [28]:
def policy_improvement(V, P, gamma=1.0):
    Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
    for s in range(len(P)):
        for a in range(len(P[s])):
            for prob, next_state, reward, done in P[s][a]:
                Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
    new_pi = lambda s: {s: a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
    return new_pi


In [38]:
def policy_iteration(P, gamma=1.0, theta=1e-10):
    random_actions = np.random.choice(tuple(P[0].keys()), len(P))
    pi = lambda s: {s: a for s, a in enumerate(random_actions)}[s]

    while True:
        old_pi = {s: pi(s) for s in range(len(P))}
        V = policy_evaluation(pi, P, gamma, theta)
        pi = policy_improvement(V, P, gamma)
        if old_pi == {s: pi(s) for s in range(len(P))}:
            break

    return V, pi


In [30]:
def value_iteration(P, gamma=1.0, theta=1e-10):
    V = np.zeros(len(P), dtype=np.float64)
    while True:
        Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
        for s in range(len(P)):
            for a in range(len(P[s])):
                for prob, next_state, reward, done in P[s][a]:
                    Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
        if np.max(np.abs(V - np.max(Q, axis=1))) < theta:
            break
        V = np.max(Q, axis=1)
        pi = lambda s: {s: a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
        return V, pi


In [33]:
env = gym.make("SlipperyWalkFive-v0")
P = env.env.P
init_state = env.reset()
goal_state = 6

LEFT, RIGHT = range(2)
pi = lambda s: {0: LEFT, 1: LEFT, 2: LEFT, 3: LEFT, 4: LEFT, 5: LEFT, 6: LEFT}[s]
print_policy(pi, P, action_symbols=("<", ">"), n_cols=7)
print(
    "Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.".format(
        probability_success(env, pi, goal_state=goal_state) * 100, mean_return(env, pi)
    )
)


Policy:
|           | 01      < | 02      < | 03      < | 04      < | 05      < |           |
Reaches goal 7.00%. Obtains an average undiscounted return of 0.0700.


# Policy Evaluation

In [34]:
V = policy_evaluation(pi, P)
print_state_value_function(V, P, n_cols=7, prec=5)

State-value function:
|           | 01 0.00275 | 02 0.01099 | 03 0.03571 | 04 0.10989 | 05 0.33242 |           |


# Policy Improvement

In [35]:
improved_pi = policy_improvement(V, P)
print_policy(improved_pi, P, action_symbols=("<", ">"), n_cols=7)
print(
    'Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'\
        .format(probability_success(env, improved_pi, goal_state=goal_state) * 100, mean_return(env, improved_pi))
)

Policy:
|           | 01      > | 02      > | 03      > | 04      > | 05      > |           |
Reaches goal 93.00%. Obtains an average undiscounted return of 0.9300.


In [36]:
improved_V = policy_evaluation(improved_pi, P)
print_state_value_function(improved_V, P, n_cols=7, prec=5)

State-value function:
|           | 01 0.66758 | 02 0.89011 | 03 0.96429 | 04 0.98901 | 05 0.99725 |           |


# Policy-iteration

In [39]:
optimal_V, optimal_pi = policy_iteration(P)
print('Optimal policy and state-value function (PI):')
print_policy(optimal_pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, optimal_pi, goal_state=goal_state)*100, 
    mean_return(env, optimal_pi)))
print()
print_state_value_function(optimal_V, P, n_cols=7, prec=5)

Optimal policy and state-value function (PI):
Policy:
|           | 01      > | 02      > | 03      > | 04      > | 05      > |           |
Reaches goal 93.00%. Obtains an average undiscounted return of 0.9300.

State-value function:
|           | 01 0.66758 | 02 0.89011 | 03 0.96429 | 04 0.98901 | 05 0.99725 |           |


# FROZEN LAKE MDP

In [45]:
env = gym.make('FrozenLake-v1')
P = env.env.P
init_state = env.reset()
goal_state = 15

LEFT, DOWN, RIGHT, UP = range(4)
random_pi = lambda s: {
    0:RIGHT, 1:LEFT, 2:DOWN, 3:UP,
    4:LEFT, 5:LEFT, 6:RIGHT, 7:LEFT,
    8:UP, 9:DOWN, 10:UP, 11:LEFT,
    12:LEFT, 13:RIGHT, 14:DOWN, 15:LEFT
}[s]
print_policy(random_pi, P)
print(
    probability_success(env, random_pi, goal_state=goal_state) * 100,
    mean_return(env, random_pi)
)

Policy:
| 00      > | 01      < | 02      v | 03      ^ |
| 04      < |           | 06      > |           |
| 08      ^ | 09      v | 10      ^ |           |
|           | 13      > | 14      v |           |
12.0 0.12


In [46]:
improved_state = policy_evaluation(random_pi, P)
print_state_value_function(improved_state, P)

State-value function:
| 00  0.118 | 01  0.059 | 02  0.059 | 03  0.059 |
| 04  0.176 |           | 06  0.059 |           |
| 08  0.235 | 09  0.294 | 10  0.118 |           |
|           | 13  0.529 | 14  0.765 |           |


In [47]:
improved_pi = policy_improvement(improved_state, P)
print_policy(improved_pi, P)

Policy:
| 00      < | 01      ^ | 02      < | 03      ^ |
| 04      < |           | 06      < |           |
| 08      ^ | 09      v | 10      < |           |
|           | 13      > | 14      v |           |


In [51]:
optimal_V, optimal_pi = policy_iteration(P)
print('Optimal policy and state-value function (PI):')
print_policy(optimal_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, optimal_pi, goal_state=goal_state)*100, 
    mean_return(env, optimal_pi)))
print()
print_state_value_function(optimal_V, P)

Optimal policy and state-value function (PI):
Policy:
| 00      < | 01      ^ | 02      ^ | 03      ^ |
| 04      < |           | 06      < |           |
| 08      ^ | 09      v | 10      < |           |
|           | 13      > | 14      v |           |
Reaches goal 74.00%. Obtains an average undiscounted return of 0.7400.

State-value function:
| 00  0.824 | 01  0.824 | 02  0.824 | 03  0.824 |
| 04  0.824 |           | 06  0.529 |           |
| 08  0.824 | 09  0.824 | 10  0.765 |           |
|           | 13  0.882 | 14  0.941 |           |
