## (간단한) DQN 구현
- Fixed Target Q-Network에서 target Q-Network 없이 미니 배치 사용
- main-network각 단계의 transition을 메모리 객체에 저장, 여러 단계 분량에 해당하는 transition을 무작위로 몇 개 꺼낸 것이 '미니배치'

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import gym

In [2]:
#애니메이션을 만드는 함수
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
    plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0), dpi = 72)
    patch = plt.imshow(frames[0])
    plt.axis('off')
    
    def animate(i):
        patch.set_data(frames[i])
    
    anim = animation.FuncAnimation(plt.gcf(), animate, frames = len(frames), interval = 50)
    writergif = animation.PillowWriter(fps=30) 
    anim.save("movie_cartpole_DQN.gif",writer=writergif)
    display(display_animation(anim, default_mode='loop'))

namedtuple 사용 예시

In [3]:
#namedtuple <- 키를 필드명으로 값에 접근 가능
from collections import namedtuple

Tr = namedtuple('tr', ('name_a','value_b'))
Tr_object = Tr('이름A', 100)

print(Tr_object)
print(Tr_object.value_b)

tr(name_a='이름A', value_b=100)
100


### Transition namedtuple 선언 및 상수 정의

In [4]:
from collections import namedtuple

Transition = namedtuple('Transition', ('state','action','next_state','reward'))

In [5]:
ENV = 'CartPole-v0'
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500

### ReplayMemory 클래스 정의

In [6]:
#transition을 저장하기 위한 메모리 클래스

class ReplayMemory:
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY #메모리 최대저장건수
        self.memory = [] #실제 transition을 저장할 변수
        self.index = 0 #저장 위치를 가리킬 인덱스 변수
    
    def push(self, state, action, state_next, reward): #transition 한덩이를 메모리에 저장
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        
        self.memory[self.index] = Transition(state,action,state_next,reward)
        self.index = (self.index+1) % self.capacity
    
    def sample(self, batch_size): #batch사이즈 만큼 무작위로 저장돼있는 transition추출
        return random.sample(self.momory, batch_size)

    def __len__(self): # 현재 저장된 transition개수 반환
        return len(self.memory)

### Brain 클래스 정의

In [None]:
#Q함수를 딥러닝 신경망 형태로 정의

import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000

class Brain: 
    def __init__(self, num_states, num_actions): #초기화에서는 'action개수 구하기', '메모리 객체 생성','신경망 구성','최적화기법 선택'
        self.num_actions = num_actions #행동의 가짓수 구함
        
        #transition저장을 위한 메모리 객체 생성
        self.memory = ReplayMemory(CAPACITY)
        
        #신경망 구성
        self.model = nn.Sequential()
        self.model.add_module('fc1', nn.Linear(num_states, 32))
        self.model.add_module('relu1', nn.ReLU())
        self.model.add_module('fc2', nn.Linear(32,32))
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fx3', nn.Linear(32, num_actions))
        print(self.model)
        
        #최적화기법 선택
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
    
    def replay(self):  #experience replay로 신경망 결합가중치 학습하기
        
        # 1.저장된 transitoin 수 확인 -> 배치 사이즈보다 작으면 아무것도 하지 않는다.
        if len(self.memory) < BATCH_SIZE: return
        
        # 2. 미니 배치 생성
        transitions = self.memory.sample(BATCH_SIZE) #메모리에서 미니배치 추출
        # (state*BATCH_SIZE, action*BATCH_SIZE, state_next*BATCH_SIZE, reward*BATCH_SIZE) 형태로 변환
        batch = Transition(*zip(*transitions))
        
        #각 변수의 요소를 미니배치에 맞게 변형, 신경망으로 다룰 수 있게 Variable로 만든다
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states= torch.cat([s for s in batch.next_state if s in not None])
        
        # 3. 정답 신호로 사용할 Q(s_t, a_t) 계산
        # 신경망 추론 모드 on
        self.model.eval()
        
        #신경망으로 Q(s_t, a_t) 계산
        state_action_values = self.model(state_batch).gather(1, action_batch)
        
        # max{Q(s_t+1, a)} 값을 계산함. 다음 상태가 존재하는지 주의
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None, batch.next_state))) #done이 아니고 next_state가 존재하는지 
        next_state_values = torch.zeros(BATCH_SIZE) #전체를 0으로 초기화 
        next_state_values[non_final_mask] = self.model(non_final_next_states).max(1)[0].detach()
        
        # Q러닝 식으로 Q(s_t, a_t) 계산
        expected_state_action_values = reward_batch + GAMMA*next_state_values
        
        # 4. 결합 가중치 수정
        # 신경망 학습모드 on
        self.model.train()
        
        #손실함수 계산
        
        