<a href="https://colab.research.google.com/github/jonghechoi/jonghe/blob/master/DQN_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
pip install pygame

Collecting pygame
[?25l  Downloading https://files.pythonhosted.org/packages/8e/24/ede6428359f913ed9cd1643dd5533aefeb5a2699cc95bea089de50ead586/pygame-1.9.6-cp36-cp36m-manylinux1_x86_64.whl (11.4MB)
[K     |████████████████████████████████| 11.4MB 4.7MB/s 
[?25hInstalling collected packages: pygame
Successfully installed pygame-1.9.6


In [0]:
import os
import pickle
import pygame 
import random 
import numpy as np
from collections import deque
from skimage.color import rgb2gray
from skimage.transform import resize
from matplotlib import pyplot as plt
from keras import backend as K
from keras.models import Sequential
from keras.layers.convolutional import Conv2D
from keras.layers import Dense, Flatten
from keras.optimizers import RMSprop

# GCP서버의 os 우분투에는 SDL이 제대로 설정되어 있지 않아 pygame.display를 불러오지 못한다(error: Not available Video device )
# 따라서 아래와 같은 처리를 해줘야 한다.
os.environ["SDL_VIDEODRIVER"] = "dummy"

FIGHTER_MOVE = [-10,10,0]

EPISODES = 30000

class Agent:
    
    def __init__(self,gamepad,fighter, action_size ):
        self.gamepad = gamepad
        self.fighter = fighter
        self.fires = []
        self.action_size = action_size
        self.state_size = (100, 80, 4)
        
        # DQN 하이퍼파라미터
        self.epsilon = 1.
        self.epsilon_start, self.epsilon_end = 1.0, 0.1
        self.exploration_steps = 10000.
        self.epsilon_decay_step = (self.epsilon_start - self.epsilon_end) / self.exploration_steps
        self.batch_size = 500
        self.train_start = 8000
        self.update_target_rate = 10000
        self.discount_factor = 0.99        
        
        # 리플레이 메모리, 최대 크기 10000
        self.memory = deque(maxlen=10000)
        
        # 모델과 타겟모델을 생성하고 타겟모델 초기화
        self.model = self.build_model()
        self.target_model = self.build_model()
        self.update_target_model()
        self.optimizer = self.optimizer()
        
        self.avg_q_max, self.avg_loss = 0, 0
        
    # reset, addfire 함수는 pygame이 작동되기 위한 기본코드이다.
    def reset(self):
        for fire in self.fires:
            fire.reset()
        self.fighter.reset()    
    
        
    def addfire(self, fire):
        self.fires.append(fire)
        
    def optimizer(self):
        a = K.placeholder(shape=(None,), dtype='int32')
        y = K.placeholder(shape=(None,), dtype='float32')

        prediction = self.model.output

        # action(0 or 1 or 2)이 원-핫인코딩 되고
        # 원-핫인코딩 [x,y,z]의 값이 model의 예측값과 곱해진다.
        # 이것은 다시 axis=1로 sum이 되고 결국은 행해진 action의 Q값만이 출력된다.
        a_one_hot = K.one_hot(a, self.action_size)
        q_value = K.sum(prediction * a_one_hot, axis=1)  
        error = K.abs(y - q_value)
        quadratic_part = K.clip(error, 0.0, 1.0)
        linear_part = error - quadratic_part
        loss = K.mean(0.5 * K.square(quadratic_part) + linear_part)

        optimizer = RMSprop(lr=0.00025, epsilon=0.01)
        updates = optimizer.get_updates(self.model.trainable_weights, [], loss)
        train = K.function([self.model.input, a, y], [loss], updates=updates)        

        return train

    def build_model(self):
        model = Sequential()
        model.add(Conv2D(32, (8, 8), strides=(4, 4), activation='relu', input_shape=self.state_size))
        model.add(Conv2D(64, (4, 4), strides=(2, 2), activation='relu'))
        #model.add(Conv2D(64, (3, 3), strides=(1, 1), activation='relu'))
        model.add(Flatten())
        model.add(Dense(256, activation='relu'))
        model.add(Dense(256, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(self.action_size))
        model.summary()

        return model

    # 타겟 모델을 모델의 가중치로 업데이트
    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def train_model(self):
        if self.epsilon > self.epsilon_end:                              # epsilon의 초기값 1에서 episilon_decay_step(0.00000009)만큼 게속 줄어들고
            self.epsilon -= self.epsilon_decay_step                      # epsilon이 0.1보다 작아지면
                                                                         
        mini_batch = random.sample(self.memory, self.batch_size)

        history = np.zeros((self.batch_size, self.state_size[0], self.state_size[1], self.state_size[2]))
        next_history = np.zeros((self.batch_size, self.state_size[0], self.state_size[1], self.state_size[2]))
        target = np.zeros((self.batch_size,))
        action, reward = [], []

        for i in range(self.batch_size):
            history[i] = np.float32(mini_batch[i][0] / 255.)
            next_history[i] = np.float32(mini_batch[i][3] / 255.)
            action.append(mini_batch[i][1])
            reward.append(mini_batch[i][2])


        target_value = self.target_model.predict(next_history)

        for i in range(self.batch_size):
            if reward[i] < 0:
                target[i] = reward[i]
            else:
                target[i] = reward[i] + self.discount_factor * np.amax(target_value[i])

        loss = self.optimizer([history, action, target])
        self.avg_loss += loss[0]


    
    # 입실론 탐욕 정책으로 행동 선택(exploit & expor;lation)
    def get_action(self, history):
        history = np.float32(history / 255.0)
        if np.random.rand() <= self.epsilon:
            direction = random.randrange(self.action_size)
            x = FIGHTER_MOVE[direction]
            self.fighter.setPos(x)
            self.fighter.move()
            
            return direction
        
        else:
            q_value = self.model. predict(history)
            direction = np.argmax(q_value[0])
            x = FIGHTER_MOVE[direction]
            self.fighter.setPos(x)
            self.fighter.move() 
            
            return direction
    
    # 샘플 <s, a, r, s'>을 리플레이 메모리에 저장
    def append_sample(self, history, action, reward, next_history):
        self.memory.append((history, action, reward, next_history))

    

class Fighter:
    def __init__(self,background,gamepad):
        global pad_width, pad_height 
        
        self.background = background
        self.gamepad = gamepad
        self.fighter = pygame.image.load('/content/drive/My Drive/test_colab/fighter.png')
        self.pad_width = 600
        self.pad_height = 700
        
        # 전투기 초기 위치(x,y)
        self.x = pad_width * 0.45
        self.y = pad_height * 0.88
        self.gamepad.blit(self.fighter,(self.x,self.y))
        self.fighter_width = 50
        self.fighter_height = 70
        self.x_change = 0
        
    def reset(self):
        self.x = pad_width * 0.45
        self.y = pad_height * 0.88
        self.gamepad.blit(self.fighter,(self.x,self.y))
        self.fighter_width = 50
        self.fighter_height = 70
        self.x_change = 0

        
    def move(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT: # 마우스로 창을 닫는 이벤트
                pygame.quit()
            if event.type == pygame.KEYDOWN:
                if event.key == pygame.K_LEFT:
                    self.x_change -= 8
                elif  event.key == pygame.K_RIGHT:
                    self.x_change += 8
            if event.type == pygame.KEYUP:
                if event.key == pygame.K_LEFT or event.key == pygame.K_RIGHT:
                    self.x_change = 0
        
        # 전투기 위치를 재조정
        self.x += self.x_change      # 현재 전투기 x좌표
                       
        if self.x < 0:
            self.x = 0
        elif self.x > self.pad_width - self.fighter_width:
            self.x = self.pad_width - self.fighter_width

        self.gamepad.blit(self.fighter,(self.x,self.y))
        
    def setPos(self, x):
        self.x_change = x
        
   
                

class Fire:
    def __init__(self,gamepad, fighter, speed):
        global fire_width, fire_height
        
        self.gamepad = gamepad
        self.fighter = fighter
        self.fire_width = 50
        self.fire_height = 70
        self.speed = speed
        self.fire = pygame.image.load('/content/drive/My Drive/test_colab/fire2.png')
        self.fire_x = random.randrange(0, pad_width-self.fire_width)
        self.fire_y = -5 


    def move(self):
        if self.fire_y >= pad_height:
            self.fire = pygame.image.load('/content/drive/My Drive/test_colab/fire2.png')
            self.fire_x = random.randrange(0, pad_width-self.fire_width)
            self.fire_y = -5
            
        self.fire_y += self.speed
                
        self.gamepad.blit(self.fire,(self.fire_x,self.fire_y))


    def reset(self):
        self.fire = pygame.image.load('/content/drive/My Drive/test_colab/fire2.png')
        self.fire_x = random.randrange(0, pad_width-self.fire_width)
        self.fire_y = -5 
        pos = [self.fire_x, self.fire_y]
        if pos[1] >= pad_height:
            self.fire = pygame.image.load('/content/drive/My Drive/test_colab/fire2.png')
            pos[0] = random.randrange(0, pad_width-self.fire_width)
            pos[1] = -5
            
        pos[1] += self.speed
                
        self.gamepad.blit(self.fire,(pos[0],pos[1]))

        
        
    def hit(self,pos):
        fpos = [self.fighter.x, self.fighter.y]
        if pos[1] + self.fire_height > fpos[1] or pos[1] > fpos[1] + self.fighter.fighter_height :
            if (pos[0] > fpos[0] and pos[0] < fpos[0] + self.fighter.fighter_width) or \
            (pos[0] + self.fire_width > fpos[0] and pos[0] + self.fire_width < fpos[0] + self.fighter.fighter_width):
                return True
        else:
            return False
            
    
    def is_hit(self):
        pos = [self.fire_x, self.fire_y]
        return self.hit(pos)
        
    
    def is_bottom_hit(self):
        pos = [self.fire_x, self.fire_y]
        if pos[1] >= pad_height:
            return True
        return False
    

    def setPos(self,x,y):
        self.fire_x = x
        self.fire_y = y
        
        
        

class Background:
    def __init__(self, gamepad):
        global pad_width, pad_height
        
        self.gamepad = gamepad
        self.gamepad.fill((1,1,1))
        self.space1 = pygame.image.load('/content/drive/My Drive/test_colab/back.png')
        self.space2 = self.space1.copy()

        self.background_height = -700
        self.background_y = 0
        self.background2_y = self.background_height

    def move(self):        
        self.background_y += 2
        self.background2_y += 2
        
        if self.background_y == -self.background_height:
            self.background_y = self.background_height
            
        if self.background2_y == -self.background_height:
            self.background2_y = self.background_height
        
        self.gamepad.blit(self.space1,(0,self.background_y))
        self.gamepad.blit(self.space2,(0,self.background2_y))


    

# 전처리 코드
def pre_processing(gamepad):
    a = pygame.surfarray.array3d(gamepad)
    aa = a.transpose(1, 0, 2)
    processed_img = np.uint8(resize(rgb2gray(aa), (100, 80), mode='constant') * 255)

    #plt.imshow( processed_img, cmap='gray')
    #plt.show()

    return processed_img


# 실행 코드
if __name__ == "__main__":
    pygame.init()
    pygame.display.set_caption('흥만티 Game') 
    pad_width = 600  
    pad_height = 700
    gamepad = pygame.display.set_mode((pad_width, pad_height))
    background = Background(gamepad)
    fighter = Fighter(background,gamepad)
    clock = pygame.time.Clock() #초당 프레임수를 설정할 수 있는 Clock객체 생성
    
    fire1 = Fire(gamepad, fighter, speed = 12)
    fire2 = Fire( gamepad, fighter, speed = 11)
    fire3 = Fire( gamepad, fighter, speed = 15)

    agent = Agent(gamepad,fighter,action_size=3)
    
    agent.addfire(fire1)
    agent.addfire(fire2)
    agent.addfire(fire3)

    num_of_evasions, episodes, global_step = [], [], 0
    average_loss_list = []
    
    for e in range(EPISODES):
        done = False
        
        step, reward, num_of_evasion, frame_skip = 0, 0, 0, 0 # score와 start_life는 시온이한테는 필요 없다. 점수나 불에 맞은 횟수를 측정할 필요는 없다.
    
        while not done:
            clock.tick(1000)
            global_step += 1
            step += 1
            
            background.move()
            for fire in agent.fires:
                fire.move()
                
                if fire.is_hit():
                    done = True
                    reward = -5
                    agent.reset()            # fighter, fire 위치 초기화
                    
                elif fire.is_bottom_hit():
                    num_of_evasion += 1
                    reward = 5
            
            if step == 1:                    # step이 1일때의 화면을 4장으로 넣는다.
                state = pre_processing(gamepad)
                history = np.stack((state, state, state, state), axis=2)
                history = np.reshape([history], (1, 100, 80, 4)) 
            
            # step이 1일때 가져온 첫화면 4장을 가지고 행동 선택
            # agent.get_action을 하게 되면 fighter가 움직인다. env.step()을 한 것과 같다.
            action = agent.get_action(history)
            
            
            # step이 1을 넘긴 다음부터는 타임스텝 5단위 당 1장의 이미지만 가져와서 history에 넣는다.
            # 5의 배수인 이미지들만 학습에 사용된다는 의미일 뿐, agent.get_action은 while문에 의해서 시온이를 계속 움직인다.
            frame_skip += 1
            if frame_skip % 2 == 0: 
                frame_skip = 0
                next_state = pre_processing(gamepad)
                next_state = np.reshape([next_state], (1, 100, 80, 1))
                next_history = np.append(next_state, history[:, :, :, :3], axis=3)
                
                # 새로운 화면이 들어왔을때 기존 화면의 q_max 값을 구한다.(avg_q_max 값을 print로 보여주기 위해서)
                agent.avg_q_max += np.amax(agent.model.predict(np.float32(history / 255.))[0]) # 모델에서 예측한 값들중 가장 큰 값을 avg_q_max에 추가한다. 왜?? 그 값이 action을 일으켰기 때문!!
                
                # 샘플 <s, a, r, s'>을 리플레이 메모리에 저장 후 학습
                # frome_skip이 5의 배수인 이미지들이 순차적으로 쌓인다.
                agent.append_sample(history, action, reward, next_history)        
                
                
                # len(agent.memory)은 적어도 agent.train_start(50000)보다 커야 모델이 학습되는것 아닌가?? 
                if len(agent.memory) >= agent.train_start:
                    agent.train_model()           
                       
                # 일정 시간마다 타겟모델을 모델의 가중치로 업데이트
                if global_step % agent.update_target_rate == 0:
                    agent.update_target_model()            
                
                history = next_history
                
            if done:
                print("episode:", e, "num_of_evasion:", num_of_evasion, "memory length:", 
                      len(agent.memory), "epsilon:", agent.epsilon, "global_step:", global_step,
                      "average_q:", agent.avg_q_max / float(step), "average_loss:",
                      agent.avg_loss / float(step))
                
                # avg_loss값을 그래프화 하기
                average_loss_list.append(agent.avg_loss)
                agent.avg_q_max, agent.avg_loss = 0, 0

            # Google colab 버전 : 학습 모델을 episode 2000번 마다 가중치/loss 저장
            if e % 2000 == 0: 
                agent.model.save_weights('/content/drive/My Drive/cion_eposide(50000).h5')

                f = open('/content/drive/My Drive/cion_final_graph.pickle', 'wb')
                pickle.dump(average_loss_list, f)
                f.close()

            pygame.display.update() # 게임화면 update
    







Model: "sequential_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_7 (Conv2D)            (None, 24, 19, 32)        8224      
_________________________________________________________________
conv2d_8 (Conv2D)            (None, 11, 8, 64)         32832     
_________________________________________________________________
flatten_3 (Flatten)          (None, 5632)              0         
_________________________________________________________________
dense_9 (Dense)              (None, 256)               1442048   
_________________________________________________________________
dense_10 (Dense)             (None, 256)               65792     
_________________________________________________________________
dense_11 (Dense)             (None, 64)                16448     
_________________________________________________________________
dense_12 (Dense)             (None, 3)                