실험1 최적루트


In [1]:
## frozen-lake 문제에 대한 DQN 프로그램.
##
import numpy as np
import time
import random
import math
from datetime import datetime
from collections import namedtuple, deque
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
total_episodes = 60000 # Total number of episodes in training
max_steps = 99 # Max steps per episode in training.
gamma = 0.90 # Discounting rate for expected return
Learning_rate = 0.00005 # 신경망 모델 learning rate (optimizer 에게 제공)
original_epsilon = 0.4 # Exploration rate
decay_rate = 0.000006 # Exponential decay rate for exploration.
TAU = 0.7 # Q_net 파라메터를 Q_hat_net 로 copy 시에 반영 비율.
one_minus_TAU = 1 - TAU

memory_pos = 0 # replay_memory 내에 transition 을 넣을 다음 위치.
# 0 에서 부터 커지다가 max_memory-1 까지 되면 다시 0 부터 시작함.
BATCH_SIZE = 16
model_update_cnt = 0 # Q_net 를 업데이트한 횟수.
copy_cnt = 4 # Q_net 업데이트를 copy_cnt 번 한 후마다 Q_hat_net 로 파라메터 복사.
max_memory = 2000 # capacity of the replay memory.
transition_cnt = 0 # 거쳐간 총 transition 수(episodes 간에 중단 없이)
# 매 (배치크기+작은 랜덤값) 마다 Q_net 의 parameter update 를 수행함.
random.seed(datetime.now().timestamp()) # give a new seed in random number generation.
# state space is defined as size_row X size_col array.
# The boundary cells are holes(H).
# S: start, G: goal, H:hole, F:frozen
max_row = 9
max_col = 9
n_actions = 4 # 0:up, 1:right, 2:down, 3:left.
n_observations = max_row * max_col # total number of states
# 1-hot 벡터로 표현하므로 NN 입력의 신호수 = 총 state 수
env_state_space = [
    ['H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H'],\
    ['H', 'S', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],\
    ['H', 'F', 'F', 'H', 'H', 'F', 'H', 'F', 'H'],\
    ['H', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],\
    ['H', 'F', 'H', 'F', 'H', 'F', 'F', 'H', 'H'],\
    ['H', 'F', 'F', 'F', 'F', 'G', 'F', 'F', 'H'],\
    ['H', 'F', 'H', 'H', 'F', 'H', 'F', 'F', 'H'],\
    ['H', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],\
    ['H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H']
]
# offset of each move action: up, right, down, left, respectively.
# a new state(location) = current state + offset of an action.
move_offset = [[-1,0], [0,1], [1,0], [0,-1]]
move_str = ['up ', 'right', 'down ', 'left ']
# replay memory: transition 들을 저장하는 버퍼.
# 저장되는 transition 의 4가지 정보: state_index, action, reward, next state index.
# (주의: state 를 좌표 대신 상태번호(index) 로 나타냄.)
replay_memory = np.ndarray((max_memory, 4), dtype=int)
batch_transition = np.ndarray((BATCH_SIZE, 4), dtype=int) # 배치 하나를 넣는데 사용.
is_replay_memory_full = 0 # 버퍼가 처음으로 완전히 채워지기 전에는 0. 그후로는 항상 1.

In [2]:
def compute_and_print_Q_values (s):
    r = s[0]
    c = s[1]
    if env_state_space[r][c] == 'G' or env_state_space[r][c] == 'H':
        action_values = [ 0.0 for i in range(n_actions)]
        action_values = np.array(action_values)
    else:
        state_idx = r * max_col + c # state 의 번호를 만듬.
        state_idx_list = [state_idx] # 배치 차원을 넣는다. 배치는 하나의 예제 입력만 가짐.
        states_tsr = torch.tensor(state_idx_list).to(device) # state 한개 가짐
        one_hot_states_tsr = F.one_hot(states_tsr, num_classes= n_observations)
        one_hot_states_tsr = one_hot_states_tsr.float().to(device)
        with torch.no_grad():
            state_action_values = Q_net(one_hot_states_tsr) # 주의: 출력은 2차원: (1, n_actions).
        state_action_values = state_action_values[0] # 배치 차원을 없앤다.
        action_values = state_action_values.cpu().numpy()
    text = "s[" + str(r) + "," + str(c) + "]: "
    for i in range(n_actions):
    #text = text + str(action_values[i]) + ", "
        text = text + "{:5.2f}".format(action_values[i])+ ", "
    print(text)

