## 48. 強化学習（RL : Reinforcement Learning）

### <font color=blue>**1.** </font> Q学習

#### <font color=green>**1.1.** </font> 迷路を解く　その１

In [None]:
## 出典 : https://book.mynavi.jp/manatee/detail/id=88714

In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# 初期位置での迷路の様子
 
# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()
 
# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)
 
# 状態を示す文字S0～S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')
 
# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
                labelbottom='off', right='off', left='off', labelleft='off')
 
# 現在値S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

In [None]:
# 初期の方策を決定するパラメータtheta_0を設定
 
# 行は状態0～7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan],  # s0
                    [np.nan, 1, np.nan, 1],  # s1
                    [np.nan, np.nan, 1, 1],  # s2
                    [1, 1, 1, np.nan],  # s3
                    [np.nan, np.nan, 1, 1],  # s4
                    [1, np.nan, np.nan, np.nan],  # s5
                    [1, np.nan, np.nan, np.nan],  # s6
                    [1, 1, np.nan, np.nan],  # s7
                    # ※s8はゴールなので、方策はなし
                    ])

In [None]:
# 方策パラメータtheta_0をランダム方策piに変換する関数の定義
 
def simple_convert_into_pi_from_theta(theta):
  '''単純に割合を計算する'''
  [m, n] = theta.shape  # thetaの行列サイズを取得
  pi = np.zeros((m, n))
  for i in range(0, m):
    pi[i, :] = theta[i, :] / np.nansum(theta[i, :])  # 割合の計算
  pi = np.nan_to_num(pi)  # nanを0に変換 
  return pi
 
# ランダム行動方策pi_0を求める
pi_0 = simple_convert_into_pi_from_theta(theta_0)

In [None]:
# Q学習による行動価値関数Qの更新
 
def Q_learning(s, a, r, s_next, Q, eta, gamma):
  if s_next == 8:  # ゴールした場合
    Q[s, a] = Q[s, a] + eta * (r - Q[s, a])
 
  else:
    Q[s, a] = Q[s, a] + eta * (r + gamma * np.nanmax(Q[s_next,: ]) - Q[s, a])
    #Q[s, a] = Q[s, a] + eta * (r + gamma * Q[s_next, a_next] - Q[s, a])
 
  return Q

In [None]:
# Q学習で迷路を解く関数の定義、状態と行動の履歴および更新したQを出力
 
def goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi_0):
  s = 0  # スタート地点
  s_a_history = [[0, np.nan]]  # エージェントの移動を記録するリスト
 
  while (1):  # ゴールするまでループ
    [a, s_next] = get_action_and_s_next(s, Q, epsilon, pi_0)
    s_a_history[-1][1] = a
    # 現在の状態（つまり一番最後なのでindex=-1）に行動を代入
 
    s_a_history.append([s_next, np.nan])
    # 次の状態を代入。行動はまだ分からないのでnanにしておく
 
    # 報酬を与え,　次の行動を求めます
    if s_next == 8:
      r = 1  # ゴールにたどり着いたなら報酬を与える
      a_next = np.nan
    else:
      r = 0
 
    # 価値関数を更新
    Q = Q_learning(s, a, r, s_next, Q, eta, gamma)
 
    # 終了判定
    if s_next == 8:  # ゴール地点なら終了
      break
    else:
      s = s_next
 
  return [s_a_history, Q]

In [None]:
# ε-greedy法を実装
 
def get_action_and_s_next(s, Q, epsilon, pi_0):
  direction = ["up", "right", "down", "left"]
 
  # 行動を決める
  if np.random.rand() < epsilon:
    # εの確率でランダムに動く
    next_direction = np.random.choice(direction, p=pi_0[s, :])
  else:
    # Qの最大値の行動を採用する
    next_direction = direction[np.nanargmax(Q[s, :])]
 
  # 決めた行動で次の状態を決める
  if next_direction == "up":
    action = 0
    s_next = s - 3  # 上に移動するときは状態の数字が3小さくなる
  elif next_direction == "right":
    action = 1
    s_next = s + 1  # 右に移動するときは状態の数字が1大きくなる
  elif next_direction == "down":
    action = 2
    s_next = s + 3  # 下に移動するときは状態の数字が3大きくなる
  elif next_direction == "left":
    action = 3
    s_next = s - 1  # 左に移動するときは状態の数字が1小さくなる
 
  return [action, s_next]

In [None]:
# 初期の行動価値関数Qを設定
 
[a, b] = theta_0.shape  # 行と列の数をa, bに格納
Q = np.random.rand(a, b) * theta_0 * 0.1
# *theta0をすることで要素ごとに掛け算をし、Qの壁方向の値がnanになる

In [None]:
# Q学習で迷路を解く
 
eta = 0.1  # 学習率
gamma = 0.9  # 時間割引率
epsilon = 0.5  # ε-greedy法の初期値
v = np.nanmax(Q, axis=1)  # 状態ごとに価値の最大値を求める
is_continue = True
episode = 1

In [None]:
V=[] # エピソードごとの状態価値を格納する
V.append(np.nanmax(Q, axis=1))  # 状態ごとに行動価値の最大値を求める
 
while is_continue:  # is_continueがFalseになるまで繰り返す
  print("エピソード:" + str(episode))
 
  # ε-greedyの値を少しずつ小さくする
  epsilon = epsilon / 2
 
  # Q学習で迷路を解き、移動した履歴と更新したQを求める
  [s_a_history, Q] = goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi_0)
 
  # 状態価値の変化
  new_v = np.nanmax(Q, axis=1)  # 状態ごとに行動価値の最大値を求める
  print(np.sum(np.abs(new_v - v)))  # 状態価値関数の変化を出力
  v = new_v
  V.append(v) # このエピソード終了時の状態価値関数を追加
 
  print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")
 
  # 100エピソード繰り返す
  episode = episode + 1
  if episode > 100:
    break

In [None]:
# 状態価値の変化を可視化
# 参考URL http://louistiao.me/posts/notebooks/embedding-matplotlib-animations-in-jupyter-notebooks/
from matplotlib import animation
from IPython.display import HTML
import matplotlib.cm as cm  # color map

In [None]:
def init():
  # 背景画像の初期化
  line.set_data([], [])
  return (line,)

In [None]:
def animate(i):
  # フレームごとの描画内容
  # 各マスに状態価値の大きさに基づく色付きの四角を描画
  line, = ax.plot([0.5], [2.5], marker="s",
                color=cm.jet(V[i][0]), markersize=85)  # S0
  line, = ax.plot([1.5], [2.5], marker="s",
                color=cm.jet(V[i][1]), markersize=85)  # S1
  line, = ax.plot([2.5], [2.5], marker="s",
                color=cm.jet(V[i][2]), markersize=85)  # S2
  line, = ax.plot([0.5], [1.5], marker="s",
                color=cm.jet(V[i][3]), markersize=85)  # S3
  line, = ax.plot([1.5], [1.5], marker="s",
                color=cm.jet(V[i][4]), markersize=85)  # S4
  line, = ax.plot([2.5], [1.5], marker="s",
                color=cm.jet(V[i][5]), markersize=85)  # S5
  line, = ax.plot([0.5], [0.5], marker="s",
                color=cm.jet(V[i][6]), markersize=85)  # S6
  line, = ax.plot([1.5], [0.5], marker="s",
                color=cm.jet(V[i][7]), markersize=85)  # S7
  line, = ax.plot([2.5], [0.5], marker="s",
                color=cm.jet(1.0), markersize=85)  # S8
  return (line,)

In [None]:
#　初期化関数とフレームごとの描画関数を用いて動画を作成
anim = animation.FuncAnimation(
    fig, animate, init_func=init, frames=len(V), interval=200, repeat=False)
 
HTML(anim.to_html5_video())

#### <font color=green>**1.2.** </font> Tic Tac Toe（三目並べ）

In [None]:
##　出典 : https://qiita.com/thinking_vecta/items/f5b52311d2c0f6a56dc6

In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output
import random
import time
from time import sleep
import numpy as np
import math

In [None]:
def get_player_input(play_area, first_inputter):
    """プレイヤーから入力を受け付ける関数
    ゲームの状況をあらわすリストを受け取り、プレイヤーの入力で更新したリストと入力を返す
    """
    choosable_area = [str(area) for area in play_area if type(area) is int]
    while(True):
        player_input = input('Choose a number!>>>')
        if player_input in choosable_area:
            player_input = int(player_input)
            break
        else:
            print('Wrong input!\nChoose a number from' \
                  '{}'.format(choosable_area))
    if first_inputter == 1:
        play_area[play_area.index(player_input)] = '○'
    elif first_inputter == 2:
        play_area[play_area.index(player_input)] = '×'
    return play_area, player_input

In [None]:
def get_ai_input(play_area, first_inputter, mode=0, q_table=None, epsilon=None):
    """AIの入力を受け付ける関数
    ゲームの状況をあらわすリストとAIのモードおよびその他のオプションを受け取り、AIの入力で更新したリストと入力を返す
    """
    choosable_area = [str(area) for area in play_area if type(area) is int]
    if mode == 0:
        ai_input = int(random.choice(choosable_area))
    elif mode == 1:
        ai_input = get_ql_action(play_area, choosable_area, q_table, epsilon)
    if first_inputter == 1:
        play_area[play_area.index(ai_input)] = '×'
    elif first_inputter == 2:
        play_area[play_area.index(ai_input)] = '○'
    return play_area, ai_input

