### 1. 상태 표현
기존에는 그리드 맵 상의 (x, y) 좌표만을 사용했지만, 제어할 수 없는 속도 값도 상태에 포함될 필요가 있습니다. 그렇게 되면 상태는 (x, y, v)와 같은 형태를 가질 것입니다.

### 2. 행동 공간
행동은 '왼쪽으로 틀기'와 '오른쪽으로 틀기' 두 가지만 가능합니다. 이를 감안하면, 각 상태에서 가능한 행동은 이 두 가지로 제한됩니다.

### 3. 보상 구조
에이전트가 목적지에 빨리 도착하면 큰 보상을 받고, 잘못된 방향으로 가면 패널티를 받는 구조를 만들면 됩니다.

이렇게 상태와 행동, 그리고 보상을 정의한 후에는 이를 기반으로 RL 환경을 구축할 수 있습니다. 이 환경에서 에이전트가 학습을 통해 목적지까지 빠르게 도달하는 방법을 배울 수 있을 것입니다.

In [1]:
import numpy as np
import time
import random
from collections import deque
from apex import amp

import gym
from gym import spaces
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.animation import FuncAnimation
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas

from scipy.ndimage import label

import torch
import torch.nn as nn
import torch.optim as optim

from tqdm import tqdm 
from IPython.display import clear_output

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

ModuleNotFoundError: No module named 'apex'

In [2]:
!pip install apex

Collecting apex
  Using cached apex-0.9.10.dev0-py3-none-any.whl
Collecting pyramid>1.1.2
  Using cached pyramid-2.0.2-py3-none-any.whl (247 kB)
Collecting zope.sqlalchemy
  Using cached zope.sqlalchemy-3.1-py3-none-any.whl (23 kB)
Collecting wtforms-recaptcha
  Using cached wtforms_recaptcha-0.3.2-py2.py3-none-any.whl (7.5 kB)
Collecting wtforms
  Using cached WTForms-3.0.1-py3-none-any.whl (136 kB)
Collecting velruse>=1.0.3
  Using cached velruse-1.1.1-py3-none-any.whl
