In [1]:
import os
import time
import gym
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

import numpy as np
import random
from collections import deque 
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import Dense, Embedding, Reshape
from tensorflow.keras.optimizers import Adam
from gym.envs.registration import register

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [3]:
GAMMA=0.9 #dis = .99
EPSILON=0.1
REPLAY_BUFFER_SIZE = 2000
learning_rate = 0.1 #ALPHA=0.4  # learning rate
batch_size = 256 #32 # 훈련을 위해 채울 메모리 사이즈

# Set Q-learning parameters
num_episodes = 100

#환경 변수
IDX_ACTION_LEFT = 0
IDX_ACTION_DOWN = 1
IDX_ACTION_RIGHT = 2
IDX_ACTION_UP = 3

STR_ACTION_UP = 'U'
STR_ACTION_DOWN = 'D'
STR_ACTION_RIGHT = 'R'
STR_ACTION_LEFT = 'L'

GOAL_STATE = 15



In [4]:
env_id = 'FrozenLake-v0'
env_dict = gym.envs.registry.env_specs.copy()
 
for env in env_dict:
    if env_id in env:
        print('Remove {} from registry'.format(env))
        del gym.envs.registry.env_specs[env]   

register(
    id=env_id,
    entry_point='gym.envs.toy_text:FrozenLakeEnv',
    kwargs={'map_name': '4x4',
            'is_slippery': False}
)

env = gym.make('FrozenLake-v0')

# Input and output size based on the Env
input_size = env.observation_space.n
output_size = env.action_space.n


print('취할 수 있는 상태 수: {}'.format(env.observation_space.n))
print('취할 수 있는 행동 수: {}'.format(env.action_space.n))
print('Q table size : {}x{}'.format(env.observation_space.n, env.action_space.n))

# weight
#W = tf.Variable(tf.random.uniform([input_size, output_size], 0, 0.01), dtype=tf.float32)

#optimizer = tf.optimizers.SGD(learning_rate=learning_rate)
optimizer = Adam(learning_rate=0.01)


Remove FrozenLake-v0 from registry
취할 수 있는 상태 수: 16
취할 수 있는 행동 수: 4
Q table size : 16x4


In [5]:
class Agent:
    def __init__(self, env, optimizer):  
        self._state_size = env.observation_space.n
        self._action_size = env.action_space.n
        self._optimizer = optimizer
        self.expirience_replay = deque(maxlen = REPLAY_BUFFER_SIZE) # 에이전트가 환경에 반응한 경험을 저장
        
        self.gamma = GAMMA # 할인율 초기화
        self.epsilon = EPSILON # 탐험 비율 초기화

        self.q_network = self.build_compile() # 큐-네트워크 구성
        self.target_network = self.build_compile() #  타깃 큐-네트워크 구성
        self.target_model() # 가중치를 적용

    def store(self, state, action, reward, next_state, terminated):
        self.expirience_replay.append((state, action, reward, next_state, terminated))

    # Embedding
    # _state_size : 입력에 대한 차원(총 입력 개수)으로 500
    # 출력에 대한 차원(결과로 나오는 임베딩 벡터의 크기)으로 10
    # input_length : 입력 시퀀스의 길이
    def build_compile(self):
        model = Sequential()
        model.add(Embedding(self._state_size, 10, input_length=1))
        model.add(Reshape((10,)))
        model.add(Dense(50, activation='relu'))
        model.add(Dense(50, activation='relu'))
        model.add(Dense(50, activation='relu'))
        model.add(Dense(self._action_size, activation='linear'))  # 마지막 layer : action 수만큼 hidden units 설정
        model.compile(loss='mse', optimizer=self._optimizer)
        return model

    def target_model(self):
        self.target_network.set_weights(self.q_network.get_weights()) #타겟 네트워크에 저장

    def get_target_weights(self):
        return self.target_network.get_weights()

    def act(self, state, epsilon): 
        #if np.random.rand() <= self.epsilon:
        if np.random.rand() <= epsilon:
            return env.action_space.sample()
        q_values = self.q_network.predict(state) 
        return np.argmax(q_values[0]) 

    def retrain(self, batch_size): #  큐-네트워크 훈련
        minibatch = random.sample(self.expirience_replay, batch_size) # 샘플링: 리플레이 메모리에서 랜덤한 데이터 선택

        for state, action, reward, next_state, terminated in minibatch:
            target = self.q_network.predict(state)
            if terminated:
                target[0][action] = reward
            else:
                t = self.target_network.predict(next_state)
                target[0][action] = reward + self.gamma * np.amax(t)

            self.q_network.fit(state, target, epochs=1, verbose=0) # 큐-네트워크 훈련

In [7]:
start_time = time.time()

agent = Agent(env, optimizer)
#agent.q_network.summary()

rList = []
min_act = env.observation_space.n * env.action_space.n
optimal_W = []