In [None]:
def show_play(play_area, inputter=0, inputted=0):
    """TIC TAC TOEの画面を表示する関数
    表示すべきリスト(1～9の数値、○、×から成る)と直前の入力者および入力を受け取り、表示する
    """
    clear_output()
    plt.figure(figsize=(6, 6))
    plt.plot()
    plt.xticks([0, 5, 10, 15])
    plt.yticks([0, 5, 10, 15])
    plt.tick_params(labelbottom='off', bottom='off')
    plt.tick_params(labelleft='off', left='off')
    plt.xlim(0, 15)
    plt.ylim(0, 15)

    x_pos = [2.5, 7.5, 12.5]
    y_pos = [2.5, 7.5, 12.5]

    markers = ['$' + str(marker) + '$' for marker in play_area]

    marker_count = 0
    for y in reversed(y_pos):
        for x in x_pos:
            if markers[marker_count] == '$○$':
                color = 'r'
            elif markers[marker_count] == '$×$':
                color = 'k'
            else:
                color = 'b'
            plt.plot(x, y, marker=markers[marker_count], 
                     markersize=30, color=color)
            marker_count += 1
    if inputter == 0:
        title = 'Play the TIC TAC TOE!!'
    else:
        title = '{} chose {}!!'.format(inputter, inputted)
    plt.title(title)
    plt.show()

In [None]:
def judge(play_area, inputter):
    """ゲーム終了及び勝者を判定する
    ゲームの状況をあらわすリストと直前の入力者を受け取り、ゲームが終了していれば勝者と終了判定を返す
    """
    end_flg = 0
    winner = 'NOBODY'
    first_list = [0, 3, 6, 0, 1, 2, 0, 2]
    second_list = [1, 4, 7, 3, 4, 5, 4, 4]
    third_list = [2, 5, 8, 6, 7, 8, 8, 6]
    for first, second, third in zip(first_list, second_list, third_list):
        if play_area[first] == play_area[second] \
        and play_area[first] == play_area[third]:
            winner = inputter
            end_flg = 1
            break
    choosable_area = [str(area) for area in play_area if type(area) is int]
    if len(choosable_area) == 0:
        end_flg = 1
    return winner, end_flg

In [None]:
def player_vs_randomAI(first_inputter):
    """プレイヤーとAI(ランダム)のゲームを実行する関数
    先手(1:プレイヤー、2:AI)を受け取り、ゲームが終了するまで実行する
    """
    inputter1 = 'YOU'
    inputter2 = 'AI'

    play_area = list(range(1, 10))
    show_play(play_area)
    inputter_count = first_inputter
    end_flg = 0
    while True:
        if (inputter_count % 2) == 1:
            print('Your turn!')
            play_area, player_input = get_player_input(play_area, first_inputter)
            show_play(play_area, inputter1, player_input)
            winner, end_flg = judge(play_area, inputter1)
            if end_flg:
                break
        elif (inputter_count % 2) == 0:
            print('AI\'s turn!\n.\n.\n.')
            play_area, ai_input = get_ai_input(play_area, first_inputter, mode=0)
            sleep(3)
            show_play(play_area, inputter2, ai_input)
            winner, end_flg = judge(play_area, inputter2)
            if end_flg:
                break
        inputter_count += 1
    print('{} win!!!'.format(winner))

In [None]:
# ゲームしてみる
# 引数1:プレイヤー先手
# 引数2:プレイヤー後手

player_vs_randomAI(1)

In [None]:
# Qテーブル作成
def make_q_table():
    """Qテーブルを作成する関数
    """
    n_columns = 9
    n_rows = 3**9
    return np.zeros((n_rows, n_columns))

In [None]:
def q_learning(play_area, ai_input, reward, play_area_next, q_table, end_flg):
    """Qテーブルを更新する関数
    ゲームの状況をあらわすリスト・AIの行動・報酬・１手番後のゲームの状況をあらわすリスト・
    Qテーブル・勝利フラグを受け取り、更新したQテーブルを返す
    """
    # 行番号取得
    row_index = find_q_row(play_area)
    row_index_next = find_q_row(play_area_next)
    column_index = ai_input - 1
    # 勝利した or 敗北した場合
    if end_flg == 1:
        q_table[row_index, column_index] = \
        q_table[row_index, column_index] + eta \
        * (reward - q_table[row_index, column_index])
    # まだ続いている場合以外
    else:
        q_table[row_index, column_index] = \
        q_table[row_index, column_index] + eta \
        * (reward + gamma * np.nanmax(q_table[row_index_next,: ]) \
           - q_table[row_index, column_index])
    return q_table

In [None]:
def find_q_row(play_area):
    """参照時の状況(state)が参照すべき行番号を計算する関数
    ゲームの状況をあらわすリストを受け取り、行番号を返す
    """
    row_index = 0
    for index in range(len(play_area)):
        if play_area[index] == '○':
            coef = 1
        elif play_area[index] == '×':
            coef = 2
        else:
            coef = 0
        row_index += (3 ** index) * coef
    return row_index

In [None]:
def get_ql_action(play_area, choosable_area, q_table, epsilon):
    """AIの行動を決定する関数
    ゲームの状況をあらわすリスト・選択可能エリア・Qテーブル・イプシロンを受け取り、行動を返す
    """
    # esilonの確率でランダムな選択をする
    if np.random.rand() < epsilon:
        ai_input = int(random.choice(choosable_area))
    # Qテーブルに従い行動を選択する
    else:
        row_index = find_q_row(play_area)
        first_choice_flg = 1
        for choice in choosable_area:
            if first_choice_flg == 1:
                ai_input = int(choice)
                first_choice_flg = 0
            else:
                if q_table[row_index, ai_input-1] \
                < q_table[row_index, int(choice)-1]:
                    ai_input = int(choice)
    return ai_input

In [None]:
def randomAI_vs_QLAI(first_inputter, q_table, epsilon=0):
    """AI(ランダム)とAI(Q学習)のゲームを実行する関数
    先手(1:AI(ランダム)、2:AI(Q学習))とQテーブルを受け取り、ゲームが終了するまで実行する
    """
    inputter1 = 'Random AI'
    inputter2 = 'QL AI'

    # Q学習退避用
    ql_input_list = []
    play_area_list = []

    play_area = list(range(1, 10))
    #show_play(play_area)
    inputter_count = first_inputter
    end_flg = 0
    ql_flg = 0
    reward = 0
    while True:
        # Q学習退避用
        play_area_tmp = play_area.copy()
        play_area_list.append(play_area_tmp)
        # Q学習実行フラグ
        ql_flg = 0
        # AI(Q学習)の手番
        if (inputter_count % 2) == 0:
            # QL AI入力
            play_area, ql_ai_input = get_ai_input(play_area, first_inputter, mode=1, q_table=q_table, epsilon=epsilon)
            winner, end_flg = judge(play_area, inputter2)
            # Q学習退避用
            ql_input_list.append(ql_ai_input)            
            # 勝利した場合
            if winner == inputter2:
                reward = 1
                ql_flg = 1
            play_area_before = play_area_list[-1]
            ql_ai_input_before = ql_input_list[-1]
        # AI(ランダム)の手番
        elif (inputter_count % 2) == 1:
            play_area, random_ai_input = get_ai_input(play_area, first_inputter+1, mode=0)
            winner, end_flg = judge(play_area, inputter1)
            # AI(ランダム)が先手の場合の初手以外は学習
            if inputter_count != 1:
                ql_flg = 1
        # Q学習実行
        if ql_flg == 1:
            ql_ai_input_before = ql_input_list[-1]
            q_table = q_learning(play_area_before, ql_ai_input_before, reward, play_area, q_table, end_flg)
        if end_flg:
            break
        inputter_count += 1
    ## print('{} win!!!'.format(winner))
    return winner, q_table

In [None]:
q_table = make_q_table()
eta = 0.1  # 学習率
gamma = 0.9  # 時間割引率
initial_epsilon = 0.5  # ε-greedy法の初期値

In [None]:
# ランダム vs QL(学習)
# 試行数設定
episode = int(5e5)  ### この値で２分くらいかかる
winner_list = []
start = time.time()
for i in range(episode):
    epsilon = initial_epsilon * (episode-i) / episode
    winner, _ = randomAI_vs_QLAI(1, q_table, epsilon)
    winner_list.append(winner)
elapsed_time = time.time() - start
print ('elapsed_time:{0}'.format(elapsed_time) + '[sec]')

In [None]:
print('勝ち回数')
print('Random AI:{}'.format(winner_list.count('Random AI')))
print('QL AI    :{}'.format(winner_list.count('QL AI')))
print('NOBODY   :{}'.format(winner_list.count('NOBODY')))
print('QLの勝率 :{}'.format(winner_list.count('QL AI') / len(winner_list)))

In [None]:
def player_vs_QLAI(first_inputter, q_table, epsilon=0):
    """プレイヤーとAI(Q学習)のゲームを実行する関数
    先手(1:プレイヤー)、2:AI(Q学習))を受け取り、ゲームが終了するまで実行する
    """
    inputter1 = 'YOU'
    inputter2 = 'QL AI'

    # Q学習退避用
    ql_input_list = []
    play_area_list = []

    play_area = list(range(1, 10))
    show_play(play_area)
    inputter_count = first_inputter
    end_flg = 0
    ql_flg = 0
    reward = 0
    while True:
        # Q学習退避用
        play_area_tmp = play_area.copy()
        play_area_list.append(play_area_tmp)
        # Q学習実行フラグ
        ql_flg = 0
        # AI(Q学習)の手番
        if (inputter_count % 2) == 0:
            # QL AI入力
            play_area, ql_ai_input = get_ai_input(play_area, first_inputter, mode=1, q_table=q_table, epsilon=epsilon)
            show_play(play_area, inputter2, ql_ai_input)
            winner, end_flg = judge(play_area, inputter2)
            # Q学習退避用
            ql_input_list.append(ql_ai_input)            
            # 勝利した場合
            if winner == inputter2:
                reward = 1
                ql_flg = 1
            play_area_before = play_area_list[-1]
            ql_ai_input_before = ql_input_list[-1]
        # プレイヤーの手番
        elif (inputter_count % 2) == 1:
            print('Your turn!')
            # プレイヤーの入力受付
            play_area, player_input = get_player_input(play_area, first_inputter)
            show_play(play_area, inputter1, player_input)
            winner, end_flg = judge(play_area, inputter1)
            # プレイヤーが勝利した場合
            if winner == inputter1:
                reward = -1
            # プレイヤーが先手の場合の初手以外は学習
            if inputter_count != 1:
                ql_flg = 1
        # Q学習実行
        if ql_flg == 1:
