In [9]:
from keras.models import Sequential
from keras.layers import Dense
from collections import deque
import numpy as np
import random

In [10]:
class DQNAgent:
    '''
    Args:
        state_size (int): 입력값의 크기
        mem_size (int): train에 필요한 buffer 크기
        discount (float): 0~1사이 값으로 현재 reward보다 미래의 reward가 얼마나 중요한지 결정하는 변수
        epsilon (float): 탐험을 위한 기준 값
        epsilon_min (float): epsilon의 최소 값
        epsilon_stop_episode (int): episode 몇에서 감소를 멈출지 결정 하는 변수
        n_neurons (list(int)): inner layer의 뉴런 수가 담긴 list
        activations (list): inner layer에 따른 activation function list
        loss (obj): Loss function
        optimizer (obj): 학습률을 줄여나가고 속도를 계산하여 학습의 갱신강도를 적응적으로 조정해나가는 방법
        replay_start_size: 학습에 필요한 최소 값
    '''

    def __init__(self, state_size, mem_size=10000, discount=0.95,
                 epsilon=1, epsilon_min=0, epsilon_stop_episode=500,
                 n_neurons=[32, 32], activations=['relu', 'relu', 'linear'],
                 loss='mse', optimizer='adam', replay_start_size=None):

        assert len(activations) == len(n_neurons) + 1

        self.state_size = state_size
        self.memory = deque(maxlen=mem_size)
        self.discount = discount
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = (self.epsilon - self.epsilon_min) / (epsilon_stop_episode)
        self.n_neurons = n_neurons
        self.activations = activations
        self.loss = loss
        self.optimizer = optimizer
        if not replay_start_size:
            replay_start_size = mem_size / 2
        self.replay_start_size = replay_start_size
        self.model = self._build_model()

    def _build_model(self):
        # keras 인공신경망 모델 생성
        model = Sequential()
        # n_nourons[0]만큼 node를 생성하고, input_dim은 input data로 부터 몇개의 값이 들어올지 정해주는 것.
        model.add(Dense(self.n_neurons[0], input_dim=self.state_size, activation=self.activations[0]))

        # hidden layer 형성
        for i in range(1, len(self.n_neurons)):
            model.add(Dense(self.n_neurons[i], activation=self.activations[i]))
        # Output layer 형성
        model.add(Dense(1, activation=self.activations[-1]))

        model.compile(loss=self.loss, optimizer=self.optimizer)

        return model

    def add_to_memory(self, current_state, next_state, reward, done):
        '''Adds a play to the replay memory buffer'''
        # 'done'은 경기가 진행중인지 끝났는지에 대한 bool.
        self.memory.append((current_state, next_state, reward, done))

    def random_value(self):
        # random한 값을 return
        return random.random()

    def predict_value(self, state):
        # state에 대한 예측 값을 return
        return self.model.predict(state)[0]

    def act(self, state):
        # 더 좋은 방법을 모험적으로 찾기 위해서 기존에 정해 놓은 epsilon보다 random value가 작으면 random_value를 return
        # 그렇지 않다면 predict_value를 return
        state = np.reshape(state, [1, self.state_size])
        if random.random() <= self.epsilon:
            return self.random_value()
        else:
            return self.predict_value(state)

    def best_state(self, states):
        '''Returns the best state for a given collection of states'''
        # 주어진 states에서 최고의 state를 찾아서 리턴한다.
        # 여기서 또한 모험적으로 더 좋은 방법을 찾기위해 epsilon보다 random vlaue가 작으면 random하게 골라서 return
        max_value = None
        best_state = None

        if random.random() <= self.epsilon:
            return random.choice(list(states))

        else:
            for state in states:
                value = self.predict_value(np.reshape(state, [1, self.state_size]))
                if not max_value or value > max_value:
                    max_value = value
                    best_state = state

        return best_state

    def train(self, batch_size=32, epochs=3):
        # model을 학습
        n = len(self.memory)

        # memory buffer에 들어있는 값의 수가 batch_size와 replay_start_size보다 클 때 학습을 진행.
        if n >= self.replay_start_size and n >= batch_size:

            batch = random.sample(self.memory, batch_size)

            # batch에서 array를 만들고 predict()를 이용해 next_states의 출력값을 next_qs에 담는다.
            next_states = np.array([x[1] for x in batch])
            next_qs = [x[0] for x in self.model.predict(next_states)]

            x = []
            y = []

            # batch에서 model 학습을 위해 xy structure를 만든다.
            for i, (state, _, reward, done) in enumerate(batch):
                if not done:
                    # Partial Q formula
                    new_q = reward + self.discount * next_qs[i]
                else:
                    new_q = reward

                x.append(state)
                y.append(new_q)

            # 주어진 값에 따라서 model을 학습시킨다.
            self.model.fit(np.array(x), np.array(y), batch_size=batch_size, epochs=epochs, verbose=0)

            # 탐색을 위한 epsilon값을 update한다.
            if self.epsilon > self.epsilon_min:
                self.epsilon -= self.epsilon_decay