<a href="https://colab.research.google.com/github/yukinaga/ai_programming_2022/blob/main/07_advanced_ai/03_exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 演習: 深層強化学習のテクニック
学習を安定化させるためのテクニックを実装しましょう。

## 深層強化学習のテクニック
深層強化学習では、一般的に以下のようなテクニックが利用されます。

### Experience Replay
各ステップの内容をメモリに保存しておき、メモリからランダムに記録を取り出して学習を行います。  
これにより、ミニバッチ法を使用することが可能になります。  
また、ミニバッチに時間的にばらけた記録が入ることになるので、学習が安定します。  

### Fixed Target Q-Network
行動を決定するのに用いるmain-networkと、誤差の計算時に正解を求めるのに用いるtarget-networkを用意します。  
target-networkのパラメータは一定時間固定されますが、main-networkのパラメータはミニバッチごとに更新されます。そして、定期的にmain-networkのパラメータがtarget-networkに上書きされます。これにより、正解が短い時間で揺れ動く問題が低減され学習が安定すると考えられています。  
  
今回の演習では、このうちのFixed Target Q-Networkを実装します。

## ライブラリの導入

In [None]:
import numpy as np
import matplotlib.pyplot as plt 
from matplotlib import animation, rc

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim

## Netクラス

In [None]:
class Net(nn.Module):
    def __init__(self, n_state, n_mid, n_action):
        super().__init__()
        self.fc1 = nn.Linear(n_state, n_mid)  # 全結合層
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.fc3 = nn.Linear(n_mid, n_action)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

## Brainクラス
以下のセルのコードを補完し、Fixed Target Q-Networkを実装しましょう。  
main-network、target-networkの2つのネットワークを用意します。  
正解を用意するためにはtarget-networkを使用しますが、target-networkには一定間隔でmain-networkのパラメータをコピーします。

In [None]:
class Brain:
    def __init__(self, n_state, n_action, main_net, target_net,
                 loss_fnc, optimizer, target_interval, is_gpu, gamma=0.9, r=0.99, lr=0.01):
        self.n_state = n_state  # 状態の数
        self.n_action = n_action  # 行動の数

        self.main_net = main_net  # main-network
        self.target_net = target_net  # target-network
        self.loss_fnc = loss_fnc  # 誤差関数
        self.optimizer = optimizer  # 最適化アルゴリズム

        self.target_interval = target_interval
        self.target_count = 0

        self.is_gpu = is_gpu  # GPUを使うかどうか
        if self.is_gpu:
            self.main_net.cuda()  # GPU対応
            self.target_net.cuda()  # GPU対応

        self.eps = 1.0  # ε
        self.gamma = gamma  # 割引率
        self.r = r  # εの減衰率
        self.lr = lr  # 学習係数

    def train(self, states, next_states, action, reward, terminal):  # ニューラルネットワークを訓練

        states = torch.from_numpy(states).float()
        next_states = torch.from_numpy(next_states).float()
        if self.is_gpu:
            states, next_states = states.cuda(), next_states.cuda()  # GPU対応
            
        self.target_net.eval()  # 評価モード
        next_q =   # ------------- この行にコードを追記 -------------
        self.main_net.train()  # 訓練モード
        q =   # ------------- この行にコードを追記 -------------

        t = q.clone().detach()
        if terminal:
            t[:, action] = reward  #  エピソード終了時の正解は、報酬のみ
        else:
            t[:, action] = reward + self.gamma*np.max(next_q.detach().cpu().numpy(), axis=1)[0]
            
        loss = self.loss_fnc(q, t)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.target_count += 1
        if self.target_count >= self.target_interval:
            self.target_net.load_state_dict(self.main_net.state_dict())
            self.target_count = 0

    def get_action(self, states):  # 行動を取得
        states = torch.from_numpy(states).float()
        if self.is_gpu:
            states = states.cuda()  # GPU対応

        if np.random.rand() < self.eps:  # ランダムな行動
            action = np.random.randint(self.n_action)
        else:  # Q値の高い行動を選択
            q = self.main_net.forward(states)
            action = np.argmax(q.detach().cpu().numpy(), axis=1)[0]
        if self.eps > 0.1:  # εの下限
            self.eps *= self.r
        return action

## エージェントのクラス

