## DQN

In [None]:
!pip install git+https://github.com/ku2482/MinAtar.git@faf6d1fde3429c9e810ae2d2bfd377f7abeafb34
!pip install gym==0.23.1
!pip install numpy==1.21.5
# !pip install torch==1.11.0  # `torch` はご利用の環境に適したバージョンをインストールしてください。
!pip install tensorboard==2.8.0

In [None]:
import os
from datetime import datetime
from collections import deque

import gym
import numpy as np
import torch
from torch.utils.tensorboard import SummaryWriter

gym.logger.set_level(40)

In [None]:
class PyTorchEnv(gym.ObservationWrapper):
    def __init__(self, env):
        super(PyTorchEnv, self).__init__(env)

        # 状態空間の定義。元の環境の (縦, 横, チャンネル) から
        # (チャンネル, 縦, 横) に変更する。
        # (Box では、[low, high] の連続値で構成される dtype 型の
        # shape 次元配列を、状態として定義できる。)
        self.observation_space = gym.spaces.Box(
            low=0.0,
            high=1.0,
            shape=(
                env.observation_space.shape[2],
                env.observation_space.shape[0],
                env.observation_space.shape[1],
            ),
            dtype=np.float32,
        )

    def observation(self, observation):
        # ここで、実際に状態を修正する。
        # 引数には、元の環境の状態が渡される。
        return np.transpose(observation, (2, 0, 1))

In [None]:
class ReplayBuffer:
    def __init__(
        self,
        buffer_size,
        state_space,
        device,
    ):
        # (状態, 行動, 報酬, 終了信号, 次の状態) の torch.tensor を初期化する。
        self.state = torch.empty((buffer_size, *state_space.shape), dtype=torch.float32, device=device)
        self.action = torch.empty((buffer_size, 1), dtype=torch.int64, device=device)
        self.reward = torch.empty((buffer_size, 1), dtype=torch.float32, device=device)
        self.done = torch.empty((buffer_size, 1), dtype=torch.float32, device=device)
        self.next_state = torch.empty((buffer_size, *state_space.shape), dtype=torch.float32, device=device)

        # 最大データ数
        self.buffer_size = buffer_size
        # データ数
        self._n = 0
        # 次にデータを挿入する位置
        self._p = 0

    def append(self, state, action, reward, done, next_state):
        # データを挿入する。
        self.state[self._p] = torch.tensor(state, dtype=torch.float32)
        self.action[self._p] = action
        self.reward[self._p] = float(reward)
        self.done[self._p] = float(done)
        self.next_state[self._p] = torch.tensor(next_state, dtype=torch.float32)

        # (最大データ数を超えないように) データ数を更新する。
        self._n = min(self._n + 1, self.buffer_size)
        # 次にデータを挿入する位置を更新する。
        # (データが一杯になったら、一番古いデータから順に上書きする。)
        self._p = (self._p + 1) % self.buffer_size

    def sample(self, batch_size):
        # バッチサイズ分のデータのインデックスを、ランダムにサンプルする。
        idxes = np.random.randint(low=0, high=self._n, size=batch_size)
        # (状態, 行動, 報酬, 終了信号, 次の状態) を返す。
        return (
            self.state[idxes],
            self.action[idxes],
            self.reward[idxes],
            self.done[idxes],
            self.next_state[idxes],
        )

In [None]:
class QNetwork(torch.nn.Module):
    def __init__(self, num_actions: int):
        super(QNetwork, self).__init__()

        # Convolutional レイヤ
        self.conv_net = torch.nn.Sequential(
            torch.nn.Conv2d(4, 16, kernel_size=3, stride=1),
            torch.nn.ReLU(),
        )

        # Fully-connected レイヤ
        self.fc_net = torch.nn.Sequential(
            torch.nn.Linear(in_features=8 * 8 * 16, out_features=128),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=128, out_features=num_actions),
        )

    def forward(self, x):
        x = self.conv_net(x)  # (B, 4, 10, 10) => (B, 16, 8, 8)
        x = x.view(x.size(0), -1)  # (B, 16, 8, 8, 16) => (B, 16 * 8 * 8)
        x = self.fc_net(x)  # (B, 16 * 8 * 8) => (B, num_actions)
        return x

    def select_action(self, state: torch.tensor) -> int:
        # この部分の計算は損失関数に出てこず、勾配を計算しなくてよいので、勾配計算を無効化する。
        with torch.no_grad():
            # 状態 (torch.tensor) を受け取り、行動価値が最大となる行動を計算する。
            # (state は状態 1 つ分なので、バッチの次元を追加してから計算する。)
            action = self.forward(state.unsqueeze(0)).argmax().item()
        return action

