In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

In [2]:
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
    plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0),
              dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')
    
    def animate(i):
        patch.set_data(frames[i])
        
    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
                                  interval=50)
    anim.save('movie_cartpole_DDQN.mp4')
    display(display_animation(anim, default_mode='loop'))

In [3]:
from collections import namedtuple

Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))

In [4]:
ENV = 'CartPole-v0'
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500

In [5]:
class ReplayMemory:

    def __init__(self, CAPACITY):
        self.capacity = CAPACITY
        self.memory = []
        self.index = 0

    def push(self, state, action, state_next, reward):

        if len(self.memory) < self.capacity:
            self.memory.append(None)

        self.memory[self.index] = Transition(state, action, state_next, reward)

        self.index = (self.index + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [18]:
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        #Dueling Network
        self.fc3_adv = nn.Linear(n_mid, n_out)#Advantage
        self.fc3_v = nn.Linear(n_mid, 1)#Value

    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))

        adv = self.fc3_adv(h2)
        val = self.fc3_v(h2).expand(-1, adv.size(1))

        output = val + adv - adv.mean(1, keepdim=True).expand(-1, adv.size(1))
        return output

In [7]:
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions #CartPoleの行動
        self.memory = ReplayMemory(CAPACITY)#経験を記憶するメモリオブジェクト

        #ニューラルネットワーク構築
        n_in, n_mid, n_out = num_states, 32, num_actions
        self.main_q_network = Net(n_in, n_mid, n_out)
        self.target_q_network = Net(n_in, n_mid, n_out)
        print(self.main_q_network)#ネットワークの形を出力

        self.optimizer = optim.Adam(self.main_q_network.parameters(), lr=0.0001)#最適化手法の設定

    def replay(self):
        '''Experience Replayでネットワークの結合パラメータを学習'''
        #1. メモリサイズの確認
        if len(self.memory) < BATCH_SIZE:
            return
        
        #2. ミニバッチ作成
        self.batch, self.state_batch, self.action_batch, self.reward_batch,self.non_final_next_states = self.make_minibatch()

        #3. 教師信号となるQ(s_t, a_t)値を求める
        self.expected_state_action_values = self.get_expected_state_action_values()

        #4. 結合パラメータの更新
        self.update_main_q_network()

    def decide_action(self, state, episode):
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            self.main_q_network.eval()#推論モードに切り替え
            with torch.no_grad():
                action = self.main_q_network(state).max(1)[1].view(1, 1)
        else:
            action = torch.LongTensor([[random.randrange(self.num_actions)]])

        return action

    def make_minibatch(self):
        '''2.ミニバッチの作成'''

        #2.1 メモリからミニバッチ分のデータを取り出す
        transitions = self.memory.sample(BATCH_SIZE)

        #2.2 各変数をミニバッチに対応する形に変形
        batch = Transition(*zip(*transitions))

        #2.3 各変数の要素をミニバッチに対応する形に変形する
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

        return batch, state_batch, action_batch, reward_batch, non_final_next_states

    def get_expected_state_action_values(self):
        '''3.教師信号となるQ(s_t, a_t)値を求める'''

        #3.1 ネットワークを推論モードに切り替え
        self.main_q_network.eval()
        self.target_q_network.eval()

        #3.2 ネットワークが出力したQ(s_t, a_t)を求める
        self.state_action_values = self.main_q_network(self.state_batch).gather(1, self.action_batch)

        #3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, self.batch.next_state)))

        next_state_values = torch.zeros(BATCH_SIZE)

        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

        a_m[non_final_mask] = self.main_q_network(
            self.non_final_next_states).detach().max(1)[1]

        a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

        next_state_values[non_final_mask] = self.target_q_network(self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

        #3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
        expected_state_action_values = self.reward_batch + GAMMA * next_state_values

        return expected_state_action_values

    def update_main_q_network(self):
        '''4. 結合パラメータの更新'''

        #4.1 ネットワークを訓練モードに切り替える
        self.main_q_network.train()

        #4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
        loss = F.smooth_l1_loss(self.state_action_values,self.expected_state_action_values.unsqueeze(1))

        #4.3 結合パラメータを更新する
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_q_network(self):
        '''Target Q-NetworkをMainと同じにする'''
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())

In [8]:
class Agent:
    def __init__(self, num_states, num_actions):
        self.brain = Brain(num_states, num_actions)
    
    def update_q_function(self):
        self.brain.replay()

    def get_acton(self, state, episode):
        action = self.brain.decide_action(state, episode)
        return action

    def memorize(self, state, action, state_next, reward):
        self.brain.memory.push(state, action, state_next, reward)

    def update_target_q_function(self):
        self.brain.update_target_q_network()