#            print('Q学習')
            ql_ai_input_before = ql_input_list[-1]
            q_table = q_learning(play_area_before, ql_ai_input_before, reward, play_area, q_table, end_flg)
        if end_flg:
            break
        inputter_count += 1
    show_play(play_area)
    print('{} win!!!'.format(winner))
    sleep(1)
    return winner, q_table

In [None]:
# プレイヤー vs QL
# 試行数設定
episode = 10
winner_list = []
for i in range(episode):
    epsilon = initial_epsilon * (episode-i) / episode
    winner, q_table = player_vs_QLAI(2, q_table, epsilon=epsilon)
    winner_list.append(winner)

#### <font color=green>**1.3.** </font> A Simple Python Example and a Step Closer to AI

In [None]:
# 出典 : https://amunategui.github.io/reinforcement-learning/index.html

In [None]:
## A VERY Simple Python Q-learning Example

import numpy as np
import pylab as plt

# map cell to cell, add circular cell to goal point
points_list = [(0,1), (1,5), (5,6), (5,4), (1,2), (2,3), (2,7)]

In [None]:
goal = 7

import networkx as nx
G=nx.Graph()
G.add_edges_from(points_list)
pos = nx.spring_layout(G)
nx.draw_networkx_nodes(G,pos)
nx.draw_networkx_edges(G,pos)
nx.draw_networkx_labels(G,pos)
plt.show()

In [None]:
# how many points in graph? x points
MATRIX_SIZE = 8

# create matrix x*y
R = np.matrix(np.ones(shape=(MATRIX_SIZE, MATRIX_SIZE)))
R *= -1

In [None]:
# assign zeros to paths and 100 to goal-reaching point
for point in points_list:
  print(point)
  if point[1] == goal:
    R[point] = 100
  else:
    R[point] = 0

  if point[0] == goal:
    R[point[::-1]] = 100
  else:
    # reverse of point
    R[point[::-1]]= 0

In [None]:
# add goal point round trip
R[goal,goal]= 100
R

In [None]:
Q = np.matrix(np.zeros([MATRIX_SIZE,MATRIX_SIZE]))

# learning parameter
gamma = 0.8

initial_state = 1

In [None]:
def available_actions(state):
  current_state_row = R[state,]
  av_act = np.where(current_state_row >= 0)[1]
  return av_act

In [None]:
def sample_next_action(available_actions_range):
  next_action = int(np.random.choice(available_act,1))
  return next_action

In [None]:
def update(current_state, action, gamma):
  max_index = np.where(Q[action,] == np.max(Q[action,]))[1]
  if max_index.shape[0] > 1:
    max_index = int(np.random.choice(max_index, size = 1))
  else:
    max_index = int(max_index)
  max_value = Q[action, max_index]
  
  Q[current_state, action] = R[current_state, action] + gamma * max_value
  #print('max_value', R[current_state, action] + gamma * max_value)
  
  if (np.max(Q) > 0):
    return (np.sum(Q/np.max(Q)*100))
  else:
    return (0)

In [None]:
available_act = available_actions(initial_state) 

action = sample_next_action(available_act)
    
update(initial_state, action, gamma)

In [None]:
# Training
scores = []
for i in range(700):
  current_state = np.random.randint(0, int(Q.shape[0]))
  available_act = available_actions(current_state)
  action = sample_next_action(available_act)
  score = update(current_state,action,gamma)
  scores.append(score)
  #print('Score:', str(score))

print("Trained Q matrix:")
print(Q/np.max(Q)*100)

In [None]:
# Testing
current_state = 0
steps = [current_state]

while current_state != 7:
  next_step_index = np.where(Q[current_state,] == np.max(Q[current_state,]))[1]
  if next_step_index.shape[0] > 1:
    next_step_index = int(np.random.choice(next_step_index, size = 1))
  else:
    next_step_index = int(next_step_index)
  steps.append(next_step_index)
  current_state = next_step_index


print("Most efficient path:")
print(steps)

In [None]:
plt.plot(scores)
plt.show()

In [None]:
## Version 2.0, with Environmental Details

bees = [2]
smoke = [4,5,6]

In [None]:
G = nx.Graph()
G.add_edges_from(points_list)

mapping = {
    0:'Start', 
    1:'1', 
    2:'2 - Bees', 
    3:'3', 
    4:'4 - Smoke', 
    5:'5 - Smoke', 
    6:'6 - Smoke', 
    7:'7 - Beehive'
    }

H = nx.relabel_nodes(G, mapping) 
pos = nx.spring_layout(H)
nx.draw_networkx_nodes(H, pos, 
                       node_size = [200,200,200,200,
                                    200,200,200,200]
                       )

nx.draw_networkx_edges(H, pos)
nx.draw_networkx_labels(H, pos)
plt.show()

In [None]:
# re-initialize the matrices for new run
Q = np.matrix(np.zeros([MATRIX_SIZE,MATRIX_SIZE]))

enviro_bees = np.matrix(np.zeros([MATRIX_SIZE,MATRIX_SIZE]))
enviro_smoke = np.matrix(np.zeros([MATRIX_SIZE,MATRIX_SIZE]))
 
initial_state = 1

In [None]:
def available_actions(state):
  current_state_row = R[state,]
  av_act = np.where(current_state_row >= 0)[1]
  return av_act

In [None]:
def sample_next_action(available_actions_range):
  next_action = int(np.random.choice(available_act,1))
  return next_action

In [None]:
def collect_environmental_data(action):
  found = []
  if action in bees:
    found.append('b')
  if action in smoke:
    found.append('s')
  return (found)

In [None]:
def update(current_state, action, gamma):
  max_index = np.where(Q[action,] == np.max(Q[action,]))[1]
  if max_index.shape[0] > 1:
    max_index = int(np.random.choice(max_index, size = 1))
  else:
    max_index = int(max_index)
  max_value = Q[action, max_index]
  
  Q[current_state, action] = R[current_state, action] + gamma * max_value
  #print('max_value', R[current_state, action] + gamma * max_value)
  
  environment = collect_environmental_data(action)
  if 'b' in environment:
    enviro_bees[current_state, action] += 1
  if 's' in environment:
    enviro_smoke[current_state, action] += 1

  if (np.max(Q) > 0):
    return (np.sum(Q/np.max(Q)*100))
  else:
    return (0)

In [None]:
available_act = available_actions(initial_state) 
action = sample_next_action(available_act)
update(initial_state, action, gamma)

In [None]:
scores = []
for i in range(700):
  current_state = np.random.randint(0, int(Q.shape[0]))
  available_act = available_actions(current_state)
  action = sample_next_action(available_act)
  score = update(current_state,action,gamma)

In [None]:
# print environmental matrices
print('Bees Found')
print(enviro_bees)

In [None]:
print('Smoke Found')
print(enviro_smoke)

In [None]:
Q = np.matrix(np.zeros([MATRIX_SIZE, MATRIX_SIZE]))

# subtract bees with smoke, this gives smoke a negative effect
enviro_matrix = enviro_bees - enviro_smoke

# Get available actions in the current state
available_act = available_actions(initial_state) 

# Sample next action to be performed
action = sample_next_action(available_act)

In [None]:
# This function updates the Q matrix according to the path selected and the Q 
# learning algorithm
def update(current_state, action, gamma):
  max_index = np.where(Q[action,] == np.max(Q[action,]))[1]
  if max_index.shape[0] > 1:
    max_index = int(np.random.choice(max_index, size = 1))
  else:
    max_index = int(max_index)
  max_value = Q[action, max_index]

  Q[current_state, action] = R[current_state, action] + gamma * max_value
  #print('max_value', R[current_state, action] + gamma * max_value)

  environment = collect_environmental_data(action)
  if 'b' in environment:
    enviro_matrix[current_state, action] += 1
  if 's' in environment:
    enviro_matrix[current_state, action] -= 1

  return (np.sum(Q/np.max(Q)*100))

In [None]:
def available_actions_with_enviro_help(state):
  current_state_row = R[state,]
  av_act = np.where(current_state_row >= 0)[1]
  # if there are multiple routes, dis-favor anything negative
  env_pos_row = enviro_matrix_snap[state, av_act]
  if (np.sum(env_pos_row < 0)):
    # can we remove the negative directions from av_act?
    temp_av_act = av_act[np.array(env_pos_row)[0]>=0]
    if len(temp_av_act) > 0:
      #print('going from : {}'.format(av_act))
      #print('to : {}'.format(temp_av_act))
      av_act = temp_av_act
  return av_act

In [None]:
update(initial_state,action,gamma)
enviro_matrix_snap = enviro_matrix.copy()

# Training
scores = []
for i in range(700):
  current_state = np.random.randint(0, int(Q.shape[0]))
  available_act = available_actions_with_enviro_help(current_state)
  action = sample_next_action(available_act)
  score = update(current_state,action,gamma)
  scores.append(score)
  #print('Score:', str(score))

plt.plot(scores)
plt.show()

### <font color=blue>**2.** </font> SARSA

#### <font color=green>**2.1.** </font> 迷路を解く　その２

In [None]:
## 出典 : https://book.mynavi.jp/manatee/detail/id=88534

In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# 初期位置での迷路の様子
 
# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()
 
# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)
 
# 状態を示す文字S0～S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')
 
# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
                labelbottom='off', right='off', left='off', labelleft='off')
 
# 現在値S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