In [None]:
class DQN:
    def __init__(
        self,
        q_net: QNetwork,
        target_net: QNetwork,
        lr: float,
        gamma: float,
        start_steps: int,
        epsilon: float,
        update_interval: int,
        target_update_interval: int,
    ):
        # ネットワーク
        self.q_net = q_net
        # ターゲットネットワーク
        self.target_net = target_net
        # ターゲットネットワークのパラメータを、ネットワークのパラメータと同期する。
        self.update_target()

        # ニューラルネットワークの最適化を行う Optimizer
        self.optimizer = torch.optim.RMSprop(self.q_net.parameters(), lr=lr)

        # 割引率
        self.gamma = gamma
        # 学習を始めるのに必要なデータの数
        self.start_steps = start_steps
        # 探索を行う確率
        self.epsilon = epsilon
        # ネットワークを更新する頻度 (今回は、行動するたびネットワークを更新するので 1 とする。)
        self.update_interval = update_interval
        # ターゲットネットワークを更新する頻度
        self.target_update_interval = target_update_interval

    def is_random(self, step):
        return step < self.start_steps or np.random.rand() < self.epsilon

    def is_update(self, step):
        return step >= self.start_steps and step % self.update_interval == 0

    def is_update_target(self, step):
        return step >= self.start_steps and step % self.target_update_interval == 0

    def calculate_loss(self, state, action, reward, done, next_state):
        # 現在の行動価値の推定値を計算する。
        output = self.q_net(state)
        curr_q = output.gather(1, action)

        # 目標値を計算する。
        with torch.no_grad():
            next_v = torch.max(self.target_net(next_state), dim=1, keepdim=True).values
            # (エピソードが終了した場合、(1 - done) は 0 となり、目標値は r_t となります。)
            target_q = reward + self.gamma * (1 - done) * next_v

        # L2 loss
        loss = 0.5 * (curr_q - target_q).pow(2).mean()
        return loss

    def update(self, batch):
        # バッチサイズ分のデータ
        state, action, reward, done, next_state = batch

        # loss を計算する。
        loss = self.calculate_loss(state, action, reward, done, next_state)

        # ネットワークを更新する。
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return {"loss": loss.item()}

    def update_target(self):
        self.target_net.load_state_dict(self.q_net.state_dict())

In [None]:
ENV_NAME = "MinAtar/Breakout-v1"
NUM_STEPS = 1000000
LEARNING_RATE = 0.00025
BATCH_SIZE = 32
BUFFER_SIZE = 100000
EPSILON = 0.01
GAMMA = 0.99
START_STEPS = 100000
UPDATE_INTERVAL = 1
TARGET_UPDATE_INTERVAL = 1000
LOG_DIR = os.path.join("logs", ENV_NAME, datetime.now().strftime("%Y-%m-%d-%H%M"))

# モデルを保存するディレクトリを作成する。
if not os.path.exists(os.path.join(LOG_DIR, "model")):
    os.makedirs(os.path.join(LOG_DIR, "model"))

In [None]:
# torch.tensor をのせるデバイス (GPU or CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 環境
env = PyTorchEnv(gym.make(ENV_NAME))

# リプレイバッファ
buffer = ReplayBuffer(BUFFER_SIZE, env.observation_space, device)

# ネットワーク
q_net = QNetwork(env.action_space.n).to(device)
target_net = QNetwork(env.action_space.n).to(device)

