## 6.5 以PyTorch建置A2C（Advanced Actor-Critic）

In [1]:
# import套件
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym


In [2]:
# 設定常數
ENV = 'CartPole-v0'  # 使用的課題名稱
GAMMA = 0.99  # 時間折扣率
MAX_STEPS = 200  # 1回合的step數
NUM_EPISODES = 1000  # 最大回合數

NUM_PROCESSES = 32  # 同時執行的環境
NUM_ADVANCED_STEP = 5  # 設定前進幾步之後再計算報酬總和


In [3]:
# 設定計算A2C損失函數所需的常數
value_loss_coef = 0.5
entropy_coef = 0.01
max_grad_norm = 0.5


In [4]:
# 記憶體類別的定義


class RolloutStorage(object):
    '''為了執行Advantage學習建置的記憶體類別'''

    def __init__(self, num_steps, num_processes, obs_shape):

        self.observations = torch.zeros(num_steps + 1, num_processes, 4)
        self.masks = torch.ones(num_steps + 1, num_processes, 1)
        self.rewards = torch.zeros(num_steps, num_processes, 1)
        self.actions = torch.zeros(num_steps, num_processes, 1).long()

        # 儲存折扣報酬總和
        self.returns = torch.zeros(num_steps + 1, num_processes, 1)
        self.index = 0  # insert索引

    def insert(self, current_obs, action, reward, mask):
        '''將transition存入下個index'''
        self.observations[self.index + 1].copy_(current_obs)
        self.masks[self.index + 1].copy_(mask)
        self.rewards[self.index].copy_(reward)
        self.actions[self.index].copy_(action)

        self.index = (self.index + 1) % NUM_ADVANCED_STEP  # 更新索引

    def after_update(self):
        '''執行Advantage的step數結束後，將最新的結果存入index0'''
        self.observations[0].copy_(self.observations[-1])
        self.masks[0].copy_(self.masks[-1])

    def compute_returns(self, next_value):
        '''加總於各步驟執行Advantage學習所得的折扣報酬'''

        # 注意：從第5step開始反向計算
        # 注意：第5step為Advantage1。第4step為Advantage2。・・・
        self.returns[-1] = next_value
        for ad_step in reversed(range(self.rewards.size(0))):
            self.returns[ad_step] = self.returns[ad_step + 1] * \
                GAMMA * self.masks[ad_step + 1] + self.rewards[ad_step]


In [5]:
# 建置A2C的深度神經網路
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):

    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.actor = nn.Linear(n_mid, n_out)  # 需要決定動作，所以輸出值為動作的種類數
        self.critic = nn.Linear(n_mid, 1)  # 由於這部分是狀態價值，所以輸出值只有一個

    def forward(self, x):
        '''定義神經網路的前向計算'''
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        critic_output = self.critic(h2)  # 計算狀態價值
        actor_output = self.actor(h2)  # 計算動作

        return critic_output, actor_output

    def act(self, x):
        '''在狀態x底下以機率計算動作'''
        value, actor_output = self(x)
        # 以dim=1沿著動作的種類方向計算softmax
        action_probs = F.softmax(actor_output, dim=1)
        action = action_probs.multinomial(num_samples=1)  # 以dim=1沿著動作的種類方向進行機率計算
        return action

    def get_value(self, x):
        '''根據狀態x計算狀態價值'''
        value, actor_output = self(x)

        return value

    def evaluate_actions(self, x, actions):
        '''根據狀態x計算狀態價值、動作action的log機率與熵值'''
        value, actor_output = self(x)

        log_probs = F.log_softmax(actor_output, dim=1)  # 以dim=1沿著動作種類的方向計算
        action_log_probs = log_probs.gather(1, actions)  # 計算實際動作的log_probs

        probs = F.softmax(actor_output, dim=1)  # 以dim=1沿著動作種類的方向計算
        entropy = -(log_probs * probs).sum(-1).mean()

        return value, action_log_probs, entropy


In [6]:
# 定義智能器的大腦類別，所有智能體共用
import torch
from torch import optim


class Brain(object):
    def __init__(self, actor_critic):
        self.actor_critic = actor_critic  # actor_critic是Net類別的深度神經網路
        self.optimizer = optim.Adam(self.actor_critic.parameters(), lr=0.01)

    def update(self, rollouts):
        '''於Advantage計算的5個step都用過後再更新'''
        obs_shape = rollouts.observations.size()[2:]  # torch.Size([4, 84, 84])
        num_steps = NUM_ADVANCED_STEP
        num_processes = NUM_PROCESSES

        values, action_log_probs, entropy = self.actor_critic.evaluate_actions(
            rollouts.observations[:-1].view(-1, 4),
            rollouts.actions.view(-1, 1))

        # 注意：各變數的大小
        # rollouts.observations[:-1].view(-1, 4) torch.Size([80, 4])
        # rollouts.actions.view(-1, 1) torch.Size([80, 1])
        # values torch.Size([80, 1])
        # action_log_probs torch.Size([80, 1])
        # entropy torch.Size([])

        values = values.view(num_steps, num_processes,
                             1)  # torch.Size([5, 16, 1])
        action_log_probs = action_log_probs.view(num_steps, num_processes, 1)

        # advantage（動作價值-狀態價值）的計算
        advantages = rollouts.returns[:-1] - values  # torch.Size([5, 16, 1])

        # 計算Critic的loss
        value_loss = advantages.pow(2).mean()

        # 計算Actor的gain、之後乘上負號，轉換成loss
        action_gain = (action_log_probs*advantages.detach()).mean()
        # 執行detach，將advantages當成變數使用

        # 誤差函數的總和
        total_loss = (value_loss * value_loss_coef -
                      action_gain - entropy * entropy_coef)

        # 更新連結參數
        self.actor_critic.train()  # 切換成訓練模式
        self.optimizer.zero_grad()  # 重設梯度
        total_loss.backward()  # 反向傳播演算法
        nn.utils.clip_grad_norm_(self.actor_critic.parameters(), max_grad_norm)
        #  為了避免連結參數一下子變化太快，梯度的大小最多為0.5

        self.optimizer.step()  # 更新連結參數