Collecting cryptacular
  Using cached cryptacular-1.6.2.tar.gz (75 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Collecting pyramid-mailer
  Using cached pyramid_mailer-0.15.1-py2.py3-none-any.whl (19 kB)
Collecting plaster
  Using cach

ERROR: Could not build wheels for cryptacular, which is required to install pyproject.toml-based projects


In [None]:
class SimpleOceanEnv(gym.Env):
    def __init__(self, size, stop_n):
        super(SimpleOceanEnv, self).__init__()
        
        # 행동공간: 위, 아래, 왼쪽, 오른쪽
        self.action_space = spaces.Discrete(4)
        
        # 상태공간: 그리드 위치
        self.observation_space = spaces.Tuple((
            spaces.Discrete(size),
            spaces.Discrete(size)
        ))

        self.size = size
        self.stop_n = stop_n
        self.agent_position = None
        self.start_position = None
        self.end_position = None  # 목표 위치 초기화
        self.total_reward = 0  # 에이전트의 총 보상을 초기화
        self.previous_action = None
        self.recently_visited = []
        
        # 임의로 바다(1)와 육지(0)를 생성
        self.map = np.random.choice([0, 1], size*size, p=[0.1, 0.9]).reshape(size, size)
        self.generate_map()
        
    def generate_map(self):
        self.map = np.ones((self.size, self.size))  # 전체를 바다로 초기화

        num_landmasses = 10  # 생성할 육지 덩어리의 개수

        for _ in range(num_landmasses):
            # 육지 덩어리의 시작점을 무작위로 선택
            start_x = np.random.randint(0, self.size)
            start_y = np.random.randint(0, self.size)

            # 육지 덩어리의 크기를 무작위로 선택 (예: 10 ~ 20)
            landmass_size = np.random.randint(10, 20)

            for _ in range(landmass_size):
                direction = np.random.randint(0, 4)  # 랜덤한 방향 선택: 0=위, 1=아래, 2=왼쪽, 3=오른쪽

                if direction == 0 and start_y > 0:  # 위
                    start_y -= 1
                elif direction == 1 and start_y < self.size - 1:  # 아래
                    start_y += 1
                elif direction == 2 and start_x > 0:  # 왼쪽
                    start_x -= 1
                elif direction == 3 and start_x < self.size - 1:  # 오른쪽
                    start_x += 1

                self.map[start_y, start_x] = 0  # 해당 위치를 육지로 설정
                
    def reset(self):
        self.total_reward = 0  # 환경 초기화 시 누적 보상도 초기화
        while True:
            # 임의의 시작 위치 설정
            
            start_x = np.random.randint(0, self.size)
            start_y = np.random.randint(0, self.size)
            # 위치가 바다이면 선택
            if self.map[start_y, start_x] == 1:
                self.agent_position = (start_y, start_x)
                self.start_position = (start_y, start_x)  # 시작 위치를 저장
                break

        while True:
            # 임의의 끝 위치 설정
            end_x = np.random.randint(0, self.size)
            end_y = np.random.randint(0, self.size)
            # 위치가 바다이고 시작 위치와 다르면 선택
            if self.map[end_y, end_x] == 1 and (end_y, end_x) != self.start_position:
                self.end_position = (end_y, end_x)  # 끝 위치를 저장
                break

        return self.get_surrounding_tiles(self.agent_position)
    
    def get_surrounding_tiles(self, pos):
        surrounding_tiles = []

        # 주변 타일 정보 추가
        for dx in range(-10, 11):  # 에이전트 위치로부터 좌우로 10칸
            for dy in range(-10, 11):  # 에이전트 위치로부터 상하로 10칸
                if dx == 0 and dy == 0:
                    continue
                x, y = pos[0] + dx, pos[1] + dy
                if 0 <= x < self.size and 0 <= y < self.size:
                    surrounding_tiles.append(self.map[x, y])
                else:
                    surrounding_tiles.append(-1)  # 경계 밖을 나타내는 값

        # 시작 위치, 현재 위치, 도착 위치 정보 추가
        normalized_start = (self.start_position[0] / self.size, self.start_position[1] / self.size)
        normalized_current = (pos[0] / self.size, pos[1] / self.size)
        normalized_end = (self.end_position[0] / self.size, self.end_position[1] / self.size)

        surrounding_tiles.extend(normalized_start)
        surrounding_tiles.extend(normalized_current)
        surrounding_tiles.extend(normalized_end)

        return np.array(surrounding_tiles)


    def step(self, action):
        y, x = self.agent_position

        if action == 0:  # 위로 이동
            y -= 1
        elif action == 1:  # 아래로 이동
            y += 1
        elif action == 2:  # 왼쪽으로 이동
            x -= 1
        elif action == 3:  # 오른쪽으로 이동
            x += 1

        done = False  # 종료 플래그 초기화
        

        
        # 맵 바깥으로 나갔는지 확인
        if x < 0 or x >= self.size or y < 0 or y >= self.size:
            return self.get_surrounding_tiles(self.agent_position), -1000, True, {}

        # 육지에 닿았는지 확인
        if self.map[y, x] == 0:
            return self.get_surrounding_tiles(self.agent_position), -1000, True, {}

        # 도착점에 도달했는지 확인
        if (y, x) == self.end_position:
            return self.get_surrounding_tiles(self.agent_position), 1000, True, {}

        # 현재 상태와 다음 상태에서의 도착점까지의 거리 계산
        current_distance = np.linalg.norm(np.array(self.agent_position) - np.array(self.end_position))
        next_distance = np.linalg.norm(np.array([y, x]) - np.array(self.end_position))
        
        # 거리가 줄어들면 보상
        if next_distance < current_distance:
            reward = +0
        else:
            reward = -0
            
        # 최근 방문 위치 패널티
        if (y, x) in self.recently_visited:
            reward -= 30
            
        self.recently_visited.append((y, x))
        if len(self.recently_visited) > 5:  # 최근 5개의 위치만 저장
            self.recently_visited.pop(0)
            
        self.total_reward += reward
        
        if self.total_reward <= -self.stop_n:
            done = True
        '''
        elif self.total_reward >= 500:
            done = True
        '''
        self.agent_position = (y, x)
        return self.get_surrounding_tiles(self.agent_position), reward, done, {}

In [None]:
def get_frame(env):
    fig, ax = plt.subplots()
    
    map_visual = np.copy(env.map)
    map_visual[env.agent_position[0], env.agent_position[1]] = 2
    map_visual[env.start_position[0], env.start_position[1]] = 3
    map_visual[env.end_position[0], env.end_position[1]] = 4
    
    colors = ['black', 'gray', 'yellow', 'lime', 'gold']  # 색상 추가: 육지, 바다, 에이전트, 시작점, 도착점
    ax.imshow(map_visual, cmap=plt.cm.colors.ListedColormap(colors))
    
    plt.axis('off')
    canvas = FigureCanvas(fig)
    canvas.draw()
    img_arr = np.array(canvas.renderer.buffer_rgba())
    plt.close()
    return img_arr

In [None]:
# DQN 네트워크 정의
class DQNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 1024),
            nn.LeakyReLU(),    # LeakyReLU 활성화 함수 추가
            nn.Dropout(0.2),       # 드롭아웃 추가

            nn.Linear(1024, 1024),
            nn.LeakyReLU(),    # LeakyReLU 활성화 함수 추가
            nn.Dropout(0.2),       # 드롭아웃 추가
            
            nn.Linear(1024, 1024),
            nn.LeakyReLU(),    # LeakyReLU 활성화 함수 추가
            nn.Dropout(0.2),       # 드롭아웃 추가
            
            nn.Linear(1024, 1024),
            nn.LeakyReLU(),    # LeakyReLU 활성화 함수 추가
            nn.Dropout(0.2),       # 드롭아웃 추가

            nn.Linear(1024, output_dim)
        )
    
    def forward(self, x):
        return self.fc(x)
    def append_memory(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

# 에이전트 클래스 정의
class DQNAgent:
    def __init__(self, input_dim, output_dim, learning_rate, gamma, epsilon, device, batch_size):
        self.dqn = DQNetwork(input_dim, output_dim).to(device)
        self.target_dqn = DQNetwork(input_dim, output_dim).to(device)
        self.target_dqn.load_state_dict(self.dqn.state_dict())
        self.target_dqn.eval()
        self.optimizer = optim.Adam(self.dqn.parameters(), lr=learning_rate)
        self.loss_fn = nn.MSELoss()
        self.gamma = gamma
        self.epsilon = epsilon
        self.device = device
        self.batch_size = batch_size
        self.memory = deque(maxlen=300000)
    
    def get_action(self, state, epsilon):
        if random.random() < self.epsilon:
            return random.randint(0, 3)
        else:
            with torch.no_grad():
                state = torch.FloatTensor(state).to(self.device)
                q_values = self.dqn(state)
                return torch.argmax(q_values).item()

    def train(self, batch_size):
        if len(self.memory) < self.batch_size:
            return

        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        states = torch.FloatTensor(np.array(states).to(self.device)
        actions = torch.LongTensor(np.array(actions) ).to(self.device)
        rewards = torch.FloatTensor(np.array(rewards) ).to(self.device)
        next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
        dones = torch.FloatTensor(np.array(dones, dtype=np.float32)).to(self.device)

        q_values = self.dqn(states)
        next_q_values = self.target_dqn(next_states).detach()
        next_q_value = next_q_values.max(1)[0]

        expected_q_values = rewards + (1 - dones) * self.gamma * next_q_value
        loss = self.loss_fn(q_values.gather(1, actions.unsqueeze(1)), expected_q_values.unsqueeze(1))


        with amp.scale_loss(loss, self.optimizer) as scaled_loss:
            scaled_loss.backward()
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


    def update_target(self):
        self.target_dqn.load_state_dict(self.dqn.state_dict())

    def append_memory(self, state, action, reward, next_state, done):
        state = np.array(state)
        next_state = np.array(next_state)
        self.memory.append((state, action, reward, next_state, done))



In [None]:
# 환경 초기화 및 에이전트 생성
env = SimpleOceanEnv(30, 400)

In [None]:
view_tpye = False # True 평가모드
create_fst = True # 첫 시도 시 True

In [None]:
batch_size = 4096
if view_tpye == True:
    epsilon = 0
    epsilon_decay = 0
    min_epsilon = 0
else:
    epsilon = 0.6
    epsilon_decay = 0.995
    min_epsilon = 0.1

update_target_interval = 5

agent = DQNAgent(446, 4, learning_rate=1e-3,gamma=0.99, epsilon=epsilon, device=device, batch_size=batch_size)
# 미리 저장된 모델의 가중치 불러오기
try:
    if create_fst == False:
        agent.dqn.load_state_dict(torch.load('S_90p_231104_visited_v5_False.pth'))
        if view_tpye == True:
            agent.dqn.eval()  # 모델을 평가 모드로 전환
            print('mode type : 평가모드')
        else:
            print('mode type : 학습모드')
        print('load_model 성공')
except:
    pass

# 학습 루프
if view_tpye == True:
    num_episodes = 30
else:
    num_episodes = 90000

In [None]:
agent.dqn, agent.optimizer = amp.initialize(agent.dqn, agent.optimizer, opt_level="O1")
frames = []
total_mean = 0
for episode in tqdm(range(num_episodes)):
    state = env.reset()
    total_reward = 0
    done = False

    while not done:
        
        action = agent.get_action([state], epsilon)
        next_state, reward, done, _ = env.step(action)
        agent.append_memory(state, action, reward, next_state, done)
        agent.train(batch_size)
        total_reward += reward
        state = next_state
        if view_tpye == True:
            if episode % ((num_episodes+1)%2) == 0:
                frame_data = {
                    'frame': get_frame(env),
                    'episode': episode,
                    'reward': total_reward,
                    'epsilon': epsilon
                }
                frames.append(frame_data)
        total_mean+=total_reward
        
        #print(f"Episode: {episode}, Total Reward: {total_reward}, mean:{round(np.mean(total_mean/episode), 5)}")
        #clear_output(wait=True)
    #clear_output(wait=True)
    if episode % update_target_interval == 0:
        agent.update_target()
    epsilon = max(epsilon * epsilon_decay, min_epsilon)
    #print(f"Episode: {episode}, Total Reward: {total_reward}, Epsilon: {epsilon:.2f}")

env.close()
torch.save(agent.dqn.state_dict(), f'S_90p_231104_visited_v5_{view_tpye}.pth')

In [None]:
for frame_data in frames:
    fig, ax = plt.subplots()
    im = ax.imshow(frame_data['frame'])
    
    ax.text(1, 1, f"Episode: {frame_data['episode']} | Reward: {frame_data['reward']} | epsilon: {frame_data['epsilon']}", 
            color='white', fontsize=12, bbox=dict(facecolor='black', alpha=0.7))
    plt.axis('off')
    plt.show()
    time.sleep(0.01)
    clear_output(wait=True)


학습을 시킬떄 모델을 생성하고 생성된 모델에서 환경을 조절하여 학습을 이어나가는 게 맞다.

In [None]:
다 쳐봐야 아는 것인디...