In [1]:
# 구현에 사용할 패키지 임포트
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym

In [2]:
# 애니메이션을 만드는 함수
# 참고 URL http://nbviewer.jupyter.org/github/patrickmineault
# /xcorr-notebooks/blob/master/Render%20OpenAI%20gym%20as%20GIF.ipynb
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display


def display_frames_as_gif(frames):
    """
    Displays a list of frames as a gif, with controls
    """
    plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0),
               dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
                                   interval=50)

    anim.save('movie_cartpole_DQN.mp4')  # 애니메이션을 저장하는 부분
    display(display_animation(anim, default_mode='loop'))


# namedtuple 생성

In [3]:
# namedtuple 생성
from collections import namedtuple

Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

# 상수 정의

In [4]:
# 상수 정의
ENV = 'CartPole=v0'  # 태스크 이름
GAMMA = 0.99  # 시간할인율
MAX_STEPs = 200  # 1에피소드당 최대 단계 수
NUM_EPISODES = 500  # 최대 에피소드 수

# class ReplayMemory

In [8]:
# transition을 저장하기 위한 메모리 클래스

class ReplayMemory:
    
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY  # 메모리의 최대 저장 건수
        self.memory = []  # 실제 transition을 저장할 변수
        self.index = 0  # 저장 위치를 가리킬 인덱스 변수
    
    def push(self, state, action, state_next, reward):
        '''transition = (state, action, state_next, reward)을 메모리에 저장'''
        
        if len(self.memory) < self.capacity:
            self.memory.append(None)  # 메모리가 가득 차지 않은 경우
        
        
        # Transition이라는 namedtuple을 사용해 키-값 쌍의 형태로 값을 저장
        self.memory[self.index] = Transition(state, action, state_next, reward)
        
        self.index = (self.index + 1) % self.capacity  # 다음 저장할 위치를 한 자리 뒤로 수정
    
    def sample(self, batch_size):
        '''batch_size 개수만큼 무작위로 저장된 transition을 추출'''
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        '''len 함수로 현재 저장된 transition 개수를 반환'''
        return len(self.memory)

# class Brain

In [6]:
# 에이전트의 두뇌 역할을 하는 클래스. DQN을 실제 수행핸다
# Q함수를 딥러닝 신경망 형태로 정의

