In [23]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers.legacy import Adam
import gym
import numpy as np
import random as rand

In [25]:
class Agent(object):
    def __init__(self):
        # 프로그램 동작 설정
        self.env = gym.make("CartPole-v1")
        self.state_size = self.env.observation_space.shape[0] # cartpole이 가진 state 개수
        self.action_size = self.env.action_space.n # cartpole의 agent가 선택할 수 있는 action의 개수

        # model의 설정
        self.node_num = 12 # nn layer에 들어있는 node의 개수
        self.learning_rate = 0.001
        self.epochs_cnt = 5
        self.model = self.build_model()
        
        # 학습 설정
        self.discount_rate = 0.97
        self.penalty = -100 # 막대가 cartpole에서 떨어져 종료되었을 때 받을 음의 보상의 크기
        
        # 반복 설정 영역 
        self.episode_num = 500 # 몇 번의 episode를 반복하여 data를 수집할 것인가

        # data 수집 환경
        self.replay_memory_limit = 2048
        self.replay_size = 32 # mini batch통해 학습할 데이터의 양을 지정
        self.replay_memory = [] # 수집한 cartpole의 실행 정보가 저장

        # 탐험 환경 설정
        self.epsilon = 0.99
        self.epsilon_decay = 0.2
        self.epsilon_min = 0.05

        # training monitoring
        self.moving_avg_size = 20 # moving average size3
        self.reward_list = []
        self.count_list = [] # 각 episode에서 cartpole이 실행된 횟수
        self.moving_avg_list = []

    def build_model(self): # neural net build
        input_states = Input(shape = (1,self.state_size), name = "input_states") # model : state -> q value for each action
        x = (input_states)
        x = Dense(self.node_num, activation = "relu")(x) # hidden layer의 node 개수
        out_actions = Dense(self.action_size, activation = "linear", name = "output")(x) # output layer = q value per actions
        model = tf.keras.models.Model(inputs = [input_states], outputs = [out_actions])
        model.compile(optimizer = Adam(learning_rate = self.learning_rate), loss = "mean_squared_error")
        model.summary()
        return model
    
    def train(self): # train the model
        for episode in range(self.episode_num): # step : cartpole이 한 번 실행되는 것 // episode : cartpole이 처음 실행되어 종료될 때까지의 전 과정
            state = self.env.reset() # cartpole 실행 환경의 초기화
            Q, count, reward_tot = self.take_action_and_append_memory(episode, state) # 경험을 기록

            if count < 500:
                reward_tot = reward_tot - self.penalty
            
            # 하나의 episode 종료 -> 수집된 보상과 실행 횟수 저장
            self.reward_list.append(reward_tot)
            self.count_list.append(count)
            self.moving_avg_list.append(self.moving_avg(self.count_list, self.moving_avg_size))

            # replay memory에 저장된 데이터를 replay_size만큼 가져와서 학습
            self.train_mini_batch(Q)

            # training log
            if (episode%10 == 0 ):
                print("episode : {}, moving_avg : {}, rewards_avg : {}".format(episode, self.moving_avg_list[-1],np.mean(self.reward_list)))
        self.save_model()

    def take_action_and_append_memory(self, episode, state):
        reward_tot = 0
        count = 0
        done = False

        epsilon = self.get_epsilon(episode) # epsilon 계산 w/ decay -> determin whether exploration or exploitation
        while not done: # 반복 설정
            count+=1
            print(state)
            state_t = np.reshape(state, [1,1,self.state_size])
            Q = self.model.predict(state_t) # Q값을 예측
            action = self.greed_search(epsilon, episode, Q) # action을 선택
            state_next, reward, done, none = self.env.step(action) # 행동을 실행 -> 환경이 다음 state와 reward를 return

            if done: # 막대가 바닥에 떨어진 경우
                reward = self.penalty
            self.replay_memory.append([state_t, action, reward, state_next, done]) # 경험을 replay_memory에 저장
            if len(self.replay_memory)>self.replay_memory_limit: # replay_memory가 너무 커지면 오래된 정보부터 삭제
                del self.replay_memory[0]
            reward_tot += reward
            state = state_next
        return Q, count, reward_tot
    
    def train_mini_batch(self, Q): # train via replay memory
        array_state = []
        array_Q = []
        this_replay_size = self.replay_size # 학습에 사용할 데이터의 크기
        if len(self.replay_memory) < self.replay_size: # 기존에 저장되어 있던 정보보다 작은 양의 정보가 replay_memory에 있는 경우 : exception handling
            this_replay_size = len(self.replay_memory)
        
        for sample in rand.sample(self.replay_memory, this_replay_size): # radom sampling
            state_t, action, reward, state_next, done = sample # training data 분리

            # Q값 계산
            if done : # 게임이 종료된 경우 -> 다음 상태가 없음
                Q[0, 0, action] = reward
            else: # 게임이 종료되지 않은 경우
                state_t = np.reshape(state_next, [1,1,self.state_size])
                Q_new = self.model.predict(state_t)
                Q[0, 0, action] = reward + self.discount_rate*np.max(Q_new)

            # data shape 변경
            array_state.append(state_t.reshape(1,self.state_size))
            array_Q.append(Q.reshape(1,self.action_size))
        array_state_t = np.array(array_state)
        array_Q_t = np.array(array_Q)

        # model의 학습
        hist = self.model.fit(array_state_t, array_Q_t, epochs = self.epochs_cnt, verbose = 0)

    def get_epsilon(self, episode):
        result = self.epsilon * (1- episode/(self.episode_num*self.epsilon_decay))
        if result < self.epsilon_min:
            result = self.epsilon_min
        return result
    
    def greed_search(self, epsilon, episode, Q) : 
        if epsilon > np.random.rand(1):
            action = self.env.action_space.sample()
        else:
            action = np.argmax(Q)
        return action
    
    def moving_avg(self, data, size = 10):
        if len(data) > size:
            c = np.array(data[len(data - size ): len(data)])
        else:
            c = np.array(data)
        return np.mean(c)
    
    def save_model(self):
        self.model.save("./model?dqn")
        print("****** end learning")

In [26]:
if __name__ == "__main__" : 
    agent = Agent()
    agent.train()

Model: "model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_states (InputLayer)   [(None, 1, 4)]            0         
                                                                 
 dense_4 (Dense)             (None, 1, 12)             60        
                                                                 
 output (Dense)              (None, 1, 2)              26        
                                                                 
Total params: 86 (344.00 Byte)
Trainable params: 86 (344.00 Byte)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
(array([ 0.00342101,  0.00030669,  0.00875837, -0.00253577], dtype=float32), {})


  super().__init__(name, **kwargs)


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (2,) + inhomogeneous part.