<a href="https://colab.research.google.com/github/machine-perception-robotics-group/GoogleColabNotebooks/blob/master/MLDL_lecture_notebooks/13_cart_pole_control_by_deep_reinforcement_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 13. 深層強化学習によるCart Pole制御

---
## 目的
Deep Q Learningを用いてCart Pole制御を行う．
ここで，Cart Pole制御とは台車に乗っている棒が倒れないように台車を左右に動かすことである．


## 準備

### Google Colaboratoryの設定確認・変更
本チュートリアルではPyTorchを利用してニューラルネットワークの実装を確認，学習および評価を行います．
**GPUを用いて処理を行うために，上部のメニューバーの「ランタイム」→「ランタイムのタイプを変更」からハードウェアアクセラレータをGPUにしてください．**


### モジュールの追加インストール
下記のプログラムを実行して，実験結果の表示に必要な追加ライブラリやモジュールをインストールする．

In [None]:
!apt-get -qq -y install libcusparse9.1 libnvrtc9.1 libnvtoolsext1 > /dev/null
!ln -snf /usr/lib/x86_64-linux-gnu/libnvrtc-builtins.so.9.1 /usr/lib/x86_64-linux-gnu/libnvrtc-builtins.so
!apt-get -qq -y install xvfb freeglut3-dev ffmpeg> /dev/null

!pip -q install gym
!pip -q install pyglet
!pip -q install pyopengl
!pip -q install pyvirtualdisplay

## モジュールのインポート
はじめに必要なモジュールをインポートする．

今回はPyTorchに加えて，Cart Poleを実行するためのシミュレータであるopenAI Gym（gym）をインポートする．

In [None]:
import numpy as np
import gym

import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
%matplotlib inline

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

# 使用するデバイス（GPU or CPU）の決定
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Use device:", device)

## OpenAI GymによるCart Poleの環境の定義
 [OpenAI Gym](https://github.com/openai/gym) は，様々な種類の環境を提供しているモジュールです．
 
 今回は古典的な制御問題であるCart Pole（倒立振子）を実行します．
 まず，gym.make関数で実行したい環境を指定します．
 その後，reset関数を実行することで，環境を初期化します．
 
cart pole環境では，倒立振子の現在の状態を把握するために，振子の角度や台車の速度などの4次元の情報が与えられており，`observation_space`という変数で確認することができます．
また，`action_space`という変数で，エージェントが取ることのできる行動の数を確認することができます．
cart poleの場合は，台車を左右どちらかに移動させるという行動を取るため，行動の数は2となっています．




In [None]:
# 環境の指定
env = gym.make('CartPole-v0')

# 環境の初期化
obs = env.reset()
#env.render()
print('observation space:', env.observation_space)
print('action space:', env.action_space)
print('initial observation:', obs)

# 行動の決定と決定した行動の入力
action = env.action_space.sample()
obs, r, done, info = env.step(action)
print('next observation:', obs)
print('reward:', r)
print('done:', done)
print('info:', info)

## ネットワークモデルの定義

ネットワークモデルを定義します．
ここでは，環境からの情報を入力し，行動に対するQ値を出力するようなネットワークを定義するために，全結合層3層から構成されるネットワークとします．

入力データのサイズをobs_size，出力する行動の数をn_actions，中間層のサイズをn_hiddenとし，ネットワークの作成時に変更できるようにしておきます．

`forward`関数では，これまでに学習したニューラルネットワークネットワークの場合と同様の手順でデータの計算を行います．

In [None]:
class DQN(nn.Module):
    def __init__(self, obs_size, n_actions, n_hidden = 50):
        super(DQN, self).__init__()
        self.obs_size = obs_size
        self.n_actions = n_actions
        self.l1 = nn.Linear(obs_size, n_hidden)
        self.l2 = nn.Linear(n_hidden, n_hidden)
        self.l3 = nn.Linear(n_hidden, n_actions)
        self.act = nn.Tanh()
    
    def forward(self, x):
        h = self.act(self.l1(x))
        h = self.act(self.l2(h))
        h = self.l3(h)
        return h

## Replay memoryの作成

DQNを含む強化学習では，Experience Replayと呼ばれる学習テクニックが頻繁に使用されます．
Experience Replayでは，過去の経験をReplay memory (buffer) に蓄積したのち，蓄積した経験をランダムに選択し学習へ使用します．
ここでは，replay memoryの定義を行います．

memoryへは，現在の状態，その時に選択された行動，行動によって遷移した状態（次状態），その時の報酬の4種類の情報を1つの経験として蓄積します．
まず，`Transition`という変数を定義します．
ここでは，`state`, `action`, `next_state`, `reward`が1セットとなるようなデータ構造（辞書オブジェクト）を定義します．

その後，ReplayMemoryクラスを定義します．
ReplayMemoryクラスでは，memoryへ格納する経験の数（`capacity`），経験を蓄積するリスト（`memory`），蓄積した経験の数を示す（`position`）を定義します．
`push`関数では，メモリへ経験を格納します．
また，`sample`関数では，指定したバッチサイズ (`batch_size`) 分の経験をランダムに選択し，返す関数を定義します．

In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

## 実験パラメータおよび関数の定義

次に，実験に使用するパラメータを定義します．
`BATCH_SIZE`はミニバッチサイズ，`GAMMA`は割引率です．
`EPS_START`は学習初期のepsilon-greedy法によりランダムな行動を選択させる割合，`EPS_END`は学習終了時の割合，`EPS_DECAY`は学習途中の変化率です
`TARGET_UPDATE`は目標値を出力するtarget networkのパラメータを更新する頻度を示します．

### select_action関数
次に，行動を選択する関数`select_action`を定義します．
この関数では，現在の環境，ネットワークモデルを引数とし，行動を決定する関数です．
このとき，上記で指定したパラメータに従い，一定の割合でランダムに行動選択を行います．それ以外の場合は，使用しているネットワークへ環境情報を入力し，行動を決定します．


### optimze_model関数
また，学習を行うための`optimize_model`関数を定義します．
ここでは，ReplayMemory，学習を行うネットワークモデル，目標値を決定するためのtarget_model，最適化手法のoptimzerオブジェクトを引数とします．
まず，memoryに十分な数の経験が蓄積されていない場合は学習を行わないように定義します．

十分な経験が蓄積されている場合，その中から指定したバッチサイズ分の経験をランダムに取得します．
取得した経験から，状態，行動，報酬，次状態それぞれにデータを分けます．
その後状態をネットワークへ入力し，行動価値関数を得ます．
また，target_modelへ次状態を入力し，出力された価値関数と報酬から，目標値（`expected_state_action_value`）を算出します．
算出した値を用いて，誤差を算出し，パラメータを更新します．

In [None]:
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10


def select_action(state, model, current_steps):
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * current_steps / EPS_DECAY)
    if sample > eps_threshold:
        with torch.no_grad():
            return model(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(model.n_actions)]], device=device, dtype=torch.long)
    
    
