In [4]:
import gym
import numpy as np
import pandas as pd
from gym import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import DummyVecEnv


class MaintenanceEnvV2(gym.Env):
    def __init__(self, sensor_data_path, num_machines=5):
        super(MaintenanceEnvV2, self).__init__()
        self.sensor_data = pd.read_csv(sensor_data_path).values[:, 1:]  # drop machine_id
        self.num_machines = num_machines
        self.max_steps = len(self.sensor_data) // num_machines
        self.current_step = 0

        # 상태: [machine * 4] + [인력, 재고, 시간]
        self.observation_space = spaces.Box(low=0, high=1, shape=(num_machines * 4 + 3,), dtype=np.float32)
        self.action_space = spaces.MultiBinary(num_machines)

        self.maintenance_time = 1.0  # 한 기계 정비 시 시간 소모 (시간 단위)
        self.reset()

    def reset(self):
        self.current_step = 0
        self.crew = 3  # 정비 인력
        self.parts = 5  # 재고 수
        self.hours = 8  # 하루 작업 가능 시간
        return self._get_obs()

    def _get_obs(self):
        start = self.current_step * self.num_machines
        end = start + self.num_machines
        machine_data = self.sensor_data[start:end] / np.array([200, 1, 1, 1])  # 정규화
        extra = np.array([self.crew / 5, self.parts / 10, self.hours / 10])
        return np.concatenate((machine_data.flatten(), extra)).astype(np.float32)

    def step(self, actions):
        reward = 0
        used_crew = 0
        used_parts = 0
        used_hours = 0

        for i, act in enumerate(actions):
            machine = self.sensor_data[self.current_step * self.num_machines + i]
            usage, fault, vib, temp = machine
            fail_prob = fault + vib + temp

            if act == 1:
                # 제약 확인
                if (self.crew - used_crew) <= 0 or (self.parts - used_parts) <= 0 or (self.hours - used_hours) < self.maintenance_time:
                    reward -= 5  # 제약 초과로 정비 실패
                    continue

                used_crew += 1
                used_parts += 1
                used_hours += self.maintenance_time

                if fail_prob > 1.5:
                    reward += 10
                else:
                    reward -= 1
            else:
                if fail_prob > 1.5:
                    reward -= 20

        self.crew -= used_crew
        self.parts -= used_parts
        self.hours -= used_hours

        self.current_step += 1
        done = self.current_step >= self.max_steps
        obs = self._get_obs() if not done else np.zeros(self.observation_space.shape, dtype=np.float32)

        return obs, reward, done, {}

env = DummyVecEnv([lambda: MaintenanceEnvV2("./data/sample_sensor_data.csv", num_machines=5)])
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=100_000)

AttributeError: partially initialized module 'torch' has no attribute 'fx' (most likely due to a circular import)

In [None]:
1. 문제 상황
 - 생산설비는 시간이 지남에 따라 고장 확률이 증가합니다.
 - 정비를 하지 않으면 예기치 못한 고장으로 큰 손실이 발생합니다.
 - 하지만 무분별한 정비는 비용 낭비를 초래합니다.
 - 정비에는 인력, 부품, 작업시간 등 자원이 제한적입니다.

2. 강화학습을 적용하는 이유
 - 상황이 시간에 따라 변화하며,
 - 에이전트는 자신의 행동(정비 여부)에 따른 장기적 보상을 고려해야 합니다.
 - 이러한 문제는 강화학습의 전형적인 적용 분야입니다.    

3. 에이전트 학습목표
 - 정비할 기계 선택	고장 확률(fault_rate + 진동 + 온도)이 높은 설비는 정비
 - 정비 시기 결정	자원이 부족하면 우선순위 높은 설비부터 정비
 - 정비 안 할 설비 판단	고장 가능성이 낮은 설비는 굳이 정비하지 않음
 - 장기 보상 고려	당장 정비를 하지 않아도 괜찮은 설비는 유보

4. 학습데이터
 - fault_rate (고장률): 기계가 고장 날 확률 또는 고장 발생 빈도를 수치로 나타낸 값(통계, 머신러닝, 베이지안)
 - vibration (진동): 설비 작동 중에 발생하는 진동 강도 또는 가속도(센서계측, RMS (Root Mean Square), FTT분석)
 - temperature (온도): 설비 부품(모터, 베어링 등)의 표면 온도 또는 내부 온도(온도 센서 계측, 특정 위치 평균값, 정상 상태 대비 상승 폭 기준)