In [None]:
# 初期の方策を決定するパラメータtheta_0を設定
 
# 行は状態0～7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan],  # s0
                    [np.nan, 1, np.nan, 1],  # s1
                    [np.nan, np.nan, 1, 1],  # s2
                    [1, 1, 1, np.nan],  # s3
                    [np.nan, np.nan, 1, 1],  # s4
                    [1, np.nan, np.nan, np.nan],  # s5
                    [1, np.nan, np.nan, np.nan],  # s6
                    [1, 1, np.nan, np.nan],  # s7、※s8はゴールなので、方策はなし
                    ])

In [None]:
# 初期の行動価値関数Qを設定
 
[a, b] = theta_0.shape  # 行と列の数をa, bに格納
Q = np.random.rand(a, b) * theta_0
# *theta0をすることで要素ごとに掛け算をし、壁方向がnanになる

In [None]:
# 方策パラメータtheta_0をランダム方策piに変換する関数の定義
 
def simple_convert_into_pi_from_theta(theta):
  '''単純に割合を計算する'''
  [m, n] = theta.shape  # thetaの行列サイズを取得
  pi = np.zeros((m, n))
  for i in range(0, m):
    pi[i, :] = theta[i, :] / np.nansum(theta[i, :])  # 割合の計算
  pi = np.nan_to_num(pi)  # nanを0に変換 
  return pi
 
# ランダム行動方策pi_0を求める
pi_0 = simple_convert_into_pi_from_theta(theta_0)

In [None]:
# ε-greedy法を実装
 
def get_action_and_s_next(s, Q, epsilon, pi_0):
  direction = ["up", "right", "down", "left"]
 
  # 行動を決める
  if np.random.rand() < epsilon:
    # εの確率でランダムに動く
    next_direction = np.random.choice(direction, p=pi_0[s, :])
  else:
    # Qの最大値の行動を採用する
    next_direction = direction[np.nanargmax(Q[s, :])]
 
  # 決めた行動で次の状態を決める
  if next_direction == "up":
    action = 0
    s_next = s - 3  # 上に移動するときは状態の数字が3小さくなる
  elif next_direction == "right":
    action = 1
    s_next = s + 1  # 右に移動するときは状態の数字が1大きくなる
  elif next_direction == "down":
    action = 2
    s_next = s + 3  # 下に移動するときは状態の数字が3大きくなる
  elif next_direction == "left":
    action = 3
    s_next = s - 1  # 左に移動するときは状態の数字が1小さくなる
 
  return [action, s_next]

In [None]:
# Sarsaによる行動価値関数Qの更新
 
def Sarsa(s, a, r, s_next, a_next, Q, eta, gamma):
  if s_next == 8:  # ゴールした場合
    Q[s, a] = Q[s, a] + eta * (r - Q[s, a])
  else:
    Q[s, a] = Q[s, a] + eta * (r + gamma * Q[s_next, a_next] - Q[s, a])
 
  return Q

In [None]:
# Sarsaで迷路を解く関数の定義、状態と行動の履歴および更新したQを出力
 
def goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi_0):
  s = 0  # スタート地点
  s_a_history = [[0, np.nan]]  # エージェントの移動を記録するリスト 
  while (1):  # ゴールするまでループ
    [a, s_next] = get_action_and_s_next(s, Q, epsilon, pi_0)
    s_a_history[-1][1] = a
    # 現在の状態（つまり一番最後なのでindex=-1）に行動を代入
 
    s_a_history.append([s_next, np.nan])
    # 次の状態を代入。行動はまだ分からないのでnanにしておく
 
    # 報酬を与え,　次の行動を求めます
    if s_next == 8:
      r = 1  # ゴールにたどり着いたなら報酬を与える
      a_next = np.nan
    else:
      r = 0
      [a_next, _] = get_action_and_s_next(s_next, Q, epsilon, pi_0)
      # 実際行動しないですが、次の行動a_nextを求めます。
      # 返り値の_は、その変数は無視するという意味です
 
    # 価値関数を更新
    Q = Sarsa(s, a, r, s_next, a_next, Q, eta, gamma)
 
    # 終了判定
    if s_next == 8:  # ゴール地点なら終了
      break
    else:
      s = s_next
 
  return [s_a_history, Q]

In [None]:
# Sarsaで迷路を解く
eta = 0.1  # 学習率
gamma = 0.9  # 時間割引率
epsilon = 0.5  # ε-greedy法の初期値
v = np.nanmax(Q, axis=1)  # 状態ごとに価値の最大値を求める
is_continue = True
episode = 1

In [None]:
while is_continue:  # is_continueがFalseになるまで繰り返す
  print("エピソード:" + str(episode))
  # ε-greedyの値を少しずつ小さくする
  epsilon = epsilon / 2
 
  # Sarsaで迷路を解き、移動した履歴と更新したQを求める
  [s_a_history, Q] = goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi_0)
 
  # 状態価値の変化
  new_v = np.nanmax(Q, axis=1)  # 状態ごとに価値の最大値を求める
  print(np.sum(np.abs(new_v - v)))  # 状態価値の変化を出力
  v = new_v
 
  print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")
 
  # 10エピソード繰り返す  # 100 -> 10
  episode = episode + 1
  if episode > 10:  # 100 -> 10
    break

In [None]:
# エージェントの移動の様子を可視化
# 参考URL http://louistiao.me/posts/notebooks/embedding-matplotlib-animations-in-jupyter-notebooks/
from matplotlib import animation
from IPython.display import HTML

In [None]:
def init():
  # 背景画像の初期化
  line.set_data([], [])
  return (line,)

In [None]:
def animate(i):
  # フレームごとの描画内容
  state = s_a_history[i][0]  # 現在の場所を描く
  x = (state % 3) + 0.5  # 状態のx座標は、3で割った余り+0.5
  y = 2.5 - int(state / 3)  # y座標は3で割った商を2.5から引く
  line.set_data(x, y)
  return (line,)

In [None]:
#　初期化関数とフレームごとの描画関数を用いて動画を作成
anim = animation.FuncAnimation(
    fig, 
    animate, 
    init_func=init, 
    frames=len(s_a_history), 
    interval=200, 
    repeat=False
    )
 
HTML(anim.to_html5_video())

#### <font color=green>**2.2.** </font> Gambler’s Problem from Sutton's book.

In [None]:
'''A gambler has the opportunity to make bets on the outcomes of a sequence of coin flips. 
If the coin comes up heads, he wins as many dollars as he has staked on that flip; 
if it is tails, he loses his stake. The game ends when the gambler wins by reaching his goal of $100, or loses by running out of money. 

On each flip, the gambler must decide what portion of his capital to stake, in integer numbers of dollars. 
This problem can be formulated as an undiscounted, episodic, finite MDP. 

The state is the gambler’s capital, s ∈ {1, 2, . . . , 99}.
The actions are stakes, a ∈ {0, 1, . . . , min(s, 100 − s)}. 
The reward is zero on all transitions except those on which the gambler reaches his goal, when it is +1.

The state-value function then gives the probability of winning from each state. 
A policy is a mapping from levels of capital to stakes. 
The optimal policy maximizes the probability of reaching the goal. Let p_h denote the probability of the coin coming up heads. 
If p_h is known, then the entire problem is known and it can be solved, for instance, by value iteration.
'''

In [None]:
import numpy as np
import sys
import matplotlib.pyplot as plt
if "../" not in sys.path:
  sys.path.append("../") 

In [None]:
# Exercise 4.9 (programming)
# Implement value iteration for the gambler’s problem and solve it for p_h = 0.25 and p_h = 0.55.

In [None]:
def value_iteration_for_gamblers(p_h, theta=0.0001, discount_factor=1.0):
    """Args:
    p_h: Probability of the coin coming up heads
    """
    # The reward is zero on all transitions except those on which the gambler reaches his goal,
    # when it is +1.
    rewards = np.zeros(101)
    rewards[100] = 1 
    
    # We introduce two dummy states corresponding to termination with capital of 0 and 100
    V = np.zeros(101)
    
    def one_step_lookahead(s, V, rewards):
        """Helper function to calculate the value for all action in a given state.
        Args:
            s: The gambler’s capital. Integer.
            V: The vector that contains values at each state. 
            rewards: The reward vector.
                        
        Returns:
            A vector containing the expected value of each action. 
            Its length equals to the number of actions.
        """
        A = np.zeros(101)
        stakes = range(1, min(s, 100-s)+1) # Your minimum bet is 1, maximum bet is min(s, 100-s).
        for a in stakes:
            # rewards[s+a], rewards[s-a] are immediate rewards.
            # V[s+a], V[s-a] are values of the next states.
            # This is the core of the Bellman equation: The expected value of your action is 
            # the sum of immediate rewards and the value of the next state.
            A[a] = p_h * (rewards[s+a] + V[s+a]*discount_factor) + (1-p_h) * (rewards[s-a] + V[s-a]*discount_factor)
        return A
    
    while True:
        # Stopping condition
        delta = 0
        # Update each state...
        for s in range(1, 100):
            # Do a one-step lookahead to find the best action
            A = one_step_lookahead(s, V, rewards)
            # print(s,A,V) # if you want to debug.
            best_action_value = np.max(A)
            # Calculate delta across all states seen so far
            delta = max(delta, np.abs(best_action_value - V[s]))
            # Update the value function. Ref: Sutton book eq. 4.10. 
            V[s] = best_action_value        
        # Check if we can stop 
        if delta < theta:
            break
    
    # Create a deterministic policy using the optimal value function
    policy = np.zeros(100)
    for s in range(1, 100):
        # One step lookahead to find the best action for this state
        A = one_step_lookahead(s, V, rewards)
        best_action = np.argmax(A)
        # Always take the best action
        policy[s] = best_action
    
    return policy, V

In [None]:
policy, v = value_iteration_for_gamblers(0.25)

