# 深層強化学習の実装
ディープラーニングを利用したQ学習を実装します。Q学習は強化学習の一種ですが、今回はニューラルネットワークを使って状態からQ値を求めます。Deep Q-Network（DQN）に近いですが、それよりも簡単な実装になります。  
今回は、強化学習により重力下で飛行する物体の制御を行います。


## 各設定
tensorflowとKerasのバージョンによっては、Kerasのコードでエラーが発生することがあります。エラーを回避するために、以下のセルでKerasのバージョンを指定してインストールします。  
以下のコードは、デフォルトのバージョンでエラーが発生しないときには必要ありません。

In [None]:
!pip install tensorflow==2.9.1
!pip install keras==2.9.0

なお、Googleの対応により上記のコードは近いうちに必要なくなるかと思います。  
今後tensorflowやKerasのバージョンアップにより同様の問題が発生する可能性がありますが、上記のようにしてバージョンを調整することによる対応が必要になります。  
上記のセルの実行後、**ランタイム→ランタイムを再起動**によりバージョンの更新が完了します。  
念のために、以下のコードによりバージョンを確認しておきましょう。

In [None]:
import tensorflow
import keras
print(tensorflow.__version__)
print(keras.__version__)

必要なモジュールのインポート、及び最適化アルゴリズムの設定を行います。

In [None]:
import numpy as np
import matplotlib.pyplot as plt 
from matplotlib import animation, rc

from keras.models import Sequential
from keras.layers import Dense, ReLU
from keras.optimizers import RMSprop

optimizer = RMSprop()

### Brainクラス
エージェントの頭脳となるクラスです。Q値を出力するニューラルネットワークを構築し、Q値が正解に近づくように訓練します。  
Q学習に用いる式は以下の通りです。  

$$ Q(s_t,a_t) \leftarrow Q(s_t,a_t) + \eta\left(R_{t+1}+\gamma \max_{a}Q(s_{t+1}, a) - Q(s_{t}, a_{t})\right) $$

ここで、$a_{t}$は行動、$s_t$は状態、$Q(s_t,a_t) $はQ値、$\eta$は学習係数、$R_{t+1}$は報酬、$\gamma$は割引率になります。  
次の状態における最大のQ値を使用するのですが、ディープラーニングの正解として用いるのは上記の式のうちの以下の部分です。  

$$R_{t+1}+\gamma \max_{a}Q(s_{t+1}, a_{t})$$

以下の`Brain`クラスにおけるtrainメソッドでは、正解として上記を用います。  
また、ある状態における行動を決定する`get_action`メソッドでは、ε-greedy法により行動が選択されます。




In [None]:
class Brain:
    def __init__(self, n_state, n_mid, n_action, gamma=0.9, r=0.99):
        self.eps = 1.0  # ε
        self.gamma = gamma  # 割引率
        self.r = r  # εの減衰率

        model = Sequential()
        model.add(Dense(n_mid, input_shape=(n_state,)))
        model.add(ReLU()) 
        model.add(Dense(n_mid))
        model.add(ReLU()) 
        model.add(Dense(n_action))
        model.compile(loss="mse", optimizer=optimizer)
        self.model = model

    def train(self, states, next_states, action, reward, terminal):
        q = self.model.predict(states)  
        next_q = self.model.predict(next_states)
        t = np.copy(q)
        if terminal:
            t[:, action] = reward  #  エピソード終了時の正解は、報酬のみ
        else:
            t[:, action] = reward + self.gamma*np.max(next_q, axis=1)
        self.model.train_on_batch(states, t)

    def get_action(self, states):
        q = self.model.predict(states)
        if np.random.rand() < self.eps:
            action = np.random.randint(q.shape[1], size=q.shape[0])
        else:
            action = np.argmax(q, axis=1)
        if self.eps > 0.1:  # εの下限
            self.eps *= self.r
        return action

### エージェントのクラス
エージェントをクラスとして実装します。  
x座標が-1から1まで、y座標が-1から1までの正方形の領域を考えますが、エージェントの初期位置は左端中央とします。  
そして、エージェントが右端に達した際は報酬として1を与え、終了とします。また、エージェントが上端もしくは下端に達した際は報酬として-1を与え、終了とします。上手く飛行できた場合は褒美を、失敗した際は罰を与えるイメージです。  
行動には、自由落下とジャンプの2種類があります。自由落下の場合は重量加速度をy速度に加えます。ジャンプの場合は、y速度を予め設定した値に変更します。

