<a href="https://colab.research.google.com/github/keng-oh/colab/blob/master/DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DQN
## 用語
|用語|意味|
|---|---|
|環境（environment）|エージェントがおかれた周囲の状況|
|エージェント（agent）|	環境の中で振る舞う主体|
|状態  s （situation）|	絶えず変化する環境の一時点のようす|
|行動  a （action）|	エージェントが環境の中で行う振る舞い|
|報酬  R （return）|	ある行動の直後に得られる得点|
|収益  G （gain）|	ある行動を選択し､その後のステップすべてで最適な選択をとったとき､将来的に見込まれる報酬の累積値｡累積報酬とも｡|
|状態価値  V （value）	|ある状態から得られる収益の期待値|
|行動価値  Q （quality）	|ある状態である行動をとったときに得られる収益の期待値|
|方策  π （policy）|	エージェントが行動を決定するためのアルゴリズム|

In [0]:
# ボール

import numpy as np
import copy

class Ball:
    '''落下してくるボールを定義したクラス'''
    
    def __init__(self, col):
        self.col = col
        self.row = 0  # ボールの初期位置は上端で固定

    def update(self):
        self.row += 1 # 下に1マス移動

    def isDroped(self, n_rows): # 画面外に出たかどうか
        return True if self.row >= n_rows else False # n_rows-1が下端の位置

In [0]:
# 環境全体

class CatchBall:
    '''
    環境全体を定義したクラス
    __init__: インスタンス作成
    reset: 環境をリセットする
    execute_action: 行動を環境に入力
    observe: 状態と報酬、エピソード終了判定を返す
    draw: 現在の画像を描画(`observe`で使用)  
    '''
    
    def __init__(self, time_limit=500, size=8, p_len=3,interval=5):
        self.screen_n_rows = size  # 画面の横幅
        self.screen_n_cols = size  # 画面の縦幅
        self.player_length = p_len # 棒の長さ
        self.enable_actions = (0, 1, 2)  # 行動のタプル
        self.frame_rate = 5  # フレームレート
        self.ball_post_interval = interval  # ボールの出現間隔
        self.ball_past_time = 0  # 最後にボールが出現してからの経過時間
        self.past_time = 0  # ゲームが開始してからの経過時間
        self.balls = []  # ボールのリスト
        self.time_limit = time_limit  # ゲームを強制的に打ち切るまでの時間

        self.reset()  # 環境を初期化
    def reset(self):  # 環境の初期化
        # 棒の位置を下端のランダムな位置に初期化
        self.player_row = self.screen_n_rows - 1
        self.player_col = np.random.randint(self.screen_n_cols - self.player_length)

        # ボールを1つ生成
        self.balls = []
        self.balls.append(Ball(np.random.randint(self.screen_n_cols)))

        # その他の初期化
        self.reward = 0
        self.terminal = False
        self.past_time = 0
        self.ball_past_time = 0

    def execute_action(self, action):  # 行動を環境に入力
        '''
        action:
            0: 停止
            1: 左へ移動
            2: 右へ移動
        '''
        if action == self.enable_actions[1]:
            #  左へ1つ移動
            #  左端(0)より小さくはしない
            self.player_col = max(0, self.player_col - 1)  
        elif action == self.enable_actions[2]:
            #  右へ1つ移動
            #  右端(self.screen_n_cols - self.player_length)より大きくはしない
            self.player_col = min(self.player_col + 1, self.screen_n_cols - self.player_length)
        else:
            # その場から動かない
            pass

        # ボールを更新
        for b in self.balls:
            b.update()
            
        # ボールが出現してからの経過時間がボールの出現間隔になったら
        if self.ball_past_time == self.ball_post_interval:
            self.ball_past_time = 0
            new_pos = np.random.randint(self.screen_n_cols)  # 新しいボールの位置を乱数で生成
            
            # 既にボールがある場合は一つ前のボールから最低でも棒の長さ分横軸の離れた位置に生成する
            while len(self.balls) > 0 and abs(new_pos - self.balls[-1].col) < self.player_length:
                    new_pos = np.random.randint(self.screen_n_cols) # 近い場合は再度乱数を生成
            self.balls.append(Ball(new_pos))  # 生成したボールをリストに追加
            
        self.ball_past_time += 1

        self.reward = 0
        self.terminal = False # エピソード終了判定

        self.past_time += 1
        if  self.past_time > self.time_limit: # 強制終了時間を過ぎたら終了
            self.terminal = True
        # リストは先に生成されたものから順番に入っているので、ボールの接触判定は先頭のもののみで良い
        if self.balls[0].row == self.screen_n_rows - 1: 
            if self.player_col <= self.balls[0].col < self.player_col + self.player_length:
                # キャッチ成功
                self.reward = 1
            else:
                # 失敗
                self.reward = -1
                self.terminal = True  # エピソード終了

        new_balls = [] # 次時刻でのボールのリスト
        
        # 画面中にあるもののみを次時刻に引き継ぐ
        for b in self.balls:
            if not b.isDroped(self.screen_n_rows):
                new_balls.append(b)
        self.balls = copy.copy(new_balls)

    def observe(self):  # 現在の状態・報酬と、エピソード終了判定を観測
        self.draw()  # 環境の画像を描画
        return self.screen, self.reward, self.terminal
        
    def draw(self):  # self.screenに現在の画面を格納する
        # 黒(0)で全画面を初期化
        self.screen = np.zeros((self.screen_n_rows, self.screen_n_cols))

        # 白(1)で棒を描画
        self.screen[self.player_row, self.player_col:self.player_col + self.player_length] = 1

        # 灰色(0.5)でボールを描画
        for b in self.balls:
            self.screen[b.row, b.col] = 0.5