print("Optimized Policy:")
print(policy)
print("")

print("Optimized Value Function:")
print(v)
print("")

In [None]:
### Show your results graphically
# Plotting Final Policy (action stake) vs State (Capital)

# x axis values
x = range(100)
# corresponding y axis values
y = v[:100]
 
# plotting the points 
plt.plot(x, y)
 
# naming the x axis
plt.xlabel('Capital')
# naming the y axis
plt.ylabel('Value Estimates')
 
# giving a title to the graph
plt.title('Final Policy (action stake) vs State (Capital)')
 
# function to show the plot
plt.show()

In [None]:
# Plotting Capital vs Final Policy

# x axis values
x = range(100)
# corresponding y axis values
y = policy
 
# plotting the bars
plt.figure(figsize=(16,8))  ##
plt.bar(x, y, align='center', alpha=0.5)
plt.grid(axis='y')  ###
 
# naming the x axis
plt.xlabel('Capital')
# naming the y axis
plt.ylabel('Final policy (stake)')
 
# giving a title to the graph
plt.title('Capital vs Final Policy')
 
# function to show the plot
plt.show()


In [None]:
policy_2, v_2 = value_iteration_for_gamblers(0.55)

print("Optimized Policy:")
print(policy_2)
print("")

print("Optimized Value Function:")
print(v_2)
print("")

In [None]:
### Show your results graphically
# Plotting Final Policy (action stake) vs State (Capital)

# x axis values
x = range(100)
# corresponding y axis values
y = v_2[:100]
 
# plotting the points 
plt.plot(x, y)
 
# naming the x axis
plt.xlabel('Capital')
# naming the y axis
plt.ylabel('Value Estimates')
 
# giving a title to the graph
plt.title('Final Policy (action stake) vs State (Capital)')
 
# function to show the plot
plt.show()

In [None]:
# Plotting Capital vs Final Policy

# x axis values
x = range(100)
# corresponding y axis values
y = policy_2
 
# plotting the bars
plt.figure(figsize=(16,8))  ##
plt.bar(x, y, align='center', alpha=0.5)
plt.grid(axis='y')  ###
 
# naming the x axis
plt.xlabel('Capital')
# naming the y axis
plt.ylabel('Final policy (stake)')
 
# giving a title to the graph
plt.title('Capital vs Final Policy')
 
# function to show the plot
plt.show()


In [None]:
policy_3, v_3 = value_iteration_for_gamblers(0.75)

print("Optimized Policy:")
print(policy_3)
print("")

print("Optimized Value Function:")
print(v_3)
print("")

In [None]:
### Show your results graphically
# Plotting Final Policy (action stake) vs State (Capital)

# x axis values
x = range(100)
# corresponding y axis values
y = v_3[:100]
 
# plotting the points 
plt.plot(x, y)
 
# naming the x axis
plt.xlabel('Capital')
# naming the y axis
plt.ylabel('Value Estimates')
 
# giving a title to the graph
plt.title('Final Policy (action stake) vs State (Capital)')
 
# function to show the plot
plt.show()

In [None]:
# Plotting Capital vs Final Policy

# x axis values
x = range(100)
# corresponding y axis values
y = policy_3
 
# plotting the bars
plt.figure(figsize=(16,8))  ##
plt.bar(x, y, align='center', alpha=0.5)
plt.grid(axis='y')  ###
 
# naming the x axis
plt.xlabel('Capital')
# naming the y axis
plt.ylabel('Final policy (stake)')
 
# giving a title to the graph
plt.title('Capital vs Final Policy')
 
# function to show the plot
plt.show()


#### <font color=green>**2.3.** </font> CartPole NumPyで実装 その１

In [None]:
## 出典 : https://deepage.net/machine_learning/2017/08/10/reinforcement-learning.html

In [None]:
import gym
import numpy as np

env = gym.make('CartPole-v0')

goal_average_steps = 195
max_number_of_steps = 200
num_consecutive_iterations = 100
num_episodes = 5000
last_time_steps = np.zeros(num_consecutive_iterations)

for episode in range(num_episodes):
    # 環境の初期化
    observation = env.reset()

    episode_reward = 0
    for t in range(max_number_of_steps):
        # CartPoleの描画
        #env.render()

        # ランダムで行動の選択
        action = np.random.choice([0, 1])

        # 行動の実行とフィードバックの取得
        observation, reward, done, info = env.step(action)
        episode_reward += reward

        if done:
            print('%d Episode finished after %d time steps / mean %f' % (episode, t + 1,
                last_time_steps.mean()))
            last_time_steps = np.hstack((last_time_steps[1:], [episode_reward]))
            break

    if (last_time_steps.mean() >= goal_average_steps): # 直近の100エピソードが195以上であれば成功
        print('Episode %d train agent successfuly!' % episode)
        break

In [None]:
q_table = np.random.uniform(low=-1, high=1, size=(4 ** 4, env.action_space.n))

def bins(clip_min, clip_max, num):
    return np.linspace(clip_min, clip_max, num + 1)[1:-1]

def digitize_state(observation):
    # 各値を4個の離散値に変換
    cart_pos, cart_v, pole_angle, pole_v = observation
    digitized = [np.digitize(cart_pos, bins=bins(-2.4, 2.4, 4)),
                 np.digitize(cart_v, bins=bins(-3.0, 3.0, 4)),
                 np.digitize(pole_angle, bins=bins(-0.5, 0.5, 4)),
                 np.digitize(pole_v, bins=bins(-2.0, 2.0, 4))]
    # 0~255に変換
    return sum([x * (4 ** i) for i, x in enumerate(digitized)])

In [None]:
def get_action(state, action, observation, reward):
    next_state = digitize_state(observation)
    next_action = np.argmax(q_table[next_state])

    # Qテーブルの更新
    alpha = 0.2
    gamma = 0.99
    q_table[state, action] = (1 - alpha) * q_table[state, action] +\
            alpha * (reward + gamma * q_table[next_state, next_action])

    return next_action, next_state

last_time_steps = np.zeros(num_consecutive_iterations)
for episode in range(num_episodes):
    # 環境の初期化
    observation = env.reset()

    state = digitize_state(observation)
    action = np.argmax(q_table[state])

    episode_reward = 0
    for t in range(max_number_of_steps):
        # CartPoleの描画
        #env.render()

        # 行動の実行とフィードバックの取得
        observation, reward, done, info = env.step(action)

        # 行動の選択
        action, state = get_action(state, action, observation, reward)
        episode_reward += reward

        if done:
            print('%d Episode finished after %d time steps / mean %f' % (episode, t + 1,
                last_time_steps.mean()))
            last_time_steps = np.hstack((last_time_steps[1:], [episode_reward]))
            break

    if (last_time_steps.mean() >= goal_average_steps): # 直近の100エピソードが195以上であれば成功
        print('Episode %d train agent successfuly!' % episode)
        break

In [None]:
def get_action_2(state, action, observation, reward):
    next_state = digitize_state(observation)

    epsilon = 0.2
    if  epsilon <= np.random.uniform(0, 1):
        next_action = np.argmax(q_table[next_state])
    else:
        next_action = np.random.choice([0, 1])

    # Qテーブルの更新
    alpha = 0.2
    gamma = 0.99
    q_table[state, action] = (1 - alpha) * q_table[state, action] +\
            alpha * (reward + gamma * q_table[next_state, next_action])

    return next_action, next_state

last_time_steps = np.zeros(num_consecutive_iterations)
for episode in range(num_episodes):
    # 環境の初期化
    observation = env.reset()

    state = digitize_state(observation)
    action = np.argmax(q_table[state])

    episode_reward = 0
    for t in range(max_number_of_steps):
        # CartPoleの描画
        #env.render()

        # 行動の実行とフィードバックの取得
        observation, reward, done, info = env.step(action)

        # 行動の選択
        action, state = get_action_2(state, action, observation, reward)
        episode_reward += reward

        if done:
            print('%d Episode finished after %d time steps / mean %f' % (episode, t + 1,
                last_time_steps.mean()))
            last_time_steps = np.hstack((last_time_steps[1:], [episode_reward]))
            break

    if (last_time_steps.mean() >= goal_average_steps): # 直近の100エピソードが195以上であれば成功
        print('Episode %d train agent successfuly!' % episode)
        break

In [None]:
def get_action_3(state, action, observation, reward, episode):
    next_state = digitize_state(observation)

    epsilon = 0.5 * (0.99 ** episode)
    if  epsilon <= np.random.uniform(0, 1):
        next_action = np.argmax(q_table[next_state])
    else:
        next_action = np.random.choice([0, 1])

    # Qテーブルの更新
    alpha = 0.2
    gamma = 0.99
    q_table[state, action] = (1 - alpha) * q_table[state, action] +\
            alpha * (reward + gamma * q_table[next_state, next_action])

    return next_action, next_state

last_time_steps = np.zeros(num_consecutive_iterations)
for episode in range(num_episodes):
    # 環境の初期化
    observation = env.reset()

    state = digitize_state(observation)
    action = np.argmax(q_table[state])

    episode_reward = 0
    for t in range(max_number_of_steps):
        # CartPoleの描画
        #env.render()

        # 行動の実行とフィードバックの取得
        observation, reward, done, info = env.step(action)

        # 行動の選択
        action, state = get_action_3(state, action, observation, reward, episode)
        episode_reward += reward

        if done:
            print('%d Episode finished after %d time steps / mean %f' % (episode, t + 1,
                last_time_steps.mean()))
            last_time_steps = np.hstack((last_time_steps[1:], [episode_reward]))
            break

    if (last_time_steps.mean() >= goal_average_steps): # 直近の100エピソードが195以上であれば成功
        print('Episode %d train agent successfuly!' % episode)
        break