In [3]:
def my_argmax(q_a_list):
    max_val = np.max(q_a_list)
    max_positions = np.where(q_a_list == max_val)[0]
    nmax = len(max_positions)

    if nmax == 1:
        return max_positions[0]  # 하나의 최대값이 있다면 그 위치를 반환

    # 최대값이 여러 개인 경우, 랜덤하게 하나의 위치를 선택하여 반환
    return torch.tensor(random.choice(max_positions))

In [4]:
import random

def choose_action_with_greedy(s):
    state_idx = s[0] * max_col + s[1]  # 상태를 번호로 변환합니다.
    state_idx_list = [state_idx]
    states_tsr = torch.tensor(state_idx_list).to(device)  # 하나의 상태를 포함하는 배열
    one_hot_states_tsr = F.one_hot(states_tsr, num_classes=n_observations)
    one_hot_states_tsr = one_hot_states_tsr.float().to(device)

    # 상태 s에 대한 모든 액션의 q-a 값 가져오기
    with torch.no_grad():
        state_action_values = Q_net(one_hot_states_tsr)  # 출력은 2차원: (1, n_actions).

    lst=state_action_values.flatten().tolist()
    max_a = my_argmax(lst) # 최대값 중에서 무작위로 선택하기

    return max_a

def choose_action_with_epsilon_greedy(s, epsilon):
    state_idx = s[0] * max_col + s[1]  # 상태를 번호로 변환합니다.
    state_idx_list = [state_idx]  # 배치 차원 추가
    states_tsr = torch.tensor(state_idx_list).to(device)  # 하나의 상태를 가집니다.
    one_hot_states_tsr = F.one_hot(states_tsr, num_classes=n_observations)
    one_hot_states_tsr = one_hot_states_tsr.float().to(device)

    # 상태 s에 대한 모든 액션의 q-a 값 가져오기
    with torch.no_grad():
        state_action_values = Q_net(one_hot_states_tsr)  # 출력은 2차원: (bsz, n_actions).

    lst=state_action_values.flatten().tolist()
    max_a = my_argmax(lst) # 최대값 중에서 무작위로 선택하기

    rn = random.random()  # 0 ~ 1 사이의 난수 생성
    if rn >= epsilon:  # epsilon 이상인 경우, 최대 확률을 가진 액션 선택
        action = max_a
    else:
        rn1 = random.random()
        # 4개의 액션 중 하나를 무작위로 선택
        if rn1 >= 0.75:
            action = 0
        elif rn1 >= 0.5:
            action = 1
        elif rn1 >= 0.25:
            action = 2
        else:
            action = 3

    return action

def get_new_state_and_reward(s, a):
	new_state = []
	off_set = move_offset[a]

	#  s + off_set gives the new_state.
	new_state.append(s[0] + off_set[0])
	new_state.append(s[1] + off_set[1])

	# compute reward for moving to the new state
	cell = env_state_space[new_state[0]][new_state[1]]
	if cell == 'F':
		rew = 0
	elif cell == 'H':
		rew = -9
	elif cell == 'G':
		rew = 9
	elif cell == 'S':
		rew = 0
	else:
		print("Logic error in get_new_state_and_reward. This cannot happen!")
		time.sleep(1200)
		return [0,0], -20000
	return new_state, rew

