In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

# --- (1) 환자 환경 정의 --- #
class PatientEnv:
    def __init__(self, patient_data, max_tests=5):
        self.patient_data = patient_data  # dict: {patient_id: {features, label}}   #환자의 id, 식별번호와 그에 따른 데이터가 들어있다.
        self.max_tests = max_tests     #이 수치보다 검사결과를 많이 이용하면 에피소드를 강제종료한다.
        self.test_list = ['test1', 'test2', 'test3', 'test4', 'test5']      #가능한 검사들의 리스트
        self.reset()

    def reset(self):
        self.done = False
        self.steps = 0
        self.patient_id = random.choice(list(self.patient_data.keys()))       #임의로 환자 선택
        self.patient = self.patient_data[self.patient_id]            
        self.available_tests = set(self.test_list)                   #아직 수행하지 않아서 남아 있는 검사들의 세트
        self.observed = []
        self.state = np.zeros(len(self.test_list))  # 초기 상태는 0
        return self.state

    def step(self, action):   #action: 수행하고자 하는 테스트의 이름 인덱스
        if action < len(self.test_list):  # 검사 선택
            test_name = self.test_list[action]
            if test_name not in self.available_tests:       #수행하라고 action에 들어있는 테스트가 이미 실행한 것이면:  현재상태 반환, 페널티 주고, 테스트가 안 끝났다고 말한다.
                return self.state, -5.0, False  # 중복 검사 penalty
            self.available_tests.remove(test_name)            #검사 선택 -> 가능한 검사목록에서 제거, 
            self.state[action] = self.patient['features'][action]  # 검사 결과 반영해 state에 해당 검사인덱스에 대한 결과를 기록한다.
            reward = -1.0  # 검사 비용
            self.done = False
        else:  # action값이 크면 진단 시도 -> label값과 비교해서 같은지 확인 
            pred = action - len(self.test_list)  # 진단 클래스 index
            correct = int(pred == self.patient['label'])                  #정확도기준으로는 못하기 때문에 new_dx와의 값 차이에 비례해서 페널티 주도록 하자!
            reward = 100.0 if correct else -100.0
            self.done = True
        self.steps += 1
        if self.steps >= self.max_tests:
            self.done = True
        return self.state.copy(), reward, self.done

    def get_action_space(self):
        return len(self.test_list) + len(set(p['label'] for p in self.patient_data.values()))    #존재하는 검사의 개수 + 가능한 new_dx의 가짓수


In [None]:
# --- (2) 정책 & 가치 모델 정의 --- #
class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super().__init__()
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
        )
        self.policy = nn.Sequential(
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )
        self.value = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        x = self.shared(x)
        return self.policy(x), self.value(x)    #들어온 state값에 대해서 policy 결과는 상태가치함수결과를 반환 -> 가능한 action별 확률을 출력. value 결과는 state에 대한 가치함수결과 반환.

In [None]:
class PPOAgent:
    def __init__(self, state_dim, action_dim, clip_eps=0.2, gamma=0.99, lr=3e-4):
        self.model = ActorCritic(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.clip_eps = clip_eps
        self.gamma = gamma

    def get_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0)
        probs, _ = self.model(state)      #가치신경망 -> 정책신경망을 state가 통과 => probs에는 정책, 즉 행동별 확률이 기록된다.
        dist = torch.distributions.Categorical(probs)        #dist는 probs 확률에 따라 행동을 선택하는 다항분포객체이다.
        action = dist.sample()                               #dist에 따라 행동선택
        return action.item(), dist.log_prob(action)        #선택한 행동의 인덱스와 그 행동의 확률의 로그값 반환

    def compute_returns(self, rewards, dones):
        R = 0
        returns = []
        for r, d in zip(reversed(rewards), reversed(dones)):      #가장 최근에 종료된 때의 보상부터 시작해서 앞의 보상들을 더하고 전체보상에 감마를 곱해가면서 누적 보상을 얻는다.
            R = r + self.gamma * R * (1 - d)               
            returns.insert(0, R)
        return returns

    def update(self, trajectories):
        states = torch.FloatTensor([t[0] for t in trajectories])
        actions = torch.LongTensor([t[1] for t in trajectories]).unsqueeze(1)
        old_log_probs = torch.cat([t[2] for t in trajectories]).detach()
        returns = torch.FloatTensor(self.compute_returns(        #rewards, dones 이용해 누적 reward, return 계산한다.
            [t[3] for t in trajectories],
            [t[4] for t in trajectories]
        )).unsqueeze(1)

        probs, values = self.model(states)
        dist = torch.distributions.Categorical(probs)
        log_probs = dist.log_prob(actions.squeeze())

        advantages = returns - values.detach()       #A=R-V

        ratio = torch.exp(log_probs - old_log_probs)     #model을 통해 얻은 next pi / 이전 시기의 pi
        clip = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps)     #클리핑으로 ratio 값 자른다.
        policy_loss = -torch.min(ratio * advantages, clip * advantages).mean()     #RA, CA의 최솟값을 구하고 -붙여서 Gradient Ascent하게 해 훈련시킨다. 
        value_loss = nn.MSELoss()(values, returns) #

        loss = policy_loss + 0.5 * value_loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

In [None]:
def train():
    # 환자 데이터 생성 (예: 100명, 각 5개 feature, 3개 클래스)
    patient_data = {
        i: {
            "features": np.random.rand(5),
            "label": np.random.randint(0, 3)
        }
        for i in range(100)
    }

    env = PatientEnv(patient_data)
    agent = PPOAgent(state_dim=5, action_dim=env.get_action_space())

    for epoch in range(300):
        trajectories = []
        for _ in range(20):  # batch of episodes
            state = env.reset()
            done = False
            episode = []
            while not done:
                action, log_prob = agent.get_action(state)
                next_state, reward, done = env.step(action)
                episode.append((state, action, log_prob, reward, done))
                state = next_state
            trajectories.extend(episode)
        agent.update(trajectories)
        if epoch % 10 == 0:
            print(f"Epoch {epoch} 완료")