In [None]:
last_time_steps = np.zeros(num_consecutive_iterations)
for episode in range(num_episodes):
    # 環境の初期化
    observation = env.reset()

    state = digitize_state(observation)
    action = np.argmax(q_table[state])

    episode_reward = 0
    for t in range(max_number_of_steps):
        # CartPoleの描画
        #env.render()

        # 行動の実行とフィードバックの取得
        observation, reward, done, info = env.step(action)

        # 罰則の追加
        if done:
            reward = -200

        # 行動の選択
        action, state = get_action_3(state, action, observation, reward, episode)

        if done:
            print('%d Episode finished after %f time steps / mean %f' % (episode, t + 1,
                last_time_steps.mean()))
            last_time_steps = np.hstack((last_time_steps[1:], [t + 1]))
            break

    if (last_time_steps.mean() >= goal_average_steps): # 直近の100エピソードが195以上であれば成功
        print('Episode %d train agent successfuly!' % episode)
        break

### <font color=blue>**3.** </font> 方策勾配法

#### <font color=green>**3.1.** </font> 迷路を解く　その３

In [None]:
## 出典 : https://book.mynavi.jp/manatee/detail/id=88297

In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# 初期位置での迷路の様子
 
# 図を描く大きさと、図の変数名を宣言
fig = plt.figure(figsize=(5, 5))
ax = plt.gca()
 
# 赤い壁を描く
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)
 
# 状態を示す文字S0～S8を描く
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')
 
# 描画範囲の設定と目盛りを消す設定
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom='off', top='off',
                labelbottom='off', right='off', left='off', labelleft='off')
 
# 現在値S0に緑丸を描画する
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

In [None]:
# 初期の方策を決定するパラメータtheta_0を設定
 
# 行は状態0～7、列は移動方向で↑、→、↓、←を表す
theta_0 = np.array([[np.nan, 1, 1, np.nan],  # s0
                    [np.nan, 1, np.nan, 1],  # s1
                    [np.nan, np.nan, 1, 1],  # s2
                    [1, 1, 1, np.nan],  # s3
                    [np.nan, np.nan, 1, 1],  # s4
                    [1, np.nan, np.nan, np.nan],  # s5
                    [1, np.nan, np.nan, np.nan],  # s6
                    [1, 1, np.nan, np.nan],  # s7、※s8はゴールなので、方策はなし
                    ])

In [None]:
# 方策パラメータthetaを行動方策piにソフトマックス関数で変換する手法の定義
 
def softmax_convert_into_pi_from_theta(theta):
    '''ソフトマックス関数で割合を計算する'''
 
    beta =1.0
    [m, n] = theta.shape  # thetaの行列サイズを取得
    pi = np.zeros((m, n))
 
    exp_theta = np.exp(beta*theta) # thetaをexp(theta)へと変換
 
    for i in range(0, m):
        # pi[i, :] = theta[i, :] / np.nansum(theta[i, :])  # simpleに割合の計算の場合
        pi[i, :] = exp_theta[i, :] / np.nansum(exp_theta[i, :])  # simpleに割合の計算の場合
 
    pi = np.nan_to_num(pi)  # nanを0に変換
 
    return pi

In [None]:
# 初期の方策pi_0を求める
 
pi_0 = softmax_convert_into_pi_from_theta(theta_0)
print(pi_0)

In [None]:
# 行動と1step移動後の状態sとを求める関数を定義
 
def get_action_and_next_s(pi, s):
    direction = ["up", "right", "down", "left"]
    # pi[s,:]の確率に従って、directionが選択される
    next_direction = np.random.choice(direction, p=pi[s, :])
 
    if next_direction == "up":
        action = 0
        s_next = s - 3  # 上に移動するときは状態の数字が3小さくなる
    elif next_direction == "right":
        action = 1
        s_next = s + 1  # 右に移動するときは状態の数字が1大きくなる
    elif next_direction == "down":
        action = 2
        s_next = s + 3  # 下に移動するときは状態の数字が3大きくなる
    elif next_direction == "left":
        action = 3
        s_next = s - 1  # 左に移動するときは状態の数字が1小さくなる
 
    return [action, s_next]

In [None]:
# 迷路を解く関数の定義、状態と行動の履歴を出力
 
def goal_maze_ret_s_a(pi):
    s = 0  # スタート地点
    s_a_history = [[0, np.nan]]  # エージェントの移動を記録するリスト
 
    while (1):  # ゴールするまでループ
        [action, next_s] = get_action_and_next_s(pi, s)
        s_a_history[-1][1] = action
        # 現在の状態（つまり一番最後なのでindex=-1）の行動を代入
 
        s_a_history.append([next_s, np.nan])
        # 次の状態を代入。行動はまだ分からないのでnanにしておく
 
        if next_s == 8:  # ゴール地点なら終了
            break
        else:
            s = next_s
 
    return s_a_history

In [None]:
s_a_history = goal_maze_ret_s_a(pi_0)
print(s_a_history)
print("迷路を解くのにかかったステップ数は"+str(len(s_a_history)-1)+"です")

In [None]:
# thetaの更新関数を定義します
 
def update_theta(theta, pi, s_a_history):
    eta = 0.1  # 学習率
    T = len(s_a_history) - 1  # ゴールまでの総ステップ数
 
    [m, n] = theta.shape  # thetaの行列サイズを取得
    delta_theta = theta.copy()  # Δthetaの元を作成、ポインタ参照なので、delta_theta = thetaはダメ
 
    # delta_thetaを要素ごとに求めます
    for i in range(0, m):
        for j in range(0, n):
            if not(np.isnan(theta[i, j])):  # thetaがnanでない場合
                # 履歴から状態iのものを取り出すリスト内包表記です
                SA_i = [SA for SA in s_a_history if SA[0] == i]
                SA_ij = [SA for SA in s_a_history if SA ==
                         [i, j]]  # 状態iで行動jをしたものを取り出す
                N_i = len(SA_i)  # 状態iで行動した総回数
                N_ij = len(SA_ij)  # 状態iで行動jをとった回数
                delta_theta[i, j] = (N_ij + pi[i, j] * N_i) / T
 
    new_theta = theta + eta * delta_theta
 
    return new_theta

In [None]:
new_theta = update_theta(theta_0, pi_0, s_a_history)
pi = softmax_convert_into_pi_from_theta(new_theta)
print(pi)

In [None]:
# 方策勾配法で迷路を解く
 
stop_epsilon = 10**-8  # 10^-8よりも方策に変化が少なくなったら学習終了とする
 
theta = theta_0
pi = pi_0
 
is_continue = True
count = 1
while is_continue:  # is_continueがFalseになるまで繰り返す
    s_a_history = goal_maze_ret_s_a(pi)  # 方策πで迷路内を探索した履歴を求める
    new_theta = update_theta(theta, pi, s_a_history)  # パラメータΘを更新
    new_pi = softmax_convert_into_pi_from_theta(new_theta)  # 方策πの更新
 
    print(np.sum(np.abs(new_pi - pi)))  # 方策の変化を出力
    print("迷路を解くのにかかったステップ数は" + str(len(s_a_history) - 1) + "です")
 
    if np.sum(np.abs(new_pi - pi)) < stop_epsilon:
        is_continue = False
    else:
        theta = new_theta
        pi = new_pi

In [None]:
np.set_printoptions(precision=3, suppress=True) #有効桁数3、指数表示しないという設定
print(pi)

In [None]:
# エージェントの移動の様子を可視化
# 参考URL http://louistiao.me/posts/notebooks/embedding-matplotlib-animations-in-jupyter-notebooks/
from matplotlib import animation
from IPython.display import HTML
 
 
def init():
    # 背景画像の初期化
    line.set_data([], [])
    return (line,)
 
 
def animate(i):
    # フレームごとの描画内容
    state = s_a_history[i][0]  # 現在の場所を描く
    x = (state % 3) + 0.5  # 状態のx座標は、3で割った余り+0.5
    y = 2.5 - int(state / 3)  # y座標は3で割った商を2.5から引く
    line.set_data(x, y)
    return (line,)
 
 
#　初期化関数とフレームごとの描画関数を用いて動画を作成
anim = animation.FuncAnimation(fig, animate, init_func=init, frames=len(
    s_a_history), interval=200, repeat=False)
 
HTML(anim.to_html5_video())

#### <font color=green>**3.2.** </font> CartPole NumPyで実装 その２

In [None]:
## 出典 : https://deepage.net/features/numpy-rl.html

In [None]:
import gym
env = gym.make("CartPole-v0")

observation = env.reset()

In [None]:
# とりあえず右に押して見る
action = 1

# stepを実行すると行動を起こした直後の状態、報酬、ゲームが終了したかどうか、情報の４つの変数が返される
observation, reward, done, info = env.step(action)

#env.render()

In [None]:
import numpy as np
observation = env.reset()

for k in range(100):
    #env.render()
    observation, reward, done, info = env.step(np.random.randint(1)) # 0か1の乱数で実行
    env.reset()

env.close() # 終了するときはenv.close()を実行する必要がある。

In [None]:
##########
# Q-learning
##########

In [None]:
# gymとNumPyのインポート。
import gym
import numpy as np

In [None]:
env = gym.make('CartPole-v0') # 環境に相当するオブジェクトをenvとおく。

goal_average_steps = 195 # 195ステップ連続でポールが倒れないことを目指す
max_number_of_steps = 200 # 最大ステップ数
num_consecutive_iterations = 100 # 評価の範囲のエピソード数
num_episodes = 5000
last_time_steps = np.zeros(num_consecutive_iterations)

# 価値関数の値を保存するテーブルを作成する。
# np.random.uniformは指定された範囲での一様乱数を返す。
q_table = np.random.uniform(low=-1, high=1, size=(4**4, env.action_space.n))

In [None]:
def bins(clip_min, clip_max, num):
    return np.linspace(clip_min, clip_max, num + 1)[1:-1]
    # np.linspaceは指定された範囲における等間隔数列を返す。