In [5]:
class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        # 신경망의 각 레이어 정의
        self.layer1 = nn.Linear(n_observations, 128)  # 입력 뉴런 수: n_observations, 출력 뉴런 수: 128
        self.layer2 = nn.Linear(128, 128)  # 입력 뉴런 수: 128, 출력 뉴런 수: 128
        self.layer3 = nn.Linear(128, n_actions)  # 입력 뉴런 수: 128, 출력 뉴런 수: n_actions

    # forward 함수: 데이터가 모델을 통과할 때 호출되는 함수
    def forward(self, x):
        x = F.relu(self.layer1(x))  # 첫 번째 레이어에 ReLU 활성화 함수를 적용
        x = F.relu(self.layer2(x))  # 두 번째 레이어에 ReLU 활성화 함수를 적용
        return self.layer3(x)

In [6]:
def learning_by_a_batch():
    state_batch=batch_transition[:,0]
    action_batch=batch_transition[:,1]
    reward_batch=batch_transition[:,2]
    next_state_batch=batch_transition[:,3]

    state_batch_tsr=torch.from_numpy(state_batch)
    one_hot_state_batch=F.one_hot(state_batch_tsr,num_classes=n_observations)
    one_hot_state_batch=one_hot_state_batch.float().to(device)

    prediction_Q_net=Q_net(one_hot_state_batch)

    state_action_value_tsr=torch.zeros([BATCH_SIZE,],dtype=torch.float64).to(device)
    for i in range(BATCH_SIZE):
        state_action_value_tsr[i]=prediction_Q_net[i,action_batch[i]]

    with torch.no_grad():
        next_state_batch_tsr=torch.from_numpy(next_state_batch)
        one_hot_next_state_batch=F.one_hot(next_state_batch_tsr,num_classes=n_observations)
        one_hot_next_state_batch=one_hot_next_state_batch.float().to(device)
        result_target_net=Q_hat_net(one_hot_next_state_batch)

        max_q_of_next_states_in_batch=torch.max(result_target_net,dim=1).values

    next_state_values=[]
    for i,st in enumerate(next_state_batch):
        r=int(st/max_col)
        c=st%max_col
        if env_state_space[r][c]=='G' or env_state_space[r][c]=='H':
            next_state_values.append(0)
        else:
            next_state_values.append(max_q_of_next_states_in_batch[i].item())

    next_state_values_tsr=torch.tensor(next_state_values).to(device)
    reward_batch_tsr=torch.from_numpy(reward_batch).to(device)
    target_state_action_values_tsr=(next_state_values_tsr*gamma)+reward_batch_tsr

    loss=criterion(state_action_value_tsr,target_state_action_values_tsr)

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(Q_net.parameters(),100)
    optimizer.step()


Q_net=DQN(n_observations,n_actions).to(device)
Q_hat_net=DQN(n_observations,n_actions).to(device)

Q_hat_net.load_state_dict(Q_net.state_dict())

optimizer=optim.AdamW(Q_net.parameters(),lr=Learning_rate,amsgrad=True)
criterion=nn.SmoothL1Loss()

In [7]:
num_episode=total_episodes
start_state=[1,1]
print("\n학습시작\n")

for i_episode in range(num_episode):
    S=start_state
    epsilon=original_epsilon*math.exp(-decay_rate*i_episode)

    if i_episode != 0 and i_episode %4000==0:
          print('episode=',i_episode,'  epsilon=',epsilon)

    for  t in range(max_steps):
        A=choose_action_with_epsilon_greedy(S,epsilon)
        S_,R = get_new_state_and_reward(S,A)

        s_idx=S[0]*max_col+S[1]
        next_s_idx=S_[0]*max_col+S_[1]

        replay_memory[memory_pos,0]= s_idx
        replay_memory[memory_pos,1]=A
        replay_memory[memory_pos,2]=R
        replay_memory[memory_pos,3]=next_s_idx

        if is_replay_memory_full==0 and memory_pos==max_memory-1:
            is_replay_memory_full=1
        memory_pos=(memory_pos+1) % max_memory
        S=S_
        transition_cnt+=1
        random_number=random.randint(0,int(BATCH_SIZE/2))

        if transition_cnt >= (BATCH_SIZE+3) and transition_cnt % (BATCH_SIZE+random_number):
            if is_replay_memory_full==1:
                random_numbers=random.sample(range(0,max_memory),BATCH_SIZE)
            else:
                random_numbers=random.sample(range(0,memory_pos-1),BATCH_SIZE)

            for i in range(BATCH_SIZE):
                rnum=random_numbers[i]
                batch_transition[i,:]=replay_memory[rnum,:]

            learning_by_a_batch()
            model_update_cnt+=1

            if model_update_cnt % copy_cnt==0:
                Q_hat_net_state_dict=Q_hat_net.state_dict()
                Q_net_state_dict=Q_net.state_dict()
                for key in Q_net_state_dict:
                    Q_hat_net_state_dict[key]=Q_net_state_dict[key]*TAU+Q_net_state_dict[key]*one_minus_TAU
                Q_hat_net.load_state_dict(Q_hat_net_state_dict)
        if env_state_space[S[0]][S[1]]=='G' or env_state_space[S[0]][S[1]]=='H':
            break

