- CNN을 이용해 상태마다 각 행동의 큐 함수들을 근사해줌.
- Q Learning Algorithm : 모든 상태와 행동에 대한 Q함수 값을 따로 저장, 학습수행, 행동 결정.
    - 단, 매우 많은 상황이 존재하는 환경에서는 적합하지 않음. 
    - ex) pong game
    
- 타겟(y) 네트워크, 일반 네트워크 2종류의 네트워크가 사용.

    $$ y = r + \gamma(\max{Q}(s_{t+1}, a^\prime;\theta^{-})$$
    
$$ \theta : \text{일반 네트워크 변수}, \theta^{-} : \text{타켓 네트워크 변수} $$

- 다음 Step에서 게임이 종료된 경우 게임이 계속 진행되는 경우.
- 타겟값 y와 예측값 Q의 차이가 최소가 되도록 학습 수행.

In [1]:
import numpy as np
import random
import datetime
import time
import tensorflow as tf

# DQN의 리플레이 메모리 역할 수행.
from collections import deque

# Unity로 만든 환경을 로드하기 위해 불러옴.
from mlagents_envs.environment import UnityEnvironment

gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)

In [2]:
# DQN 파라미터 값 설정.
state_size  = [84, 84, 3]
action_size = 5

load_model  = False

# 에이전트 학습 수행.
train_mode  = True

batch_size      = 32
mem_maxlen      = 50000

# 에이전트가 학습을 수행할 때 얼마나 미래의 보상을 고려할지를 결정하는 감가율
# 0 ~ 1 사이의 값으로 설정. ==> 값이 클수록 미래의 보상을 많이 고려.
discount_factor = 0.9
# 네트워크 학습을 얼마 빠르게 수행할지 결정하는 파라미터.
# 값이 너무 작으면 학습속도가 느려지고, 이 값이 너무 크면 학습이 불안정하게 수행됨.
learning_rate   = 0.00025

# 총 몇 번의 에피소드 동안 학습을 수행할지를 결정.
# 한 에피소드는 승리 혹은 패배 등의 이유로 게임이 한 번 끝나는 상황을 의미.
run_episode  = 25000

# 학습이 끝나고 테스트할 때 학습된 에이전트로 몇 에피소드 동안 게임을 진행할지를 결정.
# 테스트의 경우 일반적으로 랜덤 행동 수행이 아닌 네트워크의 학습 결과만을 가지고 에이전트가 행동을 택함.
test_episode = 1000

# 학습을 시작하기 전에 리플레이 메모리에 충분한 데이터를 모으기 위해 몇 에피소드 동안 임의의 행동으로 게임을 진행할 것인지 결정.
start_train_episode = 1000

# 타겟 네트워크를 몇 스텝 주기로 업데이트할지를 결정.
target_update_step = 10000

# 에피소드가 진행될 때마다 학습 상황을 파악할 수 있는 지표를 출력.
print_interval = 100

# 설정된 에피소드마다 학습이 진행되고 있는 모델을 저장해 나중에 불러옴.
# 즉, 5000 에피소드마다 저장.
save_interval = 5000

# 탐험을 위한 행동을 취할 확률인 앱실론의 초기값 설정.
epsilon_init = 1.0

# 앱실론의 최소값 설정.
epsilon_min = 0.1

# 소코반 환경 설정( 게임판 크기=5, 초록색 +의 수=1, 박스의 수=1)
# 소코반 환경 : 84 X 84 RGB 이미지를 상태로 학습 수행. => test_size = [84, 84, 3]
# 소코반 환경은 [정지, 위, 아래, 왼쪽, 오른쪽]의 5가지 행동을 하므로 action_size = 5 
# 소코반 환경의 resetParameter 값들 설정.
env_config = {"gridSize" : 5, "numGoals" : 1, "numBoxs" : 1}

# 실행 날짜, 시각 설정.
# 모델과 텐서보드 파일을 저장할 폴더를 생성할 때 폴더 이름이 중복되지 않기 위함.
date_time = datetime.datetime.now().strftime("%Y%m%d-%H-%M-%S")