In [None]:
def digitize_state(observation):
    # 各値を４個の離散値に変換
    # np.digitizeは与えられた値をbinsで指定した基数に当てはめる関数。インデックスを返す。
    cart_pos, cart_v, pole_angle, pole_v = observation
    digitized = [np.digitize(cart_pos, bins=bins(-2.4, 2.4, 4)),
                 np.digitize(cart_v, bins = bins(-3.0, 3.0, 4)),
                 np.digitize(pole_angle, bins=bins(-0.5, 0.5, 4)),
                 np.digitize(pole_v, bins=bins(-2.0, 2.0, 4))]

    # 0~255に変換
    return sum([x* (4**i) for i, x in enumerate(digitized)]) # インデックス付きループをすることができる。

In [None]:
def get_action(state, action, observation, reward, episode):
    next_state = digitize_state(observation)
    epsilon = 0.5 * (0.99** episode)
    if epsilon <= np.random.uniform(0, 1): # もし一様乱数のほうが大きければ
        next_action = np.argmax(q_table[next_state])# q_tableの中で次に取りうる行動の中で最も価値の高いものを
                                                    # next_actionに格納する
    else:# そうでなければ20%の確率でランダムな行動を取る
        next_action = np.random.choice([0, 1])


    # Qテーブルの更新
    alpha = 0.2
    gamma = 0.99
    q_table[state, action] = (1 - alpha) * q_table[state, action] + \
            alpha * (reward + gamma * q_table[next_state, next_action])
    return next_action, next_state

In [None]:
step_list = []
for episode in range(num_episodes):
    # 環境の初期化
    observation = env.reset()

    state = digitize_state(observation)
    action = np.argmax(q_table[state])

    episode_reward = 0
    for t in range(max_number_of_steps):
        #env.render() # CartPoleの描画

        observation, reward, done, info = env.step(action) # actionを取ったときの環境、報酬、状態が終わったかどうか、デバッグに有益な情報

        if done: # 倒れた時罰則を追加する
            reward -= 200
        # 行動の選択
        action, state = get_action(state, action, observation, reward, episode)
        episode_reward += reward


        if done:
            print('%d Episode finished after %f time steps / mean %f' %
                (episode, t + 1, last_time_steps.mean()))
            last_time_steps = np.hstack((last_time_steps[1:], [t+1]))
            # 継続したステップ数をステップのリストの最後に加える。np.hstack関数は配列をつなげる関数。
            step_list.append(t+1)
            break

    if (last_time_steps.mean() >= goal_average_steps): # 直近の100エピソードの平均が195以上であれば成功
        print('Episode %d train agent successfully!' % episode)
        break

In [None]:
# 以下のコードでグラフを表示
import matplotlib.pyplot as plt
plt.plot(np.arange(len(step_list)), step_list)
plt.xlabel('episode')
plt.ylabel('max_step')

In [None]:
##########
# 方策勾配法
##########

In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt

In [None]:
def do_episode(w, env):
    done = False
    observation = env.reset()
    num_steps = 0

    while not done and num_steps <= max_number_of_steps:
        action = take_action(observation, w)
        observation, _, done, _ = env.step(action)
        num_steps += 1
    # ここで報酬を与える。基本的に(連続したステップ数)-(最大ステップ数)で与えられる。
    step_val = -1 if num_steps >= max_number_of_steps else num_steps - max_number_of_steps
    return step_val, num_steps

In [None]:
def take_action(X, w): # 値が0を超えたら1を返すようにする
    action = 1 if calculate(X, w) > 0.0 else 0
    return action

In [None]:
def calculate(X, w):
    result = np.dot(X, w) # 返り値は配列ではなく、１つの値になる。
    return result

In [None]:
env = gym.make('CartPole-v0')

# env.render()
# ゲームの様子を見たいときは env.render()を実行すれば良い

eta = 0.2
sigma = 0.05 # パラメーターを変動させる値の標準偏差

max_episodes = 5000 # 学習を行う最大エピソード数
max_number_of_steps = 200
n_states = 4 # 入力のパラメーター数
num_batch = 10
num_consecutive_iterations = 100 # 評価の範囲のエピソード数

In [None]:
w = np.random.randn(n_states)
reward_list = np.zeros(num_batch)
last_time_steps = np.zeros(num_consecutive_iterations)
mean_list = [] # 学習の進行具合を過去100エピソードのステップ数の平均で記録する

In [None]:
for episode in range(max_episodes//num_batch):
    N = np.random.normal(scale=sigma,size=(num_batch, w.shape[0]))
    # パラメーターの値を変動させるための値。これが偏差になる。

    for i in range(num_batch):
        w_try = w + N[i]
        reward, steps = do_episode(w_try, env)
        if i == num_batch-1:
            print('%d Episode finished after %d steps / mean %f' %(episode*num_batch, steps, last_time_steps.mean()))
        last_time_steps = np.hstack((last_time_steps[1:], [steps]))
        reward_list[i] = reward
        mean_list.append(last_time_steps.mean())
    if last_time_steps.mean() >= 195: break # 平均が195超えたら学習終了

    std = np.std(reward_list)
    if std == 0: std = 1
    # 報酬の値を正規化する
    A = (reward_list - np.mean(reward_list))/std
    # ここでパラメーターの更新を行う
    w_delta = eta /(num_batch*sigma) * np.dot(N.T, A)
    # 振れ幅を調整するためにsigmaをかけている。
    w += w_delta

env.close()

In [None]:
# グラフの表示
plt.plot(mean_list)
plt.show()

### <font color=blue>**4.** </font> DQN（Deep Q-Network）

#### <font color=green>**4.1.** </font> CartPole kerasで実装

In [None]:
## 出典 : https://qiita.com/sugulu_Ogawa_ISID/items/bc7c70e6658f204f85f9

In [None]:
# coding:utf-8
# [0]必要なライブラリのインポート
import gym  # 倒立振子(cartpole)の実行環境
import numpy as np
import time
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from keras.utils import plot_model
from collections import deque
from gym import wrappers  # gymの画像保存
from keras import backend as K
import tensorflow as tf

In [None]:
# [1]損失関数の定義
# 損失関数にhuber関数を使用します 参考https://github.com/jaara/AI-blog/blob/master/CartPole-DQN.py
def huberloss(y_true, y_pred):
    err = y_true - y_pred
    cond = K.abs(err) < 1.0
    L2 = 0.5 * K.square(err)
    L1 = (K.abs(err) - 0.5)
    loss = tf.where(cond, L2, L1)  # Keras does not cover where function in tensorflow :-(
    return K.mean(loss)

In [None]:
# [2]Q関数をディープラーニングのネットワークをクラスとして定義
class QNetwork:
    def __init__(self, learning_rate=0.01, state_size=4, action_size=2, hidden_size=10):
        self.model = Sequential()
        self.model.add(Dense(hidden_size, activation='relu', input_dim=state_size))
        self.model.add(Dense(hidden_size, activation='relu'))
        self.model.add(Dense(action_size, activation='linear'))
        self.optimizer = Adam(lr=learning_rate)  # 誤差を減らす学習方法はAdam
        # self.model.compile(loss='mse', optimizer=self.optimizer)
        self.model.compile(loss=huberloss, optimizer=self.optimizer)

    # 重みの学習
    def replay(self, memory, batch_size, gamma, targetQN):
        inputs = np.zeros((batch_size, 4))
        targets = np.zeros((batch_size, 2))
        mini_batch = memory.sample(batch_size)

        for i, (state_b, action_b, reward_b, next_state_b) in enumerate(mini_batch):
            inputs[i:i + 1] = state_b
            target = reward_b

            if not (next_state_b == np.zeros(state_b.shape)).all(axis=1):
                # 価値計算（DDQNにも対応できるように、行動決定のQネットワークと価値観数のQネットワークは分離）
                retmainQs = self.model.predict(next_state_b)[0]
                next_action = np.argmax(retmainQs)  # 最大の報酬を返す行動を選択する
                target = reward_b + gamma * targetQN.model.predict(next_state_b)[0][next_action]

            targets[i] = self.model.predict(state_b)    # Qネットワークの出力
            targets[i][action_b] = target               # 教師信号

        # shiglayさんよりアドバイスいただき、for文の外へ修正しました
        self.model.fit(inputs, targets, epochs=1, verbose=0)  # epochsは訓練データの反復回数、verbose=0は表示なしの設定

In [None]:
# [3]Experience ReplayとFixed Target Q-Networkを実現するメモリクラス
class Memory:
    def __init__(self, max_size=1000):
        self.buffer = deque(maxlen=max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False)
        return [self.buffer[ii] for ii in idx]

    def len(self):
        return len(self.buffer)

In [None]:
# [4]カートの状態に応じて、行動を決定するクラス
# アドバイスいただき、引数にtargetQNを使用していたのをmainQNに修正しました
class Actor:
    def get_action(self, state, episode, mainQN):   # [C]ｔ＋１での行動を返す
        # 徐々に最適行動のみをとる、ε-greedy法
        epsilon = 0.001 + 0.9 / (1.0+episode)

        if epsilon <= np.random.uniform(0, 1):
            retTargetQs = mainQN.model.predict(state)[0]
            action = np.argmax(retTargetQs)  # 最大の報酬を返す行動を選択する

        else:
            action = np.random.choice([0, 1])  # ランダムに行動する

        return action

In [None]:
# [5] メイン関数開始----------------------------------------------------
# [5.1] 初期設定--------------------------------------------------------
DQN_MODE = 1    # 1がDQN、0がDDQNです
LENDER_MODE = 0 # 0は学習後も描画なし、1は学習終了後に描画する

env = gym.make('CartPole-v0')
num_episodes = 5  # 総試行回数   時間かかりすぎるので、299 -> 5
max_number_of_steps = 200  # 1試行のstep数
goal_average_reward = 195  # この報酬を超えると学習終了
num_consecutive_iterations = 10  # 学習完了評価の平均計算を行う試行回数
total_reward_vec = np.zeros(num_consecutive_iterations)  # 各試行の報酬を格納
gamma = 0.99    # 割引係数
islearned = 0  # 学習が終わったフラグ
isrender = 0  # 描画フラグ
# ---
hidden_size = 16               # Q-networkの隠れ層のニューロンの数
learning_rate = 0.00001         # Q-networkの学習係数
memory_size = 10000            # バッファーメモリの大きさ
batch_size = 32                # Q-networkを更新するバッチの大記載

In [None]:
# [5.2]Qネットワークとメモリ、Actorの生成--------------------------------------------------------
mainQN = QNetwork(hidden_size=hidden_size, learning_rate=learning_rate)     # メインのQネットワーク
targetQN = QNetwork(hidden_size=hidden_size, learning_rate=learning_rate)   # 価値を計算するQネットワーク
# plot_model(mainQN.model, to_file='Qnetwork.png', show_shapes=True)        # Qネットワークの可視化
memory = Memory(max_size=memory_size)
actor = Actor()

In [None]:
# [5.3]メインルーチン--------------------------------------------------------
for episode in range(num_episodes):  # 試行数分繰り返す
    env.reset()  # cartPoleの環境初期化
    state, reward, done, _ = env.step(env.action_space.sample())  # 1step目は適当な行動をとる
    state = np.reshape(state, [1, 4])   # list型のstateを、1行4列の行列に変換
    episode_reward = 0

    # 2018.05.16
    # skanmeraさんより間違いを修正いただきました
    # targetQN = mainQN   # 行動決定と価値計算のQネットワークをおなじにする
    # ↓
    targetQN.model.set_weights(mainQN.model.get_weights())

    for t in range(max_number_of_steps + 1):  # 1試行のループ
        if (islearned == 1) and LENDER_MODE:  # 学習終了したらcartPoleを描画する
            env.render()
            time.sleep(0.1)
            print(state[0, 0])  # カートのx位置を出力するならコメントはずす

        action = actor.get_action(state, episode, mainQN)   # 時刻tでの行動を決定する
        next_state, reward, done, info = env.step(action)   # 行動a_tの実行による、s_{t+1}, _R{t}を計算する
        next_state = np.reshape(next_state, [1, 4])     # list型のstateを、1行4列の行列に変換

        # 報酬を設定し、与える
        if done:
            next_state = np.zeros(state.shape)  # 次の状態s_{t+1}はない
            if t < 195:
                reward = -1  # 報酬クリッピング、報酬は1, 0, -1に固定
            else:
                reward = 1  # 立ったまま195step超えて終了時は報酬
        else:
            reward = 0  # 各ステップで立ってたら報酬追加（はじめからrewardに1が入っているが、明示的に表す）

        episode_reward += 1 # reward  # 合計報酬を更新

        memory.add((state, action, reward, next_state))     # メモリの更新する
        state = next_state  # 状態更新


        # Qネットワークの重みを学習・更新する replay
        if (memory.len() > batch_size) and not islearned:
            mainQN.replay(memory, batch_size, gamma, targetQN)

        if DQN_MODE:
        # 2018.06.12
        # shiglayさんさんより間違いを修正いただきました
        # targetQN = mainQN   # 行動決定と価値計算のQネットワークをおなじにする
        # ↓
            targetQN.model.set_weights(mainQN.model.get_weights())

        # 1施行終了時の処理
        if done:
            total_reward_vec = np.hstack((total_reward_vec[1:], episode_reward))  # 報酬を記録
            print('%d Episode finished after %f time steps / mean %f' % (episode, t + 1, total_reward_vec.mean()))
            break

    # 複数施行の平均報酬で終了を判断
    if total_reward_vec.mean() >= goal_average_reward:
        print('Episode %d train agent successfuly!' % episode)
        islearned = 1
        if isrender == 0:   # 学習済みフラグを更新
            isrender = 1

            # env = wrappers.Monitor(env, './movie/cartpoleDDQN')  # 動画保存する場合
            # 10エピソードだけでどんな挙動になるのか見たかったら、以下のコメントを外す
            # if episode>10:
            #    if isrender == 0:
            #        env = wrappers.Monitor(env, './movie/cartpole-experiment-1') #動画保存する場合
            #        isrender = 1
            #    islearned=1;

#### <font color=green>**4.2.** </font> CartPole DQN keras-rl2で実装

In [None]:
!apt-get -qq -y install libcusparse8.0 libnvrtc8.0 libnvtoolsext1 >/dev/null
!ln -snf /usr/lib/x86 64-linux-gnu/libnvrtc-builtins.so.8.0 /usr/lib/x86 64-linux-gnu/libnvrtc-builtins.so 
!apt-get -qq -y install xvfb freeglut3-dev ffmpeg >/dev/null
!pip -q install gym
!pip -q install pyglet
!pip -q install pyopengl
!pip -q install pyvirtualdisplay
!pip install keras-rl2

In [None]:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1024, 768))
display.start()