print('학습종료\n')


학습시작

episode= 4000   epsilon= 0.39051428390316373
episode= 8000   epsilon= 0.3812535148310019
episode= 12000   epsilon= 0.3722123583244823
episode= 16000   epsilon= 0.3633856064274825
episode= 20000   epsilon= 0.354768174686863
episode= 24000   epsilon= 0.346355099223682
episode= 28000   epsilon= 0.33814153387386353
episode= 32000   epsilon= 0.33012274739667297
episode= 36000   epsilon= 0.3222941207493919
episode= 40000   epsilon= 0.31465114442662134
episode= 44000   epsilon= 0.30718941586268245
episode= 48000   epsilon= 0.2999046368956165
episode= 52000   epsilon= 0.29279261129132506
episode= 56000   epsilon= 0.2858492423264229
학습종료



In [8]:
print('테스트 시작')

cnt=0
for e in range(100):
    S=start_state
    total_reward=0

    leng=0
    for i in range(99):
        A=choose_action_with_greedy(S)
        S_,R=get_new_state_and_reward(S,A)
        leng+=1
        total_reward+=R
        S=S_
        if env_state_space[S[0]][S[1]]=='G' or env_state_space[S[0]][S[1]]=='H':
            break

    if total_reward==9 and leng==8:
        cnt+=1
print(cnt)
print(f'the optinal accuracy: {cnt}%')
print("테스트 종료")



테스트 시작
100
the optinal accuracy: 100%
테스트 종료


In [9]:
print('학습 후의 Q_values:')
for i in range(max_row):
    for j in range(max_col):
        s=[i,j]
        compute_and_print_Q_values(s)

학습 후의 Q_values:
s[0,0]:  0.00,  0.00,  0.00,  0.00, 
s[0,1]:  0.00,  0.00,  0.00,  0.00, 
s[0,2]:  0.00,  0.00,  0.00,  0.00, 
s[0,3]:  0.00,  0.00,  0.00,  0.00, 
s[0,4]:  0.00,  0.00,  0.00,  0.00, 
s[0,5]:  0.00,  0.00,  0.00,  0.00, 
s[0,6]:  0.00,  0.00,  0.00,  0.00, 
s[0,7]:  0.00,  0.00,  0.00,  0.00, 
s[0,8]:  0.00,  0.00,  0.00,  0.00, 
s[1,0]:  0.00,  0.00,  0.00,  0.00, 
s[1,1]: -9.00,  4.30,  4.31, -9.00, 
s[1,2]: -9.00,  4.78,  4.78,  3.88, 
s[1,3]: -9.00,  5.31, -9.01,  4.30, 
s[1,4]: -9.00,  5.90, -9.02,  4.78, 
s[1,5]: -8.99,  5.31,  6.56,  5.31, 
s[1,6]: -8.96,  4.81, -8.95,  5.90, 
s[1,7]:  0.40, -5.51,  5.37,  1.20, 
s[1,8]:  0.00,  0.00,  0.00,  0.00, 
s[2,0]:  0.00,  0.00,  0.00,  0.00, 
s[2,1]:  3.87,  4.78,  4.79, -9.00, 
s[2,2]:  4.31, -8.99,  5.32,  4.30, 
s[2,3]:  0.00,  0.00,  0.00,  0.00, 
s[2,4]:  0.00,  0.00,  0.00,  0.00, 
s[2,5]:  5.90, -8.99,  7.29, -9.00, 
s[2,6]:  0.00,  0.00,  0.00,  0.00, 
s[2,7]:  3.72, -5.09,  5.86, -7.71, 
s[2,8]:  0.00,  0.00, 