In [0]:
# 不要な警告を非表示にする
import warnings
warnings.filterwarnings('ignore')

# ライブラリを読み込み
from collections import deque

from keras.layers.core import Dense, Flatten
from keras.layers import InputLayer
from keras.models import Sequential
from keras.optimizers import RMSprop
from keras.models import model_from_config
import tensorflow as tf

In [0]:
# ε-greedyのεを時刻毎に一定値ずつ減衰させる｡
# 最低値になったらその値で固定する
INITIAL_EXPLORATION = 1.0  # εの初期値
FINAL_EXPLORATION = 0.1  # εの最低値
EXPLORATION_STEPS = 500  # 最低値になるまでの時間

def clone_model(model, custom_objects={}):
    '''ターゲットネットワークの更新を行う'''
    
    config = {
        'class_name': model.__class__.__name__,
        'config': model.get_config(),
    }
    clone = model_from_config(config, custom_objects=custom_objects)
    clone.set_weights(model.get_weights())
    return clone

def huber_loss(y_true, y_pred):
    '''Huber損失'''
    
    error = tf.abs(y_pred - y_true)  # TD誤差の絶対値
    quadratic_part = tf.clip_by_value(error, 0.0, 1.0)  # クリッピングされた誤差
    linear_part = error - quadratic_part  # クリッピングによる減少分 = huber損失の線形部分
    loss = tf.reduce_sum(0.5 * tf.square(quadratic_part) + linear_part) # huber損失の計算
    tf.summary.scalar('loss', loss)

    return loss