In [None]:
#####
# DQN
#####

In [None]:
import numpy
import gym
import tensorflow as tf

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten
from tensorflow.keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

In [None]:
if __name__ == "__main__":
  # 強化学習タスクの環境を構築する
  ENV_NAME = 'CartPole-v0'
  env = gym.make(ENV_NAME)
  numpy.random.seed(123)
  env.seed(123)
  nb_actions = env.action_space.n

  # DQNモデルを準備する
  model = Sequential()
  model.add(Flatten(input_shape=(1,) + env.observation_space.shape))
  model.add(Dense(16))
  model.add(Activation('relu'))
  model.add(Dense(16))
  model.add(Activation('relu'))
  model.add(Dense(16))
  model.add(Activation('relu'))
  model.add(Dense(nb_actions))
  model.add(Activation('linear'))
  print(model.summary())

  # DQNモデルを最適化する上での目的関数の設定
  memory = SequentialMemory(limit=10000, ## 50000 -> 10000
                            window_length=1)
  policy = BoltzmannQPolicy()
  dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory,
                 nb_steps_warmup=10, target_model_update=1e-2, policy=policy)
  dqn.compile(Adam(lr=1e-3), metrics=['mae'])

  # トレーニング
  history = dqn.fit(env, nb_steps=10000, ## 50000 -> 10000 
                    visualize=True, verbose=2)

  # トレーニングした重みの保存
  dqn.save_weights('dqn_{}_weights.h5f'.format(ENV_NAME), overwrite=True)

  # テスト
  dqn.test(env, nb_episodes=5, visualize=True)

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(16,8))
plt.plot(history.history['episode_reward'])
plt.show()

### <font color=blue>**5.** </font> A2C（Advantage Actor Critic）

#### <font color=green>**5.1.** </font> ライブラリ使用例 : Stable-Baselines3

In [None]:
!pip install stable-baselines3[extra]

In [None]:
!apt install swig
!pip install box2d box2d-kengz

In [None]:
import gym

from stable_baselines3 import A2C
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy

In [None]:
eval_env = gym.make("CartPole-v1")

# Parallel environments
env = make_vec_env("CartPole-v1", n_envs=4)

model = A2C("MlpPolicy", env, verbose=1)

mean_reward_1, std_reward_1 = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
model.learn(total_timesteps=25000)  ###
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)

print(f"mean_reward={mean_reward_1:.2f} +/- {std_reward_1}")
print(f"mean_reward={mean_reward:.2f} +/- {std_reward}")

In [None]:
from stable_baselines3 import DQN

eval_env = gym.make("CartPole-v1")

model = DQN("MlpPolicy", "CartPole-v1", verbose=1, exploration_final_eps=0.1, target_update_interval=250)

mean_reward_1, std_reward_1 = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
model.learn(total_timesteps=25000)  ###
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)

print(f"mean_reward={mean_reward_1:.2f} +/- {std_reward_1}")
print(f"mean_reward={mean_reward:.2f} +/- {std_reward}")

In [None]:
from stable_baselines3 import PPO

eval_env = gym.make("CartPole-v1")

model = PPO("MlpPolicy", "CartPole-v1", verbose=1)

mean_reward_1, std_reward_1 = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
model.learn(total_timesteps=25000)  ###
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)

print(f"mean_reward={mean_reward_1:.2f} +/- {std_reward_1}")
print(f"mean_reward={mean_reward:.2f} +/- {std_reward}")

In [None]:
## 別のゲームで比較

In [None]:
eval_env = gym.make('LunarLander-v2')

# Parallel environments
env = make_vec_env('LunarLander-v2', n_envs=4)

model = A2C("MlpPolicy", env, verbose=1)

mean_reward_1, std_reward_1 = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
model.learn(total_timesteps=25000)  ###
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)

print(f"mean_reward={mean_reward_1:.2f} +/- {std_reward_1}")
print(f"mean_reward={mean_reward:.2f} +/- {std_reward}")

In [None]:
eval_env = gym.make('LunarLander-v2')

model = DQN("MlpPolicy", 'LunarLander-v2', verbose=1, exploration_final_eps=0.1, target_update_interval=250)

mean_reward_1, std_reward_1 = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
model.learn(total_timesteps=25000)  ###
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)

print(f"mean_reward={mean_reward_1:.2f} +/- {std_reward_1}")
print(f"mean_reward={mean_reward:.2f} +/- {std_reward}")

In [None]:
eval_env = gym.make('LunarLander-v2')

model = PPO("MlpPolicy", 'LunarLander-v2', verbose=1)

mean_reward_1, std_reward_1 = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
model.learn(total_timesteps=25000)  ###
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)

print(f"mean_reward={mean_reward_1:.2f} +/- {std_reward_1}")
print(f"mean_reward={mean_reward:.2f} +/- {std_reward}")