실험 2 캡처 & 리플레이 없을 때의 성능

In [10]:
## frozen-lake 문제에 대한 DQN 프로그램.
##
import numpy as np
import time
import random
import math
from datetime import datetime
from collections import namedtuple, deque
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
total_episodes = 60000 # Total number of episodes in training
max_steps = 99 # Max steps per episode in training.
gamma = 0.90 # Discounting rate for expected return
Learning_rate = 0.00005 # 신경망 모델 learning rate (optimizer 에게 제공)
original_epsilon = 0.4 # Exploration rate
decay_rate = 0.000006 # Exponential decay rate for exploration.
TAU = 0.7 # Q_net 파라메터를 Q_hat_net 로 copy 시에 반영 비율.
one_minus_TAU = 1 - TAU

memory_pos = 0 # replay_memory 내에 transition 을 넣을 다음 위치.
# 0 에서 부터 커지다가 max_memory-1 까지 되면 다시 0 부터 시작함.
BATCH_SIZE = 16
model_update_cnt = 0 # Q_net 를 업데이트한 횟수.
copy_cnt = 4 # Q_net 업데이트를 copy_cnt 번 한 후마다 Q_hat_net 로 파라메터 복사.
max_memory = BATCH_SIZE # capacity of the replay memory.
transition_cnt = 0 # 거쳐간 총 transition 수(episodes 간에 중단 없이)
# 매 (배치크기+작은 랜덤값) 마다 Q_net 의 parameter update 를 수행함.
random.seed(datetime.now().timestamp()) # give a new seed in random number generation.
# state space is defined as size_row X size_col array.
# The boundary cells are holes(H).
# S: start, G: goal, H:hole, F:frozen
max_row = 9
max_col = 9
n_actions = 4 # 0:up, 1:right, 2:down, 3:left.
n_observations = max_row * max_col # total number of states
# 1-hot 벡터로 표현하므로 NN 입력의 신호수 = 총 state 수
env_state_space = [
    ['H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H'],\
    ['H', 'S', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],\
    ['H', 'F', 'F', 'H', 'H', 'F', 'H', 'F', 'H'],\
    ['H', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],\
    ['H', 'F', 'H', 'F', 'H', 'F', 'F', 'H', 'H'],\
    ['H', 'F', 'F', 'F', 'F', 'G', 'F', 'F', 'H'],\
    ['H', 'F', 'H', 'H', 'F', 'H', 'F', 'F', 'H'],\
    ['H', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],\
    ['H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H']
]
# offset of each move action: up, right, down, left, respectively.
# a new state(location) = current state + offset of an action.
move_offset = [[-1,0], [0,1], [1,0], [0,-1]]
move_str = ['up ', 'right', 'down ', 'left ']
# replay memory: transition 들을 저장하는 버퍼.
# 저장되는 transition 의 4가지 정보: state_index, action, reward, next state index.
# (주의: state 를 좌표 대신 상태번호(index) 로 나타냄.)
replay_memory = np.ndarray((max_memory, 4), dtype=int)
batch_transition = np.ndarray((BATCH_SIZE, 4), dtype=int) # 배치 하나를 넣는데 사용.
is_replay_memory_full = 0 # 버퍼가 처음으로 완전히 채워지기 전에는 0. 그후로는 항상 1.

In [11]:
def my_argmax(q_a_list):
    max_val = np.max(q_a_list)
    max_positions = np.where(q_a_list == max_val)[0]
    nmax = len(max_positions)

    if nmax == 1:
        return max_positions[0]  # 하나의 최대값이 있다면 그 위치를 반환

    # 최대값이 여러 개인 경우, 랜덤하게 하나의 위치를 선택하여 반환
    return torch.tensor(random.choice(max_positions))

