<a href="https://colab.research.google.com/github/juhumkwon/DeepLearning/blob/main/RL_03_3_1_ppo_basic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import gymnasium as gym
import numpy as np
import tensorflow as tf

# ----- 1. Actor-Critic 모델 정의 -----
class ActorCritic(tf.keras.Model):
    def __init__(self, action_dim):
        super().__init__()
        self.common = tf.keras.layers.Dense(64, activation='relu')
        self.actor = tf.keras.layers.Dense(action_dim, activation='softmax')
        self.critic = tf.keras.layers.Dense(1, activation=None)

    def call(self, x):
        x = self.common(x)
        return self.actor(x), self.critic(x)

# ----- 2. Advantage 계산 함수 -----
def compute_advantages(rewards, values, dones, gamma=0.99, lam=0.95):
    advantages, gae = [], 0
    next_value = 0
    for r, v, d in zip(reversed(rewards), reversed(values), reversed(dones)):
        delta = r + gamma * next_value * (1 - d) - v
        gae = delta + gamma * lam * (1 - d) * gae
        advantages.insert(0, gae)
        next_value = v
    returns = np.array(advantages) + np.array(values)
    return np.array(advantages, dtype=np.float32), returns.astype(np.float32)

# ----- 3. PPO Loss -----
def ppo_loss(old_log_probs, new_log_probs, advantages, values, returns, clip_ratio=0.2,
             c_v=0.5, c_e=0.01):
    # ratio = exp(new - old)
    ratio = tf.exp(new_log_probs - old_log_probs)

    # Surrogate objectives
    surr1 = ratio * advantages
    surr2 = tf.clip_by_value(ratio, 1.0 - clip_ratio, 1.0 + clip_ratio) * advantages
    policy_loss = -tf.reduce_mean(tf.minimum(surr1, surr2))

    # Value loss (MSE)
    value_loss = tf.reduce_mean(tf.square(returns - values))

    # Entropy (탐험 보너스)
    entropy = -tf.reduce_mean(tf.reduce_sum(tf.exp(new_log_probs) * new_log_probs, axis=-1))

    total_loss = policy_loss + c_v * value_loss - c_e * entropy
    return total_loss, policy_loss, value_loss, entropy

# ----- 4. 학습 루프 -----
env = gym.make("CartPole-v1")
obs_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

model = ActorCritic(action_dim)
optimizer = tf.keras.optimizers.Adam(learning_rate=3e-4)

for episode in range(50):
    obs, _ = env.reset()
    done, ep_reward = False, 0

    observations, actions, rewards, dones, values, log_probs = [], [], [], [], [], []

    while not done:
        obs_tensor = tf.convert_to_tensor(obs[None, :], dtype=tf.float32)
        probs, value = model(obs_tensor)
        action = np.random.choice(action_dim, p=probs.numpy()[0])

        # Log prob for PPO ratio
        log_prob = tf.math.log(probs[0, action] + 1e-8)

        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # Save trajectory
        observations.append(obs)
        actions.append(action)
        rewards.append(reward)
        dones.append(done)
        values.append(value.numpy()[0,0])
        log_probs.append(log_prob.numpy())

        obs = next_obs
        ep_reward += reward

    # ----- Advantage & Return 계산 -----
    advantages, returns = compute_advantages(rewards, values, dones)

    # ----- PPO 업데이트 -----
    with tf.GradientTape() as tape:
        obs_tensor = tf.convert_to_tensor(np.array(observations), dtype=tf.float32)
        probs, values_pred = model(obs_tensor)

        # 새 log probs
        indices = tf.range(len(actions))
        action_indices = tf.stack([indices, actions], axis=1)
        new_probs = tf.gather_nd(probs, action_indices)
        new_log_probs = tf.math.log(new_probs + 1e-8)

        # 손실 계산
        loss, pl, vl, ent = ppo_loss(
            old_log_probs=tf.convert_to_tensor(log_probs, dtype=tf.float32),
            new_log_probs=new_log_probs,
            advantages=tf.convert_to_tensor(advantages, dtype=tf.float32),
            values=tf.squeeze(values_pred),
            returns=tf.convert_to_tensor(returns, dtype=tf.float32),
            clip_ratio=0.2,
            c_v=0.5,
            c_e=0.01
        )

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    print(f"Episode {episode}: Reward={ep_reward:.1f}, "
          f"Loss={loss.numpy():.3f}, PolicyLoss={pl.numpy():.3f}, "
          f"ValueLoss={vl.numpy():.3f}, Entropy={ent.numpy():.3f}")

Episode 0: Reward=23.0, Loss=30.524, PolicyLoss=-8.068, ValueLoss=77.343, Entropy=7.961
Episode 1: Reward=25.0, Loss=33.773, PolicyLoss=-8.454, ValueLoss=84.628, Entropy=8.686
Episode 2: Reward=12.0, Loss=11.502, PolicyLoss=-5.259, ValueLoss=33.605, Entropy=4.159
Episode 3: Reward=30.0, Loss=41.512, PolicyLoss=-9.326, ValueLoss=101.885, Entropy=10.401
Episode 4: Reward=21.0, Loss=27.195, PolicyLoss=-7.551, ValueLoss=69.629, Entropy=6.845
Episode 5: Reward=11.0, Loss=9.810, PolicyLoss=-4.924, ValueLoss=29.543, Entropy=3.814
Episode 6: Reward=16.0, Loss=18.315, PolicyLoss=-6.406, ValueLoss=49.553, Entropy=5.530
Episode 7: Reward=10.0, Loss=8.013, PolicyLoss=-4.430, ValueLoss=24.948, Entropy=3.142
Episode 8: Reward=14.0, Loss=14.839, PolicyLoss=-5.836, ValueLoss=41.448, Entropy=4.853
Episode 9: Reward=35.0, Loss=48.772, PolicyLoss=-10.072, ValueLoss=117.931, Entropy=12.149
Episode 10: Reward=17.0, Loss=20.018, PolicyLoss=-6.652, ValueLoss=53.457, Entropy=5.911
Episode 11: Reward=19.0, Los