# モンテカルロ法でCartPoleを学習する

In [1]:
# ref: http://neuro-educator.com/rl1/
# ref: https://qiita.com/sugulu/items/7a14117bbd3d926eb1f2

In [2]:
import gym
import numpy as np
import pandas as pd
from collections import deque 

import utils.display as disp
import utils.learning as learn

In [3]:
class LearningHistory:
    def __init__(self, max_size=200):
        self.buffer = deque(maxlen=max_size)
        
    def add(self, experience):
        self.buffer.append(experience)
        
    def sample(self):
        return self.buffer.pop()
    
    def len(self):
        return len(self.buffer)

class QMontecarlo:
    def __init__(self, num_state, num_action, alpha=0.5, gamma=0.99):
        #self.q_table = np.zeros(shape=(num_state, num_action))
        self.q_table = np.random.uniform(low=-1, high=1, size=(num_state, num_action))
        self.alpha = alpha # 学習率
        self.gamma = gamma# 時間割引率
    
    def get_action(self, state, episode):
         # ε-グリーディ法で行動を選択
        epsilon = 0.5 * (1 / (episode + 1))
        if epsilon <= np.random.uniform(0, 1):
            action = np.argmax(self.q_table[state])
        else:
            action = np.random.choice([0, 1])
        return action

    # 1エピソードの学習結果からQ値を更新する
    def update_q_table(self, history):
        total_reward_t = 0
        while (history.len() > 0):
            (state, action, reward) = history.sample()
            # 時間割引率をかける
            total_reward_t = self.gamma * total_reward_t
            # Q値 = 今回の報酬 + 未来に獲得できる報酬 に近づくように更新する
            self.q_table[state, action] = self.q_table[state, action] + self.alpha * (reward + total_reward_t - self.q_table[state, action])
            total_reward_t = total_reward_t + reward # ステップtより先に貰えた報酬

In [6]:
env = gym.make('CartPole-v0')
observation = env.reset()

num_max_episode = 5000 # 学習に使用するエピソード回数
num_max_step = 200 # 1エピソードのステップ数
num_action = 2 # アクション数
num_dizitized = 6 # observationを離散値にする際の分割数
num_state = num_dizitized ** observation.shape[0] # ステート数
num_goal_avg_episode = 100 # 学習を終了させる平均計算をするエピソード数
goal_avg_rewaed = 195 # 学習を終了させる平均報酬

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


4

## 訓練

In [7]:
learning = QMontecarlo(num_state, num_action)

history = LearningHistory(num_max_step)
rewards = []
rewards_goal_eval = np.zeros(num_goal_avg_episode)

for episode in range(num_max_episode):
    total_reward = 0
    observation = env.reset()
    state = learn.digitize_state(env, observation, num_dizitized)
    action = learning.get_action(state, episode)
    
    for step in range(num_max_step):
        next_observation, reward, done, info =  env.step(action)
        if done:
            if step < 195:
                # 失敗
                reward = -200
            else:
                reward = 1
        else:
            reward = 1
            
        history.add((state, action, reward))
        next_state = learn.digitize_state(env, next_observation, num_dizitized)
        next_action = learning.get_action(next_state, episode)
        total_reward += reward
        state = next_state
        action = next_action
        if done:
            learning.update_q_table(history)
            rewards.append(total_reward)
            rewards_goal_eval = np.hstack((rewards_goal_eval[1:], total_reward))
            break
            
    # ゴール達成のため学習終了
    reward_avg = rewards_goal_eval.mean()
    if episode % 100 == 0:
        print('episode: {}, episode_reward: {}'.format(episode, reward_avg))
    if reward_avg >= goal_avg_rewaed:
        print('learning finished: {}'.format(episode))
        break

