# 1.準備

## importなど

In [None]:
!pip install pybullet

In [None]:
import gc
import os
import time
from typing import Any, List, Tuple

import gym
import matplotlib.pyplot as plt
import numpy as np
import pybullet_envs  # PyBulletの環境をgymに登録する
import torch
from torch import nn
from torch.distributions import Normal
from torch.distributions.kl import kl_divergence
from torch.nn import functional as F
from torch.nn.utils import clip_grad_norm_
from torch.utils.tensorboard import SummaryWriter
from einops import rearrange

# 可視化のためにTensorBoardを用いるので，Colab上でTensorBoardを表示するための宣言を行う
%load_ext tensorboard

今回の学習にはGPUが必要です．

以下のコードを実行して， 結果が'cuda'でなければ「ランタイム」 →　 「ランタイムのタイプを変更」でGPUモードに変更しましょう．

In [None]:
# torch.deviceを定義．この変数は後々モデルやデータをGPUに転送する時にも使います
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

きちんとGPUが使える状態になっているかチェックしておきます． ColabのGPU割り当てにはランダム性があるので人によって結果が違う場合がありますが， どれでも実行には問題ありません（学習にかかる時間に幾らかの差は出ます）．

In [None]:
!nvidia-smi

# 2.環境の設定

環境を扱いやすくするために，いくつかラッパーを挟みます．

今回はPyBulletを画像入力の環境として用います．環境を作成して，画像がどのようになっているかを見てみましょう．

In [None]:
env = gym.make("HalfCheetahBulletEnv-v0")  # 環境を読み込む．
env.reset()
image = env.render(mode="rgb_array")  # env.renderで画像を取得
plt.imshow(image)
plt.show()
env.close()

