In [None]:
!pip install -U "stable-baselines3[extra]" "gymnasium"

# ---- 항해 궤적 최적화 ----
import os
import gymnasium as gym              
from gymnasium import spaces         
import numpy as np  
import matplotlib.pyplot as plt
import torch.nn as nn                
import torch                         
import logging
from gymnasium.wrappers import TimeLimit
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage, VecNormalize
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback, CallbackList

max_episode_steps = 100
class SmallCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space, features_dim=128):
        super().__init__(observation_space, features_dim)

        C, H, W = observation_space.shape  # CHW

        self.cnn = nn.Sequential(
            nn.Conv2d(C, 32, kernel_size=3, stride=1, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), nn.ReLU(),
            nn.Flatten(),
        )

        with torch.no_grad():
            n_flat = self.cnn(torch.zeros(1, C, H, W)).shape[1]

        self.linear = nn.Sequential(
            nn.Linear(n_flat, features_dim),
            nn.ReLU()
        )

    def forward(self, x):
        return self.linear(self.cnn(x))


class SeaNavEnvCNN(gym.Env):
    metadata = {"render_modes": ["rgb_array"], "render_fps": 8}

    def __init__(
        self,
        size=15,                 
        obstacle_ratio=0.2,      
        view=9,                  
        seed=2025,               
        map_seed=2025,           
        fixed_start=(2, 2),      
        fixed_goal=(12, 12),     
    ):
        super().__init__()

        self.rng = np.random.default_rng(seed)
        self.size, self.view = size, view
        self.obstacle_ratio = obstacle_ratio
        self.map_seed = map_seed
        self.fixed_start = np.array(fixed_start, np.int32)
        self.fixed_goal  = np.array(fixed_goal,  np.int32)

        self.moves = np.array(
            [[-1, 0],  
             [ 1, 0],  
             [ 0,-1],  
             [ 0, 1]], 
            np.int32
        )
        self.action_space = spaces.Discrete(4)

        self.observation_space = spaces.Box(
            low=0, high=255, shape=(view, view, 3), dtype=np.uint8
        )
        self._build_static()

    def _dist(self, a, b):
        return np.abs(a - b).sum()

    def _build_static(self):
        rng = np.random.default_rng(self.map_seed)
        self.map = np.zeros((self.size, self.size), np.int8)
        self.map[rng.random((self.size, self.size)) < self.obstacle_ratio] = 1
        self.map[0, :] = self.map[-1, :] = 1
        self.map[:, 0] = self.map[:, -1] = 1

    def reset(self, *, seed=None, options=None):
        if seed is not None:
            self.rng = np.random.default_rng(seed)

        self.map[self.fixed_start[0], self.fixed_start[1]] = 0
        self.map[self.fixed_goal[0],  self.fixed_goal[1]]  = 0
        self.start = self.fixed_start.copy()
        self.goal  = self.fixed_goal.copy()
        self.pos = self.start.copy()
        self.prev_dist = self._dist(self.pos, self.goal)
        self.t = 0

        return self._obs(), {}

    def _obs(self):
        v = self.view
        r = v // 2
        y, x = self.pos

        y0, y1 = y - r, y + r + 1
        x0, x1 = x - r, x + r + 1

        patch = np.ones((v, v), np.float32)
        ys0, ys1 = max(0, y0), min(self.size, y1)
        xs0, xs1 = max(0, x0), min(self.size, x1)
        py0, py1 = ys0 - y0, ys0 - y0 + (ys1 - ys0)
        px0, px1 = xs0 - x0, xs0 - x0 + (xs1 - xs0)
        patch[py0:py1, px0:px1] = self.map[ys0:ys1, xs0:xs1]

        free = 1.0 - patch

        gvec = (self.goal - self.pos).astype(np.float32)
        gdir = gvec / (np.linalg.norm(gvec) + 1e-8) if np.any(gvec) else np.array([0.0, 0.0], np.float32)

        dx = np.full((v, v), gdir[1], np.float32)  # x방향 성분
        dy = np.full((v, v), gdir[0], np.float32)  # y방향 성분

        free_u8 = (free * 255.0).astype(np.uint8)
        dx_u8   = (((dx + 1.0) * 0.5) * 255.0).astype(np.uint8)
        dy_u8   = (((dy + 1.0) * 0.5) * 255.0).astype(np.uint8)

        return np.stack([free_u8, dx_u8, dy_u8], axis=-1)

    def step(self, action: int):
        self.t += 1
        reward = -1

        move = self.moves[action]
        nxt = self.pos + move

        nxt[0] = np.clip(nxt[0], 0, self.size - 1)
        nxt[1] = np.clip(nxt[1], 0, self.size - 1)

        if self.map[nxt[0], nxt[1]] == 1:
            reward -= 3.0
            nxt = self.pos

        new_dist = self._dist(nxt, self.goal)
        reward += 0.2 * (self.prev_dist - new_dist)
        self.prev_dist = new_dist
        self.pos = nxt

        done = False
        if np.array_equal(self.pos, self.goal):
            reward += 5.0
            done = True
        if self.t >= max_episode_steps:
            done = True

        return self._obs(), reward, done, False, {}

    def render(self):
        grid = np.zeros((self.size, self.size, 3), np.float32)
        grid[:, :, 2] = 0.6                                             # 파랑
        grid[self.map == 1] = np.array([0.8, 0.8, 0.8])                 # 회색
        grid[self.goal[0], self.goal[1]] = np.array([1.0, 0.25, 0.25])  # 빨강
        grid[self.pos[0],  self.pos[1]]  = np.array([1.0, 1.0, 0.2])    # 노랑
        return (np.clip(grid, 0, 1) * 255).astype(np.uint8)

