# Import

In [None]:
import numpy as np
import random
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F

# Import : by GitHub

You can try this code in Colab by this method.  


In [None]:
!git clone https://github.com/KanghwaSisters/24_2_mainSession.git 

In [None]:
import os
os.chdir('/content/24_2_mainSession/4주차/env') 

In [None]:
! python GridWorldEnvironment.py # py file 실행

In [None]:
from GridWorldEnvironment import GridWorldEnvironment

# load env

- ⭐️ **파일을 사용하기 위해서는 cwd를 py가 있는 위치로 변경해야 한다.**
- 출처 : https://www.askpython.com/python/examples/import-py-files-google-colab

In [None]:
import os
os.chdir('/content/drive/MyDrive/24-1/강화학습세션/환경')

In [None]:
! python GridWorldEnvironment3.py

In [None]:
from GridWorldEnvironment3 import GridWorldEnvironment # ver2의 높이 너비 오류를 해결한 버전이 3

### DeepSARSA

In [None]:
class DeepSARSA(nn.Module):
    def __init__(self, state_size, action_size):
        super().__init__()
        self.fc1 = nn.Linear(state_size, 30)
        self.fc2 = nn.Linear(30, 30)
        self.fc3 = nn.Linear(30, action_size)

    def forward(self,x):
        x = self.fc1(x)
        x= self.fc2(x)
        x = self.fc3(x)
        return x

### Agent_DS

In [None]:
class DeepSARSAAgent:
    def __init__(self, state_size, action_space):
        # 행동에 관한 파라미터
        self.action_space = action_space
        self.num_actions = len(action_space)

        # 딥살사 하이퍼 파라미터
        self.step_size = 0.01
        self.discount_factor = 0.9
        self.epsilon = 1.0
        self.epsilon_decay = 0.999
        self.epsilon_min = 0.01
        self.learning_rate = 0.001

        # 신경망
        self.model = DeepSARSA(state_size, self.num_actions)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate, weight_decay=1e-3)
        self.loss = nn.MSELoss()

    def get_action(self,state):

        self.model.eval()
        state = torch.tensor(state, dtype=torch.float32)

        if np.random.rand() <= self.epsilon:
            return random.randrange(self.num_actions)
        else:
            q_value = self.model(state)
            return torch.argmax(q_value).item()

    def train_model(self, state, action_idx, reward, next_state, next_action_idx, done):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        self.model.train()

        state = torch.tensor(state, dtype=torch.float32)
        next_state = torch.tensor(next_state, dtype=torch.float32)

        # 현재 state
        y_est = self.model(state)
        one_hot_idx = F.one_hot(torch.tensor([action_idx]), num_classes=self.num_actions)
        pred = torch.sum(y_est * one_hot_idx,axis=1)

        # next state
        next_q = self.model(next_state)[next_action_idx]
        target = reward + (1-done) * self.discount_factor * next_q

        cost = self.loss(pred, target)

        self.optimizer.zero_grad()
        cost.backward()
        self.optimizer.step()

### main_DS

In [None]:
# init value
env = GridWorldEnvironment(start_point=(0,0),
                           end_point=(4,4),
                           gridworld_size=(5,5))

agent = DeepSARSAAgent(env.state_len, env.action_space)

EPISODES = 1000
RENDER_PROCESS = False
RENDER_END = False
total_moves = []
check_point = {}

In [None]:
# train code

for episode in range(EPISODES):
    # 게임 환경을 초기화
    state = env.reset()
    moves_cnt = 0
    # 현재 상태에서 행동을 선택한다.
    action_idx = agent.get_action(state)

    done = False

    while not done:
        if RENDER_PROCESS:
            env.render() # 이동을 출력하기

        # 취한 행동에 대한 next_state, reward, done을 환경이 제공한다.
        next_state, reward, done = env.step(action_idx)

        # 다음 상태에서 행동을 선택한다.
        next_action_idx = agent.get_action(next_state)

        # 큐함수를 업데이트한다.
        agent.train_model(state, action_idx, reward, next_state, next_action_idx, done)

        state = next_state
        action_idx = next_action_idx
        moves_cnt += 1

    total_moves.append(moves_cnt)

    if (episode+1) % 100 == 0:
        print(f"[Episode]: {episode+1}/{EPISODES} __ [Num of Moves mean]:{np.mean(total_moves[-100:]):.1f} __ [Epsilon]: {agent.epsilon:.3f}")
        check_point[f'epi_{episode+1}'] = agent.model.state_dict()

    if RENDER_END:
        env.render()


  return F.mse_loss(input, target, reduction=self.reduction)


[Episode]: 100/1000 __ [Num of Moves mean]:32.7 __ [Epsilon]: 0.038
[Episode]: 200/1000 __ [Num of Moves mean]:8.7 __ [Epsilon]: 0.016
[Episode]: 300/1000 __ [Num of Moves mean]:8.1 __ [Epsilon]: 0.010
[Episode]: 400/1000 __ [Num of Moves mean]:8.2 __ [Epsilon]: 0.010


In [None]:
# 마지막 학습 상황을 재현한다.
env.render()

In [None]:
# 1000 에피소드에서 에이전트의 이동 횟수를 시각화
plt.plot(total_moves)
plt.ylabel("cnt")
plt.xlabel("episodes")
plt.title("Num of Moves")
plt.show()