episode: 0, episode_reward: -1.9
episode: 100, episode_reward: -14.95
episode: 200, episode_reward: 19.97
episode: 300, episode_reward: 19.5
episode: 400, episode_reward: 99.49
episode: 500, episode_reward: 107.29
episode: 600, episode_reward: 63.55
episode: 700, episode_reward: 63.18
episode: 800, episode_reward: 65.49
episode: 900, episode_reward: 97.35
episode: 1000, episode_reward: 115.54
episode: 1100, episode_reward: 45.63
episode: 1200, episode_reward: 7.38
episode: 1300, episode_reward: 34.06
episode: 1400, episode_reward: 67.79
episode: 1500, episode_reward: 100.82
episode: 1600, episode_reward: 111.81
episode: 1700, episode_reward: 96.04
episode: 1800, episode_reward: 75.77
episode: 1900, episode_reward: 62.78
episode: 2000, episode_reward: 103.39
episode: 2100, episode_reward: 89.61
episode: 2200, episode_reward: 98.05
episode: 2300, episode_reward: 103.06
episode: 2400, episode_reward: 85.18
episode: 2500, episode_reward: 69.53
episode: 2600, episode_reward: 86.42
episode: 

### 学習後のQ値でシュミレーション

In [None]:
observation = env.reset()

frames = []
for _ in range(num_max_step):
    state = learn.digitize_state(env, observation, num_dizitized)
    action = learning.get_action(state, episode)
    observation, reward, done, info =  env.step(action)
    frames.append(env.render(mode = 'rgb_array'))
    if done:
        break
        
disp.display_frames_as_gif(frames)

### 学習曲線

In [None]:
 # エピソード毎の獲得報酬を移動平均線で表示
df_rewards = pd.DataFrame({'num': range(0, len(rewards)), 'reward': rewards})
df_rewards['reward_rolling_mean_100'] = df_rewards['reward'].rolling(window=100,center=False).mean()
df_rewards.plot(kind='line', x='num', y=['reward_rolling_mean_100'])

In [None]:
# coding:utf-8
# [0]ライブラリのインポート
import gym  # 倒立振子(cartpole)の実行環境
from gym import wrappers  #gymの画像保存
import numpy as np
import time
from collections import deque



# [1]Q関数を離散化して定義する関数　------------
# 観測した状態を離散値にデジタル変換する
def bins(clip_min, clip_max, num):
    return np.linspace(clip_min, clip_max, num + 1)[1:-1]


# 各値を離散値に変換
def digitize_state(observation):
    cart_pos, cart_v, pole_angle, pole_v = observation
    digitized = [
        np.digitize(cart_pos, bins=bins(-2.4, 2.4, num_dizitized)),
        np.digitize(cart_v, bins=bins(-3.0, 3.0, num_dizitized)),
        np.digitize(pole_angle, bins=bins(-0.5, 0.5, num_dizitized)),
        np.digitize(pole_v, bins=bins(-2.0, 2.0, num_dizitized))
    ]
    return sum([x * (num_dizitized**i) for i, x in enumerate(digitized)])


# [2]行動a(t)を求める関数 -------------------------------------
def get_action(next_state, episode):    # 徐々に最適行動のみをとる、ε-greedy法
    epsilon = 0.5 * (1 / (episode + 1))
    if epsilon <= np.random.uniform(0, 1):
        next_action = np.argmax(q_table[next_state])
    else:
        next_action = np.random.choice([0, 1])
    return next_action


# [3]1試行の各ステップの行動を保存しておくメモリクラス
class Memory:
    def __init__(self, max_size=200):
        self.buffer = deque(maxlen=max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self):
        return self.buffer.pop()  # 最後尾のメモリを取り出す

    def len(self):
        return len(self.buffer)


# [4]Qテーブルを更新する(モンテカルロ法) ＊Qlearningと異なる＊ -------------------------------------
def update_Qtable_montecarlo(q_table, memory):
    gamma = 0.99
    alpha = 0.5
    total_reward_t = 0

    while (memory.len() > 0):
        (state, action, reward) = memory.sample()
        total_reward_t = gamma * total_reward_t       # 時間割引率をかける
        # Q関数を更新
        q_table[state, action] = q_table[state, action] + alpha*(reward+total_reward_t-q_table[state, action])
        total_reward_t = total_reward_t + reward    # ステップtより先でもらえた報酬の合計を更新

    return q_table