class PathLogger(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.path = []; self.ret = 0.0
    def reset(self, *, seed=None, options=None):
        obs, info = self.env.reset(seed=seed, options=options)
        y, x = self.env.pos; self.path = [(int(y), int(x))]; self.ret = 0.0
        return obs, info
    def step(self, action):
        obs, r, terminated, truncated, info = self.env.step(action)
        y, x = self.env.pos; self.path.append((int(y), int(x))); self.ret += float(r)
        if terminated and np.array_equal(self.env.pos, self.env.goal):
            info = dict(info)
            info["success"] = True
            info["steps"] = len(self.path) - 1
            info["return"] = self.ret
            info["path"] = self.path.copy()
        return obs, r, terminated, truncated, info

log_dir = "./sea_rl_logs"; os.makedirs(log_dir, exist_ok=True)
goal_logger = logging.getLogger("goal_logger")
goal_logger.setLevel(logging.INFO)
goal_logger.handlers.clear(); goal_logger.propagate = False
fmt = logging.Formatter("%(asctime)s %(message)s")
ch = logging.StreamHandler(); ch.setFormatter(fmt)
goal_logger.addHandler(ch)

class GoalEpisodeLogger(BaseCallback):
    def __init__(self, ext_logger):
        super().__init__(verbose=0)
        self.ext_logger = ext_logger
        self.ep = 0  # 전역 에피소드 번호(단일 env 가정)

    def _on_step(self) -> bool:
        infos = self.locals.get("infos")
        dones = self.locals.get("dones")
        if not infos:
            return True

        info = infos[0]
        done = bool(dones[0]) if dones is not None else False

        if info and info.get("success"):
            self.ext_logger.info(f"[GOAL] episode={self.ep+1} steps={info['steps']} timesteps={self.num_timesteps} return={info['return']:.2f}")
            converted_path = list(map(list, info["path"]))
            goal_logger.info(f"[GOAL] path for webinput: {converted_path}")

            self.logger.record("goal/episode", self.ep+1)
            self.logger.record("goal/steps", info["steps"])
            self.logger.record("goal/return", info["return"])

        if done:
            self.ep += 1

        return True

class AttachGoalFileHandler(BaseCallback):
    def __init__(self, ext_logger, base_dir, fmt):
        super().__init__(0)
        self.ext_logger = ext_logger
        self.base_dir = base_dir
        self.fmt = fmt
        self.attached = False

    def _on_training_start(self) -> None:
        run_dir = self.model.logger.get_dir() or self.base_dir
        os.makedirs(run_dir, exist_ok=True)
        fh = logging.FileHandler(os.path.join(run_dir, "goal_hits.log"))
        fh.setFormatter(self.fmt)
        self.ext_logger.addHandler(fh)
        self.attached = True
    
    def _on_step(self) -> bool:
        return True

In [None]:
# 환경 설정
SCENARIO = dict(size=20, obstacle_ratio=0.3, view=9, map_seed=2025, fixed_start=(1,1), fixed_goal=(17, 18))
def make_env():
    def _t():
        env = SeaNavEnvCNN(**SCENARIO)
        env = PathLogger(env)                      
        env = TimeLimit(env, max_episode_steps=max_episode_steps)
        return Monitor(env)
    return _t

policy_kwargs = dict(
    features_extractor_class=SmallCNN,
    features_extractor_kwargs=dict(features_dim=128),
    normalize_images=True,  # uint8 입력이면 True
)

train_env = DummyVecEnv([make_env()])
train_env = VecTransposeImage(train_env)       # HWC -> CHW
train_env = VecNormalize(train_env, norm_obs=False, norm_reward=True)

eval_env  = DummyVecEnv([make_env()])
eval_env  = VecTransposeImage(eval_env)
eval_env  = VecNormalize(eval_env, training=False, norm_obs=False, norm_reward=False)
eval_env.ret_rms = train_env.ret_rms           # 보상 통계 공유

mon = train_env.envs[0].env          # Monitor
base = mon.env.unwrapped             # SeaNavEnvCNN
base.reset(seed=2025)
img = base.render()
plt.figure(figsize=(4,4)); plt.imshow(img)
sy, sx = base.start; gy, gx = base.goal
plt.scatter([sx],[sy], marker='o', s=90, c='lime', edgecolors='black', linewidths=1.3, label='start')
plt.scatter([gx],[gy], marker='*', s=150, c='red',  edgecolors='black', linewidths=1.3, label='goal')
plt.legend(loc='center left', bbox_to_anchor=(1.02, 0.5), borderaxespad=0.0, frameon=True)
plt.axis('off'); plt.title('maze'); plt.show()

In [None]:
eval_cb = EvalCallback(eval_env, best_model_save_path=log_dir, eval_freq=5000,
                       n_eval_episodes=10, deterministic=True)

callbacks = CallbackList([
    AttachGoalFileHandler(goal_logger, log_dir, fmt),
    GoalEpisodeLogger(goal_logger),
    eval_cb,
])

model = PPO(
    "CnnPolicy", train_env,
    device="cuda",
    n_steps=128, batch_size=128, learning_rate=0.0001,
    gamma=0.7, gae_lambda=0.5, ent_coef=0.01, clip_range=0.2,
    verbose=1, tensorboard_log=log_dir,
    policy_kwargs=policy_kwargs,
)

model.learn(total_timesteps=500, callback=callbacks)

In [None]:
# 시각화: 시작점(초록), 목표점(빨강) 표기
import numpy as np, matplotlib.pyplot as plt
env = train_env.envs[0].unwrapped
obs, _ = env.reset(seed=2025)
path = [tuple(env.pos)]; ret=0.0
for _ in range(max_episode_steps):
    obs_chw = np.transpose(obs, (2,0,1))[None, ...]
    action, _ = model.predict(obs_chw, deterministic=True)
    obs, r, done, trunc, _ = env.step(int(action)); ret+=r
    path.append(tuple(env.pos))
    if done or trunc: break

img = env.render()
plt.figure(figsize=(4,4)); plt.imshow(img)
ys,xs = zip(*path); plt.plot(xs, ys, c='w', linewidth=2)
sy,sx = env.start; gy,gx = env.goal
plt.scatter([sx],[sy], marker='o', s=90, c='lime', edgecolors='black', linewidths=1.3, label='start')
plt.scatter([gx],[gy], marker='*', s=150, c='red',  edgecolors='black', linewidths=1.3, label='goal')
plt.legend(loc='center left', bbox_to_anchor=(1.02, 0.5), borderaxespad=0.0, frameon=True)
plt.axis('off'); plt.title(f"steps={len(path)-1}, return={ret:.1f}"); plt.show()