# アルゴリズム
algo = DQN(
    q_net=q_net,
    target_net=target_net,
    gamma=GAMMA,
    start_steps=START_STEPS,
    epsilon=EPSILON,
    update_interval=UPDATE_INTERVAL,
    target_update_interval=TARGET_UPDATE_INTERVAL,
    lr=LEARNING_RATE,
)

# ログを記録するための Writer
writer = SummaryWriter(log_dir=os.path.join(LOG_DIR, "summary"))

In [None]:
%%time

# ログ用の統計
episode = 1
episode_reward = 0.0
episode_reward_stats = deque(maxlen=400)

# 環境を初期化して、初期状態を取得する。
state = env.reset()

for step in range(1, NUM_STEPS + 1):

    # 探索するかどうか
    if algo.is_random(step):
        # ランダムに探索する。
        action = np.random.randint(env.action_space.n)
    else:
        # 現時点で最適な行動を選択する。
        action = algo.q_net.select_action(
            torch.tensor(state, dtype=torch.float32, device=device)
        )

    # 環境を 1 時刻進める。
    next_state, reward, done, _ = env.step(action)
    # リプレイバッファにデータを保存する。
    buffer.append(state, action, reward, done, next_state)
    # 状態を更新する。
    state = next_state
    # ログ用に、エピソード全体の累積報酬を計算する。
    episode_reward += reward

    # エピソードが終了した場合
    if done:
        # 環境を初期化する。
        state = env.reset()
        # ログを記録する。
        episode_reward_stats.append(episode_reward)
        episode += 1
        episode_reward = 0.0

        # 定期的にログを書き出す。
        if episode % 400 == 0:
            episode_reward = np.mean(episode_reward_stats)
            writer.add_scalar("reward/episode_reward", episode_reward, step)
            print(f"Step {step} / Episode reward {episode_reward:.3f}")

    # ネットワークを更新する場合
    if algo.is_update(step):
        # バッチサイズ分のデータをサンプルする。
        batch = buffer.sample(BATCH_SIZE)
        # ネットワークを更新し、情報を受け取る。
        stats = algo.update(batch)

        # 定期的にログを書き出す。
        if step % 1000 == 0:
            writer.add_scalar("loss/q", stats["loss"], step)

        # 定期的にモデルを保存する。
        if step % 50000 == 0:
            torch.save(
                algo.q_net.state_dict(),
                os.path.join(LOG_DIR, "model", f"step{step}.pth")
            )

    # 定期的にターゲットネットワークを更新する。
    if algo.is_update_target(step):
        algo.update_target()

In [None]:
%load_ext tensorboard
%tensorboard  --logdir {os.path.join(LOG_DIR, "summary")}

In [None]:
# 検証用に動かすエピソード数
NUM_EPISODES = 5

# テスト用の環境を、mp4 保存するようにラップする。
env = gym.wrappers.RecordVideo(
    PyTorchEnv(gym.make(ENV_NAME)),
    os.path.join(LOG_DIR, "video"),
    episode_trigger=lambda x: True
)


state = env.reset()
episode = 0
episode_reward = 0.0

while episode < NUM_EPISODES:
    action = q_net.select_action(torch.tensor(state, dtype=torch.float32, device=device))
    next_state, reward, done, _ = env.step(action)
    state = next_state
    episode_reward += reward

    if done:
        print(f"Episode reward {episode_reward:.3f}")
        state = env.reset()
        episode_reward = 0.0
        episode += 1

env.close()

In [None]:
from base64 import b64encode
from IPython.display import HTML

def play_mp4(path):
    # path にある mp4 を再生する。
    mp4 = open(path, 'rb').read()
    url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML("""<video width=400 controls><source src="%s" type="video/mp4"></video>""" % url)

In [None]:
# 下のセルでエラーが出る場合には、`imageio-ffmpeg` をインストールしてください。
# !pip install imageio-ffmpeg==0.4.7

In [None]:
# 動画を再生する例 (ファイル名は、適宜変更してください。)
play_mp4(os.path.join(LOG_DIR, "video", "rl-video-episode-0.mp4"))