In [1]:
import gym
from mlagents_envs.environment import UnityEnvironment
from gym_unity.envs import UnityToGymWrapper
import numpy as np
import torch

In [2]:
from collections import namedtuple

Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))

In [3]:
#定数の設定
GAMMA = 0.99 #時間割引率
NUM_EPISODES = 100 #最大試行回数

In [4]:
import random
class ReplayMemory:

    def __init__(self, CAPACITY):
        self.capacity = CAPACITY
        self.memory = []
        self.index = 0

    def push(self, state, action, state_next, reward):

        if len(self.memory) < self.capacity:
            self.memory.append(None)

        self.memory[self.index] = Transition(state, action, state_next, reward)

        self.index = (self.index + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [5]:
TD_ERROR_EPSILON = 0.0001

class TDerrorMemory:

    def __init__(self, CAPACITY):
        self.capacity = CAPACITY #メモリの最大長
        self.memory = []#経験を保存する変数
        self.index = 0#保存するindexを示す変数

    def push(self, td_error):
        '''TD誤差をメモリに保存'''

        if len(self.memory) < self.capacity:
            self.memory.append(None)

        self.memory[self.index] = td_error
        self.index = (self.index + 1) % self.capacity

    def __len__(self):
        '''関数lenに対して、現在の変数memoryの長さを返す'''
        return len(self.memory)

    def get_prioritized_indexes(self, batch_size):
        '''TD誤差に応じた確率でindexを取得'''

        #TD誤差の和を計算
        sum_absolute_td_error = np.sum(np.absolute(self.memory))
        sum_absolute_td_error += TD_ERROR_EPSILON * len(self.memory)

        #batch_size分の乱数を生成して、昇順に並べる
        rand_list = np.random.uniform(0, sum_absolute_td_error, batch_size)
        rand_list = np.sort(rand_list)

        #作成した乱数で串刺しにして、インデックスを求める
        indexes = []
        idx = 0
        tmp_sum_absolute_td_error = 0
        for rand_num in rand_list:
            while tmp_sum_absolute_td_error < rand_num:
                tmp_sum_absolute_td_error += (abs(self.memory[idx]) + TD_ERROR_EPSILON)
                idx += 1

            #微小値を計算に使用した関係でindexがメモリの長さを超えた場合の補正
            if idx >= len(self.memory):
                idx = len(self.memory) - 1
            indexes.append(idx)

        return indexes

    def update_td_error(self, updated_td_errors):
        '''TD誤差の更新'''
        self.memory = updated_td_errors

In [6]:
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.fc3 = nn.Linear(n_mid, n_out)

    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        output = self.fc3(h2)
        return output

In [7]:
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states #入力
        self.num_actions = num_actions #行動
        self.memory = ReplayMemory(CAPACITY)#経験を記憶するメモリオブジェクト

        #ニューラルネットワーク構築
        n_in, n_mid, n_out = num_states, 32, num_actions
        self.main_q_network = Net(n_in, n_mid, n_out)
        self.target_q_network = Net(n_in, n_mid, n_out)
        print(self.main_q_network)#ネットワークの形を出力

        self.optimizer = optim.Adam(self.main_q_network.parameters(), lr=0.0001)#最適化手法の設定

        self.td_error_memory = TDerrorMemory(CAPACITY)

    def replay(self, episode):
        '''Experience Replayでネットワークの結合パラメータを学習'''
        #1. メモリサイズの確認
        if len(self.memory) < BATCH_SIZE:
            return
        
        #2. ミニバッチ作成
        self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch(episode)

        #3. 教師信号となるQ(s_t, a_t)値を求める
        self.expected_state_action_values = self.get_expected_state_action_values()

        #4. 結合パラメータの更新
        self.update_main_q_network()

    def decide_action(self, state, episode):
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            self.main_q_network.eval()#推論モードに切り替え
            with torch.no_grad():
                action = self.main_q_network(state).max(1)[1].view(1, 1)
        else:
            action = torch.LongTensor([[random.randrange(self.num_actions)]])

        return action

    def make_minibatch(self, episode):
        '''2.ミニバッチの作成'''

        #2.1 メモリからミニバッチ分のデータを取り出す
        if episode < 30:
            transitions = self.memory.sample(BATCH_SIZE)
        else:
            #TD誤差に応じてミニバッチを取り出す
            indexes = self.td_error_memory.get_prioritized_indexes(BATCH_SIZE)
            transitions = [self.memory.memory[n] for n in indexes]

        #2.2 各変数をミニバッチに対応する形に変形
        batch = Transition(*zip(*transitions))

        #2.3 各変数の要素をミニバッチに対応する形に変形する
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

        return batch, state_batch, action_batch, reward_batch, non_final_next_states

    def get_expected_state_action_values(self):
        '''3.教師信号となるQ(s_t, a_t)値を求める'''

        #3.1 ネットワークを推論モードに切り替え
        self.main_q_network.eval()
        self.target_q_network.eval()

        #3.2 ネットワークが出力したQ(s_t, a_t)を求める
        self.state_action_values = self.main_q_network(self.state_batch).gather(1, self.action_batch)

        #3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, self.batch.next_state)))

        next_state_values = torch.zeros(BATCH_SIZE)

        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

        a_m[non_final_mask] = self.main_q_network(
            self.non_final_next_states).detach().max(1)[1]

        a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

        next_state_values[non_final_mask] = self.target_q_network(self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

        #3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
        expected_state_action_values = self.reward_batch + GAMMA * next_state_values

        return expected_state_action_values

    def update_main_q_network(self):
        '''4. 結合パラメータの更新'''

        #4.1 ネットワークを訓練モードに切り替える
        self.main_q_network.train()

        #4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
        loss = F.smooth_l1_loss(self.state_action_values, self.expected_state_action_values.unsqueeze(1))

        #4.3 結合パラメータを更新する
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_q_network(self):
        '''Target Q-NetworkをMainと同じにする'''
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())

    def update_td_error_memory(self):
        '''TD誤差メモリに格納されているTD誤差を更新する'''

        #ネットワークを推論モードに
        self.main_q_network.eval()
        self.target_q_network.eval()

        #全メモリでミニバッチを作成
        transitions = self.memory.memory
        batch = Transition(*zip(*transitions))

        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

        #ネットワークが出力したQ(s_t, a_t)を求める
        state_action_values = self.main_q_network(state_batch).gather(1, action_batch)

        #cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, batch.next_state)))

        #まずは全部0にしておく、サイズはメモリの長さ
        next_state_values = torch.zeros(len(self.memory))
        a_m = torch.zeros(len(self.memory)).type(torch.LongTensor)

        #次の状態での最大Q値の行動a_mをMain Q-networkから求める
        a_m[non_final_mask] = self.main_q_network(non_final_next_states).detach().max(1)[1]

        #次の状態があるものだけにフィルターし、size 32を32*1へ
        a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

        #次の状態があるindexの、行動a_mのQ値をtarget Q-networkから求める
        next_state_values[non_final_mask] = self.target_q_network(non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

        #TD誤差を求める
        td_errors = (reward_batch + GAMMA * next_state_values) - \
            state_action_values.squeeze()

        #TD誤差メモリを更新、Tensorをdetach()で取り出し、NumPyにしてから、Pythonのリストまで変換
        self.td_error_memory.memory = td_errors.detach().numpy().tolist()

    def save_onnx(self, output_name):
        self.main_q_network.eval()
        x = torch.rand(self.num_states)
        torch.onnx.export(self.main_q_network, x,
            output_name, export_params=True, opset_version=11)

In [8]:
class Agent:
    def __init__(self, num_states, num_actions):
        '''課題の状態と行動の数を設定する'''
        self.brain = Brain(num_states, num_actions) 

    def update_q_function(self, episode):
        '''Q関数を更新する'''
        self.brain.replay(episode)

    def get_action(self, state, episode):
        '''行動を決定する'''
        action = self.brain.decide_action(state, episode)
        return action

    def memorize(self, state, action, state_next, reward):
        '''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
        self.brain.memory.push(state, action, state_next, reward)

    def update_target_q_function(self):
        '''Target Q-NetworkをMain Q-Networkと同じに更新'''
        self.brain.update_target_q_network()
        
    def memorize_td_error(self, td_error):  
        '''TD誤差メモリにTD誤差を格納'''
        self.brain.td_error_memory.push(td_error)
        
    def update_td_error_memory(self): 
        '''TD誤差メモリに格納されているTD誤差を更新する'''
        self.brain.update_td_error_memory()

    def save_onnx(self, output_name):
        self.brain.save_onnx(output_name)

In [9]:
def make_env(env_directory):
    unity_env = UnityEnvironment(env_directory)
    env = UnityToGymWrapper(unity_env)
    return env

In [10]:
class Environment:
    def __init__(self):
        self.env = make_env("ML-agents-test")
        num_states = self.env.observation_space.shape[0]
        num_actions = self.env.action_space.n

        self.agent = Agent(num_states, num_actions)

    def run(self):
        episode_10_list = np.zeros(10)

        complete_episodes = 0
        episode_final = False
        frames = []

        for episode in range(NUM_EPISODES):
            observation = self.env.reset()

            state = observation
            state = torch.from_numpy(state).type(torch.FloatTensor)
            state = torch.unsqueeze(state, 0)

            while(1):
                
                action = self.agent.get_action(state, episode)

                observation_next, reward, done, _ = self.env.step(action.item())
                reward = torch.FloatTensor([reward])
                if done:
                    state_next = None
                    print("done :" + str(episode))
                    break
                else:
                    state_next = observation_next
                    state_next = torch.from_numpy(state_next).type(torch.FloatTensor)
                    state_next = torch.unsqueeze(state_next, 0)

                self.agent.memorize(state, action, state_next, reward)

                self.agent.memorize_td_error(0)

                self.agent.update_q_function(episode)

                state = state_next

                if done:
                    self.agent.update_td_error_memory()

                    if(episode % 2 == 0):
                        self.agent.update_target_q_function()
                    break

        self.agent.save_onnx("test.onnx")

In [11]:
avoid_obstacle_env = Environment()
avoid_obstacle_env.run()

[INFO] Connected to Unity environment with package version 2.2.1-exp.1 and communication version 1.5.0
[INFO] Connected new brain: avoid_obstacle?team=0
Net(
  (fc1): Linear(in_features=6, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=3, bias=True)
)




done :0


  a_m[non_final_mask] = self.main_q_network(
  a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
  next_state_values[non_final_mask] = self.target_q_network(self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()


done :1
done :2
done :3
done :4
done :5
done :6
done :7
done :8
done :9
done :10
done :11
done :12
done :13
done :14
done :15
done :16
done :17
done :18
done :19
done :20
done :21
done :22
done :23
done :24
done :25
done :26
done :27
done :28
done :29
done :30
done :31
done :32
done :33
done :34
done :35
done :36
done :37
done :38
done :39
done :40
done :41
done :42
done :43
done :44
done :45
done :46
done :47
done :48
done :49
done :50
done :51
done :52
done :53
done :54
done :55
done :56
done :57
done :58
done :59
done :60
done :61
done :62
done :63
done :64
done :65
done :66
done :67
done :68
done :69
done :70
done :71
done :72
done :73
done :74
done :75
done :76
done :77
done :78
done :79
done :80
done :81
done :82
done :83
done :84
done :85
done :86
done :87
done :88
done :89
done :90
done :91
done :92
done :93
done :94
done :95
done :96
done :97
done :98
done :99


In [12]:
print(torch.randn(6))

tensor([-0.3185, -1.8588, -0.8598, -1.3134,  1.0143,  0.5290])