# Unity 환경 경로.
game = "Sokoban"
env_name = "../env/" + game + "/Windows/" + game

# 모델 저장 및 불러오기 경로,
save_path = "../saved_models/" + game + "/" + date_time + "_DQN"
load_path = "../saved_models/" + game + "/20210419-13-36-21_DQN/model/model"

#### Model Class

In [3]:
# Model Class ==> CNN 정의 및 Loss function 설정. 네트워크 최적화 알고리즘 결정.
class Model() :
    def __init__(self, model_name) :
        
        # tf.placeholder : 나중에 값을 던져주는 공간을 만들어줌.
        self.input = tf.placeholder(shape=[None, state_size[0], state_size[1], state_size[2]], dtype=tf.float32)
        
        # 학습의 안정화를 위해 -1 ~ 1 사이의 값을 가지도록 정규화.
        self.input_nomalize = (self.input - (255 / 2)) / (255 / 2)
        
        # CNN 구축. ==> 3개의 CNN 충과 2개의 은닉층으로 구성.
        with tf.compat.v1.variable_scope(name_or_scope=model_name) :
            
            # 합성곱 1 : 필터 크기 => 8 X 8 , 필터 수 => 32개
            self.conv1 =  tf.layers.conv2d(inputs=self.input_nomalize, filters=32, 
                                                 activation=tf.nn.relu, kernel_size=[8,8],
                                                 strides=[4,4], padding="SAME")
            
            # 합성곱 2
            self.conv2 = tf.layers.conv2d(inputs=self.conv1, filters=64, 
                                                 activation=tf.nn.relu, kernel_size=[4,4],
                                                 strides=[2,2], padding="SAME")
            
            # 합성곱 3
            self.conv3 = tf.layers.conv2d(inputs=self.conv1, filters=64, 
                                                 activation=tf.nn.relu, kernel_size=[3,3],
                                                 strides=[1,1], padding="SAME")
            # 완전 연결층 1 : node = 512
            self.flat = tf.layers.flatten(self.conv3)
            
            # 완전 연결층 2 : 출력층
            self.fc1 = tf.layers.dense(self.flat, 512, activation=tf.nn.relu)
            # 네트워크를 통해 예측한 값.
            self.Q_Qut =  tf.layers.dense(self.fc1, action_size, activation=None)
        self.predict = tf.argmax(self.Q_Qut, 1)
        
        # 학습의 목표값.
        self.target_Q = tf.placeholder(shape=[None, action_size], dtype=tf.float32)
        
        # 손실함수 값 계산 및 네트워크 학습 수행.
        # 예측한 값과 학습의 목표값 사이의 차를 줄이는 방향으로 손실함수 설정.
        self.loss = tf.losses.huber_loos(self.target_Q, self.Q_Qut)
        
        # Adam 사용.
        self.UpdateModel = tf.train.AdamOptimizer(learning_rate).minimize(self.loss)
        
        # model_name으로 시작하는 변수들만 지정.
        self.trainable_var = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES)
        
        

#### Agent Class