class DQNAgent:
    '''
    DQNのエージェント｡以下のメソッドを持つ｡
    
    __init__: インスタンス作成
    Q_values: 行動価値関数の推定値を返す
    select_action: 行動をε-greedyで決定する
    update_exploration: ε-greedyのεを決定
    update_target_model: target networkを更新  
    store_experience:experience replayのための経験を保存
    experience_replay: experience replayを実行  
    '''
    
    def __init__(self, enable_actions, size=8):
        self.enable_actions = enable_actions  # とることのできる行動
        self.n_actions = len(self.enable_actions)  # 行動の総数 
        self.minibatch_size = 32  # バッチサイズ
        self.replay_memory_size = 5000  # experience replayの記憶する経験数
        self.learning_rate = 0.00025  # 学習率
        self.discount_factor = 0.9  # 割引累積報酬の累積率
        self.exploration = INITIAL_EXPLORATION  # εの初期値
        self.exploration_step = (INITIAL_EXPLORATION - FINAL_EXPLORATION) / EXPLORATION_STEPS  # εの減少量
        self.size = size # 画像の大きさ

        # experience replayのメモリ
        self.D = deque(maxlen=self.replay_memory_size)

        # lossを初期化
        self.current_loss = 0.0

        # DNNの初期化
        self.model = Sequential()
        self.model.add(InputLayer(input_shape=(self.size,self.size)))  # 画像を入力
        self.model.add(Flatten())  # 一次元配列にする
        self.model.add(Dense(64, activation='relu'))  # 中間層
        self.model.add(Dense(32, activation='relu'))  # 中間層
        self.model.add(Dense(self.n_actions, activation='linear', name='main_output'))  # 各行動についてそれぞれ出力

        self.model.compile(loss=huber_loss,
                           optimizer=RMSprop(lr=self.learning_rate),
                           metrics=['accuracy'])
        # target networkの初期化
        self.target_model = copy.copy(self.model)

    def Q_values(self, states, isTarget=False):
        '''Q値を返す'''
        model = self.target_model if isTarget else self.model  # どちらのネットワークを使うかを選ぶ
        res = model.predict(np.array([states]))
        return res[0]


    def select_action(self, states, epsilon):
        '''ε-貪欲法の実装'''
        if np.random.rand() <= epsilon:
            # ランダムに選択
            return np.random.choice(self.enable_actions)
        else:
            # greedyに選択
            return self.enable_actions[np.argmax(self.Q_values(states))]
        
    def update_exploration(self, num):
        '''εを指数関数的に減衰させる'''
        if self.exploration > FINAL_EXPLORATION:  # まだεが最低値になっていない
            self.exploration -= self.exploration_step  # 減少させる
            if self.exploration < FINAL_EXPLORATION:
                self.exploration = FINAL_EXPLORATION

        
    def update_target_model(self):
        '''target networkの更新'''
        self.target_model = clone_model(self.model)
        
        
    def store_experience(self, states, action, reward, states_1, terminal):
        '''メモリに記録'''
        self.D.append((states, action, reward, states_1, terminal))# (メモリが満杯のときは古いものから削除)
        return (len(self.D) >= self.replay_memory_size) # メモリが満杯かどうかを返す

    def experience_replay(self, step):
        '''経験再生'''
        state_minibatch = []  # 学習に使う状態
        y_minibatch = []  # 学習に使う教師データ

        # メモリからランダムにサンプリング
        minibatch_size = min(len(self.D), self.minibatch_size)
        minibatch_indexes = np.random.randint(0, len(self.D), minibatch_size)  # 取り出すデータのindex

        for j in minibatch_indexes:
            state_j, action_j, reward_j, state_j_1, terminal = self.D[j]  # データを取り出す
            action_j_index = self.enable_actions.index(action_j)  # indexを取得

            
            # 教師データの計算
            y_j = self.Q_values(state_j)  # 行動価値関数を推定
            # 行動価値関数のうち、実際にとった行動の部分を実際の報酬と1ステップ先の推定値を用いた値に更新
            if terminal:
                y_j[action_j_index] = reward_j  # 終端状態なら収益は即時報酬のみ
            else:
                v = np.max(self.Q_values(state_j_1, isTarget=True))
                y_j[action_j_index] = reward_j + self.discount_factor * v 
            # 教師データの計算終了

            #バッチに追加
            state_minibatch.append(state_j)
            y_minibatch.append(y_j)


        # 学習開始
        s = self.model.fit(np.array(state_minibatch),
                       np.array(y_minibatch),
                       batch_size=minibatch_size,
                       epochs=1,
                       verbose=0)
        # 損失関数を格納
        score = self.model.evaluate(np.array(state_minibatch), np.array(y_minibatch), batch_size=minibatch_size,verbose=0)
        self.current_loss = score[0]

