# ゼロから作る Deep Learning 4 強化学習編 勉強ノート 第8章 〜DQN〜

ここでは、 Q 学習とニューラルネットワークを使った手法である「<font color="red">**DQN**</font>」について扱う。

## OpenAI Gym

OpenAI Gym はオープンソースライブラリであり、様々な強化学習のタスク(環境)が用意されている。  
ここでは、 OpenAI Gym の基本的な使い方について学ぶ。

### OpenAI Gym の基礎知識(*コーディング*)

OpenAI Gym には様々な環境が用意されているが、ここでは「カートポール」を見てみる。

In [None]:
# ライブラリのインポート
import gym

In [None]:
# OpenAI Gym から環境を指定する
env = gym.make("CartPole-v1")

これで「カートポール」という環境が生成された。  
カートポールは、カートを右もしくは左に動かしてボールのバランスを保つ。  
終了条件は、ポールのバランスが崩れるか、もしくはカートの位置がある範囲を超えて移動するかである。

In [None]:
# 初期状態
state = env.reset()
print(f"初期状態: {state}")

# 行動の次元数
action_space = env.action_space
print(f"行動の次元数: {action_space}")

状態は頭から順に

- カートの位置
- カートの速度
- ポールの角度
- ポールの角速度

を表している。  
行動の次元数は、 `Discrete(2)` という独自のクラスのインスタンスになっている。  
これは2つの行動の候補があることを意味している。  
具体的には `0` が左向き、 `1` が右向きにカートを移動させる行動に対応する。

それでは、実際に行動を起こして時間を1つ進めてみる。

In [None]:
# 左向きの行動を1回起こす
action = 0
next_state, reward, done, info = env.step(action)
print(next_state)

上記のように、 `env.step(action)` によって行動が実行されている。  
その結果として次の4つの情報が得られる。

- 次の状態(`next_state`)
- 報酬(`reward`)
- 終了かどうかのフラグ(`done`)
- 追加の情報(`info`)

### ランダムなエージェント(*コーディング*)

続いてコードを1つにまとめて動かしてみる。

In [None]:
# ライブラリのインポート
from IPython.display import clear_output
import matplotlib.pyplot as plt
import numpy as np

In [None]:
# インスタンスの初期化
env = gym.make("CartPole-v1", render_mode="rgb_array", new_step_api=True)
state = env.reset()
done = False

# 1回分のエピソードを動かす
while not done:
    action = np.random.choice([0, 1])
    next_state, reward, done, truncated, info = env.step(action)

    done = done or truncated

    frame = env.render()
    clear_output(wait=True)
    plt.figure(figsize=(10, 6), tight_layout=True)
    plt.imshow(frame[0])
    plt.axis("off")
    plt.show()

env.close()

## DQN のコア技術

Q 学習では、推定値を使って推定値を更新する。  
まだ正確ではない推定値を使って今ある推定値を更新するため、 Q 学習は不安定になりやすい性質がある。  
そこにニューラルネットワークのような表現力の高い関数近似法が加わると結果はさらに不安定になる。  
DQN は Q 学習とニューラルネットワークを組み合わせた手法であり、その特徴は、ニューラルネットワークの学習を安定させるために<font color="red">**経験再生**</font>(Experience Replay)と<font color="red">**ターゲットネットワーク**</font>(Target Network)という技術を使っていることである。

### 経験再生

Q 学習では、エージェントが環境に対して行動を行うたびにデータが生成される。  
具体的には、ある時間 $t$ において得られた $E_t = (S_t, A_t, R_t, S_{t+1})$ を使って Q 関数を更新する。  
ここで $E_t$ を経験データと呼ぶとする。  
この経験データは時間 $t$ が進むに従って得られるが、経験データ間には強い相関がある。  
つまり Q 学習では相関の強いデータを使って学習を行なっているため、教師あり学習のようにミニバッチ化をしてデータに偏りがないように学習させるということが出来ないのである。  
これを克服するのが経験再生である。

まずはエージェントが経験したデータ $E_t = (S_t, A_t, R_t, S_{t+1})$ を一度「バッファ」に保存する。  
そして、 Q 関数を更新する際はそのバッファから経験データをランダムに取り出す。  
これにより、経験データ間の相関が弱まり、偏りの少ないデータが得られる。  
さらに、経験データを繰り返し使うことができるため、データ効率が良くなる。  
ただし、経験再生は方策オフ型のアルゴリズムでしか使えないので、そこは注意が必要である。

