# DDQN(Double-DQN) 구현

In [83]:
import numpy as np
import matplotlib.pyplot as plt
import gym
import random
%matplotlib inline

In [84]:
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

In [85]:
ENV = 'CartPole-v0'
GAMMA = 0.99 # 시간 할인율
NUM_STEPS = 200
NUM_EPISODES = 500

In [86]:
class ReplayMemory:
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY
        self.memory = []
        self.index = 0
    
    def push(self, state, action, next_state, reward):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.index] = Transition(state, action, next_state, reward)
        self.index = (self.index + 1) % self.capacity
        
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

In [87]:
# 신경망 구성
from torch import nn
from torch.nn import functional as F

class Net(nn.Module):
    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.fc3 = nn.Linear(n_mid, n_mid)
        self.fc4 = nn.Linear(n_mid, n_mid)
        self.fc5 = nn.Linear(n_mid, n_out)
        
    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        h3 = F.relu(self.fc3(h2))
        h4 = F.relu(self.fc4(h3))
        output = self.fc5(h4)
        return output

In [88]:
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions
        self.memory = ReplayMemory(CAPACITY) # transition을 기억하기 위한 메모리 객체 생성
        
        # 신경망 구성
        n_in, n_mid, n_out = num_states, 32, num_actions
        self.main_q_network = Net(n_in, n_mid, n_out)
        self.target_q_network = Net(n_in, n_mid, n_out)
        print(self.main_q_network) # 신경망 구조 출력
        
        self.optimizer = optim.Adam(self.main_q_network.parameters(), lr=0.0001) # 최적화 기법 선택
        
    def replay(self):
        if len(self.memory) < BATCH_SIZE: # 저장된 transition의 수를 확인
            return
        # Create Mini-batch
        self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()
        # Calculate Q(s_t, a_t) for answer signal
        self.expected_state_action_values = self.get_expected_state_action_values()
        # 결합 가중치 수정
        self.update_main_q_network()
        
    def decide_action(self, state, episode):
        epsilon = 0.5 * (1 / (episode + 1))
        if epsilon < np.random.uniform(0, 1): 
            self.main_q_network.eval()
            with torch.no_grad():
                action = self.main_q_network(state).max(axis=1)[1].reshape(1, 1)
        else:
            action = torch.LongTensor([[random.randrange(self.num_actions)]])
            
        return action
    
    def make_minibatch(self):
        transition = self.memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transition)) # (state, action, next_state, reward) * batch_size ==> (state * batch_size, action * batch_size ...)으로 변경
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        
        return batch, state_batch, action_batch, reward_batch, non_final_next_states
    
    def get_expected_state_action_values(self):
        # 신경망을 추론 모드로 변경
        self.main_q_network.eval()
        self.target_q_network.eval()
        
        self.state_action_values = self.main_q_network(self.state_batch).gather(1, self.action_batch) # Q_m(s_t, a_t)를 계산
        # next_state이 존재하는지를 확인하기 위한 인덱스 마스크 생성
        non_final_mask = torch.BoolTensor(tuple(map(lambda s: s is not None, self.batch.next_state)))
        next_state_values = torch.zeros(BATCH_SIZE) # next_state의 값을 저장하기 위해 모두 0으로 초기화
        
        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)
        # a_m을 구하기 위해 main_q_network에 다음 state를 넘긴 출력 값인 Q_m(s_t+1, a)에서 detach()로 값을 때어낸 후 최댓값을 가지는 Q값의 인덱스를 추출함
        a_m[non_final_mask] = self.main_q_network(self.non_final_next_states).detach().max(axis=1)[1]
        # a_m의 형태 변경 (size 1 ==> size 1*1)
        a_m_non_final_next_states = a_m[non_final_mask].reshape(-1, 1)
        # Target_q_network를 사용해서 최대가 되는 행동 a_m에서의 Q값을 구함
        next_state_values[non_final_mask] = self.target_q_network(self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()
        
        # 계산
        expected_state_action_values = self.reward_batch + GAMMA * next_state_values
        return expected_state_action_values
    
    def update_main_q_network(self):
        self.main_q_network.train() # 신경망을 학습 모드로 전환
        loss = F.smooth_l1_loss(self.state_action_values, self.expected_state_action_values.unsqueeze(1))
        
        self.optimizer.zero_grad() # 경사를 초기화
        loss.backward() # 역전파 계산
        self.optimizer.step() # 결합 가중치 수정
        
    def update_target_q_network(self):
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())

