In [1]:
import numpy as np
import gym
import time
from gym import wrappers
from gym.envs.registration import register
from IPython.display import display, clear_output

In [2]:
def run_the_game():
    """to show how to run a env"""
    done = False
    env.reset()
    display(env.render())
    while not done:
        random_action = env.action_space.sample()
        """
        observation：进入的新状态
        reward：采取这个行动得到的奖励
        done：当前游戏是否结束
        info：其他一些信息，如性能表现，延迟等等，可用于调优
        """
        new_state, reward, done, info = env.step(
            random_action
        )
        clear_output(wait=True)
        display(env.render())
        time.sleep(1)

> Discounted sum of rewards from time step t to horizon<br>
    $G_t = R_{t+1} + γR_{t+2} + γ^2R_{t+3} + γ^3R_{t+4} + ... + γ^{T−t−1}R_T$

In [10]:
def run_episode(env, policy, gamma=1.0, render=False):
    """
    run this game
    0: left
    1: down
    2: right
    3: up
    """
    obs = env.reset() # state initialization
    total_reward = 0
    step_idx = 0
    
    while True:
        if render:
            env.render()
        
        obs, reward, done, _ = env.step(int( policy[obs] ))
        
        # MDP algorithms
        total_reward += (gamma ** step_idx * reward)
        step_idx += 1
        if done:
            break
    return total_reward

In [4]:
def evaluate_policy(env, policy, gamma = 1.0, n=100):
    """take the average of 100 samples"""
    scores =[ run_episode(env, policy, gamma, False) for _ in range(n)]
    return np.mean(scores)

In [5]:
def extract_policy(v, gamma=1.0):
    """Extract the policy given a value-function"""
    policy = np.zeros(env.env.nS) # 16
    
    for s in range(env.env.nS):
        q_sa = np.zeros(env.env.nA)
        
        for a in range(env.env.nA):
            q_sa[a] = sum( [p*(r + gamma*v[s_]) for p, s_, r, _ in env.env.P[s][a]] )
        
        policy[s] = np.argmax(q_sa)
    
    return policy

In [6]:
def compute_policy_v(env, policy, gamma=1.0):
    v = np.zeros(env.env.nS)
    eps = 1e-10 # greedy parameters
    while True:
        prev_v = np.copy(v)
        for s in range(env.env.nS):
            policy_a = policy[s]
            v[s] = sum( [p*(r+gamma*prev_v[s_]) for p, s_, r, _ in env.env.P[s][policy_a]] )
        if (np.sum(np.fabs(prev_v - v)) <= eps):
            break
    return v

In [7]:
def policy_iteration(env, gamma=1.0):
    """policy-Iteration algorithm"""
    policy = np.random.choice(env.env.nA, size=(env.env.nS)) # initialize a random policy 随机生成策略作为初始值
    max_iteration = 2*10^5
    
    for i in range(max_iteration):
        old_policy_v = compute_policy_v(env, policy, gamma)
        new_policy = extract_policy(old_policy_v, gamma)
        if (np.all(policy == new_policy)):
            print('Policy-Iteration converged at step %d.' % (i+1))
            break
        policy = new_policy
    return policy

In [8]:
if __name__ =='__main__':
    env = gym.make('FrozenLake-v0')
    optimal_policy = policy_iteration(env, gamma=1.0)
    scores = evaluate_policy(env, optimal_policy, gamma=1.0)
    print('Average scores = ', scores)

Policy-Iteration converged at step 6.
Average scores =  0.74


In [9]:
optimal_policy

array([0., 3., 3., 3., 0., 0., 0., 0., 3., 1., 0., 0., 0., 2., 1., 0.])

In [11]:
run_episode(env, policy=optimal_policy, render=True)


[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FF[41mF[0mH
HFFG
  (Left)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
HF[41mF[0mG


1.0