In [4]:
# DQNAgent class ==> DQN 알고리즘을 위한 다양한 함수 정의.
class DQNAgent() :
    def __init__(self):
        
        # Class 함수들을 위한 값 설정.
        # 두 개(Q, target)의 네트워크 생성.
        self.model = Model('Q')
        self.target_model = Model("target")
        
        # 학습에 필요한 정보를 저장하기 위한 리플레이 메모리 생성.
        # deque() : 자동으로 오래된 데이터를 삭제하고 새로운 데이터 추가.
        self.memory = deque(maxlen=mem_maxlen)
        
        # 연산 수행 및 그래프 실행을 위한 세션 생성. 
        self.sess = tf.Session()
        
        # 네트워크의 모든 변수 초기화.
        self.init = tf.global_variables_initializer()
        
        # 세션 수행.
        self.sess.run(self.init)
        
        self.epsilon = epsilon_init
        
        # 학습 모델 저장 및 불러오기.
        self.Saver = tf.train.Saver()
        
        # 학습한 내용을 텐서보드에 표시하기 위해 Summary 생성.
        self.Summary, self.Merge = self.Make_Summary()
        
        # 학습을 시작하기 전에 타겟 네트워크를 업데이트.
        self.update_target()
        
        # 로드 모드가 참이면 경로에 저장된 네트워크 모델을 불러오기.
        if load_model == True :
            self.Saver.restore(self.sess, load_path)
            
    # 에이전트 행동을 선택하는 함수.
    def get_action(self, state) :
        # 이와 같은 경우 조건을 충족시
        if self.epsilon > np.random.rand():
            
            # 앱실론 그리드에 따라 랜덤 행동 결정.
            return np.random.randint(0, action_size)
        
        else :
            # 네트워크 연산에 따라 행동 결정.
            predict = self.sess.run(self.model.predict, feed_dict={self.model.input:state})
            return np.asscalar(predict)
        
    # [상태, 행동, 보상, 다음 상태, 게임 종료 여부] 입력 받아 사용.
    # 리플레이 메모리 self.memory에 이 값들을 하나의 튜플로 추가.
    def append_sample(self, state, action, reward, next_state, done) :
        self.memory.append((state[0], action, reward, next_state[0], done))
        

    # 네트워크 모델 저장.
    # 모델 파일은 Save_path 내부에 model이라는 폴더를 만들고 model로 저장.
    def save_model(self) :
        self.Saver.save(self.sess, save_path + "/model/model")
        
    
    # 학습 수행.
    # 미니 배치 학습을 위해 배치 데이터를 뽑고, 네트워크 학습을 수행하는 함수.
    def train_model(self, done) :
        # done : 게임 종료 여부가 True 라면
        if done :
            # 앱실론의 최솟값 보다 크다면
            # epsilon - greedy 기법을 사용할 때 앱실론이 너무 낮은 상태로 학습하면 랜덤성이 거의 없어 탐험의 의미가 사라짐.
            # 또한 앱실론이 큰 상태에서 학습을 끝내면 에이전트가 아직 목표를 달성하기 위한 정확한 방법을 모를 수 있음.
            if self.epsilon > epsilon_min :
                # (전체 학습 에피소드 - 학습 시작 에피소드)의 역수 만큼 앱실론 값 감소.
                # 즉, 한 에피소드가 끝날 때마다 앱실론 감소.
                self.epsilon -= 1 / (run_episode - start_train_episode)
                
        
        # 학습을 위한 미니 배치 데이터 샘플링.
        mini_batch = random.sample(self.memory, batch_size)
        
        # 각 데이터를 담을 리스트 생성.
        states      = []
        actions     = []
        rewards     = []
        next_states = []
        dones       = []
        
        # batch_size 만큼 상태, 행동, 보상 , 다음 상태, 게임 종료 여부의 순으로 담기.
        for i in range(batch_size) :
            states.append(mini_batch[i][0])
            actions.append(mini_batch[i][1])
            rewards.append(mini_batch[i][2])
            next_states.append(mini_batch[i][3])
            dones.append(mini_batch[i][4])
            
        # 타겟값 계산.
        target = self.sess.run(self.model.Q_Qut, feed_dict={self.model.input : states})
        target_val = self.sess.run(self.target_model.Q_Qut, feed_dict={self.target_model.input : next_states})
        
        for i in range(batch_size) :
            if dones[i] :
                target[i][actions[i]] = rewards[i]

            else :
                target[i][actions[i]] = rewards[i] + dicount_factor * np.amax(target_val[i])
            
        
        # 학습 수행 및 손실 함수 값 계산.
        _, loss = self.sess.run([self.model.UpdateModel, self.model.loss], feed_dict={self.model.input : states, self.model.target_Q : target})
        
        return loss
    
    # 타겟 네트워크 업데이트.
    def update_target(self) :
        for i in range(len(self.model.trainable_var)) :
            self.sess.run(self.target_model.trainable_var[i].assign(self.model.trainable_var[i]))


    # 텐서 보드에 기록할 값 설정 및 데이터 기록.
    def Make_Summary(self) :
        self.summary_loss = tf.placeholder(dtype=tf.float32)
        self.summary_reward = tf.placeholder(dtype=tf.float32)
        tf.summary.scalar("loss", self.summary_loss)
        tf.summary.scalar("reward", self.summary_reward)
        Summary = tf.summary.FileWriter(logdir=save.path, graph=self.sess.graph)
        Merge = tf.summary.merge_all()
        
        return Summary, Merge
    
    def Wirte_Summary(self, reward, loss, episode) :
        self.Summary.add_summary(
            self.sess.run(self.Merge, feed_dict={self.summary_loss : loss, 
                                                self.summary_reward : reward}), episode)