In [7]:
# 這次不建構Agent類別

In [8]:
# 這是執行環境的類別
import copy


class Environment:
    def run(self):
        '''執行主程式'''

        # 依照同時執行所需的環境數量建立env
        envs = [gym.make(ENV) for i in range(NUM_PROCESSES)]

        # 建構所有智能體共用的大腦Brain
        n_in = envs[0].observation_space.shape[0]  # 狀態有4個
        n_out = envs[0].action_space.n  # 動作有2個
        n_mid = 32
        actor_critic = Net(n_in, n_mid, n_out)  # 建置深度神經網路
        global_brain = Brain(actor_critic)

        # 建立儲存專用變數
        obs_shape = n_in
        current_obs = torch.zeros(
            NUM_PROCESSES, obs_shape)  # torch.Size([16, 4])
        rollouts = RolloutStorage(
            NUM_ADVANCED_STEP, NUM_PROCESSES, obs_shape)  # rollouts的物件
        episode_rewards = torch.zeros([NUM_PROCESSES, 1])  # 儲存目前回合的報酬
        final_rewards = torch.zeros([NUM_PROCESSES, 1])  # 儲存最終回合的報酬
        obs_np = np.zeros([NUM_PROCESSES, obs_shape])  # Numpy陣列
        reward_np = np.zeros([NUM_PROCESSES, 1])  # Numpy陣列
        done_np = np.zeros([NUM_PROCESSES, 1])  # Numpy陣列
        each_step = np.zeros(NUM_PROCESSES)  # 記錄各環境的step數
        episode = 0  # 環境0的回合數

        # 初始狀態的啟動
        obs = [envs[i].reset() for i in range(NUM_PROCESSES)]
        obs = np.array(obs)
        obs = torch.from_numpy(obs).float()  # torch.Size([16, 4])
        current_obs = obs  # 儲存最新的obs

        # 於advanced学習專用物件rollouts的第一個狀態儲存目前的狀態
        rollouts.observations[0].copy_(current_obs)

        # 執行迴圈
        for j in range(NUM_EPISODES*NUM_PROCESSES):  # 整體的for迴圈
            # 於advanced學習的每個step計算
            for step in range(NUM_ADVANCED_STEP):

                # 計算動作
                with torch.no_grad():
                    action = actor_critic.act(rollouts.observations[step])

                # (16,1)→(16,)→將tensor轉換成NumPy
                actions = action.squeeze(1).numpy()

                # 執行1step
                for i in range(NUM_PROCESSES):
                    obs_np[i], reward_np[i], done_np[i], _ = envs[i].step(
                        actions[i])

                    # episode的結束評估與設定state_next
                    if done_np[i]:  # 步驟數是否超過200或是棒子超過一定的傾斜角度，done就會轉變成true

                        # 只在環境0的時候輸出
                        if i == 0:
                            print('%d Episode: Finished after %d steps' % (
                                episode, each_step[i]+1))
                            episode += 1

                        # 報酬的設定
                        if each_step[i] < 195:
                            reward_np[i] = -1.0  # 棒子在中途傾倒便賦予報酬-1作為懲罰
                        else:
                            reward_np[i] = 1.0  # 若程式在棒子仍直立的時候結束，就賦予報酬1

                        each_step[i] = 0  # 重設step數
                        obs_np[i] = envs[i].reset()  # 重設執行環境

                    else:
                        reward_np[i] = 0.0  # 平常狀態的報酬為0
                        each_step[i] += 1

                # 將報酬轉換成tensor、加入每回合的總報酬
                reward = torch.from_numpy(reward_np).float()
                episode_rewards += reward

                # 各執行環境的狀態不同，若為done就將mask設定為0、若仍在執行，將mask設定為1
                masks = torch.FloatTensor(
                    [[0.0] if done_ else [1.0] for done_ in done_np])

                # 更新最後回合的總報酬
                final_rewards *= masks  # 若仍在執行就乘以1，若已經結束執行就乘以0，予以重設
                # 若還在執行就加0，若已經是done就加入episode_rewards
                final_rewards += (1 - masks) * episode_rewards

                # 更新每回合的總報酬
                episode_rewards *= masks  # 由於還在執行時的mask為1，所以不會有所改變、若已結束就乘以0

                # 若已是done就將目前的狀態全部設定為0
                current_obs *= masks

                # 更新current_obs
                obs = torch.from_numpy(obs_np).float()  # torch.Size([16, 4])
                current_obs = obs  # 儲存最新的obs

                # 將目前step的transition插入記憶體物件
                rollouts.insert(current_obs, action.data, reward, masks)

            # advanced的for loop結束

            # 從advanced的最後step的狀態計算預設的狀態價值

            with torch.no_grad():
                next_value = actor_critic.get_value(
                    rollouts.observations[-1]).detach()
                # rollouts.observations的大小是torch.Size([6, 16, 4])

            # 計算所有step的折扣報酬總和，更新rollouts的變數returns
            rollouts.compute_returns(next_value)

            # 更新神經網路與rollout
            global_brain.update(rollouts)
            rollouts.after_update()

            # 若所有NUM_PROCESSES都為200step，代表學習成功
            if final_rewards.sum().numpy() >= NUM_PROCESSES:
                print('連續成功')
                break


In [12]:
# main學習
cartpole_env = Environment()
cartpole_env.run()


[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box au