# 2022年 世界モデル第5回演習

本演習では, いわゆる世界モデルを用いた強化学習アルゴリズムである **RSSM (Recurrent State Space Model)**[[1]](#scrollTo=J_PCEOJ69prx) と**Dreamer**[[2]](#scrollTo=J_PCEOJ69prx)を実装します.

1. [準備](#scrollTo=yDN8Ohc0Mh8C)
2. [環境の設定](#scrollTo=_eayBK1HPV5H)
3. [RSSM](#scrollTo=B3rWEGtkA3e_&line=1&uniqifier=1)
4. [RSSMの実装](#scrollTo=xJfU7ygCMeJn)
5. [補助機能の実装](#scrollTo=WhlVvU3aZ5FC)
6. [Dreamerの実装](#scrollTo=keAW0J-qS24Q)
7. [エージェントの実装](#scrollTo=GdKZ1S2_boOd)
8. [ハイパーパラメータの設定と学習の準備](#scrollTo=QUPWAkpyc9Z-)
9. [学習](#scrollTo=yUoI0l9Ee0Tp)
10. [結果の確認](#scrollTo=E3F2tAU0kepm)
11. [参考文献](##scrollTo=J_PCEOJ69prx&line=4&uniqifier=1)

##1.準備
まず, 演習を行うために必要な準備を行います.

この演習では, 環境として[PyBullet](https://github.com/bulletphysics/bullet3)を用います. PyBulletは`pip`を用いてインストールします.

In [None]:
!pip install pybullet



必要なライブラリをインポートします. 今回使うライブラリは, PyBullet以外はColabでは最初から入っていますが, 手元の環境で行う場合はこれらのライブラリも`pip`等で事前にインストールする必要があります.

In [None]:
import time
import os

import gym
import pybullet_envs  # PyBulletの環境をgymに登録する

import numpy as np
import matplotlib.pyplot as plt

import torch
from torch.distributions import Normal
from torch.distributions.kl import kl_divergence
from torch import nn
from torch.nn import functional as F
from torch.nn.utils import clip_grad_norm_
from torch.utils.tensorboard import SummaryWriter

# 可視化のためにTensorBoardを用いるので, Colab上でTensorBoardを表示するための宣言を行う
%load_ext tensorboard
torch.set_default_tensor_type('torch.cuda.FloatTensor')

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


今回の学習にはGPUが必要です． 

以下のコードを実行して， 結果が'cuda'でなければ「ランタイム」 →　 「ランタイムのタイプを変更」でGPUモードに変更しましょう．

In [None]:
# torch.deviceを定義. この変数は後々モデルやデータをGPUに転送する時にも使います
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


きちんとGPUが使える状態になっているかチェックしておきます． ColabのGPU割り当てにはランダム性があるので人によって結果が違う場合がありますが， どれでも実行には問題ありません（学習にかかる時間に幾らかの差は出ます）.

In [None]:
!nvidia-smi

Sun Apr  3 09:40:37 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   43C    P8    11W /  70W |      3MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

## 2.環境の設定

###1.ゲーム基幹部分

#### import

In [None]:
import cv2
import copy
from google.colab.patches import cv2_imshow
from PIL import Image
from gym.spaces import *
import random

In [None]:
# ToDO:rootフォルダを確認
path = "/content/drive/MyDrive/Colab Notebooks/Adversarial_worldModel_BomberMan"

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#### ゲームルール


各マスのid割り当てとその意味

| id |意味|
|:---|:---|
|0|空白|
|1|破壊不可オブジェクト|
|2|破壊可能オブジェクト(レンガ)|
|1X|キャラ(X = id)|
|10X|爆弾(X = 置いた人のプレイヤーid)|

アクション 

| id |意味| 
|:---|:---|
|0|何もしない|
|1|上に移動|
|2|下に移動|
|3|右に移動|
|4|左に移動|
|5|爆弾を設置|

x軸が縦、y軸が横

学習時renderには連番で投げる  

#### ゲーム情報定義

In [None]:
# フィールドのサイズ定義　両方奇数である前提で行っている
boardXsize = 13
boardYsize = 11
# フィールドにおける爆弾の合計の上限数
MAXBOMBS = 20
ITEMS = 3
# プレイヤー人数　現状2である前提
player_count = 2
# 行動可能なアクションの数
n_actions = 6
# 最初においてあるレンガの数
brickCount = 40# boardXsize * boardYsize - 58
# ゲームクラスのデバッグ用
logDisp = False
# 各modelの可視化用
dummyCheck = True

In [None]:
assert boardXsize % 2 == 1 and boardYsize % 2 == 1
assert player_count == 2

##### 報酬設定(ゲーム内で表示されるスコアの定義)

In [None]:
# 爆弾が相手に当たったときの報酬
ONBOMHIT = 1
# 相手からの爆弾が当たったときの報酬
ONBOMHITTED = -0.1
# 自分の爆弾が当たったときの報酬
ONBOMSELF = -1
# レンガを壊した時に得られる報酬
ONBREAKBRICK = 0
# 接近ボーナス
CLOSEBONUS = 0.2
# 勝利ボーナス
WIN = 0.1
# 敗北ボーナス
LOSE = -0.1
# 壁にぶつかる、爆弾の設置数が上限に達している行動をしたときのペナルティー
INVALID_ACTION_PENALTY = -0.008
# 生存ごとに毎step得る報酬
ALLIVE = 0
# ステップ上限に達したときのペナルティー
OVERTIME_PENALTY = -10

#####パラメータ、便利関数

In [None]:
def pos2v(val):
    return val[0], val[1]

def nearInt(val):
    return int(val + 0.5)

def isInBoard(p):
    if 0 <= p[0] and p[0] < boardXsize and 0 <= p[1] and p[1] < boardYsize:
        return True
    else:
        return False

def rand_ints_nodup(a, b, k):
    ns = []
    if k > b : k = b
    while len(ns) < k:
        n = np.random.randint(a, b)
        if not n in ns:
            ns.append(n)
    return ns

V = np.array([[0, -1], [0, 1], [1, 0], [-1, 0]])

##### マップ用画像読み込み

In [None]:
pix_size = 1
windowXsize = boardXsize * pix_size
windowYsize = boardYsize * pix_size
# 人間の目に優しいように
chip_none = cv2.imread(path + "/BomberMan/chips/none.jpg")
chip_block = cv2.imread(path + "/BomberMan/chips/block.jpg")
chip_bomb = cv2.imread(path + "/BomberMan/chips/bom.jpg")
chip_brick = cv2.imread(path + "/BomberMan/chips/brick.jpg")
chip_enemy = cv2.imread(path + "/BomberMan/chips/enemy.jpg")
chip_player = cv2.imread(path + "/BomberMan/chips/player.jpg")
chip_bomb_on_me = cv2.imread(path + "/BomberMan/chips/bomonme.jpg")
chip_bomb_on_enemy = cv2.imread(path + "/BomberMan/chips/bomonenemy.jpg")

### ゲームクラス

基本の流れ  
対戦型のため普通のgymのように使えないため ToDO:Read it  
addActionで各プレイヤーの行動を設定、stepで行動を実行  
また報酬なども配列で返す

Player

|クラス内変数|説明|
|:---|:---|
|id|プレイヤーid(0～)|
|life|残機|
|pos|位置|
|bombMax|プレイヤーが置ける上限爆弾数|
|setBomb|プレイヤーが置いた爆弾数|
|bombLen|このプレイヤーの置いた爆弾が燃え広がる長さ|
|score|得点|

Bomb

|クラス内変数|説明|
|:---|:---|
|isActive|この爆弾idが今使われているか|
|timer|起爆するまでのtick|
|who|爆弾を置いたひとのid|
|len|この爆弾の燃え広がる長さ|
|pos|爆弾の位置|

Env

|クラス内変数|説明|
|:---|:---|
|actions|各プレイヤーがそのtickにとった行動|
|playableId|playableId[pid]が1のときpidが生存していることを示す|
|players|Player()のインスタンス配列|
|bombs|Bomb()のインスタンス配列|
|boardHistory|ゲーム内用の盤面の履歴|
|isFin|doneと同義|


|関数|説明|返り値|
|:---|:---|:---|
|addAction(pid, action)|pidがactionを行う予定を追加||
|resetBoard()|Boardの初期化||
|setBrick(cnt)|cntの数だけおける場所にレンガを置く||
|render(pid)|pidから見た学習用の盤面を返す|np.array([boardXsize, boardYsize])|
|renderForHuman(self, pid, board = None)|pidから見た人間用の盤面を返す|cv2|
|renderToGif(pid, path)|"root+path.gif"に今の環境のプレイ履歴をgifで保存する||
|rewards()|報酬の配列を獲得する|[pid, value]|
|observations()|各プレイヤーからみたboardを返す|[pid, np.array([1, boardXsize, boardYsize])]|

render(pid)は各値が[0, ITEMS+4]の範囲にあるので必要に応じてクリッピングしている  
本来は画像でやりとりするため現状のrender(pid)は必要ない関数ではある...(renderForHumanでやるべき)  
renderForHuman()のboardにNoneをいれると現在の盤面を返し、任意のゲーム内用配列を入れるとそれを出力する

##### class

In [None]:
class Player:
    def __init__(self, id = 0):
        self.id = id
        self.life = 10
        self.pos = np.array([0, 0])
        self.bombMax = 2
        self.setBomb = 0
        self.bombLen = 15
        self.score = 0.0
    
    def _changePos(self, pos):
        self.pos = pos
    
    def scoring(self, score):
        self.score += score

    def dmg(self):
        self.life -= 1
        if logDisp : print(f"{self.id}の残機減少：残り{self.life}")
    
    def isSurvive(self):
        if self.life > 0 : return True
        else : return False 

class Bomb:
    def __init__(self):
        self.isActive = False
        self.timer = 8
        self.who = -1
        self.len = -1
        self.pos = [-1,-1]
    
    def activate(self, pos, id, len):
        self.pos = pos
        self.who = id
        self.len = len
        self.isActive = True

class Env(gym.Env):
    def __init__(self, isRenderImg = False, isOutdataSingle = False):
        self.isRenderImg = isRenderImg
        self.isOutdataSingle = isOutdataSingle
        self.reset()
        # 以下gym用
        self.action_space = gym.spaces.Discrete(6)
        if isRenderImg:
            LOW = np.array([0 for i in range(3 * 64 * 64)]).reshape(64, 64, 3)
            HIGH = np.array([1 for i in range(3 * 64 * 64)]).reshape(64, 64, 3)
        else:
            LOW = np.array([0 for i in range(boardXsize * boardYsize)]).reshape(1, boardXsize, boardYsize)
            HIGH = np.array([1 for i in range(boardXsize * boardYsize)]).reshape(1, boardXsize, boardYsize)
        self.observation_space = gym.spaces.Box(low=LOW, high=HIGH)

    def reset(self):
        self.isFin = False
        self.actions = [-1 for i in range(player_count)]
        self.playableId = [1 for i in range(player_count)]
        self.players = [Player(i) for i in range(player_count)]
        self.bombs = [Bomb() for i in range(MAXBOMBS)]
        self.distance_t0 = 999.0
        self.boardHistory = []
        self.scoreHistory = []
        self.actionHistory = []
        self.posHistory = []
        self.resetBoard()
        self.addHistory()
        return self.observations()

    # 盤面リセット
    def resetBoard(self):
        # 盤面初期化
        self.board = np.zeros([boardXsize, boardYsize])

        # 破壊不可オブジェクト配置
        for x in range(1, boardXsize, 2):
            for y in range(1, boardYsize, 2):
                self.board[x, y] = 1
        
        # プレイヤー初期位置定義
        init_pos = np.array([[0, 0], [boardXsize - 1, boardYsize - 1], [0, boardYsize - 1], [boardXsize - 1, 0]])
        for i in range(player_count):
            self.board[pos2v(init_pos[i])] = 10 + i
            self.players[i]._changePos(pos = init_pos[i])
        
        self.setBrick(brickCount)

    # レンガ配置
    # プレイヤーの上下左右1マスは設置不可
    def setBrick(self, cnt):
        unsettable = []
        for player in self.players:
            for v in V:
                p = np.add(player.pos, v)
                if isInBoard(p) and self.board[pos2v(p)] == 0:
                    unsettable.append(p)
        
        settable = []
        for x, board_x in enumerate(self.board):
            for y, board_elm in enumerate(board_x):
                if board_elm == 0 and not any([np.allclose(p, [x, y]) for p in unsettable]):
                    settable.append([x, y])
        
        bricks_pos = rand_ints_nodup(0, len(settable), cnt)
        for b in bricks_pos:
            self.board[pos2v(settable[b])] = 2

    def addHistory(self):
        self.boardHistory.append(copy.deepcopy(self.board))
        self.scoreHistory.append([self.players[i].score for i in range(player_count)])
        self.actionHistory.append(copy.deepcopy(self.actions))
        self.posHistory.append([self.players[i].pos for i in range(player_count)])

    # キャラの移動処理
    def move(self, pid, direction):
        p = np.add(self.players[pid].pos, V[direction])
        if logDisp : print(f"{pid}は{p}に移動")
        if isInBoard(p):
            # 空白の時のみ移動可能
            if self.board[pos2v(p)] == 0:
                self.board[pos2v(self.players[pid].pos)] -= pid + 10
                self.board[pos2v(p)] += pid + 10
                self.players[pid]._changePos(p)
            else:
                self.onCollision(pid)
        else:
            self.onCollision(pid)

    # 壁など衝突 ToDo:減点処理...?
    def onCollision(self, pid):
        self.players[pid].scoring(INVALID_ACTION_PENALTY)

    # 爆弾セット
    def bombSet(self, pid, pos):
        # 爆弾がすでにあるかおける上限を超えたとき処理をスキップ
        if self.board[pos2v(pos)] >= 100 or self.players[pid].setBomb >= self.players[pid].bombMax:
            if logDisp : print(f"pid:{pid}は爆弾が置けない")
            self.players[pid].scoring(INVALID_ACTION_PENALTY)
            return
        for i in range(MAXBOMBS):
            if not self.bombs[i].isActive:
                self.bombs[i].activate(pos, pid, self.players[pid].bombLen)
                self.board[pos2v(pos)] += 100
                self.players[pid].setBomb += 1
                break

    # 爆弾tick
    def bombTick(self):
        for i in range(MAXBOMBS):
            if self.bombs[i].isActive:
                  self.bombs[i].timer -= 1
                  if self.bombs[i].timer <= 0:
                      self.bombDmg(i)
                      self.bombs[i].__init__()    

    # 爆弾爆発処理
    def bombDmg(self, bomId):
        pos = self.bombs[bomId].pos
        if logDisp : print(f"{pos}で{self.bombs[bomId].who}の爆弾が長さ{self.bombs[bomId].len}で爆発")
        # 爆弾を置いた人が置けるようにする
        self.players[self.bombs[bomId].who].setBomb -= 1
        # 爆発跡地
        self.board[pos2v(pos)] -= 100
        self.exploseCheck(pos, bomId)
        for v in V:
            for i in range(1, self.bombs[bomId].len):
                p = np.add(pos, v * i)
                if isInBoard(p):
                    if self.exploseCheck(p, bomId) == 1:
                        break
                else:
                    break      

    # 爆発後の地形変化 1を返すとき爆発がそこで止まることを示している
    def exploseCheck(self, p, bomId):
        whosBom = self.bombs[bomId].who
        if logDisp : print(f"{p}は{self.board[pos2v(p)]}")
        # 破壊不能の場合終了
        if int(self.board[pos2v(p)]) == 1:
            return 1
        # レンガの場合レンガを破壊して終了
        elif int(self.board[pos2v(p)]) == 2:
            self.board[pos2v(p)] = 0
            self.players[whosBom].scoring(ONBREAKBRICK)
            return 1
        # 爆弾の場合次のtickに連鎖爆発
        elif int(self.board[pos2v(p)]) >= 100:
            for i in range(MAXBOMBS):
              if np.allclose(self.bombs[i].pos, p):
                  if self.bombs[i].timer >= 2:
                      self.bombs[i].timer = 1
                  return 0
        # プレイヤーの場合ダメージ処理
        elif int(self.board[pos2v(p)]) >= 10:
            damagedPid = int(self.board[pos2v(p)]) % 10
            self.players[damagedPid].dmg()
            if logDisp : print(f"{damagedPid}が被弾")

            # 爆弾を当てたrewardと被弾reward
            if whosBom != damagedPid:
                self.players[whosBom].scoring(ONBOMHIT)
                self.players[damagedPid].scoring(ONBOMHITTED)
            else:
                self.players[damagedPid].scoring(ONBOMSELF)
            
            if not self.players[damagedPid].isSurvive():
                self.playableId[damagedPid] = 0
                self.players[damagedPid].scoring(LOSE)
                if whosBom != damagedPid:
                    self.players[whosBom].scoring(WIN)
            
            return 0

    # 相手に向かっているときボーナス
    def closeBonus(self):
        distance_t1 = np.linalg.norm(self.players[0].pos - self.players[1].pos)
        if self.distance_t0 > distance_t1:
            for pid in range(player_count):
                # 待機じゃなければ/今は待機でもスコアリングできるようにしてある
                #if 1 <= self.actions[pid] and self.actions[pid] <= 4:
                    self.players[pid].scoring(CLOSEBONUS)
            self.distance_t0 = distance_t1
    
    # 出力
    def render(self, pid):
        if self.isRenderImg:
            return self._renderToImg(pid)
        else:
            board = copy.deepcopy(self.board)
            for x in range(boardXsize):
                for y in range(boardYsize):
                    chip = int(board[x, y])
                    if 10 <= chip and chip < 100:
                        if chip == pid + 10:
                            board[x, y] = ITEMS
                        # 敵性プレイヤー
                        else:
                            board[x, y] = ITEMS + 1
                    # 爆弾
                    elif chip== 100:
                        board[x, y] = ITEMS + 2
                    # 爆弾onMe
                    elif chip == 110 + pid:
                        board[x, y] = ITEMS + 3
                    # 爆弾onEnemy
                    elif not chip < ITEMS:
                        board[x, y] = ITEMS + 4
                        
            return board

    def renderForHuman(self, pid, board = None):
        b = self.board
        if type(board) != type(None):
            b = board
        img = []
        for x in range(boardXsize):
            img_x = []
            for y in range(boardYsize):
                if b[x, y] == 0:
                    img_x.append(chip_none)
                elif b[x, y] == 1:
                    img_x.append(chip_block)
                elif b[x, y] == 2:
                    img_x.append(chip_brick)
                elif b[x, y] == 10 + pid:
                    img_x.append(chip_player)
                elif 10 <= b[x, y] and b[x, y] < 100:
                    img_x.append(chip_enemy)
                elif b[x, y] == 100:
                    img_x.append(chip_bomb)
                elif b[x, y] == 110 + pid:
                    img_x.append(chip_bomb_on_me)
                else:
                    img_x.append(chip_bomb_on_enemy)
            img_x = cv2.vconcat(img_x)
            img.append(img_x)
        img = cv2.hconcat(img)
        return img

    def _renderToImg(self, pid = 0, pixSize = [64, 64]):
        img = self.renderForHuman(pid)
        img = cv2.pyrUp(img)
        img = cv2.resize(img, dsize = (64, 64))
        return img

    def renderToGif(self, pid = 0, path = "play"):
        images = []
        for i, board in enumerate(self.boardHistory):
            img = cv2.pyrUp(self.renderForHuman(pid = pid, board = board))

            # スコア表示
            score = np.zeros((8 * 2, windowXsize * 8 * 2, 3), np.uint8)
            score = cv2.putText(score,
                text = "step:" + str('{:03}'.format(i)) + " | " +\
                   str('{:.5f}'.format(self.scoreHistory[i][0])) + " | " + str('{:.5f}'.format(self.scoreHistory[i][1])),
                org=(0, 10),
                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                fontScale=0.2,
                color=(255, 255, 255),
                thickness=1,
                lineType=cv2.LINE_4)

            # 行動表示
            for j in range(player_count):
                if 1 <= self.actionHistory[i][j]:
                    text = ["^", "v", ">", "<", "b"][self.actionHistory[i][j] - 1]
                    img = cv2.putText(img, text, (self.posHistory[i][j][0] * 16 + 8, self.posHistory[i][j][1] * 16 + 8), cv2.FONT_HERSHEY_SIMPLEX , 0.2, (255, 255, 255), 1)

            img = cv2.vconcat([img, score])
            images.append(Image.fromarray(img))

        images[0].save(path + ".gif", save_all = True, append_images = images[1:], duration=300, loop=0)

    # 以下gym用の関数

    # リワード設定
    def rewards(self):
        if self.isOutdataSingle:
            return self.players[0].score
        else:
            return [self.players[i].score for i in range(player_count)]
    
    # 観測
    def observations(self):
        if self.isOutdataSingle:
            if self.isRenderImg:
                return self.render(pid = 0)
            else:
                return torch.Tensor(self.render(pid = 0).reshape(1, windowXsize, windowYsize))
        else:
            rets = []
            for pid in range(player_count):
                if self.isRenderImg:
                    rets.append(self.render(pid = pid))
                else:
                    rets.append(torch.Tensor(self.render(pid = pid).reshape(1, windowXsize, windowYsize)))
            return rets

    # アクション登録
    def addAction(self, pid, action):
        if isinstance(action, np.ndarray):
            action = np.argmax(action)
        
        if pid < player_count and action <= n_actions:
                self.actions[pid] = action

    
    # 時間経過
    def step(self):
        for pid in range(player_count):
            action = self.actions[pid]
            if 1 <= action and action <= 4:
                self.move(pid, action - 1)
            elif action == 5:
                self.bombSet(pid, self.players[pid].pos)

        if len(self.boardHistory) >= 800:
            self.isFin = True
            for pid in range(player_count):
                self.players[pid].scoring(OVERTIME_PENALTY)

        if not self.isFin:
            self.bombTick()
            self.closeBonus()

            if sum(self.playableId) <= 1:
                self.isFin = True

        # 生存スコア計算
        for pid in range(player_count):
            if self.actions[pid] != 0:
                self.players[pid].scoring(ALLIVE)
            else:
                self.players[pid].scoring(INVALID_ACTION_PENALTY)
        
        self.addHistory()

        return self.observations(), self.rewards(), self.isFin, {}

In [None]:
# renderForHumanと同義
def matToImg(mat, fromFloat = False, pid = 0):
    mat = np.array(copy.deepcopy(mat).reshape(windowXsize, windowYsize))
    # 学習用に[0,1]の範囲にされたものをゲーム処理用に直す
    if fromFloat:
        mat *= (ITEMS + 4)
        for x in range(boardXsize):
            for y in range(boardYsize):
                chip = nearInt(mat[x, y])
                if chip == ITEMS:
                    mat[x, y] = 10
                elif chip == ITEMS + 1:
                    mat[x, y] = 11
                elif chip == ITEMS + 2:
                    mat[x, y] = 100
                elif chip == ITEMS + 3:
                    mat[x, y] = 110
                elif not chip < ITEMS:
                    mat[x, y] = 111
                else:
                    mat[x, y] = chip
    img = []
    # ゲーム処理用の配列をEnv()のインスタンスがないためここで人間用の画像に処理
    for x in range(boardXsize):
        img_x = []
        for y in range(boardYsize):
            if mat[x, y] == 0:
                img_x.append(chip_none)
            elif mat[x, y] == 1:
                img_x.append(chip_block)
            elif mat[x, y] == 2:
                img_x.append(chip_brick)
            elif mat[x, y] == 10 + pid:
                img_x.append(chip_player)
            elif 10 <= mat[x, y] and mat[x, y] < 100:
                img_x.append(chip_enemy)
            elif mat[x, y] == 100:
                img_x.append(chip_bomb)
            elif mat[x, y] == 110 + pid:
                img_x.append(chip_bomb_on_me)
            else:
                img_x.append(chip_bomb_on_enemy)
        img_x = cv2.vconcat(img_x)
        img.append(img_x)
    img = cv2.hconcat(img)
    return img

### 環境テスト

In [None]:
def make_env():
    env = Env(isRenderImg = True, isOutdataSingle = True)
    return env

In [None]:
env = make_env()
pid = 0
img = env._renderToImg(pid = pid)
for i in range(10):
    env.addAction(0, random.randint(0, 5))
    env.addAction(1, random.randint(0, 5))
    observations, rewards, isFin, _ = env.step()
env.renderToGif(pid = 0, path = path + "/result/test")
print(env.rewards())

-0.64




In [None]:
def print_spaces(space, label = " "):
   # 空間の出力
   print(label, space)

   # Box/Discreteの場合は最大値と最小値も表示
   if isinstance(space, Box):
       print('    最小値: ', space.low)
       print('    最大値: ', space.high)
   if isinstance(space, Discrete):
       print('    最小値: ', 0)
       print('    最大値: ', space.n-1)

In [None]:
print_spaces(env.action_space)
env.action_space.shape

  Discrete(6)
    最小値:  0
    最大値:  5


()

## 3.RSSM
RSSMは以下のパーツに分けることができます.

**Deterministic state model:** 
> $h_{t}=f(h_{t-1}, s_{t-1}, a_{t-1})$

**Stochastic state model:** 
> $s_{t}$ ~ $p(s_{t} | h_{t})$

**Observation model:** 
> $o_{t}$ ~ $p(o_{t} | h_{t}, s_{t})$

**Reward model:**
> $r_{t}$~ $p(r_{t} | h_{t}, s_{t})$

[以下の画像](https://towardsai.net/p/machine-learning/dreamer-8b5a42acebbf)は1つの時間ステップでのRSSMの挙動を示しています. 

![1stepあたりの模式図](https://cdn-images-1.medium.com/max/1024/1*4b2l7Fr4eavQV5ZMouCT-g.png)



Deterministic state modelで, RNNは, 前の時間ステップの行動$a_{t-1}$と事後確率状態$s'_{t-1}$, 決定論的状態$h_{t-1}$を入力とし, 決定論的状態$h_t$を出力します. 

$h_{t}$は (1)事前状態確率$s_{t}$を計算するために, 1つの隠れ層を持つMLPに供給され (本実装では線形変換）, (2)事後確率状態$s'_{t}$を計算するために, 画像埋め込み$e_t$(本実装では`embedded_obs`と表現される）と連結され, 別の単層MLP(線形変換)に供給されます. 

その後, $h_{t}$と$s'_{t}$を連結したものを用いて, 画像や報酬などを再構成します.








## 4.RSSMの実装


上記をもとにRSSMの実装を行います. 実装では, 以下の3つのクラスに分けて実装します.

* 状態遷移を担うクラス（TransitionModel） 
* 観測を復元するデコーダクラス（ObservationModel）
* 報酬を予測するクラス（RewardModel）

※TransitionModelは上記Deterministic state model, Stochastic state modelを合体させています.


まず実装するのがTransitionModelです. 

1ステップ先の未来の状態表現を予測する機能を担います.

学習の方針として, 状態遷移を用いた1ステップ先の未来の状態表現の分布である"prior"と, 1ステップ先の観測の情報を取り込んで計算した状態表現の分布である"posterior"が一致するように学習します.

"posterior"は後に観測の再構成誤差と報酬の予測誤差によって学習されるので, "posterior"は上記の2つの誤差によって妥当な状態表現になるように学習され, 同時に"prior"をそれに近づけることで, 状態表現の空間で未来の予測が可能になるように学習する, というイメージです. よって, これを踏まえたクラスの実装になっています.

状態表現として, 決定的な状態である"rnn_hidden"と, 確率的な状態である"state"の両方を持っています.

In [None]:
class TransitionModel(nn.Module):
    """
    このクラスは複数の要素を含んでいます.
    決定的状態遷移 （RNN) : h_t+1 = f(h_t, s_t, a_t)
    確率的状態遷移による1ステップ予測として定義される "prior" : p(s_t+1 | h_t+1)
    観測の情報を取り込んで定義される "posterior": q(s_t+1 | h_t+1, e_t+1)
    """
    def __init__(self, state_dim, action_dim, rnn_hidden_dim,
                 hidden_dim=200, min_stddev=0.1, act=F.elu):
        super(TransitionModel, self).__init__()
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.rnn_hidden_dim = rnn_hidden_dim
        self.fc_state_action = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc_rnn_hidden = nn.Linear(rnn_hidden_dim, hidden_dim)
        self.fc_state_mean_prior = nn.Linear(hidden_dim, state_dim)
        self.fc_state_stddev_prior = nn.Linear(hidden_dim, state_dim)
        self.fc_rnn_hidden_embedded_obs = nn.Linear(rnn_hidden_dim + 1024, hidden_dim)
        self.fc_state_mean_posterior = nn.Linear(hidden_dim, state_dim)
        self.fc_state_stddev_posterior = nn.Linear(hidden_dim, state_dim)

        #next hidden stateを計算
        self.rnn = nn.GRUCell(hidden_dim, rnn_hidden_dim)
        self._min_stddev = min_stddev
        self.act = act
  

    def forward(self, state, action, rnn_hidden, embedded_next_obs):
        """
        h_t+1 = f(h_t, s_t, a_t)
        prior p(s_t+1 | h_t+1) と posterior q(s_t+1 | h_t+1, e_t+1) を返す
        この2つが近づくように学習する
        """
        next_state_prior, rnn_hidden = self.prior(self.reccurent(state, action, rnn_hidden))
        next_state_posterior = self.posterior(rnn_hidden, embedded_next_obs)
        return next_state_prior, next_state_posterior, rnn_hidden
      
    def reccurent(self, state, action, rnn_hidden):
        """
        h_t+1 = f(h_t, s_t, a_t)を計算する
        """
        hidden = self.act(self.fc_state_action(torch.cat([state, action], dim=1)))
        #h_t+1を求める
        rnn_hidden = self.rnn(hidden, rnn_hidden)
        return rnn_hidden

    def prior(self, rnn_hidden):
        """
        prior p(s_t+1 | h_t+1) を計算する
        """
        #h_t+1を求める
        hidden = self.act(self.fc_rnn_hidden(rnn_hidden))

        mean = self.fc_state_mean_prior(hidden)
        stddev = F.softplus(self.fc_state_stddev_prior(hidden)) + self._min_stddev
        return Normal(mean, stddev), rnn_hidden

    def posterior(self, rnn_hidden, embedded_obs):
        """
        posterior q(s_t+1 | h_t+1, e_t+1)  を計算する
        """
        # h_t+1, o_t+1を結合し, q(s_t+1 | h_t+1, e_t+1) を計算する
        hidden = self.act(self.fc_rnn_hidden_embedded_obs(
            torch.cat([rnn_hidden, embedded_obs], dim=1)))
        mean = self.fc_state_mean_posterior(hidden)
        stddev = F.softplus(self.fc_state_stddev_posterior(hidden)) + self._min_stddev
        return Normal(mean, stddev)

次に, 観測を再構成するデコーダを実装します. 

In [None]:
class ObservationModel(nn.Module):
    """
    p(o_t | s_t, h_t)
    低次元の状態表現から画像を再構成するデコーダ (3, 64, 64)
    """
    def __init__(self, state_dim, rnn_hidden_dim):
        super(ObservationModel, self).__init__()
        self.fc = nn.Linear(state_dim + rnn_hidden_dim, 1024)
        self.dc1 = nn.ConvTranspose2d(1024, 128, kernel_size=5, stride=2)
        self.dc2 = nn.ConvTranspose2d(128, 64, kernel_size=5, stride=2)
        self.dc3 = nn.ConvTranspose2d(64, 32, kernel_size=6, stride=2)
        self.dc4 = nn.ConvTranspose2d(32, 3, kernel_size=6, stride=2)


    def forward(self, state, rnn_hidden):
        hidden = self.fc(torch.cat([state, rnn_hidden], dim=1))
        hidden = hidden.view(hidden.size(0), 1024, 1, 1)
        hidden = F.relu(self.dc1(hidden))
        hidden = F.relu(self.dc2(hidden))
        hidden = F.relu(self.dc3(hidden))
        obs = self.dc4(hidden)
        return obs

次に, 報酬を予測するRewardモデルを実装します.

In [None]:
class RewardModel(nn.Module):
    """
    p(r_t | s_t, h_t)
    低次元の状態表現から報酬を予測する
    """
    def __init__(self, state_dim, rnn_hidden_dim, hidden_dim=400, act=F.elu):
        super(RewardModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + rnn_hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, 1)
        self.act = act
 

    def forward(self, state, rnn_hidden):
        hidden = self.act(self.fc1(torch.cat([state, rnn_hidden], dim=1)))
        hidden = self.act(self.fc2(hidden))
        hidden = self.act(self.fc3(hidden))
        reward = self.fc4(hidden)
        return reward

最後に, 上記で定義された３つのモデルを`RSSM`クラスとしてまとめます.




In [None]:
class RSSM:
    def __init__(self, state_dim, action_dim, rnn_hidden_dim, ):
        self.transition = TransitionModel(state_dim, action_dim, rnn_hidden_dim).to(device)
        self.observation = ObservationModel(state_dim, rnn_hidden_dim,).to(device)
        self.reward = RewardModel(state_dim, rnn_hidden_dim,).to(device)

## 5.補助機能の実装

モデルの実装の前に, いくつか学習上必要になる補助機能を実装しておきましょう. 具体的にはリプレイバッファ, 観測の前処理を行う関数, λ-returnを計算する関数 の3つです.

まずはリプレイバッファを実装します. ただし, RNNを使う関係上一連の系列として経験をサンプルしてくる必要があるため, DQNの時よりは少し実装に工夫が必要です.

In [None]:
# こちらはDQNのReplayBufferの定義
class ReplayBuffer:
    def __init__(self, memory_size):
        self.memory_size = memory_size
        self.memory = deque([], maxlen = memory_size)
    
    def append(self, transition):
        self.memory.append(transition)
    
    def sample(self, batch_size):
        batch_indexes = np.random.randint(0, len(self.memory), size=batch_size)
        states      = np.array([self.memory[index]['state'] for index in batch_indexes])
        next_states = np.array([self.memory[index]['next_state'] for index in batch_indexes])
        rewards     = np.array([self.memory[index]['reward'] for index in batch_indexes])
        actions     = np.array([self.memory[index]['action'] for index in batch_indexes])
        dones   = np.array([self.memory[index]['done'] for index in batch_indexes])
        return {'states': states, 'next_states': next_states, 'rewards': rewards, 'actions': actions, 'dones': dones}

In [None]:
# 今回のReplayBuffer
class ReplayBuffer(object):
    """
    RNNを用いて訓練するのに適したリプレイバッファ
    """
    def __init__(self, capacity, observation_shape, action_dim):
        self.capacity = capacity

        self.observations = np.zeros((capacity, *observation_shape), dtype=np.uint8)
        self.actions = np.zeros((capacity, action_dim), dtype=np.float32)
        self.rewards = np.zeros((capacity, 1), dtype=np.float32)
        self.done = np.zeros((capacity, 1), dtype=np.bool)

        self.index = 0
        self.is_filled = False

    def push(self, observation, action, reward, done):
        """
        リプレイバッファに経験を追加する
        """
        self.observations[self.index] = observation
        self.actions[self.index] = action
        self.rewards[self.index] = reward
        self.done[self.index] = done

        # indexは巡回し, 最も古い経験を上書きする
        if self.index == self.capacity - 1:
            self.is_filled = True
        self.index = (self.index + 1) % self.capacity

    def sample(self, batch_size, chunk_length):
        """
        経験をリプレイバッファからサンプルします. （ほぼ）一様なサンプルです
        結果として返ってくるのは観測(画像), 行動, 報酬, 終了シグナルについての(batch_size, chunk_length, 各要素の次元)の配列です
        各バッチは連続した経験になっています
        注意: chunk_lengthをあまり大きな値にすると問題が発生する場合があります
        """
        episode_borders = np.where(self.done)[0]
        sampled_indexes = []
        for _ in range(batch_size):
            cross_border = True
            while cross_border:
                initial_index = np.random.randint(len(self) - chunk_length + 1)
                final_index = initial_index + chunk_length - 1
                cross_border = np.logical_and(initial_index <= episode_borders,
                                              episode_borders < final_index).any()#論理積
            sampled_indexes += list(range(initial_index, final_index + 1))

        sampled_observations = self.observations[sampled_indexes].reshape(
            batch_size, chunk_length, *self.observations.shape[1:])
        sampled_actions = self.actions[sampled_indexes].reshape(
            batch_size, chunk_length, self.actions.shape[1])
        sampled_rewards = self.rewards[sampled_indexes].reshape(
            batch_size, chunk_length, 1)
        sampled_done = self.done[sampled_indexes].reshape(
            batch_size, chunk_length, 1)
        return sampled_observations, sampled_actions, sampled_rewards, sampled_done

    def __len__(self):
        return self.capacity if self.is_filled else self.index

次に観測の前処理を行う関数を実装します. これは簡単です. ちなみに, これもラッパーとして最初から適用してしまわないのは, リプレイバッファにはより容量の小さなnp.uint8の形式で保存しておきたいためです.

In [None]:
def preprocess_obs(obs):
    """
    画像の変換. [0, 255] -> [-0.5, 0.5]
    """
    obs = obs.astype(np.float32)
    normalized_obs = obs / 255.0 - 0.5
    return normalized_obs

Dreamerでは価値関数の学習を行いますが, このために通常のTD誤差ではなく, **TD(λ)をベースにしたλ-return**としてターゲット価値を計算し, それと現在の予測価値の誤差を用います. そのためにλ-returnを計算する関数をここで実装しておきます.

In [None]:
def lambda_target(rewards, values, gamma, lambda_):
    """
    価値関数の学習のためのλ-returnを計算します
    """
    V_lambda = torch.zeros_like(rewards, device=rewards.device)

    H = rewards.shape[0] - 1
    V_n = torch.zeros_like(rewards, device=rewards.device)
    V_n[H] = values[H]
    for n in range(1, H+1):
        # まずn-step returnを計算します
        # 注意: 系列が途中で終わってしまったら, 可能な中で最大のnを用いたn-stepを使います
        V_n[:-n] = (gamma ** n) * values[n:]
        for k in range(1, n+1):
            if k == n:
                V_n[:-n] += (gamma ** (n-1)) * rewards[k:]
            else:
                V_n[:-n] += (gamma ** (k-1)) * rewards[k:-n+k]

        # lambda_でn-step returnを重みづけてλ-returnを計算します
        if n == H:
            V_lambda += (lambda_ ** (H-1)) * V_n
        else:
            V_lambda += (1 - lambda_) * (lambda_ ** (n-1)) * V_n

    return V_lambda

## 6.Dreamerの実装

続いてDreamerの実装です.Dreamerの実装では以下の3クラスは上で定義したRSSMの各クラスと同義ですので, 
改めて定義はしておらず`RSSM`クラスを呼び出して使用することとします.
* 状態遷移を担うクラス(TransitionModel）
* 観測を復元するデコーダクラス（ObservationModel）
* 報酬を予測するクラス（RewardModel）


そのため,新たに定義するモデルは, 以下の３つのみになります.
* 観測の画像をベクトルに変換するエンコーダクラス（Encoder）
* 価値関数を計算するクラス（ValueModel）
* 実際に行動を決定するクラス（ActionModel）


まずエンコーダを実装します. ここでは, CNN（Convolutional Neural Network）を用いて, 観測の画像をベクトルに変換します.

In [None]:
class Encoder(nn.Module):
    """
    (3, 64, 64)の画像を(1024,)のベクトルに変換するエンコーダ
    """
    def __init__(self):
        super(Encoder, self).__init__()
        self.cv1 = nn.Conv2d(3, 32, kernel_size=4, stride=2)
        self.cv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.cv3 = nn.Conv2d(64, 128, kernel_size=4, stride=2)
        self.cv4 = nn.Conv2d(128, 256, kernel_size=4, stride=2)

    def forward(self, obs):
        hidden = F.relu(self.cv1(obs))
        hidden = F.relu(self.cv2(hidden))
        hidden = F.relu(self.cv3(hidden))
        embedded_obs = F.relu(self.cv4(hidden)).reshape(hidden.size(0), -1)
        return embedded_obs

ここからがDreamerの中核となる部分で, RSSMの学習を通して獲得された低次元の状態表現の上でActor-Criticを行います.


以下で, 価値関数を近似するValueモデル $v_{\phi}(s_{\tau})　\approx E_{q(.|s_{\tau})}(\sum_{\tau=t}^{t+H}(\gamma^{\tau-t}r_{\tau}) )$ を実装します. Q学習などで用いられる状態行動価値関数Q(s, a)ではなく, 状態価値関数V(s)であることに多少の注意が必要です.

In [None]:
class ValueModel(nn.Module):
    """
    低次元の状態表現(state_dim + rnn_hidden_dim)から状態価値を出力する
    """
    def __init__(self, state_dim, rnn_hidden_dim, hidden_dim=400, act=F.elu):
        super(ValueModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + rnn_hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, 1)
        self.act = act

    def forward(self, state, rnn_hidden):
        hidden = self.act(self.fc1(torch.cat([state, rnn_hidden], dim=1)))
        hidden = self.act(self.fc2(hidden))
        hidden = self.act(self.fc3(hidden))
        state_value = self.fc4(hidden)
        return state_value

最後です. 実際に行動を出力するActionモデル $a_{\tau} \sim q_{\delta}(a_{\tau}|s_{\tau})$ を実装します.

Actionモデルは価値の見積もりを最大化することを目的とします.



In [None]:
class ActionModel(nn.Module):
    """
    低次元の状態表現(state_dim + rnn_hidden_dim)から行動を計算するクラス
    """
    def __init__(self, state_dim, rnn_hidden_dim, action_dim,
                 hidden_dim=400, act=F.elu, min_stddev=1e-4, init_stddev=5.0):
        super(ActionModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + rnn_hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, hidden_dim)
        self.fc_mean = nn.Linear(hidden_dim, action_dim)
        self.fc_stddev = nn.Linear(hidden_dim, action_dim)
        self.act = act
        self.min_stddev = min_stddev
        self.init_stddev = np.log(np.exp(init_stddev) - 1)

    def forward(self, state, rnn_hidden, training=True):
        """
        training=Trueなら, NNのパラメータに関して微分可能な形の行動のサンプル（Reparametrizationによる）を返します
        training=Falseなら, 行動の確率分布の平均値を返します
        """
        hidden = self.act(self.fc1(torch.cat([state, rnn_hidden], dim=1)))
        hidden = self.act(self.fc2(hidden))
        hidden = self.act(self.fc3(hidden))
        hidden = self.act(self.fc4(hidden))

        # Dreamerの実装に合わせて少し平均と分散に対する簡単な変換が入っています
        mean = self.fc_mean(hidden)
        mean = 5.0 * torch.tanh(mean / 5.0)
        stddev = self.fc_stddev(hidden)
        stddev = F.softplus(stddev + self.init_stddev) + self.min_stddev

        if training:
            action = torch.tanh(Normal(mean, stddev).rsample())#微分可能にするためrsample()
        else:
            action = torch.tanh(mean)
        return action

実装の詳細まで掴みきれなくとも, 個々のクラスが担っている役割が大雑把にでもわかっていただければ幸いです.

## 7.エージェントの実装

Dreamerでは行動を計算するために低次元の状態表現が必要で, この状態表現はRSSMを用いて計算されるため, テスト時もこの状態表現のためにRSSMによる推論を行い続ける必要があります. 

そのため, 先ほど実装したActionModelをそのまま使っても簡単には行動を決定できません. 

ここを扱いやすくするために, RSSMを使って低次元の状態表現を計算しつつ, 行動を決定するAgentクラスを実装します.

In [None]:
class Agent:
    """
    ActionModelに基づき行動を決定する. そのためにRSSMを用いて状態表現をリアルタイムで推論して維持するクラス
    """
    def __init__(self, encoder, rssm, action_model):
        self.encoder = encoder
        self.rssm = rssm
        self.action_model = action_model

        self.device = next(self.action_model.parameters()).device
        self.rnn_hidden = torch.zeros(1, rssm.rnn_hidden_dim, device=self.device)

    def __call__(self, obs, training=True):
        # preprocessを適用, PyTorchのためにChannel-Firstに変換
        obs = preprocess_obs(obs)
        obs = torch.as_tensor(obs, device=self.device)
        obs = obs.transpose(1, 2).transpose(0, 1).unsqueeze(0)

        with torch.no_grad():
            # 観測を低次元の表現に変換し, posteriorからのサンプルをActionModelに入力して行動を決定する
            embedded_obs = self.encoder(obs)
            state_posterior = self.rssm.posterior(self.rnn_hidden, embedded_obs)
            state = state_posterior.sample()
            action = self.action_model(state, self.rnn_hidden, training=training)

            # 次のステップのためにRNNの隠れ状態を更新しておく
            _, self.rnn_hidden = self.rssm.prior(self.rssm.reccurent(state, action, self.rnn_hidden))

        return action.squeeze().cpu().numpy()

    #RNNの隠れ状態をリセット
    def reset(self):
        self.rnn_hidden = torch.zeros(1, self.rssm.rnn_hidden_dim, device=self.device)

## 8.ハイパーパラメータの設定と学習の準備

ここまででDreamerの基本的な構成要素は実装が終わりました. あとはハイパーパラメータを設定し, モデルやリプレイバッファを宣言して学習の準備を整えます.

In [None]:
# リプレイバッファの宣言
buffer_capacity = 200000  # Colabのメモリの都合上, 元の実装より小さめにとっています
replay_buffer = ReplayBuffer(capacity=buffer_capacity,
                              observation_shape=env.observation_space.shape,
                              action_dim=env.action_space.n)

# モデルの宣言
state_dim = 30  # 確率的状態の次元
rnn_hidden_dim = 200  # 決定的状態（RNNの隠れ状態）の次元
#確率的状態の次元と決定的状態（RNNの隠れ状態）の次元は一致しなくて良い
encoder = Encoder().to(device)
rssm = RSSM(state_dim,env.action_space.n,rnn_hidden_dim, )
value_model = ValueModel(state_dim, rnn_hidden_dim).to(device)
action_model = ActionModel(state_dim, rnn_hidden_dim,
                             env.action_space.n).to(device)

# オプティマイザの宣言
model_lr = 6e-4  # encoder, rssm, obs_model, reward_modelの学習率
value_lr = 8e-5
action_lr = 8e-5
eps = 1e-4
model_params = (list(encoder.parameters()) +
                  list(rssm.transition.parameters()) +
                  list(rssm.observation.parameters()) +
                  list(rssm.reward.parameters()))
model_optimizer = torch.optim.Adam(model_params, lr=model_lr, eps=eps)
value_optimizer = torch.optim.Adam(value_model.parameters(), lr=value_lr, eps=eps)
action_optimizer = torch.optim.Adam(action_model.parameters(), lr=action_lr, eps=eps)

# その他ハイパーパラメータ
seed_episodes = 20   # 最初にランダム行動で探索するエピソード数
all_episodes = 300  # 学習全体のエピソード数（300ほどで, ある程度収束します）
test_interval = 10  # 何エピソードごとに探索ノイズなしのテストを行うか
model_save_interval = 20  # NNの重みを何エピソードごとに保存するか
collect_interval = 100  # 何回のNNの更新ごとに経験を集めるか（＝1エピソード経験を集めるごとに何回更新するか）

action_noise_var = 0.3  # 探索ノイズの強さ

# 基本的に1episodeからとれる経験の数が少ないのでseed_episodeからの経験のみで最低限の経験すら持ってこれないことがあるので注意
# chunk_lengthは特に注意
batch_size = 40
chunk_length = 15  # 1回の更新で用いる系列の長さ
imagination_horizon = 15  # Actor-Criticの更新のために, Dreamerで何ステップ先までの想像上の軌道を生成するか


gamma = 0.9  # 割引率
lambda_ = 0.95  # λ-returnのパラメータ
clip_grad_norm = 100  # gradient clippingの値
free_nats = 3  # KL誤差（RSSMのTransitionModelにおけるpriorとposteriorの間の誤差）がこの値以下の場合, 無視する

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  if sys.path[0] == '':


## 9.学習
まず, 最初の数エピソードはランダムに行動して経験をリプレイバッファに貯めます.

In [None]:
env = make_env()
pid = 0

for episode in range(seed_episodes):
    obs = env.reset()
    done = False
    while not done:
        action = env.action_space.sample()
        env.addAction(pid = 0, action = action)
        env.addAction(pid = 1, action = random.randint(0, 5))
        next_obs, reward, done, _ = env.step()
        replay_buffer.push(obs, action, reward, done)
        obs = next_obs



学習結果を確認するために, TensorBoardを立ち上げておきます.

In [None]:
log_dir = 'logs'
writer = SummaryWriter(log_dir)
#%tensorboard --logdir='./logs'

以下がメインの学習ループです. それぞれのコメントを見て, 実装の内容を追ってください.

学習にはColab Proで3時間半ぐらいの時間がかかります.

In [None]:
env = make_env()
policy_enemy = Agent(encoder, rssm.transition, action_model)

for episode in range(seed_episodes, all_episodes):
    # -----------------------------
    #      経験を集める
    # -----------------------------
    start = time.time()
    # 行動を決定するためのエージェントを宣言
    policy = Agent(encoder, rssm.transition, action_model)

    obs = env.reset()
    done = False
    total_reward = 0
    while not done:
        action = policy(obs)
        action_enemy = policy_enemy(obs, training=False)
        env.addAction(pid = 0, action = action)
        env.addAction(pid = 1, action = action_enemy)
        # 探索のためにガウス分布に従うノイズを加える(explaration noise)
        action += np.random.normal(0, np.sqrt(action_noise_var),
                                     env.action_space.n)
        next_obs, reward, done, _ = env.step()
        
        #リプレイバッファに観測, 行動, 報酬, doneを格納
        replay_buffer.push(obs, action, reward, done)
        
        obs = next_obs
        total_reward += reward

    # 訓練時の報酬と経過時間をログとして表示
    writer.add_scalar('total reward at train', total_reward, episode)
    print('episode [%4d/%4d] is collected. Total reward is %f' %
            (episode+1, all_episodes, total_reward))
    print('elasped time for interaction: %.2fs' % (time.time() - start))

    # NNのパラメータを更新する
    start = time.time()
    for update_step in range(collect_interval):
        # -------------------------------------------------------------------------------------
        #  RSSM(trainsition_model, obs_model, reward_model)の更新 - Dynamics learning
        # -------------------------------------------------------------------------------------
        observations, actions, rewards, _ = \
            replay_buffer.sample(batch_size, chunk_length)

        # 観測を前処理し, RNNを用いたPyTorchでの学習のためにTensorの次元を調整
        observations = preprocess_obs(observations)
        observations = torch.as_tensor(observations, device=device)
        observations = observations.transpose(3, 4).transpose(2, 3)
        observations = observations.transpose(0, 1)
        actions = torch.as_tensor(actions, device=device).transpose(0, 1)
        rewards = torch.as_tensor(rewards, device=device).transpose(0, 1)

        # 観測をエンコーダで低次元のベクトルに変換
        embedded_observations = encoder(
            observations.reshape(-1, 3, 64, 64)).view(chunk_length, batch_size, -1)

        # 低次元の状態表現を保持しておくためのTensorを定義
        states = torch.zeros(chunk_length, batch_size, state_dim, device=device)
        rnn_hiddens = torch.zeros(chunk_length, batch_size, rnn_hidden_dim, device=device)

        # 低次元の状態表現は最初はゼロ初期化（timestep１つ分）
        state = torch.zeros(batch_size, state_dim, device=device)
        rnn_hidden = torch.zeros(batch_size, rnn_hidden_dim, device=device)

        # 状態s_tの予測を行ってそのロスを計算する（priorとposteriorの間のKLダイバージェンス）
        kl_loss = 0
        for l in range(chunk_length-1):
            next_state_prior, next_state_posterior, rnn_hidden = \
                rssm.transition(state, actions[l], rnn_hidden, embedded_observations[l+1])
            state = next_state_posterior.rsample()
            states[l+1] = state
            rnn_hiddens[l+1] = rnn_hidden
            kl = kl_divergence(next_state_prior, next_state_posterior).sum(dim=1)
            kl_loss += kl.clamp(min=free_nats).mean()  # 原論文通り, KL誤差がfree_nats以下の時は無視
        kl_loss /= (chunk_length - 1)

        # states[0] and rnn_hiddens[0]はゼロ初期化なので以降では使わない
        # states, rnn_hiddensは低次元の状態表現
        states = states[1:]
        rnn_hiddens = rnn_hiddens[1:]

        # 観測を再構成, また, 報酬を予測
        flatten_states = states.view(-1, state_dim)
        flatten_rnn_hiddens = rnn_hiddens.view(-1, rnn_hidden_dim)
        recon_observations = rssm.observation(flatten_states, flatten_rnn_hiddens).view(chunk_length-1, batch_size, 3, 64, 64)
        predicted_rewards = rssm.reward(flatten_states, flatten_rnn_hiddens).view(chunk_length-1, batch_size, 1)

        # 観測と報酬の予測誤差を計算
        obs_loss = 0.5 * F.mse_loss(recon_observations, observations[1:], reduction='none').mean([0, 1]).sum()
        reward_loss = 0.5 * F.mse_loss(predicted_rewards, rewards[:-1])

        # 以上のロスを合わせて勾配降下で更新する
        model_loss = kl_loss + obs_loss + reward_loss
        model_optimizer.zero_grad()
        model_loss.backward()
        clip_grad_norm_(model_params, clip_grad_norm)
        model_optimizer.step()

        # --------------------------------------------------
        #  Action Model, Value Modelの更新 - Behavior leaning
        # --------------------------------------------------
        # Actor-Criticのロスで他のモデルを更新することはないので勾配の流れを一度遮断
        # flatten_states, flatten_rnn_hiddensは RSSMから得られた低次元の状態表現を平坦化した値
        flatten_states = flatten_states.detach()
        flatten_rnn_hiddens = flatten_rnn_hiddens.detach()

        # DreamerにおけるActor-Criticの更新のために, 現在のモデルを用いた
        # 数ステップ先の未来の状態予測を保持するためのTensorを用意
        imaginated_states = torch.zeros(imagination_horizon + 1,
                                         *flatten_states.shape,
                                          device=flatten_states.device)
        imaginated_rnn_hiddens = torch.zeros(imagination_horizon + 1,
                                                *flatten_rnn_hiddens.shape,
                                                device=flatten_rnn_hiddens.device)

        #　未来予測をして想像上の軌道を作る前に, 最初の状態としては先ほどモデルの更新で使っていた
        # リプレイバッファからサンプルされた観測データを取り込んだ上で推論した状態表現を使う
        imaginated_states[0] = flatten_states
        imaginated_rnn_hiddens[0] = flatten_rnn_hiddens
        
        # open-loopで未来の状態予測を使い, 想像上の軌道を作る
        for h in range(1, imagination_horizon + 1):
            # 行動はActionModelで決定. この行動はモデルのパラメータに対して微分可能で,
            #　これを介してActionModelは更新される
            actions = action_model(flatten_states, flatten_rnn_hiddens)
            flatten_states_prior, flatten_rnn_hiddens = rssm.transition.prior(rssm.transition.reccurent(flatten_states,
                                                                   actions,
                                                                   flatten_rnn_hiddens))
            flatten_states = flatten_states_prior.rsample()
            imaginated_states[h] = flatten_states
            imaginated_rnn_hiddens[h] = flatten_rnn_hiddens

        # RSSMのreward_modelにより予測された架空の軌道に対する報酬を計算
        flatten_imaginated_states = imaginated_states.view(-1, state_dim)
        flatten_imaginated_rnn_hiddens = imaginated_rnn_hiddens.view(-1, rnn_hidden_dim)
        imaginated_rewards = \
            rssm.reward(flatten_imaginated_states,
                        flatten_imaginated_rnn_hiddens).view(imagination_horizon + 1, -1)
        imaginated_values = \
            value_model(flatten_imaginated_states,
                        flatten_imaginated_rnn_hiddens).view(imagination_horizon + 1, -1)

        # λ-returnのターゲットを計算(V_{\lambda}(s_{\tau})
        lambda_target_values = lambda_target(imaginated_rewards, imaginated_values, gamma, lambda_)

        # 価値関数の予測した価値が大きくなるようにActionModelを更新
        # PyTorchの基本は勾配降下だが, 今回は大きくしたいので-1をかける
        action_loss = -lambda_target_values.mean()
        action_optimizer.zero_grad()
        action_loss.backward()
        clip_grad_norm_(action_model.parameters(), clip_grad_norm)
        action_optimizer.step()
        
        # TD(λ)ベースの目的関数で価値関数を更新（価値関数のみを学習するため，学習しない変数のグラフは切っている. )
        imaginated_values = value_model(flatten_imaginated_states.detach(), flatten_imaginated_rnn_hiddens.detach()).view(imagination_horizon + 1, -1)        
        value_loss = 0.5 * F.mse_loss(imaginated_values, lambda_target_values.detach())
        value_optimizer.zero_grad()
        value_loss.backward()
        clip_grad_norm_(value_model.parameters(), clip_grad_norm)
        value_optimizer.step()

        # ログをTensorBoardに出力
        print('update_step: %3d model loss: %.5f, kl_loss: %.5f, '
             'obs_loss: %.5f, reward_loss: %.5f, '
             'value_loss: %.5f action_loss: %.5f'
                % (update_step + 1, model_loss.item(), kl_loss.item(),
                    obs_loss.item(), reward_loss.item(),
                    value_loss.item(), action_loss.item()))
        total_update_step = episode * collect_interval + update_step
        writer.add_scalar('model loss', model_loss.item(), total_update_step)
        writer.add_scalar('kl loss', kl_loss.item(), total_update_step)
        writer.add_scalar('obs loss', obs_loss.item(), total_update_step)
        writer.add_scalar('reward loss', reward_loss.item(), total_update_step)
        writer.add_scalar('value loss', value_loss.item(), total_update_step)
        writer.add_scalar('action loss', action_loss.item(), total_update_step)

    print('elasped time for update: %.2fs' % (time.time() - start))

    if (episode + 1) % 10 == 0:
        env.renderToGif(pid = 0, path = path + "/result/DreamerEpisode" + str(episode))
        # 敵を同期
        policy_enemy = copy.deepcopy(policy)
        

    # --------------------------------------------------------------
    #    テストフェーズ. 探索ノイズなしでの性能を評価する
    # --------------------------------------------------------------
    if (episode + 1) % test_interval == 0:
        policy = Agent(encoder, rssm.transition, action_model)
        start = time.time()
        obs = env.reset()
        done = False
        total_reward = 0
        while not done:
            action = policy(obs, training=False)
            action_enemy = policy_enemy(obs, training=False)

            env.addAction(pid = 0, action = action)
            env.addAction(pid = 1, action = action_enemy)

            obs, reward, done, _ = env.step()
            total_reward += reward

        writer.add_scalar('total reward at test', total_reward, episode)
        print('Total test reward at episode [%4d/%4d] is %f' %
                (episode+1, all_episodes, total_reward))
        print('elasped time for test: %.2fs' % (time.time() - start))

    if (episode + 1) % model_save_interval == 0:
        # 定期的に学習済みモデルのパラメータを保存する
        model_log_dir = os.path.join(log_dir, 'episode_%04d' % (episode + 1))
        os.makedirs(model_log_dir)
        torch.save(encoder.state_dict(), os.path.join(model_log_dir, 'encoder.pth'))
        torch.save(rssm.transition.state_dict(), os.path.join(model_log_dir, 'rssm.pth'))
        torch.save(rssm.observation.state_dict(), os.path.join(model_log_dir, 'obs_model.pth'))
        torch.save(rssm.reward.state_dict(), os.path.join(model_log_dir, 'reward_model.pth'))
        torch.save(value_model.state_dict(), os.path.join(model_log_dir, 'value_model.pth'))
        torch.save(action_model.state_dict(), os.path.join(model_log_dir, 'action_model.pth'))

writer.close()



[1;30;43mストリーミング出力は最後の 5000 行に切り捨てられました。[0m
update_step:  88 model loss: 19.32774, kl_loss: 4.26208, obs_loss: 14.05737, reward_loss: 1.00828, value_loss: 3.57484 action_loss: 29.34588
update_step:  89 model loss: 20.73034, kl_loss: 4.29701, obs_loss: 15.22430, reward_loss: 1.20902, value_loss: 2.89100 action_loss: 29.20293
update_step:  90 model loss: 22.64560, kl_loss: 4.32168, obs_loss: 17.54768, reward_loss: 0.77625, value_loss: 2.42231 action_loss: 28.90589
update_step:  91 model loss: 26.04962, kl_loss: 4.38116, obs_loss: 21.11440, reward_loss: 0.55405, value_loss: 2.57432 action_loss: 31.22308
update_step:  92 model loss: 22.57973, kl_loss: 4.38976, obs_loss: 17.24019, reward_loss: 0.94978, value_loss: 2.87528 action_loss: 29.19276
update_step:  93 model loss: 24.50870, kl_loss: 4.40998, obs_loss: 19.27481, reward_loss: 0.82392, value_loss: 2.50564 action_loss: 30.45751
update_step:  94 model loss: 23.77971, kl_loss: 4.29353, obs_loss: 18.41227, reward_loss: 1.07391, value_los

TensorBoardで学習結果を確認してみます.

In [None]:
%tensorboard --logdir='./logs'

In [None]:
env.renderToGif(pid = 0, path = "DreamerEpisode" + str(episode))

##  10.結果の確認
保存された学習済み重みを用いて, 動作を確認してみましょう.

学習にはかなりの時間がかかるので, ここでは事前に学習しておいた重みを読み込むことにします. 時間のある方は, 上記のコードで実際に学習した重みを使って同様に試してみてください.

In [None]:
# 事前にGoogle Driveにあげておいた学習済み重みをダウンロードします
from google_drive_downloader import GoogleDriveDownloader as gdd

file_id = "" # Google Driveにあげた学習済重みのfile idを取得してここにコピペしてください
gdd.download_file_from_google_drive(file_id=file_id,
                                       dest_path='./episode_0100.zip',
                                       unzip=True)

In [None]:
encoder.load_state_dict(torch.load('/episode_0100/encoder.pth'))
rssm.transition.load_state_dict(torch.load('/episode_0100/rssm.pth'))
rssm.observation.load_state_dict(torch.load('/episode_0100/obs_model.pth'))
action_model.load_state_dict(torch.load('/episode_0100/action_model.pth'))

In [None]:
# 学習済み重みを用いず, このcolab上で学習したモデルを使うなら, このセルを実行してください.
# あるいは, 定期的に保存されているモデルを読み込むこともできます
encoder.load_state_dict(torch.load(os.path.join(model_log_dir, 'encoder.pth')))
rssm.transition.load_state_dict(torch.load(os.path.join(model_log_dir, 'rssm.pth')))
rssm.observation.load_state_dict(torch.load(os.path.join(model_log_dir, 'obs_model.pth')))
action_model.load_state_dict(torch.load(os.path.join(model_log_dir, 'action_model.pth')))

動作の様子を動画で観てみることにします.

In [None]:
# 結果を動画で観てみるための関数
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML


def display_video(frames):
    plt.figure(figsize=(8, 8), dpi=50)
    patch = plt.imshow(frames[0])
    plt.axis('off')
    
    def animate(i):
        patch.set_data(frames[i])
        plt.title("Step %d" % (i))
    
    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
    display(HTML(anim.to_jshtml(default_mode='once')))
    plt.close()

In [None]:
policy = Agent(encoder, rssm.transition, action_model)
obs = env.reset()
done = False
total_reward = 0
frames = [obs]
while not done:
    action = policy(obs, training=False)
    obs, reward, done, _ = env.step(action)
    total_reward += reward
    frames.append(obs)

print('Total Reward:', total_reward)

In [None]:
display_video(frames)

ある時点の適当な観測から, 世界モデルで**open-loop**に未来予測を行わせ, 観測を再構成して視覚的に観てみましょう.

In [None]:
policy = Agent(encoder, rssm.transition, action_model)
obs = env.reset()
# 最初に適当な回数行動します. この間にrnn_hiddenに観測の系列に関する情報が蓄積されます
for _ in range(np.random.randint(5, 100)):
    action = policy(obs, training=False)
    obs, _, _, _ = env.step(action)

# 現在の観測をベクトルに変換し, それを元にposteriorを計算します.
preprocessed_obs = preprocess_obs(obs)
preprocessed_obs = torch.as_tensor(preprocessed_obs, device=device)
preprocessed_obs = preprocessed_obs.transpose(1, 2).transpose(0, 1).unsqueeze(0)
with torch.no_grad():
    embedded_obs = encoder(preprocessed_obs)

# posteriorからのサンプルとして得られたstateと, policyから取得したrnn_hiddenが低次元の状態表現です.
# open-loopの予測なので, これ以降この2つの変数は状態遷移を表すpriorでしか更新しません.
# (policyの中では, 行動を決定するために観測をリアルタイムで反映してposteriorで更新しています)
rnn_hidden = policy.rnn_hidden
state = rssm.transition.posterior(rnn_hidden, embedded_obs).sample()
frame = np.zeros((64, 128, 3))
frames = []

prediction_length = 100
for _ in range(prediction_length):
    action = policy(obs)
    obs, _, _, _ = env.step(action)

    action = torch.as_tensor(action, device=device).unsqueeze(0)
    with torch.no_grad():
        state_prior, rnn_hidden = rssm.transition.prior(rssm.transition.reccurent(state, action, rnn_hidden))
        state = state_prior.sample()
        predicted_obs = rssm.observation(state, rnn_hidden)#obs_model(state, rnn_hidden)

    frame[:, :64, :] = preprocess_obs(obs)
    frame[:, 64:, :] = predicted_obs.squeeze().transpose(0, 1).transpose(1, 2).cpu().numpy()
    frames.append((frame + 0.5).clip(0.0, 1.0))

open-loopの動画予測の結果を, 左側に真のフレーム, 右側に予測されたフレームと並べてみてみましょう.

In [None]:
display_video(frames)

以上で演習は終わりです. お疲れ様でした！

## 11.参考文献
[[1]](https://arxiv.org/pdf/1811.04551.pdf) Danijar Hafner, Timothy Lillicrap, Ian Fischer, Ruben Villegas, David Ha, "Learning Latent Dynamics for Planning from Pixels", arXiv, 2019

[[2]](https://arxiv.org/abs/1912.01603) Danijar Hafner, Timothy Lillicrap, Jimmy Ba, Mohammad Norouzi, 
"Dream to Control: Learning Behaviors by Latent Imagination", ICLR2020