In [89]:
class Agent:
    def __init__(self, num_states, num_actions):
        '''태스크의 상태 및 행동의 가짓수를 설정'''
        self.brain = Brain(num_states, num_actions)  # 에이전트의 행동을 결정할 두뇌 역할 객체를 생성

    def update_q_function(self):
        '''Q함수를 수정'''
        self.brain.replay()

    def get_action(self, state, episode):
        '''행동을 결정'''
        action = self.brain.decide_action(state, episode)
        return action

    def memorize(self, state, action, state_next, reward):
        '''memory 객체에 state, action, state_next, reward 내용을 저장'''
        self.brain.memory.push(state, action, state_next, reward)

    def update_target_q_function(self):
        '''Target Q-Network을 Main Q-Network와 맞춤'''
        self.brain.update_target_q_network()

In [90]:
class Environment:
    def __init__(self):
        self.env = gym.make(ENV)
        num_states = self.env.observation_space.shape[0]
        num_actions = self.env.action_space.n
        self.agent = Agent(num_states, num_actions)
        
    def run(self):
        episode_10_list = np.zeros(10)
        complete_episodes = 0
        episode_final = False # 마지막 episode 여부
        
        for episode in range(NUM_EPISODES):
            observation = self.env.reset()
            state = observation
            state = torch.from_numpy(state).type(torch.FloatTensor)
            state = torch.unsqueeze(state, 0)
            
            for step in range(NUM_STEPS):
                action = self.agent.get_action(state, episode)
                observation_next, _, done, _ = self.env.step(action.item())
                
                if done:
                    state_next = None
                    episode_10_list = np.hstack((episode_10_list[1:], step + 1))
                    if step < 195:
                        reward = torch.FloatTensor([-1.0])
                        complete_episodes = 0
                    else:
                        reward = torch.FloatTensor([1.0])
                        complete_episodes += 1
                else:
                    reward = torch.FloatTensor([0.0])
                    state_next = observation_next
                    state_next = torch.from_numpy(state_next).type(torch.FloatTensor)
                    state_next = torch.unsqueeze(state_next, 0)
                    
                self.agent.memorize(state, action, state_next, reward)
                self.agent.update_q_function() # Experience Replay로 Q함수 저장
                state = state_next # 관측 결과 update
                
                if done:
                    print('%d Episode: Finished after %d steps：최근 10 에피소드의 평균 단계 수 = %.1lf' % (episode, step + 1, episode_10_list.mean()))
                    if episode % 2 == 0:
                        self.agent.update_target_q_function()
                    break
            
            if episode_final is True:
                break
            if complete_episodes >= 10:
                print("10 에피소드 연속 성공")
                episode_final = True

In [93]:
cartpole_env = Environment()
cartpole_env.run()

Net(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=32, bias=True)
  (fc4): Linear(in_features=32, out_features=32, bias=True)
  (fc5): Linear(in_features=32, out_features=2, bias=True)
)
0 Episode: Finished after 11 steps：최근 10 에피소드의 평균 단계 수 = 1.1
1 Episode: Finished after 9 steps：최근 10 에피소드의 평균 단계 수 = 2.0
2 Episode: Finished after 11 steps：최근 10 에피소드의 평균 단계 수 = 3.1
3 Episode: Finished after 10 steps：최근 10 에피소드의 평균 단계 수 = 4.1
4 Episode: Finished after 9 steps：최근 10 에피소드의 평균 단계 수 = 5.0
5 Episode: Finished after 12 steps：최근 10 에피소드의 평균 단계 수 = 6.2
6 Episode: Finished after 9 steps：최근 10 에피소드의 평균 단계 수 = 7.1
7 Episode: Finished after 9 steps：최근 10 에피소드의 평균 단계 수 = 8.0
8 Episode: Finished after 9 steps：최근 10 에피소드의 평균 단계 수 = 8.9
9 Episode: Finished after 14 steps：최근 10 에피소드의 평균 단계 수 = 10.3
10 Episode: Finished after 20 steps：최근 10 에피소드의 평균 단계 수 = 11.2
11 Episode: Finished after