import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions  # 행동의 가짓수(왼쪽, 오른쪽)를 구함
        
        # transition을 기억하기 위한 메모리 객체 생성
        self.memory = ReplayMemory(CAPACITY)
        
        # 신경망 구성
        self.model = nn.Sequential()
        self.model.add_module('fc1', nn.Linear(num_states, 32))
        self.model.add_module('relu1', nn.ReLU())
        self.model.add_module('fc2', nn.Linear(32, 32))
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fc3', nn.Linear(32, num_actions))
        
        print(self.model)  # 신경망 구조 출력
        
        # 최적화 기법 선택
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
        
    
    def replay(self):
        '''Experience Replay로 신경망의 결합 가중치 학습'''
        
        # -----------------------------------------
        # 1. 저장된 transition 수 확인
        # -----------------------------------------
        # 1.1 저장된 transition의 수가 미니배치 크기보다 작으면 아무 것도 하지 않음
        if len(self.memory) < BATCH_SIZE:
            return
        
        # -----------------------------------------
        # 2. 미니배치 생성
        # -----------------------------------------
        # 2.1 메모리 객체에서 미니배치를 추출
        transition = self.memory.sample(BATCH_SIZE)
        
        # 2.2 각 변수를 미니배치에 맞는 형태로 변형
        # transitions는 각 단계 별로 (state, action, state_next, reward) 형태로 BATCH_SIZE 갯수만큼 저장됨
        # 다시 말해, (state, action, state_next, reward) * BATCH_SIZE 형태가 된다
        # 이것을 미니배치로 만들기 위해
        # (state*BATCH_SIZE, action*BATCH_SIZE, state_next*BATCH_SIZE, reward*BATCH_SIZE) 형태로 변환한다
        batch = Transition(*zip(*transitions))
        
        # 2.3 각 변수의 요소를 미니배치에 맞게 변형하고, 신경망으로 다룰 수 있도록 Variable로 만든다
        # state를 예로 들면, [torch.FloatTensor of size 1*4] 형태의 요소가 BATCH_SIZE 갯수만큼 있는 형태이다
        # 이를 torch.FloatTensor of size BATCH_SIZE*4 형태로 변형한다
        # 상태, 행동, 보상, non_final 상태로 된 미니배치를 나타내는 Variable을 생성
        # cat은 Concatenates(연접)을 의미한다
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state
                                           if s is not None])
        
        # -----------------------------------------
        # 3. 정답신호로 사용할 Q(s_t, a_t)를 계산
        # -----------------------------------------
        # 3.1 신경망을 추론 모드로 전환
        self.model.eval()
        
        # 3.2 신경망으로 Q(s_t, a_t)를 계산
        # self.model(state_batch)은 왼쪽, 오른쪽에 대한 Q값을 출력하며
        # [torch.FloatTensor of size BATCH_SIZEx2] 형태이다
        # 여기서부터는 실행한 행동 a_t에 대한 Q값을 계산하므로 action_batch에서 취한 행동 a_t가 
        # 왼쪽이냐 오른쪽이냐에 대한 인덱스를 구하고, 이에 대한 Q값을 gather 메서드로 모아온다
        state_action_values = self.model(state_batch).gather(1, action_batch)
        
        # 3.3 max{Q(s_t+1, a)}값을 계산한다 이때 다음 상태가 존재하는지에 주의해야 한다
        
        # cartpole이 done 상태가 아니고, next_state가 존재하는지 확인하는 인덱스 마스크를 만듦
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, batch.next_state)))
        
        # 먼저 전체를 0으로 초기화
        next_state_values = torch.zeros(BATCH_SIZE)
        
        # 다음 상태가 있는 인덱스에 대한 최대 Q값을 구한다
        # 출력에 접근하여 열방향 최대값(max(1))이 되는 [값, 인덱스]를 구한다
        # 그리고 이 Q값(인덱스=0)을 출력한 다음
        # detatch 메서드로 이 값을 꺼내온다
        next_state_values[non_final_mask] = self.model(non_final_next_states).max(1)[0].detatch()
        
        # 3.4 정답신호로 사용할 Q(s_t, a_t) 값을 Q러닝 식으로 계산한다
        expected_state_actino_values = reward_batch + GAMMA * next_state_values
        
        # -----------------------------------------
        # 4. 결합 가중치 수정
        # -----------------------------------------
        # 4.1 신경망을 학습 모드로 전환
        self.model.train()
        
        # 4.2 손실함수를 계산 (smooth_l1_Loss는 Huber 함수)
        # expected_state_action_values은
        # size가 [minibatch]이므로 unsqueeze하여 [minibatch*1]로 만든다
        loss = F.smooth_l1_loss(state_action_values,
                               expected_state_aciton_values.unsqueeze(1))
        
        # 4.3 결합 가중치를 수정한다
        self.optimizer.zero_grad()  # 검사를 초기화
        loss.backward()  # 역전파 계산
        self.optimizer.step()  # 결합 가중치 수정
        
    
    def decide_action(self, state, episode):
        '''현재 상태에 따라 행동을 결정한다'''
        # ε-greedy 알고리즘에서 서서히 최적행동의 비중을 늘린다
        epsilon = 0.5 * (1 / (episode + 1))
        
        if epsilon <= np.random.uniform(0, 1):
            self.model.eval()  # 신경망을 추론 모드로 전환
            with torch.no_grad():
                action = self.model(state).max(1)[1].view(1, 1)
            # 신경망 출력의 최댓값에 대한 인덱스 = max(1)[1]
            # .view(1,1)은 [torch.LongTensor of size 1] 을 size 1*1로 변환하는 역할을 한다
            
        else:
            # 행동을 무작위로 반환(0 혹인 1)
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])  # 행동을 무작위로 반환(0 혹은 1)
            # action은 [torch.LongTensor of size 1*1] 형태가 된다
            
        return action      

# class Agent

In [None]:
# CartPole 태스크의 에이전트 클래스. 봉 달린 수레 자체라고 보면 된다


class Agent:
    def __init__(self, num_states, num_actions):
        '''태스크의 상태 및 행동의 가짓수를 설정'''
        self.brain = Brain(num_states, num_actions)  # 에이전트의 행동을 결정할 두뇌 역할 객체를 생성
    
    def update_q_function(self):
        '''Q함수를 수정'''
        self.brain.replay()
        
    def get_action(self, state, episode):
        '''행동을 결정'''
        action = self.brain.decide_action(state, episode)
        return action
    
    def memorize(self, state, aciton, state_next, reward):
        '''memory 객체에 state, action, state_next, reward 내용을 저장'''