def optimize_model(memory, model, target_model, optimizer):
    # 十分な数の経験が蓄積されているか確認
    if len(memory) < BATCH_SIZE:
        return

    # バッチサイズ分の経験を取得
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))
    
    # 状態，行動，報酬等にデータを分ける
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)),
                                  device=device,
                                  dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    
    # 状態を入力して行動を取得（推論）
    state_action_values = model(state_batch).gather(1, action_batch)

    # target_modelを用いた目標値の決定
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_model(non_final_next_states).max(1)[0].detach()
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # 誤差の算出
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    # パラメータ更新
    optimizer.zero_grad()
    loss.backward()
    for param in model.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

## 学習
学習を行います．

学習回数（エピソード数）を200とします．

次に，学習を行うネットワーク（`policy_net`）および目標値を算出するネットワーク（`target_net`）を生成します．
この時，target_netのパラメータはpolicy_netと同じになるように，指定します．
最適化手法として，RMSpropを使用します．
さらに，Replay Memoryを生成します．

学習を開始します．
まず，環境を初期化し，経験をReplayMemoryへ蓄積します．
十分に蓄積された後，optimize_model関数でパラメータの更新を行います．
また，`TARGET_UPDATE`で指定した回数ごとに，target_netのパラメータをpolicy_netのパラメータと同じになるようにコピーを行います．

In [None]:
num_episode = 200

steps_done = 0
episode_durations = []
n_actions = env.action_space.n

policy_net= DQN(4, n_actions).to(device)
target_net= DQN(4, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)


# 学習の開始
for i_episode in range(1, num_episode+1):
    state = env.reset()
    state = torch.from_numpy(state).view(-1, 4).to(device).float()
    
    for t in count():
        action = select_action(state, policy_net, steps_done)
        steps_done += 1
        
        next_state, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        
        if done:
            next_state = None
        else:
            next_state = torch.from_numpy(next_state).view(-1, 4).to(device).float()
        
        memory.push(state, action, next_state, reward)
        state = next_state
        
        optimize_model(memory, policy_net, target_net, optimizer)
        if done:
            episode_durations.append(t+1)
            break
    
    if i_episode % 10 == 0:
        print("episode:", i_episode, "duration:", t)

    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print("Complete")
env.close()

## 評価
学習したネットワーク（エージェント）を確認してみます．

ここでは，framesに描画したフレームを順次格納します．

In [None]:
# 結果を描画するための設定
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1024, 768))
display.start()
import os
os.environ["DISPLAY"] = ":" + str(display.display) + "." + str(display.screen)


frames = []
for i in range(3):
    state = env.reset()
    state = torch.from_numpy(state).view(-1, 4).to(device).float()
    done = False
    R = 0
    t = 0
    
    while not done and t < 200:
        frames.append(env.render(mode='rgb_array'))
        action = policy_net(state)
        state, r, done, _ = env.step(action.max(1)[1].view(1, 1).item())
        state = torch.from_numpy(state).view(-1, 4).to(device).float()
        R += r
        t += 1
    print("test episode:", i, "R:", R)

## 描画

maptlotlibを用いて，保存した動画フレームをアニメーションとして作成し，表示しています．

In [None]:
# 実行結果の表示
import matplotlib.pyplot as plt
import matplotlib.animation
from IPython.display import HTML

plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi = 72)
patch = plt.imshow(frames[0])
plt.axis('off')
animate = lambda i: patch.set_data(frames[i])
ani = matplotlib.animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval = 50)
HTML(ani.to_jshtml())