In [None]:
class GymWrapper_PyBullet(object):
    """
    PyBullet環境のためのラッパー
    """

    metadata = {"render.modes": ["human", "rgb_array"]}
    reward_range = (-np.inf, np.inf)

    # __init__でカメラ位置に関するパラメータ（ cam_dist:カメラ距離，cam_yaw：カメラの水平面での回転，cam_pitch:カメラの縦方向での回転）を受け取り，カメラの位置を調整できるようにします.
    # 　同時に画像の大きさも変更できるようにします
    def __init__(
        self,
        env: gym.Env,
        cam_dist: int = 3,
        cam_yaw: int = 0,
        cam_pitch: int = -30,
        render_width: int = 320,
        render_height: int = 240,
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        env : gym.Env
            gymで提供されている環境のインスタンス．
        cam_dist : int
            カメラの距離．
        cam_yaw : int
            カメラの水平面での回転．
        cam_pitch : int
            カメラの縦方向での回転．
        render_width : int
            観測画像の幅．
        render_height : int
            観測画像の高さ．
        """
        self._env = env

        self._render_width = render_width
        self._render_height = render_height
        self._set_nested_attr(self._env, cam_dist, "_cam_dist")
        self._set_nested_attr(self._env, cam_yaw, "_cam_yaw")
        self._set_nested_attr(self._env, cam_pitch, "_cam_pitch")
        self._set_nested_attr(self._env, render_width, "_render_width")
        self._set_nested_attr(self._env, render_height, "_render_height")

    def _set_nested_attr(self, env: gym.Env, value: int, attr: str) -> None:
        """
        多重継承の属性に再帰的にアクセスして値を変更する．
        カメラの設定に利用．

        Parameters
        ----------
        value : int
            設定したい値．
        attr : str
            変更したい属性の名前．
        """
        if hasattr(env, attr):
            setattr(env, attr, value)
        else:
            self._set_nested_attr(env.env, value, attr)

    def __getattr(self, name: str) -> Any:
        """
        環境が保持している属性値を取得するメソッド．

        Parameters
        ----------
        name : str
            取得したい属性値の名前．

        Returns
        -------
        _env.name : Any
            環境が保持している属性値．
        """
        return getattr(self._env, name)

    @property
    def observation_space(self) -> gym.spaces.Box:
        """
        観測空間に関する情報を取得するメソッド．

        Returns
        -------
        space : gym.spaces.Box
            観測空間に関する情報（各画素値の最小値，各画素値の最大値，観測データの形状， データの型）．
        """
        width = self._render_width
        height = self._render_height
        state_space = self._env.observation_space
        return gym.spaces.Dict({"image": gym.spaces.Box(0, 255, (height, width, 3), dtype=np.uint8), "state": state_space})

    @property
    def action_space(self) -> gym.spaces.Box:
        """
        行動空間に関する情報を取得するメソッド．

        Returns
        -------
        space : gym.spaces.Box
            行動空間に関する情報（各行動の最小値，各行動の最大値，行動空間の次元， データの型） ．
        """
        return self._env.action_space

    # 　元の観測（低次元の状態）は今回は捨てて，env.render()で取得した画像を観測とします.
    #  画像，報酬，終了シグナルが得られます.
    def step(self, action: np.ndarray) -> (np.ndarray, float, bool, dict):
        """
        環境に行動を与え次の観測，報酬，終了フラグを取得するメソッド．

        Parameters
        ----------
        action : np.dnarray (action_dim, )
            与える行動．

        Returns
        -------
        obs : np.ndarray (height, width, 3)
            行動を与えたときの次の観測．
        reward : float
            行動を与えたときに得られる報酬．
        done : bool
            エピソードが終了したかどうか表すフラグ．
        info : dict
            その他の環境に関する情報．
        """
        next_state, reward, done, info = self._env.step(action)
        obs = {
          "image1": self._env.render(mode="rgb_array"),
          "joints": next_state,
        }
        return obs, reward, done, info

    def reset(self) -> np.ndarray:
        """
        環境をリセットするためのメソッド．

        Returns
        -------
        obs : np.ndarray (height, width, 3)
            環境をリセットしたときの初期の観測．
        """
        next_state = self._env.reset()
        obs = {
          "image1": self._env.render(mode="rgb_array"),
          "joints": next_state,
       }
        return obs

    def render(self, mode="human", **kwargs) -> np.ndarray:
        """
        観測をレンダリングするためのメソッド．

        Parameters
        ----------
        mode : str
            レンダリング方法に関するオプション． (default='human')

        Returns
        -------
        obs : np.ndarray (height, width, 3)
            観測をレンダリングした結果．
        """
        return self._env.render(mode, **kwargs)

    def close(self) -> None:
        """
        環境を閉じるためのメソッド．
        """
        self._env.close()

環境にラッパーを適用します．同時にカメラ位置に関するパラメータを与えて，カメラの位置と角度を調整します．（今回カメラのパラメータは人力で決めています．環境が変わると調整し直す必要があるかもしれません）．また，画像の大きさも同時に64x64に変更しています．

In [None]:
env = gym.make("HalfCheetahBulletEnv-v0")
# カメラのパラメータを与えてカメラの位置と角度，画像の大きさを調整
env = GymWrapper_PyBullet(
    env, cam_dist=2, cam_pitch=0, render_width=64, render_height=64
)

もう一度画像を確認してみましょう．

In [None]:
env.reset()
image = env.render(mode="rgb_array")
plt.imshow(image)
plt.show()
env.close()

カメラの位置が変わりました．これで元の観測より学習データとして扱いやすくなったと思います．

また，より環境を扱いやすくするために，同じ行動を何度か繰り返すラッパーを挟みます．

In [None]:
class RepeatAction(gym.Wrapper):
    """
    同じ行動を指定された回数自動的に繰り返すラッパー．観測は最後の行動に対応するものになる
    """

    def __init__(self, env: GymWrapper_PyBullet, skip: int = 4) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        env : GymWrapper_PyBullet
            環境のインスタンス．今回は先程定義したラッパーでラップした環境を利用する．
        skip : int
            同じ行動を繰り返す回数．
        """
        gym.Wrapper.__init__(self, env)
        self._skip = skip

    def reset(self) -> np.ndarray:
        """
        環境をリセットするためのメソッド．

        Returns
        -------
        obs : np.ndarray (width, height, 3)
            環境をリセットしたときの初期の観測．
        """
        return self.env.reset()

    def step(self, action: np.ndarray) -> (np.ndarray, float, bool, dict):
        """
        環境に行動を与え次の観測，報酬，終了フラグを取得するメソッド．
        与えられた行動をskipの回数だけ繰り返した結果を返す．

        Parameters
        ----------
        action : np.ndarray (action_dim, )
            与える行動．

        Returns
        -------
        obs : np.ndarray (width, height, 3)
            行動をskipの回数だけ繰り返したあとの観測．
        total_reawrd : float
            行動をskipの回数だけ繰り返したときの報酬和．
        done : bool
            エピソードが終了したかどうか表すフラグ．
        info : dict
            その他の環境に関する情報．
        """
        total_reward = 0.0
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info

以上でラッパーに関する話は終わりです．これまでに作成したラッパーをまとめて適用し，最終的に用いる環境を作成する関数を実装して，本題のアルゴリズムの実装に移りましょう．

In [None]:
def make_env() -> RepeatAction:
    """
    作成たラッパーをまとめて適用して環境を作成する関数．

    Returns
    -------
    env : RepeatAction
        ラッパーを適用した環境．
    """
    env = gym.make("HalfCheetahBulletEnv-v0")  # 環境を読み込む．今回はHalfCheetah
    # Dreamerでは観測は64x64のRGB画像
    env = GymWrapper_PyBullet(
        env, cam_dist=2, cam_pitch=0, render_width=64, render_height=64
    )
    env = RepeatAction(env, skip=2)  # DreamerではActionRepeatは2
    return env

# 3.RSSM

In [None]:
class TransitionModel(nn.Module):
    """
    状態遷移を担うクラス．このクラスは複数の要素を含んでいます．
    決定的状態遷移 （RNN) : h_t+1 = f(h_t, s_t, a_t)
    確率的状態遷移による1ステップ予測として定義される "prior" : p(s_t+1 | h_t+1)
    観測の情報を取り込んで定義される "posterior": q(s_t+1 | h_t+1, e_t+1)
    """

    def __init__(
        self,
        state_dim: int,
        action_dim: int,
        rnn_hidden_dim: int,
        ModalityInfo,
        hidden_dim: int = 200,
        min_stddev: float = 0.1,
        act: "function" = F.elu,
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        state_dim : int
            確率的状態sの次元数．
        action_dim : int
            行動空間の次元数．
        rnn_hidden_dim : int
            決定的状態遷移を計算するRNNの隠れ層の次元数．
        ModalityInfo : tuple[Modality]
            使用するモダリティの情報.
        hidden_dim : int
            決定的状態hの次元数．
        min_stddev : float
            確率状態遷移の標準偏差の最小値．
        act : function
            活性化関数．
        """
        super(TransitionModel, self).__init__()
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.rnn_hidden_dim = rnn_hidden_dim
        self.fc_state_action = nn.Linear(state_dim + action_dim, hidden_dim)

        self.fc_rnn_hidden = nn.Linear(rnn_hidden_dim, hidden_dim)
        self.fc_state_mean_prior = nn.Linear(hidden_dim, state_dim)
        self.fc_state_stddev_prior = nn.Linear(hidden_dim, state_dim)
        # self.fc_rnn_hidden_embedded_obs = nn.Linear(rnn_hidden_dim + 1024, hidden_dim)
        self.posteriors = nn.ModuleDict()
        for modality in ModalityInfo:
            self.posteriors[f'hidden_{modality.name}'] = nn.Linear(hidden_dim + modality.dim, hidden_dim)
            self.posteriors[f'mean_{modality.name}'] = nn.Linear(hidden_dim, state_dim)
            self.posteriors[f'stddev_{modality.name}'] = nn.Linear(hidden_dim, state_dim)
        # self.fc_state_mean_posterior = nn.Linear(hidden_dim, state_dim)
        # self.fc_state_stddev_posterior = nn.Linear(hidden_dim, state_dim)

        # next hidden stateを計算
        self.rnn = nn.GRUCell(hidden_dim, rnn_hidden_dim)
        self._min_stddev = min_stddev
        self.act = act

    def forward(
        self,
        state: torch.Tensor,
        action: torch.Tensor,
        rnn_hidden: torch.Tensor,
        embedded_next_obs: torch.Tensor,
    ) -> Tuple[torch.Tensor]:
        """
        prior p(s_t+1 | h_t+1) と posterior q(s_t+1 | h_t+1, e_t+1) を返すメソッド．
        この2つが近づくように学習する．

        Parameters
        ----------
        state : torch.Tensor (batch size, state dim)
            時刻tの状態(s_t)．
        action : torch.Tensor (batch size, action dim)
            時刻tの行動(a_t)．
        rnn_hidden : torch.Tensor (batch size, rnn hidden dim)
            RNNが保持している決定的状態(h_t)．
        embedded_next_obs : torch.Tensor (batch size, 1024)
            時刻t+1の観測をエンコードしたもの(e_t+1)．

        Returns
        -------
        next_state_prior : torch.Tensor (batch size, state dim)
            prior(p(s_t+1 | h_t+1))による次の時刻の状態の予測．
        next_state_posterior : torch.Tensor (batch size, state dim)
            posterior(q(s_t+1 | h_t+1, e_t+1))による次の時刻の状態の予測．
        rnn_hidden : torch.Tensor (batch size, rnn hidden dim)
            RNNが保持する次の決定的状態(h_t+1)．
        """
        next_state_prior, rnn_hidden = self.prior(
            self.recurrent(state, action, rnn_hidden) # h_t+1
        )
        next_state_posterior = self.posterior(rnn_hidden, embedded_next_obs)
        return next_state_prior, next_state_posterior, rnn_hidden

    def recurrent(
        self, state: torch.Tensor, action: torch.Tensor, rnn_hidden: torch.Tensor
    ) -> torch.Tensor:
        """
        決定的状態 h_t+1 = f(h_t, s_t, a_t)を計算するメソッド．

        Parameters
        ----------
        state : torch.Tensor (batch size, state dim)
            時刻tの状態(s_t)．
        action : torch.Tensor (batch size, action dim)
            時刻tの行動(a_t)．
        rnn_hidden : torch.Tensor (batch size, rnn hidden dim)
            RNNが保持している決定的状態(h_t)．

        Returns
        -------
        rnn_hidden : torch.Tensor (batch size, rnn hidden dim)
            RNNが保持する次の決定的状態(h_t+1)．
        """
        hidden = self.act(self.fc_state_action(torch.cat([state, action], dim=1)))
        # h_t+1を求める
        rnn_hidden = self.rnn(hidden, rnn_hidden)
        return rnn_hidden

    def prior(self, rnn_hidden: torch.Tensor) -> Tuple[torch.Tensor]:
        """
        prior p(s_t+1 | h_t+1) を計算するメソッド．

        Parameters
        ----------
        rnn_hidden : torch.Tensor (batch size, rnn hidden dim)
            RNNが保持している決定的状態(h_t+1)．

        Returns
        -------
        state : torch.Tensor (batch size, state dim)
            決定的状態を用いてサンプリングされた確率的な状態(s_t+1)．
            ここでは決定的状態h_t+1からガウス分布の平均，標準偏差を推定してサンプリングしています．
        rnn_hidden : torch.Tensor (batch size, rnn hidden dim)
            RNNが保持する決定的状態(h_t+1)．
            入力からのものをそのまま返しています．
        """
        #h_t+1を求める（ヒント: self.act, self.fc_rnn_hiddenを使用）
        hidden = self.act(self.fc_rnn_hidden(rnn_hidden)) # WRITE ME

        mean = self.fc_state_mean_prior(hidden)
        stddev = F.softplus(self.fc_state_stddev_prior(hidden)) + self._min_stddev
        return Normal(mean, stddev), rnn_hidden

    def posterior(
        self, rnn_hidden: torch.Tensor, modalities
    ) -> torch.Tensor:
        """
        posterior q(s_t+1 | h_t+1, e_t+1)  を計算するメソッド．

        Parameters
        ----------
        rnn_hidden : torch.Tensor (batch size, rnn hidden dim)
            RNNが保持している決定的状態(h_t+1)．
        modalities : Dict
            画像や関節角などのモダリティ.

        Returns
        -------
        state : torch.Tensor (batch size, state dim)
            決定的状態とエンコードした観測を用いてサンプリングされた確率的な状態(s_t+1)．
            ここでは決定的状態h_t+1とエンコードした観測e_t+1からガウス分布の平均，標準偏差を推定してサンプリングしています．
        """
        # h_t+1，o_t+1を結合し，q(s_t+1 | h_t+1, e_t+1) を計算する
        posteriors = []
        for modality_name, modality in modalities.items():
            x = self.act(self.posteriors[f'hidden_{modality_name}'](torch.cat([rnn_hidden, modality], dim=1)))
            mean = self.posteriors[f'mean_{modality_name}'](x) # (B, state_dim)
            stddev = F.softplus(self.posteriors[f'stddev_{modality_name}'](x)) + self._min_stddev # (B, state_dim)
            posteriors.append([mean, stddev])
        # hidden = self.act(self.fc_rnn_hidden_embedded_obs(torch.cat([rnn_hidden, embedded_obs], dim=1))) # WRITE ME
        # mean = self.fc_state_mean_posterior(hidden)
        # stddev = F.softplus(self.fc_state_stddev_posterior(hidden)) + self._min_stddev


        num_combinations = 2 ** len(posteriors)
        batch_size, state_dim = mean.shape
        PoEs = torch.zeros((num_combinations-1, batch_size, state_dim * 2), device=mean.device) # (num_comb-1, B, state_dim * 2)
        for i in range(1, num_combinations):
            means, stddevs = [], []
            for j in range(len(posteriors)):
                if (i >> j) & 1:
                    means.append(posteriors[j][0])
                    stddevs.append(posteriors[j][1])
            mu = 0
            sigma_squarred = 0
            for mean, stddev in zip(means, stddevs):
                one_over_sig_squarred = 1 / (stddev ** 2)
                sigma_squarred += one_over_sig_squarred # (B, state_dim)
                mu += mean * one_over_sig_squarred # (B, state_dim)
            sigma_squarred = 1 / sigma_squarred
            mu = sigma_squarred * mu
            sigma = torch.sqrt(sigma_squarred)
            PoEs[i-1] = torch.cat([mu, sigma], dim=1)

        # MoE
        mean, stddev = PoEs[:, :, :state_dim], PoEs[:, :, state_dim:]
        mean = mean.mean(dim=0)
        stddev = stddev.pow(2).sum(dim=0).sqrt() / (num_combinations - 1)
        return Normal(mean, stddev)

## Decoder

In [None]:
class ObservationModel(nn.Module):
    """
    p(o_t | s_t, h_t)
    低次元の状態表現から画像を再構成するデコーダ (3, 64, 64)
    """

    def __init__(self, ModalityInfo, state_dim: int, rnn_hidden_dim: int, hidden_dim : int = 400) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        ModalityInfo : Tuple[Modality]
            使用するモダリティの情報.
        state_dim : int
            確率的状態sの次元数．
        rnn_hidden_dim : int
            決定的状態hの次元数．
        hidden_dim : int
            隠れ層の次元数.
        """
        super(ObservationModel, self).__init__()
        self.decoders = nn.ModuleDict()
        for modality in ModalityInfo:
            if modality.type_ == 'image':
                self.decoders[modality.name] = nn.Sequential(
                    nn.Linear(state_dim + rnn_hidden_dim, 1024),
                    nn.Unflatten(1, (1024, 1, 1)),
                    nn.ConvTranspose2d(1024, 128, kernel_size=5, stride=2),
                    nn.ReLU(),
                    nn.ConvTranspose2d(128, 64, kernel_size=5, stride=2),
                    nn.ReLU(),
                    nn.ConvTranspose2d(64, 32, kernel_size=6, stride=2),
                    nn.ReLU(),
                    nn.ConvTranspose2d(32, 3, kernel_size=6, stride=2),
                )
            elif modality.type_ == 'joint':
                self.decoders[modality.name] = nn.Sequential(
                    nn.Linear(state_dim + rnn_hidden_dim, hidden_dim),
                    nn.ReLU(),
                    nn.Linear(hidden_dim, hidden_dim),
                    nn.ReLU(),
                    nn.Linear(hidden_dim, hidden_dim),
                    nn.ReLU(),
                    nn.Linear(hidden_dim, modality.dim)
                )
        # self.fc = nn.Linear(state_dim + rnn_hidden_dim, 1024)
        # self.dc1 = nn.ConvTranspose2d(1024, 128, kernel_size=5, stride=2)
        # self.dc2 = nn.ConvTranspose2d(128, 64, kernel_size=5, stride=2)
        # self.dc3 = nn.ConvTranspose2d(64, 32, kernel_size=6, stride=2)
        # self.dc4 = nn.ConvTranspose2d(32, 3, kernel_size=6, stride=2)

    def forward(self, state: torch.Tensor, rnn_hidden: torch.Tensor, modalities) -> torch.Tensor:
        """
        順伝播を行うメソッド．確率的状態sと決定的状態hから観測を再構成する．

        Parameters
        ----------
        state : torch.Tensor (batch size, state dim)
            確率的状態s．
        rnn_hidden : torch.Tensor (batch size, rnn_hidden_dim)
            決定的状態h．
        modalities : Dict
            画像や関節角などのモダリティ.

        Returns
        -------
        obs : Dict
            再構成された観測（画像と関節角)．
        """
        obs = nn.ModuleDict()
        for modality_name, modality in modalities.items():
            obs[modality_name] = self.decoders[modality_name](torch.cat([state, rnn_hidden], dim=1))
        # hidden = self.fc(torch.cat([state, rnn_hidden], dim=1))
        # hidden = hidden.view(hidden.size(0), 1024, 1, 1)
        # hidden = F.relu(self.dc1(hidden))
        # hidden = F.relu(self.dc2(hidden))
        # hidden = F.relu(self.dc3(hidden))
        # obs = self.dc4(hidden)
        return obs

## Reward Model


In [None]:
class RewardModel(nn.Module):
    """
    p(r_t | s_t, h_t)
    低次元の状態表現から報酬を予測する．
    """

    def __init__(
        self,
        state_dim: int,
        rnn_hidden_dim: int,
        hidden_dim: int = 400,
        act: "function" = F.elu,
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        state_dim : int
            確率的状態sの次元数．
        rnn_hidden_dim : int
            決定的状態hの次元数．
        hidden_dim : int
            報酬モデルの隠れ層の次元数． (default=400)
        act : function
            報酬モデルに利用される活性化関数． (default=torch.nn.functional.elu)
        """
        super(RewardModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + rnn_hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, 1)
        self.act = act

    def forward(self, state: torch.Tensor, rnn_hidden: torch.Tensor) -> torch.Tensor:
        """
        順伝播を行うメソッド．確率的状態sと決定的状態hから報酬rを推定する．

        Parameters
        ----------
        state : torch.Tensor (batch size, state dim)
            確率的状態s．
        rnn_hidden : torch.Tensor (batch size, rnn_hidden_dim)
            決定的状態h．

        Returns
        -------
        reward : torch.Tensor (batch size, 1)
            確率的状態s，決定的状態hに対する報酬r．
        """
        hidden = self.act(self.fc1(torch.cat([state, rnn_hidden], dim=1)))
        hidden = self.act(self.fc2(hidden))
        hidden = self.act(self.fc3(hidden))
        reward = self.fc4(hidden)
        return reward

## rssm

In [None]:
class RSSM:
    """
    TransitionModel, ObservationModel, RewardModelの3つをまとめたRSSMクラス．
    """

    def __init__(
        self,
        state_dim: int,
        action_dim: int,
        rnn_hidden_dim: int,
        ModalityInfo,
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        state_dim : int
            確率的状態sの次元数．
        action_dim : int
            行動空間の次元数．
        rnn_hidden_dim : int
            決定的状態hの次元数．
        """
        self.transition = TransitionModel(state_dim, action_dim, rnn_hidden_dim, ModalityInfo).to(
            device
        )
        self.observation = ObservationModel(
            ModalityInfo,
            state_dim,
            rnn_hidden_dim,
        ).to(device)
        self.reward = RewardModel(
            state_dim,
            rnn_hidden_dim,
        ).to(device)

# 4.補助機能の実装



## ReplayBuffer

In [None]:
# 　今回のReplayBuffer
class ReplayBuffer(object):
    """
    RNNを用いて訓練するのに適したリプレイバッファ．
    """

    def __init__(
        self, capacity: int, ModalityInfo, action_dim: int
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        capacity : int
            リプレイバッファにためておくことができる経験の上限．
        ModalityInfo : Tuple[Modality]
            モダリティの情報．
        action_dim : int
            行動空間の次元数．
        """
        self.capacity = capacity

        self.observations = {}
        for modality in ModalityInfo:
            if modality.type_ == 'image':
                self.observations[modality.name] = np.zeros(
                    (capacity, *modality.image_shape), dtype=np.uint8
                ) # (capacity, H, W, C)
            elif modality.type_ == 'joint':
                self.observations[modality.name] = np.zeros(
                    (capacity, modality.dim), dtype=np.float32
                ) # (capacity, joint_dim)
        # self.observations = np.zeros((capacity, *observation_shape), dtype=np.uint8) # (capacity, H, W, C)
        self.actions = np.zeros((capacity, action_dim), dtype=np.float32) # (capacity, action_dim)
        self.rewards = np.zeros((capacity, 1), dtype=np.float32) # (capacity, 1)
        self.done = np.zeros((capacity, 1), dtype=bool) # (capacity, 1)
        # self.done = np.zeros((capacity, 1), dtype=np.bool)

        self.index = 0
        self.is_filled = False

    def push(
        self, modalities, action: np.ndarray, reward: float, done: bool
    ) -> None:
        """
        リプレイバッファに経験を追加するメソッド．

        Parameters
        ----------
        modalities : Dict
            画像や関節角などのモダリティ.
        action : np.ndarray (action_dim, )
            エージェントがとった（もしくは経験を貯める際のランダムな）行動．
        reward : float
            観測に対して行動をとったときに得られる報酬．
        done : bool
            エピソードが終了するかどうかのフラグ．
        """
        for modality_name, modality in modalities.items():
            self.observations[modality_name][self.index] = modality
        # self.observations[self.index] = observation
        self.actions[self.index] = action
        self.rewards[self.index] = reward
        self.done[self.index] = done

        # indexは巡回し，最も古い経験を上書きする
        if self.index == self.capacity - 1:
            self.is_filled = True
        self.index = (self.index + 1) % self.capacity

    def sample(self, batch_size: int, chunk_length: int):
        """
        経験をリプレイバッファからサンプルします．（ほぼ）一様なサンプルです．
        結果として返ってくるのは観測（画像），行動，報酬，終了シグナルについての(batch_size, chunk_length, 各要素の次元)の配列です．
        各バッチは連続した経験になっています．
        注意: chunk_lengthをあまり大きな値にすると問題が発生する場合があります．

        Parameters
        ----------
        batch_size : int
            バッチサイズ．
        chunk_length : int
            バッチあたりの系列長．


        Returns
        -------
        sampled_observations : Dict
            バッファからサンプリングされたモダリティ.
        sampled_actions : np.ndarray (batch size, chunk length, action dim)
            バッファからサンプリングされた行動．
        sampled_rewards : np.ndarray (batch size, chunk length, 1)
            バッファからサンプリングされた報酬．
        sampled_rewards : np.ndarray (batch size, chunk length, 1)
            バッファからサンプリングされたエピソードの終了フラグ．
        """
        episode_borders = np.where(self.done)[0]
        sampled_indexes = []
        for _ in range(batch_size):
            cross_border = True
            while cross_border:
                initial_index = np.random.randint(len(self) - chunk_length + 1)
                final_index = initial_index + chunk_length - 1
                cross_border = np.logical_and(
                    initial_index <= episode_borders, episode_borders < final_index
                ).any()  # 論理積
            sampled_indexes += list(range(initial_index, final_index + 1))

        sampled_observations = {}
        for modality_name, modality in self.observations.items():
            sampled_observations[modality_name] = self.observations[modality_name][sampled_indexes].reshape(
                batch_size, chunk_length, *self.observations[modality_name].shape[1:] # (len(sampled_idnex), H, W, C) -> (B, chunk_length, H, W, C)
            )

        # sampled_observations = self.observations[sampled_indexes].reshape(
        #     batch_size, chunk_length, *self.observations.shape[1:] # (len(sampled_idnex), H, W, C) -> (B, chunk_length, H, W, C)
        # )

        sampled_actions = self.actions[sampled_indexes].reshape(
            batch_size, chunk_length, self.actions.shape[1]
        )
        sampled_rewards = self.rewards[sampled_indexes].reshape(
            batch_size, chunk_length, 1
        )
        sampled_done = self.done[sampled_indexes].reshape(batch_size, chunk_length, 1)
        return sampled_observations, sampled_actions, sampled_rewards, sampled_done

    def __len__(self) -> int:
        """
        バッファに貯められている経験の数を返すメソッド．

        Returns
        -------
        length : int
            バッファに貯められている経験の数．
        """
        return self.capacity if self.is_filled else self.index

In [None]:
def preprocess_obs(obs: np.ndarray) -> np.ndarray:
    """
    画像を正規化する．[0, 255] -> [-0.5, 0.5]．

    Parameters
    ----------
    obs : np.ndarray (64, 64, 3) or (chank length, batch size, 64, 64, 3)
        環境から得られた観測．画素値は[0, 255]．

    Returns
    -------
    normalized_obs : np.ndarray (64, 64, 3) or (chank length, batch size, 64, 64, 3)
        画素値を[-0.5, 0.5]で正規化した観測．
    """
    obs = obs.astype(np.float32)
    normalized_obs = obs / 255.0 - 0.5
    return normalized_obs

Dreamerでは価値関数の学習を行いますが，このために通常のTD誤差ではなく，**TD(λ)をベースにしたλ-return**としてターゲット価値を計算し，それと現在の予測価値の誤差を用います．そのためにλ-returnを計算する関数をここで実装しておきます．

In [None]:
def lambda_target(
    rewards: torch.Tensor, values: torch.Tensor, gamma: float, lambda_: float
) -> torch.Tensor:
    """
    価値関数の学習のためのλ-returnを計算する関数．

    Parameters
    ----------
    rewards : torch.Tensor (imagination_horizon, batch size * (chank length - 1))
        報酬モデルによる報酬の推定値．
    values : torch.Tensor (imagination_horizon, batch size * (chank length - 1))
        価値関数を近似するValueモデルによる状態価値観数の推定値．
    gamma : float
        割引率．
    lambda_ : float
        λ-returnのパラメータλ．

    V_lambda : torch.Tensor (imagination_horizon, batch size * (chank length - 1))
        各状態に対するλ-returnの値．
    """
    V_lambda = torch.zeros_like(rewards, device=rewards.device)
    H = rewards.shape[0] - 1
    V_n = torch.zeros_like(rewards, device=rewards.device)
    V_n[H] = values[H]
    for n in range(1, H + 1):
        # まずn-step returnを計算します
        # 注意: 系列が途中で終わってしまったら，可能な中で最大のnを用いたn-stepを使います
        V_n[:-n] = (gamma**n) * values[n:]
        for k in range(1, n + 1):
            if k == n:
                V_n[:-n] += (gamma ** (n - 1)) * rewards[k:]
            else:
                V_n[:-n] += (gamma ** (k - 1)) * rewards[k : -n + k]

        # lambda_でn-step returnを重みづけてλ-returnを計算します
        if n == H:
            V_lambda += (lambda_ ** (H - 1)) * V_n
        else:
            V_lambda += (1 - lambda_) * (lambda_ ** (n - 1)) * V_n

    return V_lambda

# 6.Dreamerの実装




## Encoder

In [None]:
class Encoder(nn.Module):
    """
    (3, 64, 64)の画像を(1024,)のベクトルに変換するエンコーダクラス．
    """

    def __init__(self) -> None:
        """
        コンストラクタ．
        層の定義のみを行う．
        """
        super(Encoder, self).__init__()
        self.cv1 = nn.Conv2d(3, 32, kernel_size=4, stride=2)
        self.cv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.cv3 = nn.Conv2d(64, 128, kernel_size=4, stride=2)
        self.cv4 = nn.Conv2d(128, 256, kernel_size=4, stride=2)

    def forward(self, obs: torch.Tensor) -> torch.Tensor:
        """
        順伝播を行うメソッド．観測画像をベクトルに埋め込む．

        Parameters
        ----------
        obs : torch.Tensor (batch size, 3, 64, 64)
            環境から得られた観測画像．

        Returns
        -------
        embedded_obs : torch.Tensor (batch size, 1024)
            観測を1024次元のベクトルに埋め込んだもの．
        """
        hidden = F.relu(self.cv1(obs))
        hidden = F.relu(self.cv2(hidden))
        hidden = F.relu(self.cv3(hidden))
        embedded_obs = F.relu(self.cv4(hidden)).reshape(hidden.size(0), -1)
        return embedded_obs

ここからがDreamerの中核となる部分で，RSSMの学習を通して獲得された低次元の状態表現の上でActor-Criticを行います.


以下で，価値関数を近似するValueモデル $v_{\phi}(s_{\tau})　\approx E_{q(.|s_{\tau})}(\sum_{\tau=t}^{t+H}(\gamma^{\tau-t}r_{\tau}) )$ を実装します．Q学習などで用いられる状態行動価値関数Q(s, a)ではなく，状態価値関数V(s)であることに多少の注意が必要です．

In [None]:
class ValueModel(nn.Module):
    """
    低次元の状態表現(state_dim + rnn_hidden_dim)から状態価値を出力するクラス．
    """

    def __init__(
        self,
        state_dim: int,
        rnn_hidden_dim: int,
        hidden_dim: int = 400,
        act: "function" = F.elu,
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        state_dim : int
            確率的状態sの次元数．
        rnn_hidden_dim : int
            決定的状態hの次元数．
        hidden_dim : int
            モデルの隠れ層の次元数． (default=400)
        act : function
            モデルの活性化関数． (default=torch.nn.functional.elu)
        """
        super(ValueModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + rnn_hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, 1)
        self.act = act

    def forward(self, state: torch.Tensor, rnn_hidden: torch.Tensor) -> torch.Tensor:
        """
        順伝播を行うメソッド．低次元の状態表現から状態価値を推定する．

        Parameters
        ----------
        state : torch.Tensor (batch size, state dim)
            確率的状態s．
        rnn_hidden : torch.Tensor (batch size, rnn_hidden_dim)
            決定的状態h．

        Returns
        -------
        state_value : torch.Tensor (batch size, 1)
            入力された状態に対する状態価値の推定値．
        """
        hidden = self.act(self.fc1(torch.cat([state, rnn_hidden], dim=1)))
        hidden = self.act(self.fc2(hidden))
        hidden = self.act(self.fc3(hidden))
        state_value = self.fc4(hidden)
        return state_value

最後です．実際に行動を出力するActionモデル $a_{\tau} \sim q_{\delta}(a_{\tau}|s_{\tau})$ を実装します．

Actionモデルは価値の見積もりを最大化することを目的とします．



In [None]:
class ActionModel(nn.Module):
    """
    低次元の状態表現(state_dim + rnn_hidden_dim)から行動を計算するクラス．
    """

    def __init__(
        self,
        state_dim: int,
        rnn_hidden_dim: int,
        action_dim: int,
        hidden_dim: int = 400,
        act: "function" = F.elu,
        min_stddev: float = 1e-4,
        init_stddev: float = 5.0,
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        state_dim : int
            確率的状態sの次元数．
        rnn_hidden_dim : int
            決定的状態hの次元数．
        action_dim : int
            行動空間の次元数．
        hidden_dim : int
            モデルの隠れ層の次元数． (default=400)
        act : function
            モデルの活性化関数． (default=torch.nn.functional.elu)
        min_stddev : float
            行動をサンプリングする分布の標準偏差の最小値． (default=1e-4)
        init_stddev : float
            行動をサンプリングする分布の標準偏差の初期値． (default=5.0)
        """
        super(ActionModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + rnn_hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, hidden_dim)
        self.fc_mean = nn.Linear(hidden_dim, action_dim)
        self.fc_stddev = nn.Linear(hidden_dim, action_dim)
        self.act = act
        self.min_stddev = min_stddev
        self.init_stddev = np.log(np.exp(init_stddev) - 1)

    def forward(
        self, state: torch.Tensor, rnn_hidden: torch.Tensor, training: bool = True
    ) -> None:
        """
        順伝播を行うメソッド．入力された状態に対する行動を出力する．
        training=Trueなら，NNのパラメータに関して微分可能な形の行動のサンプル（Reparametrizationによる）を返す．
        training=Falseなら，行動の確率分布の平均値を返す．

        Parameters
        ----------
        staet : torch.Tensor (batch size, state dim)
            確率的状態s．
        rnn_hidden : torch.Tensor (batch size, rnn_hidden_dim)
            決定的状態h．
        training : bool
            訓練か推論かを示すフラグ． (default=True)

        Returns
        -------
        action : torch.Tensor (batch size, action dim)
            入力された状態に対する行動．
            training=Trueでは微分可能な形の行動をサンプリングした値，
            training=Falseでは行動の確率分布の平均値を返す．
        """
        hidden = self.act(self.fc1(torch.cat([state, rnn_hidden], dim=1)))
        hidden = self.act(self.fc2(hidden))
        hidden = self.act(self.fc3(hidden))
        hidden = self.act(self.fc4(hidden))

        # Dreamerの実装に合わせて少し平均と分散に対する簡単な変換が入っています
        mean = self.fc_mean(hidden)
        mean = 5.0 * torch.tanh(mean / 5.0)
        stddev = self.fc_stddev(hidden)
        stddev = F.softplus(stddev + self.init_stddev) + self.min_stddev

        if training:
            action = torch.tanh(Normal(mean, stddev).rsample())  # 微分可能にするためrsample()
        else:
            action = torch.tanh(mean)
        return action

実装の詳細まで掴みきれなくとも，個々のクラスが担っている役割が大雑把にでもわかっていただければ幸いです.

# 7.エージェントの実装

Dreamerでは行動を計算するために低次元の状態表現が必要で，この状態表現はRSSMを用いて計算されるため，テスト時もこの状態表現のためにRSSMによる推論を行い続ける必要があります．

そのため，先ほど実装したActionModelをそのまま使っても簡単には行動を決定できません．

ここを扱いやすくするために，RSSMを使って低次元の状態表現を計算しつつ，行動を決定するAgentクラスを実装します．

In [None]:
class Agent:
    """
    ActionModelに基づき行動を決定する．そのためにRSSMを用いて状態表現をリアルタイムで推論して維持するクラス．
    """

    def __init__(self, encoder: Encoder, rssm: RSSM, action_model: ActionModel) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        encoder : Encoder
            上で定義したEncoderクラスのインスタンス．
            観測画像を1024次元のベクトルに埋め込む ．
        rssm : RSSM
            上で定義したRSSMクラスのインスタンス．
            遷移モデル，1024次元のベクトルを観測画像にするデコーダ，報酬を予測するモデルを持つ．
        action_model : ActionModel
            上で定義したActionModelのインスタンス．
            低次元の状態表現から行動を予測する．
        """
        self.encoder = encoder
        self.rssm = rssm
        self.action_model = action_model

        self.device = next(self.action_model.parameters()).device
        self.rnn_hidden = torch.zeros(1, rssm.rnn_hidden_dim, device=self.device)

    def __call__(self, modalities, training=True) -> np.ndarray:
        """
        特殊メソッド．
        インスタンスに直接引数を渡すことで実行される．
        （例）agent = Agent(*args)
             action = agent(obs)  # このときに__call__メソッドが呼び出される．

        Parameters
        ----------
        modalities : Dict # np.ndarray (batch size, 3, 64, 64)
            環境から得られた観測画像．
        training : bool
            訓練か推論かを示すフラグ． (default=True)

        Returns
        -------
        action : np.ndarray (batch size, action dim)
            入力された観測に対する行動の予測．
        """
        # preprocessを適用，PyTorchのためにChannel-Firstに変換
        for modality_name, modality in modalities.items():
            if Modality.get_type(modality_name) == 'image':
                modality = preprocess_obs(modality)
                modality = np.transpose(modality, (2, 0, 1)) # (1, C, H, W)
            modality = np.expand_dims(modality, axis=0)
            modalities[modality_name] = torch.as_tensor(modality, device=self.device)


        # obs = preprocess_obs(obs)
        # obs = torch.as_tensor(obs, device=self.device)
        # obs = obs.transpose(1, 2).transpose(0, 1).unsqueeze(0) # (1, C, H, W)

        with torch.no_grad():
            # 観測を低次元の表現に変換し，posteriorからのサンプルをActionModelに入力して行動を決定する
            for modality_name, modality in modalities.items():
                if Modality.get_type(modality_name) == 'image':
                    modalities[modality_name] = self.encoder(modality)

            state_posterior = self.rssm.posterior(self.rnn_hidden, modalities)
            state = state_posterior.sample()
            # embedded_obs = self.encoder(obs)
            # state_posterior = self.rssm.posterior(self.rnn_hidden, embedded_obs)
            # state = state_posterior.sample()
            action = self.action_model(state, self.rnn_hidden, training=training) # (1, action_dim)

            # 次のステップのためにRNNの隠れ状態を更新しておく
            _, self.rnn_hidden = self.rssm.prior(
                self.rssm.recurrent(state, action, self.rnn_hidden)
            )

        return action.squeeze().cpu().numpy() # (action_dim, )

    def reset(self) -> None:
        """
        RNNの隠れ状態（=決定的状態）をリセットする．
        """
        self.rnn_hidden = torch.zeros(1, self.rssm.rnn_hidden_dim, device=self.device)

# 8.ハイパーパラメータの設定と学習の準備

ここまででDreamerの基本的な構成要素は実装が終わりました．あとはハイパーパラメータを設定し，モデルやリプレイバッファを宣言して学習の準備を整えます．

In [None]:
class Modality:
    num2type = {}

    def __init__(self, name, dim, type_):
        self.name = name # モダリティの識別番号
        self.dim = dim # 画像の場合はembedされた後のdim, 関節角の場合はそのまま
        self.type_ = type_ # 'image' or 'joint'
        if type_ == 'image':
            self.image_shape = (64, 64, 3)
        Modality.num2type[name] = type_

    @classmethod
    def get_type(cls, number):
        return cls.num2type[number]

ModalityInfo = (
    Modality('image1', 1024, 'image'),
    Modality('joints', 26, 'joint'),
)

In [None]:
# リプレイバッファの宣言
buffer_capacity = 200000  # Colabのメモリの都合上，元の実装より小さめにとっています
replay_buffer = ReplayBuffer(
    capacity=buffer_capacity,
    ModalityInfo=ModalityInfo,
    action_dim=env.action_space.shape[0],
)

# モデルの宣言
state_dim = 30  # 確率的状態の次元
rnn_hidden_dim = 200  # 決定的状態（RNNの隠れ状態）の次元
# 確率的状態の次元と決定的状態（RNNの隠れ状態）の次元は一致しなくて良い
encoder = Encoder().to(device)
rssm = RSSM(
    state_dim,
    env.action_space.shape[0], # shapeが(6, )なのでshape[0]は6
    rnn_hidden_dim,
    ModalityInfo,
)
value_model = ValueModel(state_dim, rnn_hidden_dim).to(device)
action_model = ActionModel(state_dim, rnn_hidden_dim, env.action_space.shape[0]).to(
    device
)

# オプティマイザの宣言
model_lr = 6e-4  # encoder, rssm, obs_model, reward_modelの学習率
value_lr = 8e-5
action_lr = 8e-5
eps = 1e-4
model_params = (
    list(encoder.parameters())
    + list(rssm.transition.parameters())
    + list(rssm.observation.parameters())
    + list(rssm.reward.parameters())
)
model_optimizer = torch.optim.Adam(model_params, lr=model_lr, eps=eps)
value_optimizer = torch.optim.Adam(value_model.parameters(), lr=value_lr, eps=eps)
action_optimizer = torch.optim.Adam(action_model.parameters(), lr=action_lr, eps=eps)

# その他ハイパーパラメータ
seed_episodes = 5  # 最初にランダム行動で探索するエピソード数
all_episodes = 100  # 学習全体のエピソード数（300ほどで，ある程度収束します）
test_interval = 10  # 何エピソードごとに探索ノイズなしのテストを行うか
model_save_interval = 20  # NNの重みを何エピソードごとに保存するか
collect_interval = 100  # 何回のNNの更新ごとに経験を集めるか（＝1エピソード経験を集めるごとに何回更新するか）

action_noise_var = 0.3  # 探索ノイズの強さ

batch_size = 50
chunk_length = 50  # 1回の更新で用いる系列の長さ
imagination_horizon = 15  # Actor-Criticの更新のために，Dreamerで何ステップ先までの想像上の軌道を生成するか


gamma = 0.9  # 割引率
lambda_ = 0.95  # λ-returnのパラメータ
clip_grad_norm = 100  # gradient clippingの値
free_nats = 3  # KL誤差（RSSMのTransitionModelにおけるpriorとposteriorの間の誤差）がこの値以下の場合，無視する

# 9.学習
まず，最初の数エピソードはランダムに行動して経験をリプレイバッファに貯めます．

In [None]:
env = make_env() # Wrapされたenv
for episode in range(seed_episodes):
    obs = env.reset()
    done = False
    while not done:
        action = env.action_space.sample()
        next_obs, reward, done, _ = env.step(action)
        replay_buffer.push(obs, action, reward, done)
        obs = next_obs

学習結果を確認するために，TensorBoardを立ち上げておきます．

In [None]:
log_dir = "./logs"
writer = SummaryWriter(log_dir)
%tensorboard --logdir './logs'

以下がメインの学習ループです．それぞれのコメントを見て，実装の内容を追ってください.

学習にはColab Proで3時間半ぐらいの時間がかかります.

## トレーニング回す

In [None]:
for episode in range(seed_episodes, 20):
    # -----------------------------
    # 各エピソードから経験を集める
    # -----------------------------
    start = time.time()
    # 行動を決定するためのエージェントを宣言
    policy = Agent(encoder, rssm.transition, action_model) # rssm.transitionはTransitionクラス

    env = make_env()
    obs = env.reset()
    done = False
    total_reward = 0
    while not done:
        action = policy(obs) # __call__を呼び出し。 (action_dim, )
        # 探索のためにガウス分布に従うノイズを加える(explaration noise)
        action += np.random.normal(0, np.sqrt(action_noise_var),
                                     env.action_space.shape[0])
        next_obs, reward, done, _ = env.step(action)

        #リプレイバッファに観測，行動，報酬，doneを格納
        replay_buffer.push(obs, action, reward, done)

        obs = next_obs
        total_reward += reward

    # 訓練時の報酬と経過時間をログとして表示
    writer.add_scalar('total reward at train', total_reward, episode)
    print('episode [%4d/%4d] is collected. Total reward is %f' %
            (episode+1, all_episodes, total_reward))
    print('elasped time for interaction: %.2fs' % (time.time() - start))

    # NNのパラメータを更新する
    start = time.time()
    for update_step in range(collect_interval): # 1エピソード集めたときcollect_interval回NN更新する
        # -------------------------------------------------------------------------------------
        #  RSSM(trainsition_model, obs_model, reward_model)の更新 - Dynamics learning
        #  事前分布と事後分布のKL距離、再構成誤差、報酬モデルの３つの損失関数を用いてDynamicsを学習する
        # -------------------------------------------------------------------------------------
        observations, actions, rewards, _ = \
            replay_buffer.sample(batch_size, chunk_length) # (B, chunk_length, H, W, C), (B, chunk_length, action_dim), ...

        # 観測を前処理し，RNNを用いたPyTorchでの学習のためにTensorの次元を調整
        for modality_name, modality in observations.items():
            if Modality.get_type(modality_name) == 'image':
                modality = preprocess_obs(modality)
                modality = rearrange(modality, "b t h w c -> t b c h w")  # (T, B, C, H, W)
            elif Modality.get_type(modality_name) == 'joint':
                modality = rearrange(modality, "b t d -> t b d") # (T, B, joint_dim)
            observations[modality_name] = torch.as_tensor(modality, device=device)  # (T, B, *)

        # observations = preprocess_obs(observations)
        # observations = torch.as_tensor(observations, device=device)  # (B, T, H, W, C)
        actions = torch.as_tensor(actions, device=device)  # (B, T, action dim)
        rewards = torch.as_tensor(rewards, device=device)  # (B, T, 1)

        # observations = rearrange(observations, "b t h w c -> t b c h w")  # (T, B, C, H, W)
        actions = rearrange(actions, "b t d -> t b d")  # (T, B, action dim)
        rewards = rearrange(rewards, "b t d -> t b d")  # (T, B, 1)

        # 観測をエンコーダで低次元のベクトルに変換
        embedded_observations = encoder(
            observations.reshape(-1, 3, 64, 64)).view(chunk_length, batch_size, -1)  # (T, B, 1024)

        # 低次元の状態表現を保持しておくためのTensorを定義
        states = torch.zeros(chunk_length, batch_size, state_dim, device=device)  # (T, B, state dim)
        rnn_hiddens = torch.zeros(chunk_length, batch_size, rnn_hidden_dim, device=device)  # (T, B, rnn hidden dim)

        # 低次元の状態表現は最初はゼロ初期化（timestep１つ分）
        state = torch.zeros(batch_size, state_dim, device=device)
        rnn_hidden = torch.zeros(batch_size, rnn_hidden_dim, device=device)

        #KL loss#################################################################################
        # 状態s_tの予測を行ってそのロスを計算する（priorとposteriorの間のKLダイバージェンス）
        kl_loss = 0
        for l in range(chunk_length-1):
            next_state_prior, next_state_posterior, rnn_hidden = \
                rssm.transition(state, actions[l], rnn_hidden, embedded_observations[l+1])  # (B, state_dim)
            state = next_state_posterior.rsample()
            states[l+1] = state
            rnn_hiddens[l+1] = rnn_hidden
            kl = kl_divergence(next_state_prior, next_state_posterior).sum(dim=1) # WRITE ME （ヒント: kl_divergence()を使用）多変量正規分布の各次元を足す
            kl_loss += kl.clamp(min=free_nats).mean()  # 原論文通り，KL誤差がfree_nats以下の時は無視
        kl_loss /= (chunk_length - 1)
        ##########################################################################################

        # states[0] and rnn_hiddens[0]はゼロ初期化なので以降では使わない
        # states，rnn_hiddensは低次元の状態表現
        states = states[1:]  # (T-1, B, state dim)
        rnn_hiddens = rnn_hiddens[1:]  # (T-1, B, rnn hidden dim)

        #obs lossとrecon loss#####################################################################
        # 観測を再構成，また，報酬を予測
        flatten_states = states.view(-1, state_dim)  # ((T-1) x B, state dim)
        flatten_rnn_hiddens = rnn_hiddens.view(-1, rnn_hidden_dim)  # ((T-1) x B, rnn hidden dim)
        recon_observations = rssm.observation(flatten_states, flatten_rnn_hiddens)
        for modality_name, modality in recon_observations.items():
            if Modality.get_type(modality_name) == 'image':
                recon_observations[modality_name] = observations.view(chunk_length-1, batch_size, 3, 64, 64)
            elif Modality.get_type(modality_name) == 'joint':
                recon_observations[modality_name] = observations.view(chunk_length-1, batch_size, -1)
        # recon_observations = rssm.observation(flatten_states, flatten_rnn_hiddens).view(chunk_length-1, batch_size, 3, 64, 64)  # (T-1, B, C, H, W)
        predicted_rewards = rssm.reward(flatten_states, flatten_rnn_hiddens).view(chunk_length-1, batch_size, 1)  # (T-1, B, 1)

        # 観測と報酬の予測誤差を計算
        obs_loss = 0
        for modality_name, recon_modality in recon_observations.items():
            if Modality.get_type(modality_name) == 'image':
                obs_loss += 0.5 * F.mse_loss(recon_modality, observations[modality_name][1:], reduction='none').mean([0, 1]).sum()
            if Modality.get_type(modality_name) == 'joint':
                obs_loss += 0.5 * F.mse_loss(recon_modality, observations[modality_name][1:])
        # obs_loss = 0.5 * F.mse_loss(recon_observations, observations[1:], reduction='none').mean([0, 1]).sum() #(T-1, B, C, H, W) -> (C, H, W) -> (1, )
        reward_loss = 0.5 * F.mse_loss(predicted_rewards, rewards[:-1])
        ###########################################################################################


        # 以上のロスを合わせて勾配降下で更新する
        model_loss = kl_loss + obs_loss + reward_loss
        model_optimizer.zero_grad()
        model_loss.backward()
        clip_grad_norm_(model_params, clip_grad_norm)
        model_optimizer.step()

        # --------------------------------------------------
        #  Action Model, Value　Modelの更新　- Behavior learning
        # --------------------------------------------------
        # Actor-Criticのロスで他のモデルを更新することはないので勾配の流れを一度遮断
        # flatten_states, flatten_rnn_hiddensは RSSMから得られた低次元の状態表現を平坦化した値
        flatten_states = flatten_states.detach()
        flatten_rnn_hiddens = flatten_rnn_hiddens.detach()

        # DreamerにおけるActor-Criticの更新のために，現在のモデルを用いた
        # 数ステップ先の未来の状態予測を保持するためのTensorを用意
        imagined_states = torch.zeros(imagination_horizon + 1,
                                         *flatten_states.shape, # ((T-1) x B, state dim)
                                          device=flatten_states.device)  # (horizon + 1, (T-1) x B, state dim)
        imagined_rnn_hiddens = torch.zeros(imagination_horizon + 1,
                                                *flatten_rnn_hiddens.shape,
                                                device=flatten_rnn_hiddens.device)  # (horizon + 1, (T-1) x B, rnn hidden dim)

        #　未来予測をして想像上の軌道を作る前に，最初の状態としては先ほどモデルの更新で使っていた
        # リプレイバッファからサンプルされた観測データを取り込んだ上で推論した状態表現を使う
        imagined_states[0] = flatten_states
        imagined_rnn_hiddens[0] = flatten_rnn_hiddens

        # open-loopで未来の状態予測を使い，想像上の軌道を作る
        for h in range(1, imagination_horizon + 1):
            # 行動はActionModelで決定．この行動はモデルのパラメータに対して微分可能で,
            #　これを介してActionModelは更新される
            actions = action_model(flatten_states, flatten_rnn_hiddens)  # ((T-1) x B, action dim)
            flatten_states_prior, flatten_rnn_hiddens = rssm.transition.prior(rssm.transition.recurrent(flatten_states,
                                                                   actions,
                                                                   flatten_rnn_hiddens))
            flatten_states = flatten_states_prior.rsample()
            imagined_states[h] = flatten_states  # ((T-1) x B, state dim)
            imagined_rnn_hiddens[h] = flatten_rnn_hiddens  # ((T-1) x B, rnn hidden dim)

        # RSSMのreward_modelにより予測された架空の軌道に対する報酬を計算
        flatten_imagined_states = imagined_states.view(-1, state_dim)  # ((hotizon+1)x(T-1)xB, state dim)
        flatten_imagined_rnn_hiddens = imagined_rnn_hiddens.view(-1, rnn_hidden_dim)  # ((horizon+1)x(T-1)xB, rnn hidden dim)
        imagined_rewards = \
            rssm.reward(flatten_imagined_states,
                        flatten_imagined_rnn_hiddens).view(imagination_horizon + 1, -1)  # ((horizon+1), (T-1)xB, state dim)
        imagined_values = \
            value_model(flatten_imagined_states,
                        flatten_imagined_rnn_hiddens).view(imagination_horizon + 1, -1)  # ((horizon+1), (T-1)xB, rnn hidden dim)

        # λ-returnのターゲットを計算(V_{\lambda}(s_{\tau})
        lambda_target_values = lambda_target(imagined_rewards, imagined_values, gamma, lambda_) # WRITE ME （ヒント: lambda_target()を利用）  # ((horizon+1), (T-1)xB, 1)

        # 価値関数の予測した価値が大きくなるようにActionModelを更新
        # PyTorchの基本は勾配降下だが，今回は大きくしたいので-1をかける
        action_loss = -lambda_target_values.mean()
        action_optimizer.zero_grad()
        action_loss.backward()
        clip_grad_norm_(action_model.parameters(), clip_grad_norm)
        action_optimizer.step()

        # TD(λ)ベースの目的関数で価値関数を更新（価値関数のみを学習するため，学習しない変数のグラフは切っている．)
        imagined_values = value_model(flatten_imagined_states.detach(), flatten_imagined_rnn_hiddens.detach()).view(imagination_horizon + 1, -1)# ((horizon+1), (T-1)xB, 1)
        value_loss = 0.5 * F.mse_loss(imagined_values, lambda_target_values.detach()) # WRITE ME （ヒント: 0.5 * F.mse_loss()を使用）
        value_optimizer.zero_grad()
        value_loss.backward()
        clip_grad_norm_(value_model.parameters(), clip_grad_norm)
        value_optimizer.step()

        # ログをTensorBoardに出力
        print('update_step: %3d model loss: %.5f, kl_loss: %.5f, '
             'obs_loss: %.5f, reward_loss: %.5f, '
             'value_loss: %.5f action_loss: %.5f'
                % (update_step + 1, model_loss.item(), kl_loss.item(),
                    obs_loss.item(), reward_loss.item(),
                    value_loss.item(), action_loss.item()))
        total_update_step = episode * collect_interval + update_step
        writer.add_scalar('model loss', model_loss.item(), total_update_step)
        writer.add_scalar('kl loss', kl_loss.item(), total_update_step)
        writer.add_scalar('obs loss', obs_loss.item(), total_update_step)
        writer.add_scalar('reward loss', reward_loss.item(), total_update_step)
        writer.add_scalar('value loss', value_loss.item(), total_update_step)
        writer.add_scalar('action loss', action_loss.item(), total_update_step)

    print('elasped time for update: %.2fs' % (time.time() - start))

    # --------------------------------------------------------------
    #    テストフェーズ．探索ノイズなしでの性能を評価する
    # --------------------------------------------------------------
    if (episode + 1) % test_interval == 0:
        policy = Agent(encoder, rssm.transition, action_model)
        start = time.time()
        obs = env.reset()
        done = False
        total_reward = 0
        while not done:
            action = policy(obs, training=False)
            obs, reward, done, _ = env.step(action)
            total_reward += reward

        writer.add_scalar('total reward at test', total_reward, episode)
        print('Total test reward at episode [%4d/%4d] is %f' %
                (episode+1, all_episodes, total_reward))
        print('elasped time for test: %.2fs' % (time.time() - start))

    if (episode + 1) % model_save_interval == 0:
        # 定期的に学習済みモデルのパラメータを保存する
        model_log_dir = os.path.join(log_dir, 'episode_%04d' % (episode + 1))
        os.makedirs(model_log_dir, exist_ok=True)
        torch.save(encoder.state_dict(), os.path.join(model_log_dir, 'encoder.pth'))
        torch.save(rssm.transition.state_dict(), os.path.join(model_log_dir, 'rssm.pth'))
        torch.save(rssm.observation.state_dict(), os.path.join(model_log_dir, 'obs_model.pth'))
        torch.save(rssm.reward.state_dict(), os.path.join(model_log_dir, 'reward_model.pth'))
        torch.save(value_model.state_dict(), os.path.join(model_log_dir, 'value_model.pth'))
        torch.save(action_model.state_dict(), os.path.join(model_log_dir, 'action_model.pth'))
    del env
    gc.collect()

writer.close()

TensorBoardで学習結果を確認してみます．

In [None]:
%tensorboard --logdir='./logs'

##  10.結果の確認
保存された学習済み重みを用いて，動作を確認してみましょう.

学習にはかなりの時間がかかるので，ここでは事前に学習しておいた重みを読み込むことにします．時間のある方は，上記のコードで実際に学習した重みを使って同様に試してみてください.

In [None]:
# # 事前にGoogle Driveにあげておいた学習済み重みをダウンロードします
# from google_drive_downloader import GoogleDriveDownloader as gdd
# file_id_dreamer = "1EE6okFLo33RlUAmLdwsCvkXql3hqbjVu"  # Google Driveにあげた学習済重みのfile idを取得してここにコピペしてください
# file_id_mopoedreamer = "1r0YXULm4xBoSql0sxzE1GhWuTQyMQkeM"
# gdd.download_file_from_google_drive(
#     file_id=file_id_dreamer, dest_path="./Dreamer", unzip=True, overwrite=True
# )
# gdd.download_file_from_google_drive(
#     file_id=file_id_mopoedreamer, dest_path="./MoPoE-Dreamer", unzip=True, overwrite=True
# )

# # import zipfile

# # def unzip_file(zip_file_path, extract_to_path):
# #   """
# #   zipファイルを指定されたディレクトリに解凍する。

# #   Args:
# #     zip_file_path: 解凍するzipファイルのパス。
# #     extract_to_path: 解凍先のディレクトリパス。
# #   """
# #   with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
# #     zip_ref.extractall(extract_to_path)

# # # 使用例
# # zip_file_path_dreamer = './Dreamer/episode_0300.zip'  # ダウンロードしたzipファイルのパス
# # extract_to_path_dreamer = './Dreamer/'  # 解凍先のディレクトリ (カレントディレクトリに解凍)
# # zip_file_path_mopoedreamer = './MoPoE-Dreamer/episode_0300.zip'  # ダウンロードしたzipファイルのパス
# # extract_to_path_mopoedreamer = './MoPoE-Dreamer/'

# # unzip_file(zip_file_path_dreamer, extract_to_path_dreamer)
# # unzip_file(zip_file_path_mopoedreamer, extract_to_path_mopoedreamer)

In [None]:
from google.colab import drive
import shutil
import os

# Googleドライブをマウント
drive.mount('/content/drive')

In [None]:
# dir = '/content/drive/MyDrive/世界モデル/最終課題/Parameters/DreamerV1'
dir = '/content/drive/MyDrive/世界モデル/最終課題/Parameters/MoPoE-DreamerV1'

In [None]:
encoder = Encoder().to(device)
rssm = RSSM(
    state_dim,
    env.action_space.shape[0], # shapeが(6, )なのでshape[0]は6
    rnn_hidden_dim,
    ModalityInfo,
)
value_model = ValueModel(state_dim, rnn_hidden_dim).to(device)
action_model = ActionModel(state_dim, rnn_hidden_dim, env.action_space.shape[0]).to(
    device
)

In [None]:
encoder.load_state_dict(torch.load(os.path.join(dir, 'episode_0300/encoder.pth'), map_location=device))
rssm.transition.load_state_dict(torch.load(os.path.join(dir, "episode_0300/rssm.pth"), map_location=device))
rssm.observation.load_state_dict(torch.load(os.path.join(dir, "episode_0300/obs_model.pth"), map_location=device))
action_model.load_state_dict(torch.load(os.path.join(dir, "episode_0300/action_model.pth"), map_location=device))

In [None]:
# # 学習済み重みを用いず，このcolab上で学習したモデルを使うなら，このセルを実行してください．
# # あるいは，定期的に保存されているモデルを読み込むこともできます
# encoder.load_state_dict(torch.load(os.path.join(model_log_dir, "encoder.pth")))
# rssm.transition.load_state_dict(torch.load(os.path.join(model_log_dir, "rssm.pth")))
# rssm.observation.load_state_dict(
#     torch.load(os.path.join(model_log_dir, "obs_model.pth"))
# )
# action_model.load_state_dict(
#     torch.load(os.path.join(model_log_dir, "action_model.pth"))
# )

動作の様子を動画で観てみることにします．

In [None]:
env = make_env()
policy = Agent(encoder, rssm.transition, action_model)
obs = env.reset()
done = False
total_reward = 0
frames = [obs['image1']]
while not done:
    action = policy(obs, training=False)
    obs, reward, done, _ = env.step(action)
    total_reward += reward
    frames.append(obs['image1'])

print("Total Reward:", total_reward)

In [None]:
# 結果を動画で観てみるための関数
import matplotlib.pyplot as plt
from IPython.display import HTML
from matplotlib import animation


def display_video(frames: List[np.ndarray]) -> None:
    """
    結果を動画にするための関数．

    frames : List[np.ndarray]
        観測画像をリスト化したもの．
    """
    plt.figure(figsize=(8, 8), dpi=50)
    patch = plt.imshow(frames[0])
    plt.axis("off")

    def animate(i):
        patch.set_data(frames[i])
        plt.title("Step %d" % (i))

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
    display(HTML(anim.to_jshtml(default_mode="once")))
    plt.close()

In [None]:
# full
import statistics
reward_list = []
env = make_env()
policy = Agent(encoder, rssm.transition, action_model)
obs = env.reset()
done = False
total_reward = 0
frames = [obs['image1']]
sum_total_reward = 0
iter = 10
for _ in range(iter):
    while not done:
        action = policy(obs, training=False)
        obs, reward, done, _ = env.step(action)
        total_reward += reward
        frames.append(obs['image1'])

    print("Total Reward:", total_reward)
    reward_list.append(total_reward)
    obs = env.reset()
    done = False
    total_reward = 0

print(f'average reward: {statistics.mean(reward_list)} \n std: {statistics.stdev(reward_list)}')

In [None]:
display_video(frames)

In [None]:
# only img
reward_list = []
def extract_image(obs):
    new_obs = {}
    for modality_name, modality in obs.items():
        if modality_name == 'image1':
            new_obs[modality_name] = modality
    return new_obs

env = make_env()
policy = Agent(encoder, rssm.transition, action_model)
obs = env.reset()
obs = extract_image(obs)
done = False
total_reward = 0
frames = [obs['image1']]
iter = 10
for _ in range(iter):
    while not done:
        action = policy(obs, training=False)
        obs, reward, done, _ = env.step(action)
        obs = extract_image(obs)
        total_reward += reward
        frames.append(obs['image1'])

    print("Total Reward:", total_reward)
    reward_list.append(total_reward)
    obs = env.reset()
    done = False
    total_reward = 0

print(f'average reward: {statistics.mean(reward_list)} \n std: {statistics.stdev(reward_list)}')

In [None]:
display_video(frames)

In [None]:
# only join
reward_list = []
def extract_joints(obs):
    new_obs = {}
    for modality_name, modality in obs.items():
        if modality_name == 'joints':
            new_obs[modality_name] = modality
    return new_obs

env = make_env()
policy = Agent(encoder, rssm.transition, action_model)
obs = env.reset()
new_obs = extract_joints(obs)
done = False
total_reward = 0
frames = [obs['image1']]
iter = 10
for _ in range(iter):
    while not done:
        action = policy(new_obs, training=False)
        obs, reward, done, _ = env.step(action)
        new_obs = extract_joints(obs)
        total_reward += reward
        frames.append(obs['image1'])

    print("Total Reward:", total_reward)
    reward_list.append(total_reward)
    obs = env.reset()
    done = False
    total_reward = 0

print(f'average reward: {statistics.mean(reward_list)} \n std: {statistics.stdev(reward_list)}')

In [None]:
display_video(frames)

ある時点の適当な観測から，世界モデルで**open-loop**に未来予測を行わせ，観測を再構成して視覚的に観てみましょう．

In [None]:
policy = Agent(encoder, rssm.transition, action_model)
obs = env.reset()
# 最初に適当な回数行動します．この間にrnn_hiddenに観測の系列に関する情報が蓄積されます
for _ in range(np.random.randint(5, 100)):
    action = policy(obs, training=False)
    obs, _, _, _ = env.step(action)

# 現在の観測をベクトルに変換し，それを元にposteriorを計算します．
preprocessed_obs = preprocess_obs(obs)
preprocessed_obs = torch.as_tensor(preprocessed_obs, device=device)
preprocessed_obs = preprocessed_obs.transpose(1, 2).transpose(0, 1).unsqueeze(0)
with torch.no_grad():
    embedded_obs = encoder(preprocessed_obs)

# posteriorからのサンプルとして得られたstateと，policyから取得したrnn_hiddenが低次元の状態表現です．
# open-loopの予測なので，これ以降この2つの変数は状態遷移を表すpriorでしか更新しません．
# （policyの中では，行動を決定するために観測をリアルタイムで反映してposteriorで更新しています）
rnn_hidden = policy.rnn_hidden
state = rssm.transition.posterior(rnn_hidden, embedded_obs).sample()
frame = np.zeros((64, 128, 3))
frames = []

prediction_length = 100
for _ in range(prediction_length):
    action = policy(obs)
    obs, _, _, _ = env.step(action)

    action = torch.as_tensor(action, device=device).unsqueeze(0)
    with torch.no_grad():
        state_prior, rnn_hidden = rssm.transition.prior(
            rssm.transition.recurrent(state, action, rnn_hidden)
        )
        state = state_prior.sample()
        predicted_obs = rssm.observation(
            state, rnn_hidden
        )  # obs_model(state, rnn_hidden)

    frame[:, :64, :] = preprocess_obs(obs)
    frame[:, 64:, :] = (
        predicted_obs.squeeze().transpose(0, 1).transpose(1, 2).cpu().numpy()
    )
    frames.append((frame + 0.5).clip(0.0, 1.0))

open-loopの動画予測の結果を，左側に真のフレーム，右側に予測されたフレームと並べてみてみましょう．

In [None]:
display_video(frames)

以上で演習は終わりです．お疲れ様でした！

## 11.参考文献
[[1]](https://arxiv.org/pdf/1811.04551.pdf) Danijar Hafner, Timothy Lillicrap, Ian Fischer, Ruben Villegas, David Ha, "Learning Latent Dynamics for Planning from Pixels", arXiv, 2019

[[2]](https://arxiv.org/abs/1912.01603) Danijar Hafner, Timothy Lillicrap, Jimmy Ba, Mohammad Norouzi,
"Dream to Control: Learning Behaviors by Latent Imagination", ICLR2020