### 経験再生の実装(*コーディング*)

経験再生のバッファは、現実的には無限にデータを格納することは出来ない。  
そこで、最大のサイズを例えば50000個などと予め設定しておく。  
その最大サイズを超えてデータを追加したい場合には古いデータから順に削除する。  
それでは、経験再生の仕組みを `ReplayBuffer` というクラスで実装する。

In [None]:
# ライブラリのインポート
from collections import deque
import random

In [None]:
# 経験再生の仕組みである ReplayBuffer クラスの実装
class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size = batch_size

    def add(self, state, action, reward, next_state, done):
        data = (state, action, reward, next_state, done)
        self.buffer.append(data)

    def __len__(self):
        return len(self.buffer)

    def get_batch(self):
        data = random.sample(self.buffer, self.batch_size)

        state = np.stack([x[0] for x in data])
        action = np.stack([x[1] for x in data])
        reward = np.stack([x[2] for x in data])
        next_state = np.stack([x[3] for x in data])
        done = np.stack([x[4] for x in data]).astype(np.int32)

        return state, action, reward, next_state, done

In [None]:
# インスタンスの初期化
env = gym.make("CartPole-v1")
replay_buffer = ReplayBuffer(buffer_size=10000, batch_size=32)

# 10回エピソードを繰り返す
for _ in range(10):
    state = env.reset()
    done = False

    while not done:
        action = 0
        next_state, reward, done, info = env.step(action)
        replay_buffer.add(state, action, reward, next_state, done)
        state = next_state

# ミニバッチを確認する
state, action, reward, next_state, done = replay_buffer.get_batch()
print(f"state: {state.shape}")
print(f"action: {action.shape}")
print(f"reward: {reward.shape}")
print(f"next_state: {next_state.shape}")
print(f"done: {done.shape}")

これでバッチサイズのデータが `np.ndarray` のインスタンスとして取り出されていることがわかった。

### ターゲットネットワーク

Q 学習では、 $Q(S_t, A_t)$ の値が、 TD ターゲット $R_t + \gamma \max_a Q(S_{t+1}, a)$ となるように Q 関数を更新する。  
TD ターゲットの値は、 Q 関数が更新されるたびに変動するので、教師あり学習のようにラベルが学習途中で変わらないということがない。  
これはターゲットネットワークというテクニックにより TD ターゲットを固定することで克服できる。

まずは Q 関数を表すオリジナルのネットワーク(`qnet` と呼ぶことにする)を用意する。  
それとは別にもう1つ同じ構造のネットワーク(`qnet_target` と呼ぶことにする)を用意する。  
`qnet` は通常の Q 学習によって更新を行い、 `qnet_target` は定期的に `qnet` の重みと同期するようにして、それ以外は重みパラメータを固定したままにする。  
あとは `qnet_target` を使って TD ターゲットの値を計算すれば、 TD ターゲットの変動が抑えられるのでニューラルネットワークの学習が安定することが期待される。

### ターゲットネットワークの実装(*コーディング*)

`DQNAgent` というエージェントのクラスを実装する。  
まずはこの `DQNAgent` クラスに対応できるように `ReplayBuffer` クラスと `QNet` クラスを改めて用意する。

In [None]:
!pip install -U japanize-matplotlib

In [None]:
# ライブラリのインポート
import japanize_matplotlib
import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
# PyTorch に対応した ReplayBuffer クラスの実装
class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size = batch_size

    def add(self, state, action, reward, next_state, done):
        data = (state, action, reward, next_state, done)
        self.buffer.append(data)

    def __len__(self):
        return len(self.buffer)

    def get_batch(self):
        data = random.sample(self.buffer, self.batch_size)

        state = torch.tensor(np.stack([x[0] for x in data]))
        action = torch.tensor(np.array([x[1] for x in data]))
        reward = torch.tensor(np.array([x[2] for x in data]))
        next_state = torch.tensor(np.stack([x[3] for x in data]))
        done = torch.tensor(np.array([x[4] for x in data]).astype(np.int32))

        return state, action, reward, next_state, done