In [12]:
#my_argmax 실험
state_action_values = torch.tensor([[3.2, 5.1, 2.8, 5.1]])
max_a = torch.argmax(state_action_values, dim=1)
lst=state_action_values.flatten().tolist()
rmax=my_argmax(lst)
max_a = max_a[0]
print(rmax)
print(max_a)

tensor(1)
tensor(1)


In [13]:
import random

def choose_action_with_greedy(s):
    state_idx = s[0] * max_col + s[1]  # 상태를 번호로 변환합니다.
    state_idx_list = [state_idx]
    states_tsr = torch.tensor(state_idx_list).to(device)  # 하나의 상태를 포함하는 배열
    one_hot_states_tsr = F.one_hot(states_tsr, num_classes=n_observations)
    one_hot_states_tsr = one_hot_states_tsr.float().to(device)

    # 상태 s에 대한 모든 액션의 q-a 값 가져오기
    with torch.no_grad():
        state_action_values = Q_net(one_hot_states_tsr)  # 출력은 2차원: (1, n_actions).

    lst=state_action_values.flatten().tolist()
    max_a = my_argmax(lst) # 최대값 중에서 무작위로 선택하기

    return max_a

def choose_action_with_epsilon_greedy(s, epsilon):
    state_idx = s[0] * max_col + s[1]  # 상태를 번호로 변환합니다.
    state_idx_list = [state_idx]  # 배치 차원 추가
    states_tsr = torch.tensor(state_idx_list).to(device)  # 하나의 상태를 가집니다.
    one_hot_states_tsr = F.one_hot(states_tsr, num_classes=n_observations)
    one_hot_states_tsr = one_hot_states_tsr.float().to(device)

    # 상태 s에 대한 모든 액션의 q-a 값 가져오기
    with torch.no_grad():
        state_action_values = Q_net(one_hot_states_tsr)  # 출력은 2차원: (bsz, n_actions).

    lst=state_action_values.flatten().tolist()
    max_a = my_argmax(lst) # 최대값 중에서 무작위로 선택하기

    rn = random.random()  # 0 ~ 1 사이의 난수 생성
    if rn >= epsilon:  # epsilon 이상인 경우, 최대 확률을 가진 액션 선택
        action = max_a
    else:
        rn1 = random.random()
        # 4개의 액션 중 하나를 무작위로 선택
        if rn1 >= 0.75:
            action = 0
        elif rn1 >= 0.5:
            action = 1
        elif rn1 >= 0.25:
            action = 2
        else:
            action = 3

    return action

def get_new_state_and_reward(s, a):
	new_state = []
	off_set = move_offset[a]

	#  s + off_set gives the new_state.
	new_state.append(s[0] + off_set[0])
	new_state.append(s[1] + off_set[1])

	# compute reward for moving to the new state
	cell = env_state_space[new_state[0]][new_state[1]]
	if cell == 'F':
		rew = 0
	elif cell == 'H':
		rew = -9
	elif cell == 'G':
		rew = 9
	elif cell == 'S':
		rew = 0
	else:
		print("Logic error in get_new_state_and_reward. This cannot happen!")
		time.sleep(1200)
		return [0,0], -20000
	return new_state, rew

In [14]:
class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        # 신경망의 각 레이어 정의
        self.layer1 = nn.Linear(n_observations, 128)  # 입력 뉴런 수: n_observations, 출력 뉴런 수: 128
        self.layer2 = nn.Linear(128, 128)  # 입력 뉴런 수: 128, 출력 뉴런 수: 128
        self.layer3 = nn.Linear(128, n_actions)  # 입력 뉴런 수: 128, 출력 뉴런 수: n_actions

    # forward 함수: 데이터가 모델을 통과할 때 호출되는 함수
    def forward(self, x):
        x = F.relu(self.layer1(x))  # 첫 번째 레이어에 ReLU 활성화 함수를 적용
        x = F.relu(self.layer2(x))  # 두 번째 레이어에 ReLU 활성화 함수를 적용
        return self.layer3(x)