for i in range(num_episodes):
    b_success = False   # goal에 도착한경우
    action_cnt = 0      # action 횟수 설정

    state = env.reset()
    state = np.reshape(state, [1, 1])
    
    e = 1. / ((i / 50) + 10)
    
    done = False
    rAll = 0    
    reward = 0 #보상 변수 초기화    
    action_cnt = 0      # action 횟수 설정
    
    while not done:
        action = agent.act(state, e) 
        state_next, reward, done, info = env.step(action)  
        state_next = np.reshape(state_next, [1, 1]) #Q = R + Q
        agent.store(state, action, reward, state_next, done)

        rAll += reward
        #if timestep == 0 : print('ep[',e,']', 'state:', state, ' action:', action, ' reward:', reward, ' state_next:', state_next, 'total r:', rAll) 
        
        if done:
            if state_next == [[15]]:
                print('Find --> ep[',i,']', 'state:', state, ' action:', action, ' reward:', reward, ' next_s:', state_next, 'total r:', rAll) 
            else:
                print('Hole --> ep[',i,']', 'state:', state, ' action:', action, ' reward:', reward, ' next_s:', state_next, 'total r:', rAll)           
            break

        state = state_next
        action_cnt +=1

        if state_next == GOAL_STATE:
            b_success = True

        if len(agent.expirience_replay) > batch_size: # 어느정도 경험이 쌓인후 다시 학습시작
            agent.retrain(batch_size)

    rList.append(rAll)

    if i % 10 ==1: # 10번마다 targetDQN으로 복사
        agent.target_model() #q-tartget network 의 q 값을 재설정

    # 최단거리로 Goal간경우 q_value를 optimal value로 설정
    if b_success and action_cnt < min_act:            
        min_act = action_cnt
        optimal_W = agent.get_target_weights()

    if (i + 1) % 10 == 0:
        print("**********************************")
        print("Episode: {}".format(i + 1))
        env.render() 
        print("**********************************")



Hole --> ep[ 0 ] state: [[4]]  action: 2  reward: 0.0  next_s: [[5]] total r: 0.0
Hole --> ep[ 1 ] state: [[8]]  action: 1  reward: 0.0  next_s: [[12]] total r: 0.0
Hole --> ep[ 2 ] state: [[8]]  action: 1  reward: 0.0  next_s: [[12]] total r: 0.0
Hole --> ep[ 3 ] state: [[8]]  action: 1  reward: 0.0  next_s: [[12]] total r: 0.0
Hole --> ep[ 4 ] state: [[8]]  action: 1  reward: 0.0  next_s: [[12]] total r: 0.0
Hole --> ep[ 5 ] state: [[8]]  action: 1  reward: 0.0  next_s: [[12]] total r: 0.0
Hole --> ep[ 6 ] state: [[1]]  action: 1  reward: 0.0  next_s: [[5]] total r: 0.0


KeyboardInterrupt: ignored

In [None]:
print(f'{(time.time() - start_time)} seconds')
print("Success rate: " + str(sum(rList) / num_episodes))
plt.bar(range(len(rList)), rList, color='b', alpha=0.4)
plt.show()

In [None]:
# Q-table 시각화
def print_str_direct(q_value):
    cnt = 0
    while cnt < len(q_value):
        txt = ''
        for _ in range(4):
            # q-value가 실수인 경우 보완
            q = ''.join([str(int(round(e, 0))) for e in q_value[cnt]])
            if q == '1000':
                txt += STR_ACTION_LEFT
            elif q == '0100':
                txt += STR_ACTION_DOWN
            elif q == '0010':
                txt += STR_ACTION_RIGHT
            elif q == '0001':
                txt += STR_ACTION_UP
            else:
                txt += ' '
            txt += ' | '
            cnt += 1
        print(txt)    
        
def one_hot(x):
    return np.identity(16)[x:x+1].astype(np.float32)


In [None]:
def get_optimal_path(optimal_q_value):
    list_optimal_step = []
    optimal_step = 0
    optimal_path = []

    #q-value중 max값을 1로 변경
    qvalue_table = np.zeros([optimal_q_value.shape[0], optimal_q_value.shape[1]])

    for state, q_value in enumerate(optimal_q_value):
        q_max = np.amax(q_value)  # q_value array의 최댓값 반환
        indices = np.nonzero(q_value == q_max)[0]
        qvalue_table[state, indices[0]] = 1
        state += 1

    #print(qvalue_table)

    for state, q_value in enumerate(qvalue_table):
        index = q_value.argmax()

        if optimal_step == state : # 최단 경로 위에 있는 state에 대해 최적경로step을 지정한다.
            list_optimal_step.append(optimal_step)

            if optimal_step == GOAL_STATE:
                q_value = [0,0,0,0]
            else:   
                if index == IDX_ACTION_UP :
                    optimal_step -= 4
                elif index == IDX_ACTION_DOWN :
                    optimal_step += 4
                elif index == IDX_ACTION_RIGHT :
                    optimal_step += 1
                elif index == IDX_ACTION_LEFT :
                    optimal_step -= 1
                else: 
                    pass
        else:
            q_value = [0,0,0,0]
        
        optimal_path.append(list(map(int,q_value)))

    #print(list_optimal_step)
    #print(optimal_path)

    if optimal_step != GOAL_STATE:
        print("Agent can't find optimal path.")
    return optimal_path

In [None]:
print(f'{(time.time() - start_time)} seconds')
print("Success rate: " + str(sum(rList) / num_episodes))
plt.bar(range(len(rList)), rList, color='b', alpha=0.4)
plt.show()