In [0]:
# 経験させるエピソード数
n_epochs =300

# 画面サイズ
size = 6

# ボールの落ちる間隔
interval = 4

# 棒の長さ
p_len = 2

# 環境、エージェントの初期化
env = CatchBall(size=size,interval=interval,p_len=p_len)
agent = DQNAgent(env.enable_actions,size=size)

win = 0  # キャッチした回数
total_frame = 0  # 総フレーム数
e = 0  # エピソード数

while e < n_epochs:
    # 初期化
    frame = 0  # このエピソードが始まってからのフレーム数
    loss = 0.0  # 損失関数の合計(後で平均をログとして表示する)
    Q_max = 0.0  # 各時刻での行動価値関数の最大値の合計(後で平均をログとして表示する)
    env.reset()
    state_t_1, reward_t, terminal = env.observe()  # 初期状態の観測
    win = 0
    while not terminal:  # エピソードが終わるまで実行
        state_t = state_t_1

        # 行動を選択
        action_t = agent.select_action(state_t, agent.exploration)
        
        # 行動を実行
        env.execute_action(action_t)

        # 環境を観測
        state_t_1, reward_t, terminal = env.observe()

        # 経験を保存
        start_replay = agent.store_experience(state_t, action_t, reward_t, state_t_1, terminal)

        # experience replay
        if start_replay:  # メモリが貯まったら
            agent.update_exploration(e)  # εを更新
            agent.experience_replay(e)

        # 200ステップ毎にtarget networkを同期
        if total_frame % 200 == 0 and start_replay:
            agent.update_target_model()

        # ログ用
        frame += 1
        total_frame += 1
        loss += agent.current_loss
        Q_max += np.max(agent.Q_values(state_t))
        if reward_t == 1:
            win += 1

    if start_replay:        
        # ログを表示
        # 実行する時はコメントアウトを外してください。
        # print("EPOCH: {:03d}/{:03d} | WIN: {:03d} | LOSS: {:.4f} | Q_MAX: {:.4f}".format(e+1, n_epochs, win, loss / frame, Q_max / frame))
        win = 0

    # experience replayが始まった時点からエピソードをカウント
    if start_replay:
        e += 1

ValueError: ignored

In [0]:
import matplotlib.animation as animation
import matplotlib.pyplot as plt
from collections import deque

env.reset()

def init():  # 初期化
    state_t_1, reward_t, terminal = env.observe()
    img.set_array(state_t_1)
    plt.axis("off")
    return img


def animate(step):  # 繰り返し実行する部分
    global state_t_1, reward_t, terminal  # 次に実行したときに引き継ぐためにglobal

    if terminal:  # エピソード終了ならリセット
        env.reset()
    else:  # エピソードが続いているなら行動を選択
        state_t = state_t_1
        action_t = agent.select_action(state_t, 0.0)  # greedy方策(ε=0.0)
        env.execute_action(action_t)

    # 環境を観測
    state_t_1, reward_t, terminal = env.observe()

    # 表示する画像を描画
    img.set_array(state_t_1)
    plt.axis("off")
    return img

In [0]:
%matplotlib nbagg

# 動画表示準備
fig = plt.figure(figsize=(env.screen_n_rows / 2, env.screen_n_cols / 2),num='CatchBallGame')
img = plt.imshow(state_t_1, interpolation="none", cmap="gray")
# 動画再生
ani = animation.FuncAnimation(fig, animate, init_func=init, interval=(1000 / env.frame_rate), blit=True)