In [15]:
def learning_by_a_batch():
    state_batch=batch_transition[:,0]
    action_batch=batch_transition[:,1]
    reward_batch=batch_transition[:,2]
    next_state_batch=batch_transition[:,3]

    state_batch_tsr=torch.from_numpy(state_batch)
    one_hot_state_batch=F.one_hot(state_batch_tsr,num_classes=n_observations)
    one_hot_state_batch=one_hot_state_batch.float().to(device)

    prediction_Q_net=Q_net(one_hot_state_batch)

    state_action_value_tsr=torch.zeros([BATCH_SIZE,],dtype=torch.float64).to(device)
    for i in range(BATCH_SIZE):
        state_action_value_tsr[i]=prediction_Q_net[i,action_batch[i]]

    with torch.no_grad():
        next_state_batch_tsr=torch.from_numpy(next_state_batch)
        one_hot_next_state_batch=F.one_hot(next_state_batch_tsr,num_classes=n_observations)
        one_hot_next_state_batch=one_hot_next_state_batch.float().to(device)
        result_target_net=Q_hat_net(one_hot_next_state_batch)

        max_q_of_next_states_in_batch=torch.max(result_target_net,dim=1).values

    next_state_values=[]
    for i,st in enumerate(next_state_batch):
        r=int(st/max_col)
        c=st%max_col
        if env_state_space[r][c]=='G' or env_state_space[r][c]=='H':
            next_state_values.append(0)
        else:
            next_state_values.append(max_q_of_next_states_in_batch[i].item())

    next_state_values_tsr=torch.tensor(next_state_values).to(device)
    reward_batch_tsr=torch.from_numpy(reward_batch).to(device)
    target_state_action_values_tsr=(next_state_values_tsr*gamma)+reward_batch_tsr

    loss=criterion(state_action_value_tsr,target_state_action_values_tsr)

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(Q_net.parameters(),100)
    optimizer.step()


Q_net=DQN(n_observations,n_actions).to(device)
Q_hat_net=DQN(n_observations,n_actions).to(device)

Q_hat_net.load_state_dict(Q_net.state_dict())

optimizer=optim.AdamW(Q_net.parameters(),lr=Learning_rate,amsgrad=True)
criterion=nn.SmoothL1Loss()

In [16]:
num_episode=total_episodes
start_state=[1,1]
print("\n학습시작\n")

for i_episode in range(num_episode):
    S=start_state
    epsilon=original_epsilon*math.exp(-decay_rate*i_episode)

    if i_episode != 0 and i_episode %4000==0:
          print('episode=',i_episode,'  epsilon=',epsilon)

    for  t in range(max_steps):
        A=choose_action_with_epsilon_greedy(S,epsilon)
        S_,R = get_new_state_and_reward(S,A)

        s_idx=S[0]*max_col+S[1]
        next_s_idx=S_[0]*max_col+S_[1]

        replay_memory[memory_pos,0]= s_idx
        replay_memory[memory_pos,1]=A
        replay_memory[memory_pos,2]=R
        replay_memory[memory_pos,3]=next_s_idx

        if is_replay_memory_full==0 and memory_pos==max_memory-1:
            is_replay_memory_full=1
        memory_pos=(memory_pos+1) % max_memory
        S=S_
        transition_cnt+=1
        random_number=random.randint(0,int(BATCH_SIZE/2))

        if is_replay_memory_full==1:
            for i in range(BATCH_SIZE):
                batch_transition[i,:]=replay_memory[i,:]

            is_replay_memory_full=0

            learning_by_a_batch()
            model_update_cnt+=1

            if model_update_cnt % copy_cnt==0:
                Q_hat_net_state_dict=Q_hat_net.state_dict()
                Q_net_state_dict=Q_net.state_dict()
                for key in Q_net_state_dict:
                    Q_hat_net_state_dict[key]=Q_net_state_dict[key]*TAU+Q_net_state_dict[key]*one_minus_TAU
                Q_hat_net.load_state_dict(Q_hat_net_state_dict)
        if env_state_space[S[0]][S[1]]=='G' or env_state_space[S[0]][S[1]]=='H':
            break