#### Main Function 
- Model Class에서 정의한 네트워크와 Agent 클래스에서 정의한 다양한 함수들을 이용해 행동을 결정하고 유니티 환경과 통신하며 학습을 수행하는 함수.

#### Step 
- 1. unity 환경 설정 & 브레인 정의.
- 2. 행동 선택.
- 3. Unity 환경에서 행동 수행.
- 4. Unity 환경으로부터 다음 상태, 보상, 게임 종료 등 정보 취득.
- 5. 학습 수행.
- 6. 특정 스텝마다 타켓 네트워크 업데이트.
- 7. 진행 상황 출력 및 특정 스텝마다 모델 저장.
- 8. step2로 이동하여 2 ~ 8의 과정을 지정 횟수만큼 반복.

In [None]:
# Main 함수 ==> 전체적으로 DQN 알고리즘을 진행.
if __name__ == '__main__' :
    # 유니티 환경 경로 설정 (file_name)
    env = UnityEnvironment(file_name=env_name)
    
    # 유니티 브레인 설정.
    default_brain = env.brain_names[0]
    brain = env.brains[default_brain]
    
    # DQNAgent 클래스를 agent로 정의.
    agent = DQNAgent()
    
    step = 0 
    rewards = []
    losses = []
    
    # 환경 설정 (env_config)에 따라 유니티 환경 리셋 및 학습 모드 설정.
    env_info = env.reset(train_mode = train_mode, config=env_config)[default_brain]
    
    # 게임 진행 반복문.
    for episode in range(run_episode + test_episode) :
        if episode == run_episode :
            train_mode = False
            env_info = env.reset(train_mode = train_mode)[default_brain]
            
        # 상태, episode_rewards, done 초기화.
        state = np.unit8(255 * np.array(env_info.visual_observations[0]))
        episode_rewards = 0 
        done = False
        
        # 한 에피소드를 진행하는 반복문.
        while not done :
            step += 1
            
            # 행동 결정 및 유니티 환경에 행동 적용.
            action = agent.get_action(state, train_mode)
            env_info = env.step(action)[default_brain]
            
            # 다음 상태, 보상, 게임 종료 정보 취득.
            next_state =  np.unit8(255 * np.array(env_info.visual_observations[0]))
            reward = env_info.rewards[0]
            episode_rewards += reward
            done = env_info.local_done[0]
            
            # 학습 모드인 경우 리플레이 메모리에 데이터 저장.
            if train_mode :
                agent.append_sample(state, action, reward, next_state, done)
                
            else :
                time.sleep(1)
                agent.epsilon = 0.05
                
            # 상태 정보 업데이트.
            state = next_state
            
            if episode > start_train_episode and train_mode :
                # 학습 수행.
                loss = agent.train_model(done)
                losses.append(loss)
                
                # 타겟 네트워크 업데이트.
                if step % (target_update_step) == 0 :
                    agent.update_target()
                    
        rewards.append(episode_rewards)
        
        # 게임 진행 상황 출력 및 텐서보드에 보상과 손실함수 값 기록.
        if episode % print_interval == 0 and episode != 0 :
            print(f"step : {step} episode : {episode} reward : {np.mean(rewards)} loss : {np.mean(losses)} epsilon : {agent.epsilon}")
            agent.Write_Summary(np.mean(rewards), np.mean(losses), episode)
            rewards = []
            losses = []
            
        # 네트워크 모델 저장.
        if episode % save_interval == 0 and episode != 0 :
            agent.save_model()
            print(f"Save Model {episode}")
            
    env.close()