In [None]:
class Agent:
    def __init__(self, v_x, v_y_sigma, v_jump, brain):
        self.v_x = v_x  # x速度
        self.v_y_sigma = v_y_sigma  # y速度、初期値の標準偏差
        self.v_jump = v_jump  # ジャンプ速度
        self.brain = brain
        self.reset()

    def reset(self):
        self.x = -1  # 初期x座標
        self.y = 0  # 初期y座標
        self.v_y = self.v_y_sigma * np.random.randn()  # 初期y速度

    def step(self, g):  # 時間を1つ進める g:重力加速度
        states = np.array([[self.y, self.v_y]])
        self.x += self.v_x
        self.y += self.v_y

        reward = 0  # 報酬
        terminal = False  # 終了判定
        if self.x>1.0:
            reward = 1
            terminal = True
        elif self.y<-1.0 or self.y>1.0:
            reward = -1
            terminal = True

        action = self.brain.get_action(states)
        if action == 0:
            self.v_y -= g   # 自由落下
        else:
            self.v_y = self.v_jump  # ジャンプ
        next_states = np.array([[self.y, self.v_y]])
        self.brain.train(states, next_states, action, reward, terminal)

        if terminal:
            self.reset()

## 環境のクラス

In [None]:
class Environment:
    def __init__(self, agent, g):
        self.agent = agent
        self.g = g  # 重力加速度

    def step(self):
        self.agent.step(self.g)
        return (self.agent.x, self.agent.y)

## アニメーション

In [None]:
def animate(environment, interval, frames):
    fig, ax = plt.subplots()
    plt.close()
    ax.set_xlim(( -1, 1))
    ax.set_ylim((-1, 1))
    sc = ax.scatter([], [])

    def plot(data):
        x, y = environment.step()
        sc.set_offsets(np.array([[x, y]]))
        return (sc,)

    return animation.FuncAnimation(fig, plot, interval=interval, frames=frames, blit=True)

## 深層強化学習の実行
2つのニューラルネットワーク、Brain、エージェント、環境の設定を行い、深層強化学習を実行します。

In [None]:
n_state = 2
n_mid = 32
n_action = 2

main_net = Net(n_state, n_mid, n_action)
target_net = Net(n_state, n_mid, n_action)
target_net.load_state_dict(main_net.state_dict())
target_net.eval()

loss_fnc = nn.MSELoss()  # 誤差関数
optimizer = optim.RMSprop(main_net.parameters(), lr=0.01)  # 最適化アルゴリズム
target_interval = 10
is_gpu = True

brain = Brain(n_state, n_action, main_net, target_net,
              loss_fnc, optimizer, target_interval, is_gpu) 

v_x = 0.05
v_y_sigma = 0.1
v_jump = 0.2
agent = Agent(v_x, v_y_sigma, v_jump, brain)

g = 0.2
environment = Environment(agent, g)

anim = animate(environment, 50, 1024)
rc("animation", html="jshtml")
anim

# 解答例
以下に解答例を掲載します。

In [None]:
class Brain:
    def __init__(self, n_state, n_action, main_net, target_net,
                 loss_fnc, optimizer, target_interval, is_gpu, gamma=0.9, r=0.99, lr=0.01):
        self.n_state = n_state  # 状態の数
        self.n_action = n_action  # 行動の数

        self.main_net = main_net  # main-network
        self.target_net = target_net  # target-network
        self.loss_fnc = loss_fnc  # 誤差関数
        self.optimizer = optimizer  # 最適化アルゴリズム

        self.target_interval = target_interval
        self.target_count = 0

        self.is_gpu = is_gpu  # GPUを使うかどうか
        if self.is_gpu:
            self.main_net.cuda()  # GPU対応
            self.target_net.cuda()  # GPU対応

        self.eps = 1.0  # ε
        self.gamma = gamma  # 割引率
        self.r = r  # εの減衰率
        self.lr = lr  # 学習係数

    def train(self, states, next_states, action, reward, terminal):  # ニューラルネットワークを訓練

        states = torch.from_numpy(states).float()
        next_states = torch.from_numpy(next_states).float()
        if self.is_gpu:
            states, next_states = states.cuda(), next_states.cuda()  # GPU対応
            
        self.target_net.eval()  # 評価モード
        next_q = self.target_net.forward(next_states)  # ------------- この行にコードを追記 -------------
        self.main_net.train()  # 訓練モード
        q = self.main_net.forward(states)  # ------------- この行にコードを追記 -------------

        t = q.clone().detach()
        if terminal:
            t[:, action] = reward  #  エピソード終了時の正解は、報酬のみ
        else:
            t[:, action] = reward + self.gamma*np.max(next_q.detach().cpu().numpy(), axis=1)[0]
            
        loss = self.loss_fnc(q, t)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.target_count += 1
        if self.target_count >= self.target_interval:
            self.target_net.load_state_dict(self.main_net.state_dict())
            self.target_count = 0

    def get_action(self, states):  # 行動を取得
        states = torch.from_numpy(states).float()
        if self.is_gpu:
            states = states.cuda()  # GPU対応

        if np.random.rand() < self.eps:  # ランダムな行動
            action = np.random.randint(self.n_action)
        else:  # Q値の高い行動を選択
            q = self.main_net.forward(states)
            action = np.argmax(q.detach().cpu().numpy(), axis=1)[0]
        if self.eps > 0.1:  # εの下限
            self.eps *= self.r
        return action