# Q 関数のニューラルネットワークモデル
class QNet(nn.Module):
    def __init__(self, action_size):
        super(QNet, self).__init__()
        self.linear1 = nn.Linear(4, 128)
        self.linear2 = nn.Linear(128, 128)
        self.linear3 = nn.Linear(128, action_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        return x


# DQNAgent クラスの実装
class DQNAgent:
    def __init__(self):
        self.gamma = 0.98
        self.lr = 0.0005
        self.eps = 0.1
        self.buffer_size = 10000
        self.batch_size = 32
        self.action_size = 2

        self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
        self.qnet = QNet(self.action_size)
        self.qnet_target = QNet(self.action_size)
        self.optimizer = optim.Adam(self.qnet.parameters(), lr=self.lr)
        self.criterion = nn.MSELoss()

    def get_action(self, state):
        if np.random.rand() < self.eps:
            return np.random.choice(self.action_size)
        else:
            qs = self.qnet(state)
            return torch.argmax(qs).item()

    def update(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)
        if len(self.replay_buffer) < self.batch_size:
            return

        state, action, reward, next_state, done = self.replay_buffer.get_batch()
        qs = self.qnet(state)
        q = qs[np.arange(len(action)), action]

        with torch.no_grad():
            next_qs = self.qnet_target(next_state)
            next_q = next_qs.max(1)[0]
            target = reward + (1 - done) * self.gamma * next_q

        target = torch.tensor(target, dtype=torch.float32)
        loss = self.criterion(q, target)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def sync_qnet(self):
        self.qnet_target.load_state_dict(self.qnet.state_dict())

In [None]:
# 報酬をプロットする関数を定義
def reward_show(history: list) -> None:
    x = [i for i in range(1, len(history) + 1)]

    plt.figure(figsize=(8, 6), tight_layout=True)
    plt.title("報酬の遷移", size=15, color="red")
    plt.grid()
    plt.plot(x, history)

In [None]:
# 各クラスのインスタンスを生成
env = gym.make("CartPole-v1")
agent = DQNAgent()

# 300回のエピソードで実行
episodes = 300
sync_interval = 20
reward_history = []
for episode in range(episodes):
    state = env.reset()
    done = False
    total_reward = 0

    while not done:
        state = torch.tensor(state, dtype=torch.float32)
        action = agent.get_action(state)
        next_state, reward, done, info = env.step(action)

        agent.update(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

    if episode % sync_interval == 0:
        agent.sync_qnet()

    reward_history.append(total_reward)
    if episode % 10 == 0:
        print(f"episode: {episode}\t\ttotal reward: {total_reward}")

# 報酬の遷移画像を描画
reward_show(reward_history)

最初はすぐにバランスを崩すが、ある程度エピソードを繰り返すことで「コツ」を掴み始める。  
その後は振れ幅が大きいが、概ね良い方向に学習ができていることがわかる。

## DQN と Atari

DQN は "[Playing Atari with Deep Reinforcement Learning](https://arxiv.org/abs/1312.5602)" において提案された手法であり、 Atari はコンピュータゲームを作成する会社の名前である。  
強化学習の分野では Atari が制作した昔のゲームソフトを指して "Atari" と呼んでいる。

### Atari のゲーム環境

OpenAI Gym には Atari の様々なゲーム環境が用意されている。  
「Pong」というゲームは、敵とユーザーがボードを動かしてボールを打ち合うゲームである。  
ボールが相手の陣地を越えれば得点が入る。  
エージェントには、状態としてゲームの画像が与えられる。

### 前処理

これまで出てきた強化学習の理論は、 MDP を前提としていた。  
MDP では、最適な行動を決めるにあたって必要な情報が「現在の状態」に含まれる。  
しかし、上記の「Pong」の場合、この要件は満たさない。  
この問題は <font color="red">**部分観測マルコフ決定過程**</font>(Partially Observable Markov Decision Process)という。  
「Pong」のようなテレビゲームの場合、 POMDP を MDP に変換するのは簡単であり、4フレームの連続する画像を重ね合わせ、それを1つの「状態」として扱う。  
また、 DQN の論文では、フレームを重ねる前に固定の前処理を行っている。

- 画像の周囲をトリミング
- グレイスケールへの変換
- 画像のリサイズ
- 正規化

### CNN

前節のカートポールでは、全結合層からなるニューラルネットワークを使ったが、 Atari のような画像データを扱う場合は <font color="red">**畳み込みニューラルネットワーク**</font>(Convolutional Neural Network)が有効である。  
DQN で使われる CNN では、入力に近い層では畳み込み層を使用し、出力に近い層では全結合層を使い、最後の出力層はタスクに応じて行動の候補の数だけ出力する。  
活性化関数には ReLU 関数が使われている。

## DQN の拡張

DQN は深層強化学習において最も有名なアルゴリズムの1つであり、これを発展させた手法は数多く提案されている。

### Double DQN

DQN では「ターゲットネットワーク」というテクニックが使われた。  
これはメインとなるネットワークの他に、パラメータが異なるネットワーク(ターゲットネットワーク)を使う手法である。  
2つのネットワークのパラメータをそれぞれ $\theta$ と $\theta'$ で表すことにして、その2つのネットワークによって表現される Q 関数を $Q_\theta(s, a), Q_{\theta'}(s, a)$ で表すとする。  
このとき、 Q 関数の更新で用いるターゲットは

$$R_t + \gamma \max_a Q_{\theta'}(S_{t+1}, a)$$

で表される。  
DQN では、 $Q_{\theta}(s, a)$ の値を上式の値(TD ターゲット)に近づけるように学習する。  
ここで問題なのが $\max_a Q_{\theta'}(S_{t+1}, a)$ で、誤差が含まれる推定値 $Q_{\theta'}$ に対して $\max$ 演算子を使うと真の Q 関数を使って計算する場合に比べて過大に評価されてしまう。  
Double DQN は、次の式を TD ターゲットにしてこの問題を解決した。

\begin{eqnarray*}
\newcommand{\argmax}{\mathop{\rm arg~max}\limits}
R_t + \gamma Q_{\theta'}(S_{t+1}, \argmax_a Q_{\theta}(S_{t+1}, a))
\end{eqnarray*}

ポイントとしては、 $Q_\theta(s, a)$ を使って最大となる行動を選び、実際の値は $Q_{\theta'}(s, a)$ から取得することである。  
このように2つの Q 関数を使い分けることで、過大評価が解消され、学習がより安定する。

### 優先度付き経験再生

DQN で使われる経験再生では、経験 $E_t = (S_t, A_t, R_t, S_{t+1})$ をバッファに保存し、学習時にはバッファから経験データをランダムに取り出して使う。  
これをさらに進化させたのが<font color="red">**優先度付き経験再生**</font>(Prioritized Experience Replay)である。  
経験データの優先度を決める上で、自然に考えられるのは次の式である。

$$\delta_t = \left| R_t + \gamma \max_a Q_{\theta'}(S_{t+1}, a) - Q_\theta(S_t, A_t) \right|$$

この値が大きければそれだけ修正すべきこと、すなわち学ぶべきことが大きいということである。  
優先度付き経験再生では、経験データをバッファに保存するときに $\delta_t$ も計算する。  
そして、これを含めた $(S_t, A_t, R_t, S_{t+1}, \delta_t)$ をバッファに追加する。  
バッファから経験データを取り出すときは、 $\delta_t$ を使って各経験データが選ばれる確率を計算する。  
$N$ 個の経験データがバッファに含まれる場合、 $i$ 番目の経験データが選ばれる確率は

$$p_i = \frac{\delta_i}{\sum_{k=0}^N \delta_k}$$

で表される。  
優先度付き経験再生を使うことで、学ぶべきことが多いデータほど優先して使用されるため、学習がより早く進むことが期待される。

### Dueling DQN

Dueling DQN は、ニューラルネットワークの構造を工夫した手法であり、キーとなるのが<font color="red">**アドバンテージ関数**</font>(Advantage function)である。  
アドバンテージ関数は、 Q 関数と価値関数の差分で、

$$A_\pi(s, a) = Q_\pi(s, a) - V_\pi(s)$$

のように定義され、 $a$ という行動が方策 $\pi$ に比べてどれだけ良いかを表す。  
$Q_\pi(s, a)$ と $V_\pi(s)$ の違いは、状態 $s$ において行動 $a$ を行うか、それとも方策 $\pi$ に従って行動を選ぶかであるため、アドバンテージ関数は、「$a$ という行動」が「方策 $\pi$ で選ばれる行動」に比べてどれだけ良いかを示す指標として解釈できる。  
また、上の式を変形すると、

$$Q_\pi(s, a) = A_\pi(s, a) + V_\pi(s)$$

であるため、アドバンテージ関数をもとに Q 関数を求めることもできる。  
DQN の場合、ある状態 $s$ で実際に行った行動 $a$ に対して $Q(s, a)$ を学習する。  
もし、行動に関わらず結果が決まっている状態であっても、全ての行動を試さなければ DQN では $Q(s, a)$ は学習されない。  
一方で、 Dueling DQN は、価値関数 $V(s)$ を経由する($V(s)$ が学習される)ので、他の行動を試さなくても $Q(s, a)$ の近似性能が改善する。