# 以 Monte Carlo 求解 Cliff Walking

## 載入套件

In [1]:
import numpy as np
import random
import sys
import gymnasium as gym

## 載入遊戲

In [4]:
env = gym.make('CliffWalking-v0')

In [5]:
# 隨機策略
def create_random_policy(env):
     policy = {}
     for key in range(0, env.observation_space.n):
          current_end = 0
          p = {}
          for action in range(0, env.action_space.n):
               p[action] = 1 / env.action_space.n
          policy[key] = p
     return policy
    
# 狀態/行動矩陣
def create_state_action_dictionary(env, policy):
    Q = {}
    for key in policy.keys():
         Q[key] = {a: 0.0 for a in range(0, env.action_space.n)}
    return Q

In [6]:
def run_game(env, policy):
    s, info = env.reset()
    episode = []
    done = False
    
    while not done:
        # s = env.env.s        
        timestep = []
        timestep.append(s)
        # 依照Q值比例抽樣
        n = random.uniform(0, sum(policy[s].values()))
        top_range = 0
        for prob in policy[s].items():
             top_range += prob[1]
             if n < top_range:
                   action = prob[0]
                   break 
        s, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        timestep.append(action)
        timestep.append(reward)        
        episode.append(timestep)
    return episode

In [7]:
def monte_carlo(env, episodes=100, policy=None, epsilon=0.01):
    # 一開始採隨機策略
    if not policy:
        policy = create_random_policy(env)   
    # 初始化狀態/行動矩陣
    Q = create_state_action_dictionary(env, policy) 
    returns = {}
    
    for i_episode in range(episodes): # Looping through episodes
        # 每 1000 回合顯示除錯訊息
        if (i_episode+1) % 1000 == 0:
            print(f"\r {(i_episode+1)}/{episodes}回合.", end="")
            sys.stdout.flush() # 清除畫面
            
        # 初始化報酬
        G = 0 
        # 測試遊戲
        episode = run_game(env=env, policy=policy) 

        # 倒推計算路徑中每個狀態的值函數
        for i in reversed(range(0, len(episode))):   
            s_t, a_t, r_t = episode[i] 
            state_action = (s_t, a_t)
            G += r_t 
            
            if not state_action in [(x[0], x[1]) for x in episode[0:i]]: # 
                if returns.get(state_action):
                    returns[state_action].append(G)
                else:
                    returns[state_action] = [G]   

                # 計算值函數期望值
                Q[s_t][a_t] = sum(returns[state_action]) / len(returns[state_action]) 
                # 找到最大值函數
                Q_list = list(map(lambda x: x[1], Q[s_t].items())) 
                indices = [i for i, x in enumerate(Q_list) if x == max(Q_list)]
                max_Q = random.choice(indices)
                
                # ε-greedy策略
                A_star = max_Q 
                for a in policy[s_t].items(): # Update action probability for s_t in policy
                    if a[0] == A_star: # 最佳行動的機率再加 1 - ε 
                        policy[s_t][a[0]] = 1 - epsilon + (epsilon / abs(sum(policy[s_t].values())))
                    else: # 每個行動的機率初始化，均為 ε / n
                        policy[s_t][a[0]] = (epsilon / abs(sum(policy[s_t].values())))

    return policy

In [8]:
policy = monte_carlo(env, episodes=20000)

KeyboardInterrupt: 

In [None]:
def test_policy(policy, env):
    wins = 0
    r = 1000
    for i in range(r):
        w = run_game(env, policy)[-1][-1]
        if w == 1:
              wins += 1
    return wins / r
    
test_policy(policy, env)