print('학습종료\n')


학습시작

episode= 4000   epsilon= 0.39051428390316373
episode= 8000   epsilon= 0.3812535148310019
episode= 12000   epsilon= 0.3722123583244823
episode= 16000   epsilon= 0.3633856064274825
episode= 20000   epsilon= 0.354768174686863
episode= 24000   epsilon= 0.346355099223682
episode= 28000   epsilon= 0.33814153387386353
episode= 32000   epsilon= 0.33012274739667297
episode= 36000   epsilon= 0.3222941207493919
episode= 40000   epsilon= 0.31465114442662134
episode= 44000   epsilon= 0.30718941586268245
episode= 48000   epsilon= 0.2999046368956165
episode= 52000   epsilon= 0.29279261129132506
episode= 56000   epsilon= 0.2858492423264229
학습종료



In [17]:
print('테스트 시작')

cnt=0
for e in range(100):
    S=start_state
    total_reward=0
    leng=0
    for i in range(99):
        A=choose_action_with_greedy(S)
        S_,R=get_new_state_and_reward(S,A)
        #print(f'move {move_str[A]} to {S_[0]},{S_[1]}')
        leng+=1
        total_reward+=R
        S=S_
        if env_state_space[S[0]][S[1]]=='G' or env_state_space[S[0]][S[1]]=='H':
            break
    if leng == 8 and total_reward==9 : cnt+=1
    #print(f'ep{e}  lent={leng} R={total_reward}\n')


print(f'the optinal accuracy: {cnt}%')
print("테스트 종료")


테스트 시작
the optinal accuracy: 100%
테스트 종료


In [18]:
print('학습 후의 Q_values:')
for i in range(max_row):
    for j in range(max_col):
        s=[i,j]
        compute_and_print_Q_values(s)

학습 후의 Q_values:
s[0,0]:  0.00,  0.00,  0.00,  0.00, 
s[0,1]:  0.00,  0.00,  0.00,  0.00, 
s[0,2]:  0.00,  0.00,  0.00,  0.00, 
s[0,3]:  0.00,  0.00,  0.00,  0.00, 
s[0,4]:  0.00,  0.00,  0.00,  0.00, 
s[0,5]:  0.00,  0.00,  0.00,  0.00, 
s[0,6]:  0.00,  0.00,  0.00,  0.00, 
s[0,7]:  0.00,  0.00,  0.00,  0.00, 
s[0,8]:  0.00,  0.00,  0.00,  0.00, 
s[1,0]:  0.00,  0.00,  0.00,  0.00, 
s[1,1]: -9.01,  3.62,  4.28, -9.02, 
s[1,2]: -8.43,  3.64,  4.02,  4.03, 
s[1,3]: -3.40,  3.02, -8.16,  3.75, 
s[1,4]: -2.95,  2.54, -7.86,  4.26, 
s[1,5]: -7.67,  2.58,  3.55,  4.70, 
s[1,6]: -3.65,  2.60, -8.08,  4.95, 
s[1,7]: -5.55, -7.00,  3.66,  2.95, 
s[1,8]:  0.00,  0.00,  0.00,  0.00, 
s[2,0]:  0.00,  0.00,  0.00,  0.00, 
s[2,1]:  3.86,  3.86,  4.76, -9.02, 
s[2,2]:  3.61, -8.99,  4.31,  4.04, 
s[2,3]:  0.00,  0.00,  0.00,  0.00, 
s[2,4]:  0.00,  0.00,  0.00,  0.00, 
s[2,5]:  3.34, -1.48,  3.04, -5.06, 
s[2,6]:  0.00,  0.00,  0.00,  0.00, 
s[2,7]:  2.74, -2.72,  3.04, -4.64, 
s[2,8]:  0.00,  0.00, 