# [5]. メイン関数開始 パラメータ設定--------------------------------------------------------
env = gym.make('CartPole-v0')
max_number_of_steps = 200  #1試行のstep数
num_consecutive_iterations = 100  #学習完了評価に使用する平均試行回数
num_episodes = 2000  #総試行回数
goal_average_reward = 195  #この報酬を超えると学習終了（中心への制御なし）
# 状態を6分割^（4変数）にデジタル変換してQ関数（表）を作成
num_dizitized = 6  #分割数
memory_size = max_number_of_steps            # バッファーメモリの大きさ
memory = Memory(max_size=memory_size)
q_table = np.random.uniform(low=-1, high=1, size=(num_dizitized**4, env.action_space.n))
total_reward_vec = np.zeros(num_consecutive_iterations)  #各試行の報酬を格納
final_x = np.zeros((num_episodes, 1))  #学習後、各試行のt=200でのｘの位置を格納
islearned = 0  #学習が終わったフラグ
isrender = 0  #描画フラグ


# [5] メインルーチン--------------------------------------------------
for episode in range(num_episodes):  #試行数分繰り返す
    # 環境の初期化
    observation = env.reset()
    state = digitize_state(observation)
    action = np.argmax(q_table[state])
    episode_reward = 0

    for t in range(max_number_of_steps):  #1試行のループ
        if islearned == 1:  #学習終了したらcartPoleを描画する
            env.render()
            time.sleep(0.1)
            print (observation[0])  #カートのx位置を出力


        # 行動a_tの実行により、s_{t+1}, r_{t}などを計算する
        observation, reward, done, info = env.step(action)

        # 報酬を設定し与える
        if done:
            if t < 195:
                reward = -200  #こけたら罰則
            else:
                reward = 1  #立ったまま終了時は罰則はなし

        else:
            reward = 1  #各ステップで立ってたら報酬追加


        # メモリに、現在の状態と行った行動、得た報酬を記録する
        memory.add((state, action, reward))

        # 次ステップへ行動と状態を更新
        next_state = digitize_state(observation)  # t+1での観測状態を、離散値に変換
        next_action = get_action(next_state, episode)  # 次の行動a_{t+1}を求める
        action = next_action  # a_{t+1}
        state = next_state  # s_{t+1}

        episode_reward += reward  #報酬を追加

        # 終了時の処理
        if done:
            # これまでの行動の記憶と、最終的な結果からQテーブルを更新していく
            q_table = update_Qtable_montecarlo(q_table, memory)

            print('%d Episode finished after %f time steps / mean %f' %
                  (episode, t + 1, total_reward_vec.mean()))
            total_reward_vec = np.hstack((total_reward_vec[1:],
                                          episode_reward))  #報酬を記録
            if islearned == 1:  #学習終わってたら最終のx座標を格納
                final_x[episode, 0] = observation[0]
            break

    if (total_reward_vec.mean() >=
            goal_average_reward):  # 直近の100エピソードが規定報酬以上であれば成功
        print('Episode %d train agent successfuly!' % episode)
        islearned = 1
        #np.savetxt('learned_Q_table.csv',q_table, delimiter=",") #Qtableの保存する場合
        if isrender == 0:
            # env = wrappers.Monitor(env, './movie/cartpole-experiment-1') #動画保存する場合
            isrender = 1
    #10エピソードだけでどんな挙動になるのか見たかったら、以下のコメントを外す
    #if episode>10:
    #    if isrender == 0:
    #        env = wrappers.Monitor(env, './movie/cartpole-experiment-1') #動画保存する場合
    #        isrender = 1
    #    islearned=1;

if islearned:
    np.savetxt('final_x.csv', final_x, delimiter=",")