In [4]:
import gym
import numpy as np
# # https://github.com/openai/gym/blob/master/gym/envs/toy_text/frozen_lake.py
env = gym.make('FrozenLake-v1')
env.reset()

obs_n = env.observation_space.n
act_n = env.action_space.n

# transition matrix
P = np.zeros((obs_n, act_n, obs_n))
# reward function
R = np.zeros((obs_n, act_n, obs_n))
for obs in range(obs_n):
    for act, results in env.P[obs].items():
        for (prob, next_obs, reward, done) in results:
            P[obs, act, next_obs] = prob
            R[obs, act, next_obs] = reward
            if obs == obs_n - 1:
                # terminal states should always have reward 1
                R[obs, act, next_obs] = 1
# Reward at terminal states?
gamma = 0.9

def calc_policy(V):
    """get a deterministic policy that will acting greedly w.r.t. value function"""
    policy = np.zeros((obs_n, act_n))
    for obs in range(obs_n):
        Q = []
        for act in range(act_n):
            # find action with highest q value
            q = 0
            for next_obs in range(obs_n):
                q += R[obs, act, next_obs] * P[obs, act, next_obs] + gamma * P[obs, act, next_obs] * V[next_obs]
            Q.append(q)
        best_act = Q.index(max(Q))
        policy[obs, best_act] = 1
    return policy

Policy Evaluation:

$$v_{\pi_{k+1}} (s)= \sum_{a \in \mathcal{A}} \pi(a|s) \left( \mathcal{R}^a_s + \gamma \sum_{s' \in S} \mathcal{P}^a_{ss'} \ v_{\pi_k}(s') \right) $$

In [5]:
def policy_eval(policy, k = 10):
    """ Bellman Expectation Equation """
    # set initial values
    V = np.zeros((obs_n))
    for _ in range(k):
        # synchronous backups
        old_value = np.copy(V)
        for obs in range(obs_n):
            for act in range(act_n):
                for next_obs in range(obs_n):
                    V[obs] += policy[obs, act] * (R[obs, act, next_obs] * P[obs, act, next_obs]\
                                            + gamma * P[obs, act, next_obs] * old_value[next_obs])
    return V

def policy_iter():
    """calculate value function of the policy and update policy, until policy unchange"""
    policy = np.zeros((obs_n, act_n))
    # initialize policy: always go downwards
    policy[:, 1] = 1
    
    count = 0
    while True:
        count += 1
        V = policy_eval(policy)
        new_policy = calc_policy(V)
        if (new_policy == policy).all():
            print(count)
            return policy
        policy = np.copy(new_policy)

policy = policy_iter()

3


Value Iteration

$$v_{k+1}(s) = \max_{a \in \mathcal{A}} \left(\mathcal{R}^a_{s} + \gamma \sum_{s \in \mathcal{S}} \mathcal{P}_{ss'}^a \ v_k(s') \right)$$

In [14]:
def update_v(V):
    """Bellman Optimality Equation"""
    # synchronous backups
    old_V = np.copy(V)
    for obs in range(obs_n):
        Qs = []
        for act in range(act_n):
            # calculate q-values for each action
            q = 0
            for next_obs in range(obs_n):
                q += P[obs, act, next_obs] * R[obs, act, next_obs] +\
                    gamma * P[obs, act, next_obs] * old_V[next_obs]
            Qs.append(q)
        V[obs] = max(Qs)
    return V

def value_iter():
    """iteratively update BOE, until the policy calculate by BOE unchange"""
    # set initial values
    # no initial policy needed
    V = np.zeros(obs_n)
    policy = calc_policy(V)
    while True:
        V = update_v(V)
        new_policy = calc_policy(V)
        if (new_policy == policy).all():
            return policy
        policy = np.copy(new_policy)

policy = value_iter()

In [13]:
def act(policy, obs):
    return np.argmax(policy[int(obs)])

import time
obs = env.reset()
done = False
while not done:
    # print(actions[act(policy, obs)])
    obs, reward, done, _ = env.step(act(policy, obs))
    env.render()
    time.sleep(1)
env.close()

感想：First try to program elementwise before you want to anything matrixwise

In [71]:
actions ={0: 'LEFT',
          1: 'DOWN',
          2: 'RIGHT',
          3: 'UP',}