In [13]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces

class BinaryHologramEnv(gym.Env):
    def __init__(self, target_function, max_steps=1000000, T_PSNR=30, T_steps=100):
        """
        target_function: 타겟 이미지와의 손실(MSE 또는 PSNR) 계산 함수.
        max_steps: 최대 타임스텝 제한.
        T_PSNR: 목표 PSNR 값.
        T_steps: PSNR 목표를 유지해야 하는 최소 타임스텝.
        """
        super(BinaryHologramEnv, self).__init__()

        # Observation space: 연속형 데이터
        self.observation_space = spaces.Box(
            low=0,
            high=1,
            shape=(1024, 1024, 8),  # 8채널 데이터
            dtype=np.float32,
        )

        # Action space: 1024x1024x8 이진 데이터 (출력)
        self.action_space = spaces.MultiBinary(1024 * 1024 * 8)

        # 목표 함수
        self.target_function = target_function

        # 에피소드 설정
        self.max_steps = max_steps
        self.T_PSNR = T_PSNR
        self.T_steps = T_steps

        # 학습 상태
        self.state = None
        self.binary_state = None  # 초기 이진 상태
        self.steps = 0
        self.psnr_sustained_steps = 0  # PSNR 목표 유지 스텝 수

    def reset(self, seed=None, options=None):
        """
        환경 상태를 초기화.
        """
        # 연속형 데이터 초기화 (1024x1024x8 크기)
        self.state = np.random.rand(1024, 1024, 8).astype(np.float32)

        # 초기 이진 상태를 0.5 기준으로 설정
        self.binary_state = (self.state >= 0.5).astype(np.float32)

        # 초기 상태 설정
        self.steps = 0
        self.psnr_sustained_steps = 0
        return self.state, {}

    def calculate_psnr(self, mse):
        """
        MSE로부터 PSNR 계산.
        """
        if mse == 0:
            return np.inf
        return 10 * np.log10(1.0 / mse)  # MAX 신호 값(1.0) 기준

    def step(self, action):
        """
        action: 1024x1024x8 배열로 출력된 바이너리 홀로그램 (0 또는 1).
        """
        # Action을 3D 배열로 변환
        action = np.reshape(action, (1024, 1024, 8))

        # 타겟 계산 (타겟은 연속형 데이터의 평균값으로 가정)
        target = self.state.mean(axis=-1, keepdims=True)  # 타겟 이미지 (평균값)

        # MSE 계산 (이진 출력의 평균과 타겟 비교)
        mse = np.mean((action.mean(axis=-1) - target.squeeze()) ** 2)

        # PSNR 계산
        psnr = self.calculate_psnr(mse)

        # 보상 설계 (MSE 기반)
        reward = -mse

        # 종료 조건 초기화
        terminated = False
        truncated = False
        self.steps += 1

        # 시간 초과 조건
        if self.steps >= self.max_steps:
            truncated = True

        # 성능 기반 종료 조건 (PSNR 유지)
        if psnr >= self.T_PSNR:
            self.psnr_sustained_steps += 1
        else:
            self.psnr_sustained_steps = 0

        # 목표 PSNR을 일정 시간 유지하면 종료
        if self.psnr_sustained_steps >= self.T_steps:
            terminated = True

        # 추가 정보 반환
        info = {
            "mse": mse,
            "psnr": psnr,
            "psnr_sustained_steps": self.psnr_sustained_steps,
        }

        return self.state, reward, terminated, truncated, info

In [14]:
from stable_baselines3.common.env_checker import check_env

def target_function(state, action):
    target = state.mean(axis=-1, keepdims=True)
    return np.mean((action.mean(axis=-1) - target.squeeze()) ** 2)

env = BinaryHologramEnv(
    target_function=target_function,
    max_steps=1000000,
    T_PSNR=30,
    T_steps=100,
)

# 환경 유효성 확인
check_env(env, warn=True)


In [None]:
from stable_baselines3 import PPO

# PPO 모델 학습
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=50000)


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