In [None]:
class Agent:
    def __init__(self, v_x, v_y_sigma, v_jump, brain):
        self.v_x = v_x  # x速度
        self.v_y_sigma = v_y_sigma  # y速度、初期値の標準偏差
        self.v_jump = v_jump  # ジャンプ速度
        self.brain = brain
        self.reset()

    def reset(self):
        self.x = -1  # 初期x座標
        self.y = 0  # 初期y座標
        self.v_y = self.v_y_sigma * np.random.randn()  # 初期y速度

    def step(self, g):  # 時間を1つ進める g:重力加速度
        states = np.array([[self.y, self.v_y]])
        self.x += self.v_x
        self.y += self.v_y

        reward = 0  # 報酬
        terminal = False  # 終了判定
        if self.x>1.0:
            reward = 1
            terminal = True
        elif self.y<-1.0 or self.y>1.0:
            reward = -1
            terminal = True
        reward = np.array([reward])

        action = self.brain.get_action(states)
        if action[0] == 0:
            self.v_y -= g   # 自由落下
        else:
            self.v_y = self.v_jump  # ジャンプ
        next_states = np.array([[self.y, self.v_y]])
        self.brain.train(states, next_states, action, reward, terminal)

        if terminal:
            self.reset()

## 環境のクラス
環境をクラスとして実装します。このクラスの役割は、重力加速度を設定し、時間を前に進めるのみです。

In [None]:
class Environment:
    def __init__(self, agent, g):
        self.agent = agent
        self.g = g

    def step(self):
        self.agent.step(self.g)
        return (self.agent.x, self.agent.y)

## アニメーション
今回は、matplotlibを使って物体の飛行をアニメーションで表します。  
アニメーションには、matplotlib.animationのFuncAnimation関数を使用します。  

In [None]:
def animate(environment, interval, frames):
    fig, ax = plt.subplots()
    plt.close()
    ax.set_xlim(( -1, 1))
    ax.set_ylim((-1, 1))
    sc = ax.scatter([], [])

    def plot(data):
        x, y = environment.step()
        sc.set_offsets(np.array([[x, y]]))
        return (sc,)

    return animation.FuncAnimation(fig, plot, interval=interval, frames=frames, blit=True)

## ランダムな行動
まずは、エージェントがランダムに行動する例をみていきましょう。`r`の値を1に設定し、εが減衰しないようにすることで、エージェントは完全にランダムな行動を選択するようになります。

In [None]:
n_state = 2
n_mid = 32
n_action = 2
brain = Brain(n_state, n_mid, n_action, r=1.0)  # εが減衰しない

v_x = 0.05
v_y_sigma = 0.1
v_jump = 0.2
agent = Agent(v_x, v_y_sigma, v_jump, brain)

g = 0.2
environment = Environment(agent, g)

anim = animate(environment, 50, 1024)
rc('animation', html='jshtml')
anim

運良く右端に到達することもありますが、大抵は上端もしくは下端にぶつかってしまいます。

## Q学習の導入
`r`の値を0.99に設定し、εが減衰するようにします。これにより、次第にQ学習が行われるようになります。

In [None]:
n_state = 2
n_mid = 32
n_action = 2
brain = Brain(n_state, n_mid, n_action, r=0.99)  # εが減衰する

v_x = 0.05
v_y_sigma = 0.1
v_jump = 0.2
agent = Agent(v_x, v_y_sigma, v_jump, brain)

g = 0.2
environment = Environment(agent, g)

anim = animate(environment, 50, 1024)
rc('animation', html='jshtml')
anim

学習が進むと、上下の端にぶつらずに飛べるようになります。  
なお、初期条件によっては学習に失敗することもあります。  

## DQNのテクニック
Deep Q-Network（DQN）では、一般的に以下のテクニックが使われます。

### Experience Replay
各ステップの内容をメモリに保存しておき、メモリからランダムに記録を取り出して学習を行います。  
これにより、ミニバッチ法を使用することが可能になります。  
また、ミニバッチに時間的にばらけた記録が入ることになるので、学習が安定します。  

### Fixed Target Q-Network
行動を決定するのに用いるmain-networkと、誤差の計算時に正解を求めるのに用いるtarget-networkを用意します。  
target-networkのパラメータは一定時間固定されますが、main-networkのパラメータはミニバッチごとに更新されます。そして、定期的にmain-networkのパラメータがtarget-networkに上書きされます。これにより、正解が短い時間で揺れ動く問題が低減され学習が安定すると考えられています。  

