## 6.2 PyTorchでDDQN

In [51]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

In [52]:
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
    plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0),
               dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
                                   interval=50)

    anim.save('movie_cartpole_DDQN.mp4')  # 動画のファイル名と保存です
    display(display_animation(anim, default_mode='loop'))


In [53]:
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

In [54]:
ENV = 'CartPole-v0'  # 使用する課題名
GAMMA = 0.99  # 時間割引率
MAX_STEPS = 200  # 1試行のstep数
NUM_EPISODES = 500  # 最大試行回数

In [55]:
class ReplayMemory:
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY  # メモリの最大長さ
        self.memory = []  # 経験を保存する変数
        self.index = 0  # 保存するindexを示す変数
    def push(self, state, action, state_next, reward):
        '''transition = (state, action, state_next, reward)をメモリに保存する'''
        if len(self.memory) < self.capacity:
            self.memory.append(None)  # メモリが満タンでないときは足す
        self.memory[self.index] = Transition(state, action, state_next, reward)
        self.index = (self.index + 1) % self.capacity  # 保存するindexを1つずらす
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    def __len__(self):
        return len(self.memory)

In [56]:
import torch.nn as nn
import torch.nn.functional as F
class Net(nn.Module):
    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.fc3 = nn.Linear(n_mid, n_out)
    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        output = self.fc3(h2)
        return output

In [57]:
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions  # CartPoleの行動（右に左に押す）の2を取得
        self.memory = ReplayMemory(CAPACITY)
        n_in, n_mid, n_out = num_states, 32, num_actions
        self.main_q_network = Net(n_in, n_mid, n_out)  # Netクラスを使用
        self.target_q_network = Net(n_in, n_mid, n_out)  # Netクラスを使用
        print(self.main_q_network)  # ネットワークの形を出力
        self.optimizer = optim.Adam(
            self.main_q_network.parameters(), lr=0.0001)

    def replay(self):
        if len(self.memory) < BATCH_SIZE:
            return
        self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()
        self.expected_state_action_values = self.get_expected_state_action_values()
        self.update_main_q_network()

    def decide_action(self, state, episode):
        epsilon = 0.5 * (1 / (episode + 1))
        if epsilon <= np.random.uniform(0, 1):
            self.main_q_network.eval()  # ネットワークを推論モードに切り替える
            with torch.no_grad():
                print("test1 : ", self.main_q_network(state))
                print("test2 : ", self.main_q_network(state).max(1)[1])
                print("test3 : ", self.main_q_network(state).max(1)[1].view(1, 1))
                action = self.main_q_network(state).max(1)[1].view(1, 1)
        else:
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])  # 0,1の行動をランダムに返す
        return action

    def make_minibatch(self):
        transitions = self.memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state
                                           if s is not None])
        return batch, state_batch, action_batch, reward_batch, non_final_next_states

    def get_expected_state_action_values(self):
        self.main_q_network.eval()
        self.target_q_network.eval()
        print("state_batch", self.state_batch.shape)
        self.state_action_values = self.main_q_network(self.state_batch).gather(1, self.action_batch)
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, self.batch.next_state)))
        next_state_values = torch.zeros(BATCH_SIZE)
        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)
        a_m[non_final_mask] = self.main_q_network(self.non_final_next_states).detach().max(1)[1]
        a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
        next_state_values[non_final_mask] = self.target_q_network(
            self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()
        expected_state_action_values = self.reward_batch + GAMMA * next_state_values
        print("xx1", self.reward_batch.shape, "xx2", next_state_values.shape, "xx3",expected_state_action_values.shape)
        return expected_state_action_values

    def update_main_q_network(self):
        self.main_q_network.train()
        print("loss1",self.state_action_values.shape,"loss2",self.expected_state_action_values.unsqueeze(1).shape)
        loss = F.smooth_l1_loss(self.state_action_values,
                                self.expected_state_action_values.unsqueeze(1))
        self.optimizer.zero_grad()  # 勾配をリセット
        loss.backward()  # バックプロパゲーションを計算
        self.optimizer.step()  # 結合パラメータを更新
    def update_target_q_network(self):  # DDQNで追加
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())


In [58]:
class Agent:
    def __init__(self, num_states, num_actions):
        self.brain = Brain(num_states, num_actions)  # エージェントが行動を決定するための頭脳を生成
    def update_q_function(self):
        self.brain.replay()
    def get_action(self, state, episode):
        action = self.brain.decide_action(state, episode)
        return action
    def memorize(self, state, action, state_next, reward):
        self.brain.memory.push(state, action, state_next, reward)
    def update_target_q_function(self):
        self.brain.update_target_q_network()
        