In [9]:
class Environment:
    def __init__(self):
        self.env = gym.make(ENV)
        self.num_states = self.env.observation_space.shape[0]
        self.num_actions = self.env.action_space.n

        self.agent = Agent(self.num_states, self.num_actions)

    def run(self):
        episode_10_list = np.zeros(10)

        complete_episodes = 0
        episode_final = False
        frames = []

        for episode in range(NUM_EPISODES):
            observation = self.env.reset()

            state = observation
            state = torch.from_numpy(state).type(torch.FloatTensor)
            state = torch.unsqueeze(state, 0)

            for step in range(MAX_STEPS):
                if episode_final is True:
                    frames.append(self.env.render(mode = 'rgb_array'))
                
                action = self.agent.get_acton(state, episode)

                observation_next, _, done, _ = self.env.step(action.item())
                if done:
                    state_next = None

                    episode_10_list = np.hstack((episode_10_list[1:], step + 1))

                    if step < 195:
                        reward = torch.FloatTensor([-1.0])
                        complete_episodes = 0
                    else:
                        reward = torch.FloatTensor([1.0])
                        complete_episodes = complete_episodes + 1
                else:
                    reward = torch.FloatTensor([0.0])
                    state_next = observation_next
                    state_next = torch.from_numpy(state_next).type(torch.FloatTensor)
                    state_next = torch.unsqueeze(state_next, 0)

                self.agent.memorize(state, action, state_next, reward)

                self.agent.update_q_function()

                state = state_next

                if done:
                    print('%d Episode: Finished after %d steps : 10試行の平均step数 = %.1f' %(episode, step + 1, episode_10_list.mean()))

                    if(episode % 2 == 0):
                        self.agent.update_target_q_function()
                    break

            if episode_final is True:
                display_frames_as_gif(frames)
                break

            if complete_episodes >= 10:
                print('10回連続成功')
                episode_final = True

In [19]:
cartpole_env = Environment()
cartpole_env.run()

Net(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (fc3_adv): Linear(in_features=32, out_features=2, bias=True)
  (fc3_v): Linear(in_features=32, out_features=1, bias=True)
)
0 Episode: Finished after 21 steps : 10試行の平均step数 = 2.1
1 Episode: Finished after 9 steps : 10試行の平均step数 = 3.0
2 Episode: Finished after 10 steps : 10試行の平均step数 = 4.0
3 Episode: Finished after 9 steps : 10試行の平均step数 = 4.9
4 Episode: Finished after 10 steps : 10試行の平均step数 = 5.9
5 Episode: Finished after 10 steps : 10試行の平均step数 = 6.9
6 Episode: Finished after 10 steps : 10試行の平均step数 = 7.9
7 Episode: Finished after 9 steps : 10試行の平均step数 = 8.8
8 Episode: Finished after 10 steps : 10試行の平均step数 = 9.8


  a_m[non_final_mask] = self.main_q_network(
  a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
  next_state_values[non_final_mask] = self.target_q_network(self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()


9 Episode: Finished after 10 steps : 10試行の平均step数 = 10.8
10 Episode: Finished after 10 steps : 10試行の平均step数 = 9.7
11 Episode: Finished after 10 steps : 10試行の平均step数 = 9.8
12 Episode: Finished after 10 steps : 10試行の平均step数 = 9.8
13 Episode: Finished after 11 steps : 10試行の平均step数 = 10.0
14 Episode: Finished after 9 steps : 10試行の平均step数 = 9.9
15 Episode: Finished after 11 steps : 10試行の平均step数 = 10.0
16 Episode: Finished after 13 steps : 10試行の平均step数 = 10.3
17 Episode: Finished after 12 steps : 10試行の平均step数 = 10.6
18 Episode: Finished after 14 steps : 10試行の平均step数 = 11.0
19 Episode: Finished after 14 steps : 10試行の平均step数 = 11.4
20 Episode: Finished after 12 steps : 10試行の平均step数 = 11.6
21 Episode: Finished after 13 steps : 10試行の平均step数 = 11.9
22 Episode: Finished after 19 steps : 10試行の平均step数 = 12.8
23 Episode: Finished after 17 steps : 10試行の平均step数 = 13.4
24 Episode: Finished after 16 steps : 10試行の平均step数 = 14.1
25 Episode: Finished after 18 steps : 10試行の平均step数 = 14.8
26 Episode: Finished