In [59]:
class Environment:
    def __init__(self):
        self.env = gym.make(ENV)  # 実行する課題を設定
        num_states = self.env.observation_space.shape[0]  # 課題の状態と行動の数を設定
        num_actions = self.env.action_space.n  # CartPoleの行動（右に左に押す）の2を取得
        self.agent = Agent(num_states, num_actions)

    def run(self):
        episode_10_list = np.zeros(10)  # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
        complete_episodes = 0  # 195step以上連続で立ち続けた試行数
        episode_final = False  # 最後の試行フラグ
        for episode in range(NUM_EPISODES):  # 試行数分繰り返す
            observation = self.env.reset()  # 環境の初期化
            state = observation  # 観測をそのまま状態sとして使用
            state = torch.from_numpy(state).type(
                torch.FloatTensor)  # numpy変数をPyTorchのテンソルに変換
            state = torch.unsqueeze(state, 0)  # size 4をsize 1x4に変換
            print("obs",observation,"state",state)
            for step in range(MAX_STEPS):  # 1エピソードのループ
                action = self.agent.get_action(state, episode)  # 行動を求める
                observation_next, _, done, _ = self.env.step(action.item())  # rewardとinfoは使わないので_にする
                if done:  # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
                    state_next = None  # 次の状態はないので、Noneを格納
                    episode_10_list = np.hstack((episode_10_list[1:], step + 1))
                    if step < 195:
                        reward = torch.FloatTensor( [-1.0])  # 途中でこけたら罰則として報酬-1を与える
                        complete_episodes = 0  # 連続成功記録をリセット
                    else:
                        reward = torch.FloatTensor([1.0])  # 立ったまま終了時は報酬1を与える
                        complete_episodes = complete_episodes + 1  # 連続記録を更新
                else:
                    reward = torch.FloatTensor([0.0])  # 普段は報酬0
                    state_next = observation_next  # 観測をそのまま状態とする
                    state_next = torch.from_numpy(state_next).type(
                        torch.FloatTensor)  # numpy変数をPyTorchのテンソルに変換
                    state_next = torch.unsqueeze(state_next, 0)  # size 4をsize 1x4に変換
                print("SASR", state, action, state_next, reward)
                self.agent.memorize(state, action, state_next, reward)
                self.agent.update_q_function()
                state = state_next
                if done:
                    print('%d Episode: Finished after %d steps：10試行の平均step数 = %.1lf' % (
                        episode, step + 1, episode_10_list.mean()))
                    if(episode % 2 == 0):
                        self.agent.update_target_q_function()
                    break
            if episode_final is True:
                break
            if complete_episodes >= 10:
                print('10回連続成功')
                episode_final = True  # 次の試行を描画を行う最終試行とする

In [60]:
cartpole_env = Environment()
cartpole_env.run()

Net(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=2, bias=True)
)
obs [-0.03055083  0.04383443  0.02171692 -0.03894129] state tensor([[-0.0306,  0.0438,  0.0217, -0.0389]])
test1 :  tensor([[-0.0041,  0.0769]])
test2 :  tensor([1])
test3 :  tensor([[1]])
SASR tensor([[-0.0306,  0.0438,  0.0217, -0.0389]]) tensor([[1]]) tensor([[-0.0297,  0.2386,  0.0209, -0.3247]]) tensor([0.])
SASR tensor([[-0.0297,  0.2386,  0.0209, -0.3247]]) tensor([[1]]) tensor([[-0.0249,  0.4335,  0.0144, -0.6107]]) tensor([0.])
test1 :  tensor([[0.0044, 0.0467]])
test2 :  tensor([1])
test3 :  tensor([[1]])
SASR tensor([[-0.0249,  0.4335,  0.0144, -0.6107]]) tensor([[1]]) tensor([[-0.0162,  0.6284,  0.0022, -0.8988]]) tensor([0.])
test1 :  tensor([[0.0095, 0.0146]])
test2 :  tensor([1])
test3 :  tensor([[1]])
SASR tensor([[-0.0162,  0.6284,  0.0022, -0.8988]]) tensor([[1]]) tensor([[-0.0037,  0.8

  a_m[non_final_mask] = self.main_q_network(self.non_final_next_states).detach().max(1)[1]
  a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
  next_state_values[non_final_mask] = self.target_q_network(


xx1 torch.Size([32]) xx2 torch.Size([32]) xx3 torch.Size([32])
loss1 torch.Size([32, 1]) loss2 torch.Size([32, 1])
test1 :  tensor([[ 0.0194, -0.0544]])
test2 :  tensor([0])
test3 :  tensor([[0]])
SASR tensor([[ 0.0123,  0.9790, -0.0616, -1.4497]]) tensor([[0]]) tensor([[ 0.0318,  0.7847, -0.0905, -1.1768]]) tensor([0.])
state_batch torch.Size([32, 4])
xx1 torch.Size([32]) xx2 torch.Size([32]) xx3 torch.Size([32])
loss1 torch.Size([32, 1]) loss2 torch.Size([32, 1])
test1 :  tensor([[ 0.0084, -0.0266]])
test2 :  tensor([0])
test3 :  tensor([[0]])
SASR tensor([[ 0.0318,  0.7847, -0.0905, -1.1768]]) tensor([[0]]) tensor([[ 0.0475,  0.5909, -0.1141, -0.9139]]) tensor([0.])
state_batch torch.Size([32, 4])
xx1 torch.Size([32]) xx2 torch.Size([32]) xx3 torch.Size([32])
loss1 torch.Size([32, 1]) loss2 torch.Size([32, 1])
test1 :  tensor([[0.0004, 0.0047]])
test2 :  tensor([1])
test3 :  tensor([[1]])
SASR tensor([[ 0.0475,  0.5909, -0.1141, -0.9139]]) tensor([[1]]) tensor([[ 0.0594,  0.7873, -0

KeyboardInterrupt: 