### Created on 2025
### @author: S.W

### A2C (Advantage Actor-Critic)

A2C의 핵심 아이디어:
- Actor(정책)와 Critic(가치함수)을 동시에 학습
- Advantage 함수로 분산 감소
- 정책 기반과 가치 기반의 장점 결합
- 안정적이고 효율적인 학습

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from collections import deque, namedtuple
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical
import gym
import warnings
warnings.filterwarnings('ignore')

# 한글 폰트 설정
plt.rcParams['font.family'] = 'DejaVu Sans'
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['axes.unicode_minus'] = False

# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"사용 장치: {device}")

사용 장치: cuda


Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


### 실습: CartPole-v1

In [2]:
# 간단한 CartPole 환경 구현 (gym 의존성 제거)
class CartPoleEnv:
   """
   간단한 CartPole 환경 구현
   
   카트 위에 막대가 있고, 카트를 좌우로 움직여서 막대가 넘어지지 않도록 하는 환경.
   물리 법칙을 따라 카트와 막대의 움직임을 시뮬레이션합니다.
   """
   
   def __init__(self):
       """
       CartPole 환경 초기화
       
       물리적 파라미터들과 임계값들을 설정합니다.
       """
       # 물리 파라미터들
       self.gravity = 9.8              # 중력 가속도 (m/s^2)
       self.masscart = 1.0             # 카트의 질량 (kg)
       self.masspole = 0.1             # 막대의 질량 (kg)
       self.total_mass = (self.masspole + self.masscart)  # 전체 질량
       self.length = 0.5               # 막대의 절반 길이 (m) - 회전 중심에서 질량 중심까지
       self.polemass_length = (self.masspole * self.length)  # 막대 질량 × 길이
       self.force_mag = 10.0           # 카트에 가해지는 힘의 크기 (N)
       self.tau = 0.02                 # 시간 간격 (초) - 물리 시뮬레이션 스텝 크기
       
       # 게임 오버 임계값들
       self.theta_threshold_radians = 12 * 2 * np.pi / 360  # ±12도를 라디안으로 변환
       self.x_threshold = 2.4          # 카트가 중심에서 ±2.4m 이상 벗어나면 게임 오버
       
       # 상태와 행동 공간 정의 (OpenAI Gym 스타일)
       self.observation_space = type('obj', (object,), {'shape': (4,)})()  # 4차원 상태 공간
       self.action_space = type('obj', (object,), {'n': 2})()              # 2개의 이산 행동
       
       # 현재 상태 변수
       self.state = None
       self.reset()
   
   def reset(self):
       """
       환경을 초기 상태로 리셋
       
       Returns:
           np.ndarray: 초기 상태 [x, x_dot, theta, theta_dot]
               - x: 카트의 위치 (-0.05 ~ 0.05)
               - x_dot: 카트의 속도 (-0.05 ~ 0.05)  
               - theta: 막대의 각도 (-0.05 ~ 0.05 라디안)
               - theta_dot: 막대의 각속도 (-0.05 ~ 0.05 라디안/초)
       """
       self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,))
       return self.state.copy()
   
   def step(self, action):
       """
       주어진 행동을 수행하고 환경을 한 스텝 진행
       
       Args:
           action (int): 수행할 행동
               - 0: 왼쪽으로 힘을 가함 (-10N)
               - 1: 오른쪽으로 힘을 가함 (+10N)
       
       Returns:
           tuple: (next_state, reward, done, info)
               - next_state (np.ndarray): 다음 상태 [x, x_dot, theta, theta_dot]
               - reward (float): 보상 (생존 시 1.0, 게임오버 시 0.0)
               - done (bool): 에피소드 종료 여부
               - info (dict): 추가 정보 (비어있음)
       """
       assert action in [0, 1], f"Invalid action {action}"
       
       # 현재 상태 분해
       x, x_dot, theta, theta_dot = self.state
       
       # 행동에 따른 힘 설정
       force = self.force_mag if action == 1 else -self.force_mag
       
       # 삼각함수 미리 계산 (성능 향상)
       costheta = np.cos(theta)
       sintheta = np.sin(theta)
       
       # 물리 법칙에 따른 가속도 계산
       # 복합 시스템의 운동방정식을 풀어서 얻은 공식들
       temp = (force + self.polemass_length * theta_dot ** 2 * sintheta) / self.total_mass
       
       # 막대의 각가속도 계산
       thetaacc = (self.gravity * sintheta - costheta * temp) / (
           self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass)
       )
       
       # 카트의 가속도 계산
       xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
       
       # 오일러 적분법으로 상태 업데이트
       # v = v0 + a*dt, x = x0 + v*dt
       x = x + self.tau * x_dot
       x_dot = x_dot + self.tau * xacc
       theta = theta + self.tau * theta_dot
       theta_dot = theta_dot + self.tau * thetaacc
       
       # 새로운 상태 저장
       self.state = np.array([x, x_dot, theta, theta_dot])
       
       # 게임 오버 조건 확인
       done = bool(
           x < -self.x_threshold           # 카트가 너무 왼쪽으로
           or x > self.x_threshold         # 카트가 너무 오른쪽으로
           or theta < -self.theta_threshold_radians  # 막대가 너무 왼쪽으로 기울어짐
           or theta > self.theta_threshold_radians   # 막대가 너무 오른쪽으로 기울어짐
       )
       
       # 보상 계산: 생존하면 1점, 게임오버면 0점
       reward = 1.0 if not done else 0.0
       
       return self.state.copy(), reward, done, {}

# 환경 생성 및 정보 출력
env = CartPoleEnv()
print("✓ CartPole 환경 생성 완료")
print(f"상태 공간 차원: {env.observation_space.shape}")
print(f"행동 공간 크기: {env.action_space.n}")
print("상태: [위치, 속도, 각도, 각속도]")
print("행동: [왼쪽(0), 오른쪽(1)]")
print("목표: 막대를 넘어뜨리지 않고 최대한 오래 유지")

# 환경 동작 테스트
print("\n환경 테스트:")
test_state = env.reset()
print(f"초기 상태: {test_state}")
test_next_state, test_reward, test_done, _ = env.step(1)  # 오른쪽으로 힘을 가함
print(f"한 스텝 후 상태: {test_next_state}")
print(f"보상: {test_reward}, 종료: {test_done}")

✓ CartPole 환경 생성 완료
상태 공간 차원: (4,)
행동 공간 크기: 2
상태: [위치, 속도, 각도, 각속도]
행동: [왼쪽(0), 오른쪽(1)]
목표: 막대를 넘어뜨리지 않고 최대한 오래 유지

환경 테스트:
초기 상태: [ 0.01417383 -0.0442655   0.00536805  0.04488949]
한 스텝 후 상태: [ 0.01328852  0.15077906  0.00626584 -0.24609494]
보상: 1.0, 종료: False


### 시각화 라이브러리 불러오기

In [3]:
import matplotlib.animation as animation
from matplotlib.patches import Rectangle, Circle
from IPython.display import HTML, display

# GIF 스타일 설정
plt.rcParams['animation.html'] = 'jshtml'
plt.rcParams['animation.embed_limit'] = 100
plt.rcParams['font.family'] = 'sans-serif'
plt.rcParams['figure.dpi'] = 80

### A2C 네트워크 및 에이전트 정의

In [4]:
class ActorCriticNetwork(nn.Module):
    """
    Actor-Critic 네트워크: 정책(Actor)과 상태 가치 함수(Critic)를 동시에 추정하는 신경망
    """
    def __init__(self, state_size, action_size, hidden_size=128):
        super(ActorCriticNetwork, self).__init__()
        self.fc_shared = nn.Linear(state_size, hidden_size)
        
        # Actor 헤드: 정책 확률 분포 출력
        self.fc_actor = nn.Linear(hidden_size, hidden_size)
        self.actor_out = nn.Linear(hidden_size, action_size)
        
        # Critic 헤드: 상태 가치 출력
        self.fc_critic = nn.Linear(hidden_size, hidden_size)
        self.critic_out = nn.Linear(hidden_size, 1)

    def forward(self, state):
        """
        상태를 입력받아 정책 확률과 상태 가치를 출력
        """
        shared = F.relu(self.fc_shared(state))

        actor_hidden = F.relu(self.fc_actor(shared))
        action_probs = F.softmax(self.actor_out(actor_hidden), dim=-1)

        critic_hidden = F.relu(self.fc_critic(shared))
        state_value = self.critic_out(critic_hidden)

        return action_probs, state_value

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

class ActorCriticNetwork(nn.Module):
    """
    Actor-Critic 네트워크: 정책(Actor)과 상태 가치 함수(Critic)를 동시에 추정하는 신경망
    """
    def __init__(self, state_size, action_size, hidden_size=128):
        super(ActorCriticNetwork, self).__init__()
        self.fc_shared = nn.Linear(state_size, hidden_size)
        
        # Actor 헤드: 정책 확률 분포 출력
        self.fc_actor = nn.Linear(hidden_size, hidden_size)
        self.actor_out = nn.Linear(hidden_size, action_size)
        
        # Critic 헤드: 상태 가치 출력
        self.fc_critic = nn.Linear(hidden_size, hidden_size)
        self.critic_out = nn.Linear(hidden_size, 1)
    
    def forward(self, state):
        """
        상태를 입력받아 정책 확률과 상태 가치를 출력
        """
        shared = F.relu(self.fc_shared(state))
        
        # Actor 경로
        actor_hidden = F.relu(self.fc_actor(shared))
        action_probs = F.softmax(self.actor_out(actor_hidden), dim=-1)
        
        # Critic 경로
        critic_hidden = F.relu(self.fc_critic(shared))
        state_value = self.critic_out(critic_hidden)
        
        return action_probs, state_value


class A2CAgent:
    """
    REINFORCE 스타일 코드 스타일을 따른 Actor-Critic 에이전트 구현
    정책과 가치 함수를 동시에 학습하며, 에피소드별 데이터를 기록하고 업데이트합니다.
    """
    def __init__(self, state_size, action_size, lr=0.01, gamma=0.99):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.lr = lr
        self.gamma = gamma
        
        # Actor-Critic 네트워크 초기화
        self.ac_network = ActorCriticNetwork(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.ac_network.parameters(), lr=lr)
        
        # 에피소드 데이터 저장소
        self.reset_episode()
        
        # 성능 추적을 위한 리스트 (REINFORCE 스타일 호환성)
        self.episode_rewards = []
    
    def reset_episode(self):
        """새 에피소드 시작 시 행동 로그, 상태 가치, 보상 데이터를 초기화"""
        self.log_probs = []
        self.state_values = []
        self.rewards = []
    
    def select_action(self, state):
        """상태 입력 시 정책을 통해 행동을 샘플링, 로그 확률과 상태 가치 저장"""
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        action_probs, state_value = self.ac_network(state)
        
        # 행동 확률 분포에서 행동 샘플링
        dist = Categorical(action_probs)
        action = dist.sample()
        
        # 학습에 필요한 정보 저장
        self.log_probs.append(dist.log_prob(action))
        self.state_values.append(state_value)
        
        return action.item()
    
    def store_reward(self, reward):
        """환경으로부터 받은 보상을 저장"""
        self.rewards.append(reward)
    
    def get_action_probabilities(self, state):
        """
        A2C 애니메이션 호환성을 위한 메서드
        현재 상태에서 각 행동의 확률을 반환
        
        Args:
            state: 현재 환경 상태
            
        Returns:
            list: [왼쪽 확률, 오른쪽 확률]
        """
        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            action_probs, _ = self.ac_network(state)
            return action_probs.cpu().numpy()[0].tolist()
    
    def get_state_value(self, state):
        """
        A2C 애니메이션 호환성을 위한 메서드
        현재 상태의 가치를 반환
        
        Args:
            state: 현재 환경 상태
            
        Returns:
            float: 상태 가치
        """
        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            _, state_value = self.ac_network(state)
            return state_value.cpu().numpy()[0][0]
    
    def update_policy(self):
        """
        에피소드 종료 후 정책 및 가치 네트워크 업데이트
        할인된 누적 보상과 상태 가치를 사용해 advantage 계산,
        정책과 가치 손실을 합산해서 네트워크를 최적화합니다.
        """
        # 할인된 누적 보상 계산
        discounted_rewards = []
        cumulative = 0
        for r in reversed(self.rewards):
            cumulative = r + self.gamma * cumulative
            discounted_rewards.insert(0, cumulative)
        
        discounted_rewards = torch.FloatTensor(discounted_rewards).to(self.device)
        state_values = torch.cat(self.state_values).squeeze()
        
        # Advantage 계산 (할인 보상 - 상태 가치)
        advantages = discounted_rewards - state_values.detach()
        
        # 정책 손실: 로그 확률과 advantage 곱의 음의 합
        policy_losses = [-log_prob * adv for log_prob, adv in zip(self.log_probs, advantages)]
        policy_loss = torch.stack(policy_losses).sum()
        
        # 가치 손실: 예측 가치와 할인 보상 간의 평균 제곱 오차
        value_loss = F.mse_loss(state_values, discounted_rewards)
        
        # 총 손실: 정책 손실과 가치 손실 가중합
        total_loss = policy_loss + 0.5 * value_loss
        
        # 네트워크 업데이트
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()
        
        # 에피소드 데이터 초기화
        self.reset_episode()
        
        return policy_loss.item(), value_loss.item(), total_loss.item()
    
    def update_network(self):
        """
        train_agent_properly 함수 호환성을 위한 메서드
        update_policy와 동일한 기능을 수행
        
        Returns:
            tuple: (actor_loss, critic_loss, total_loss)
        """
        return self.update_policy()

### A2C 학습 및 테스트 함수 정의

In [5]:
def train_agent_properly(env, agent, num_episodes=1000, verbose=True):
    """
    A2C (Advantage Actor-Critic) 에이전트 학습 함수
    
    지정된 에피소드 수만큼 환경에서 에이전트를 학습시킵니다.
    각 에피소드마다 환경과 상호작용하고, 에피소드 종료 후 
    Actor-Critic 네트워크를 업데이트합니다.
    
    Args:
        env: 강화학습 환경 (CartPole 등)
        agent: 학습할 A2C 에이전트
        num_episodes (int): 총 학습 에피소드 수
        verbose (bool): 학습 진행상황 출력 여부
        
    Returns:
        tuple: (에피소드 보상 리스트, Actor 손실 리스트, Critic 손실 리스트, 총 손실 리스트)
    """
    # 학습 결과를 저장할 리스트들
    episode_rewards = []    # 각 에피소드의 총 보상
    actor_losses = []       # Actor 네트워크의 손실값
    critic_losses = []      # Critic 네트워크의 손실값
    total_losses = []       # 전체 손실값 (Actor + Critic)
    
    if verbose:
        print(f"Training A2C agent ({num_episodes} episodes)")
        print("=" * 50)
    
    # 지정된 에피소드 수만큼 반복 학습
    for episode in range(num_episodes):
        # 환경 초기화 및 변수 설정
        state, info = env.reset()
        episode_reward = 0      # 현재 에피소드의 누적 보상
        done = False            # 에피소드 종료 플래그
        step_count = 0          # 현재 에피소드의 스텝 카운터
        max_steps = 500         # 무한 루프 방지를 위한 최대 스텝 제한
        
        # 에피소드 실행: 환경과 상호작용하며 경험 수집
        while not done and step_count < max_steps:
            # 현재 Actor 정책에 따라 행동 선택
            action = agent.select_action(state)
            
            # 선택한 행동을 환경에서 실행
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated  # 에피소드 종료 조건
            
            # 받은 보상을 에이전트에 저장 (나중에 네트워크 업데이트에 사용)
            agent.store_reward(reward)
            episode_reward += reward
            
            # 다음 스텝을 위해 상태 업데이트
            state = next_state
            step_count += 1
        
        # 에피소드 종료 후 Actor-Critic 네트워크 업데이트 (A2C의 핵심)
        # 수집한 경험을 바탕으로 정책과 가치함수를 동시에 학습
        actor_loss, critic_loss, total_loss = agent.update_network()
        
        # 성능 및 손실 기록
        episode_rewards.append(episode_reward)
        actor_losses.append(actor_loss)
        critic_losses.append(critic_loss)
        total_losses.append(total_loss)
        
        # 에이전트 내부 성능 기록도 업데이트 (선택적)
        if hasattr(agent, 'episode_rewards'):
            agent.episode_rewards.append(episode_reward)
        
        # 100 에피소드마다 학습 진행상황 출력
        if verbose and (episode + 1) % 100 == 0:
            recent_avg_reward = np.mean(episode_rewards[-100:])    # 최근 100 에피소드 평균 보상
            recent_avg_loss = np.mean(total_losses[-100:])         # 최근 100 에피소드 평균 손실
            print(f"Episode {episode + 1}: Recent 100 avg reward = {recent_avg_reward:.2f}, "
                  f"avg loss = {recent_avg_loss:.4f}")
    
    if verbose:
        final_avg_reward = np.mean(episode_rewards[-100:])
        final_avg_loss = np.mean(total_losses[-100:])
        print(f"Training complete! Final average reward: {final_avg_reward:.2f}")
        print(f"Final average loss: {final_avg_loss:.4f}")
    
    return episode_rewards, actor_losses, critic_losses, total_losses


def test_agent_performance(agent, env, num_tests=10):
    """
    학습된 A2C 에이전트의 성능을 테스트하는 함수
    
    학습이 완료된 A2C 에이전트가 얼마나 잘 수행하는지 여러 번 테스트해서
    평균 성능과 성능의 일관성을 확인합니다. 테스트 중에는 네트워크 업데이트를
    수행하지 않고 현재 정책의 성능만 평가합니다.
    
    Args:
        agent: 테스트할 학습된 A2C 에이전트
        env: 테스트 환경
        num_tests (int): 테스트할 에피소드 수
        
    Returns:
        list: 각 테스트 에피소드별 총 보상 리스트
    """
    test_rewards = []  # 각 테스트 에피소드의 성능을 저장할 리스트
    
    print(f"Testing A2C agent performance ({num_tests} episodes)")
    print("-" * 40)
    
    # 지정된 횟수만큼 테스트 실행
    for test in range(num_tests):
        # 각 테스트마다 환경을 새로 초기화
        state, info = env.reset()
        total_reward = 0        # 현재 테스트 에피소드의 총 보상
        done = False            # 에피소드 종료 플래그
        step = 0                # 스텝 카운터
        max_steps = 500         # 최대 스텝 제한
        
        # 에피소드가 끝날 때까지 또는 최대 스텝에 도달할 때까지 실행
        while not done and step < max_steps:
            # 학습된 Actor 정책에 따라 행동 선택 (여전히 확률적 선택)
            action = agent.select_action(state)
            
            # 행동 실행 및 결과 관찰
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            
            state = next_state
            total_reward += reward
            step += 1
        
        # 이번 테스트의 총 보상 기록
        test_rewards.append(total_reward)
    
    # 테스트 결과 요약 출력
    avg_test_reward = np.mean(test_rewards)
    std_test_reward = np.std(test_rewards)
    print(f"\nTest Results Summary:")
    print(f"Average reward: {avg_test_reward:.2f} ± {std_test_reward:.2f}")
    print(f"Best performance: {max(test_rewards):.2f}")
    print(f"Worst performance: {min(test_rewards):.2f}")
    
    return test_rewards

In [6]:
class A2CCartPoleAnimator:
    """
    CartPole 환경에서 A2C 에이전트의 플레이를 애니메이션으로 시각화하는 클래스
    
    A2C (Actor-Critic) 에이전트가 실제로 환경에서 어떻게 행동하는지 시각적으로 
    확인할 수 있는 애니메이션을 생성합니다. 카트의 움직임, 막대의 기울어짐, 
    Actor의 행동 선택, Critic의 가치 예측 등을 실시간으로 보여줍니다.
    """
    
    def __init__(self, title="A2C CartPole", figsize=(12, 7)):
        """
        A2C 애니메이터 초기화
        
        Args:
            title (str): 애니메이션 제목
            figsize (tuple): 그림 크기 (가로, 세로) - A2C 정보 표시를 위해 더 큰 사이즈
        """
        self.title = title
        self.figsize = figsize
        
        # 에피소드 데이터
        self.states = []         # 각 스텝의 환경 상태
        self.actions = []        # 각 스텝에서 Actor가 선택한 행동
        self.action_probs = []   # 각 스텝에서 Actor의 행동 확률 분포
        self.state_values = []   # 각 스텝에서 Critic이 예측한 상태 가치
        self.total_reward = 0
        
        # 시각적 요소들 (나중에 초기화)
        self.fig = None
        self.ax = None
        self.cart = None
        self.pole_line = None
        self.pole_tip = None
        self.step_text = None
        self.action_text = None
        self.angle_text = None
        self.value_text = None       # Critic 가치 예측 표시
        self.prob_text = None        # Actor 확률 분포 표시
    
    def collect_episode_data(self, agent, env, max_frames=200):
        """
        A2C 에이전트가 한 에피소드를 플레이하면서 모든 상태, 행동, 가치를 기록
        
        Args:
            agent: 학습된 A2C 에이전트
            env: CartPole 환경
            max_frames (int): 최대 프레임 수 (에피소드 길이 제한)
        """
        # 데이터 초기화
        self.states = []
        self.actions = []
        self.action_probs = []
        self.state_values = []
        self.total_reward = 0
        
        # 에피소드 시작
        state, info = env.reset()
        done = False
        step = 0
        
        # A2C 에이전트가 에피소드를 플레이하며 데이터 수집
        while not done and step < max_frames:
            # 현재 상태 저장 (애니메이션에서 사용할 데이터)
            self.states.append(np.array(state).copy())
            
            # A2C 에이전트의 Actor가 현재 상태를 보고 행동 결정
            # Actor 네트워크가 상태를 분석해서 행동 확률 분포 생성
            action = agent.select_action(state)
            self.actions.append(action)
            
            # A2C의 특징: Actor의 행동 확률과 Critic의 가치 예측을 동시에 얻기
            if hasattr(agent, 'get_action_probabilities'):
                # Actor의 행동 확률 분포 저장 (시각화용)
                action_probs = agent.get_action_probabilities(state)
                self.action_probs.append(action_probs)
            else:
                # 행동 확률을 직접 얻을 수 없는 경우 더미 데이터
                self.action_probs.append([0.5, 0.5])
            
            if hasattr(agent, 'get_state_value'):
                # Critic의 상태 가치 예측 저장 (시각화용)
                state_value = agent.get_state_value(state)
                self.state_values.append(state_value)
            else:
                # 상태 가치를 직접 얻을 수 없는 경우 더미 데이터
                self.state_values.append(0.0)
            
            # 선택한 행동을 환경에서 실행하고 결과 관찰
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            
            state = next_state
            self.total_reward += reward
            step += 1
        
        print(f"A2C Animation episode: {len(self.states)} steps, Reward: {self.total_reward}")
        print(f"Average state value: {np.mean(self.state_values):.3f}")
    
    def setup_animation_canvas(self):
        """
        A2C 정보를 포함한 애니메이션 캔버스와 시각적 요소들을 설정
        """
        # 캔버스 생성 (A2C 정보 표시를 위해 서브플롯 사용)
        self.fig = plt.figure(figsize=self.figsize)
        self.fig.patch.set_facecolor('white')
        
        # 메인 애니메이션 영역 (왼쪽 2/3)
        self.ax = plt.subplot2grid((3, 3), (0, 0), colspan=2, rowspan=3)
        
        # 좌표계 설정 (CartPole 환경에 맞게)
        self.ax.set_xlim(-2.5, 2.5)    # 카트가 움직일 수 있는 x 범위
        self.ax.set_ylim(-0.3, 2.2)    # y 범위 (바닥부터 막대 끝까지)
        self.ax.set_aspect('equal')     # 비율 맞춤
        self.ax.grid(True, alpha=0.3, color='gray')
        self.ax.set_facecolor('lightblue')
        
        # 고정 정보: 제목과 에피소드 요약
        title_text = self.ax.text(0, 1.9, self.title, ha='center', fontsize=16, 
                                 fontweight='bold', color='darkblue')
        info_text = self.ax.text(0, 1.7, f'Steps: {len(self.states)} | Reward: {self.total_reward:.0f}', 
                                ha='center', fontsize=12, color='darkgreen')
        
        # 동적 정보: 매 프레임마다 업데이트되는 실시간 정보
        self.step_text = self.ax.text(-2.2, -0.2, '', fontsize=11, fontweight='bold', color='blue')
        self.action_text = self.ax.text(1.0, -0.2, '', fontsize=11, fontweight='bold', color='red')
        self.angle_text = self.ax.text(0, -0.2, '', fontsize=10, color='purple')
        
        # A2C 전용 정보 영역 (오른쪽 1/3)
        self._setup_a2c_info_panel()
        
        # CartPole 시각적 요소 생성
        self._create_cartpole_elements()
    
    def _setup_a2c_info_panel(self):
        """
        A2C 알고리즘 특화 정보 패널 설정
        """
        # Critic 가치 예측 정보 영역
        ax_critic = plt.subplot2grid((3, 3), (0, 2))
        ax_critic.set_title('Critic: State Value', fontsize=12, fontweight='bold', color='darkgreen')
        ax_critic.set_xlim(0, 1)
        ax_critic.set_ylim(0, 1)
        ax_critic.axis('off')
        self.value_text = ax_critic.text(0.5, 0.5, '', ha='center', va='center', 
                                        fontsize=14, fontweight='bold', color='darkgreen')
        
        # Actor 행동 확률 정보 영역
        ax_actor = plt.subplot2grid((3, 3), (1, 2))
        ax_actor.set_title('Actor: Action Probabilities', fontsize=12, fontweight='bold', color='darkred')
        ax_actor.set_xlim(0, 1)
        ax_actor.set_ylim(0, 1)
        ax_actor.axis('off')
        self.prob_text = ax_actor.text(0.5, 0.5, '', ha='center', va='center', 
                                      fontsize=10, fontweight='bold', color='darkred')
        
        # A2C 알고리즘 설명 영역
        ax_info = plt.subplot2grid((3, 3), (2, 2))
        ax_info.set_title('A2C Algorithm', fontsize=12, fontweight='bold', color='darkblue')
        ax_info.set_xlim(0, 1)
        ax_info.set_ylim(0, 1)
        ax_info.axis('off')
        
        info_text = ("Actor: Policy Network\n(Action Probability Prediction)\n\n"
                    "Critic: Value Network\n(State Value Prediction)\n\n"
                    "Variance reduction through\nsimultaneous learning")
        ax_info.text(0.5, 0.5, info_text, ha='center', va='center', 
                    fontsize=9, color='darkblue')
    
    def _create_cartpole_elements(self):
        """
        CartPole의 시각적 요소들 (카트, 막대, 바닥) 생성
        """
        # 카트: 직사각형으로 표현
        self.cart = Rectangle((-0.4, 0), 0.8, 0.25, fc='navy', ec='black', alpha=0.9, linewidth=2)
        
        # 막대: 선으로 표현 (카트 중심에서 막대 끝까지)
        self.pole_line, = self.ax.plot([], [], linewidth=8, solid_capstyle='round')
        
        # 막대 끝: 원으로 표현 (질량 중심을 시각화)
        self.pole_tip = Circle((0, 0), 0.1, fc='red', ec='black', linewidth=2, alpha=0.9)
        
        # 바닥: 수평선으로 표현
        self.ax.axhline(y=0, color='brown', linewidth=4, alpha=0.8)
        
        # 요소들을 축에 추가
        self.ax.add_patch(self.cart)
        self.ax.add_patch(self.pole_tip)
    
    def get_pole_status(self, theta):
        """
        막대 각도에 따른 위험도 상태 반환
        
        Args:
            theta (float): 막대 각도 (라디안)
            
        Returns:
            tuple: (색상, 상태 메시지)
        """
        angle_deg = abs(theta * 180 / np.pi)  # 라디안을 각도로 변환
        
        if angle_deg > 20:          # 매우 위험: 곧 넘어질 것 같음
            return 'darkred', 'DANGER!'
        elif angle_deg > 12:        # 위험: 게임오버 임계점 근처
            return 'red', 'Warning'
        elif angle_deg > 8:         # 주의: 조심스러운 상태
            return 'orange', 'Careful'
        else:                       # 안전: 안정적인 상태
            return 'green', 'Stable'
    
    def update_frame(self, frame):
        """
        각 프레임에서 CartPole의 상태와 A2C 정보를 업데이트
        
        수집된 상태 데이터, 행동 데이터, A2C 예측값들을 바탕으로 매 프레임마다
        카트의 위치, 막대의 각도, Actor의 행동 확률, Critic의 가치 예측을 
        시각적으로 표현합니다.
        
        Args:
            frame (int): 현재 프레임 번호 (0부터 시작)
            
        Returns:
            tuple: 업데이트된 시각적 요소들
        """
        # 프레임 범위 확인
        if frame >= len(self.states):
            return (self.cart, self.pole_line, self.pole_tip, self.step_text, 
                   self.action_text, self.angle_text, self.value_text, self.prob_text)
        
        # 현재 프레임의 상태 데이터 가져오기
        x_pos, x_vel, theta, theta_vel = self.states[frame]
        
        # 카트 위치 업데이트
        # 카트의 x 좌표에 따라 카트 직사각형의 위치 조정
        self.cart.set_x(x_pos - 0.4)  # 카트 중심이 x_pos가 되도록 조정
        
        # 막대 위치 계산 및 업데이트
        pole_length = 1.3  # 막대의 시각적 길이
        
        # 막대는 카트 중심에서 시작해서 theta 각도로 뻗어나감
        # 물리학: x = r*sin(θ), y = r*cos(θ) (θ=0일 때 수직)
        pole_x = x_pos + pole_length * np.sin(theta)      # 막대 끝의 x 좌표
        pole_y = 0.125 + pole_length * np.cos(theta)      # 막대 끝의 y 좌표 (카트 높이 0.125 추가)
        
        # 막대 선 업데이트: 카트 중심에서 막대 끝까지
        self.pole_line.set_data([x_pos, pole_x], [0.125, pole_y])
        
        # 막대 끝 원 위치 업데이트
        self.pole_tip.center = (pole_x, pole_y)
        
        # 막대 각도에 따른 위험도 시각화
        color, status = self.get_pole_status(theta)
        
        # 막대와 막대 끝의 색상 업데이트
        self.pole_line.set_color(color)
        self.pole_tip.set_facecolor(color)
        
        # 실시간 정보 텍스트 업데이트
        self._update_info_texts(frame, theta, status)
        
        # A2C 특화 정보 업데이트
        self._update_a2c_info(frame)
        
        return (self.cart, self.pole_line, self.pole_tip, self.step_text, 
               self.action_text, self.angle_text, self.value_text, self.prob_text)
    
    def _update_info_texts(self, frame, theta, status):
        """
        실시간 정보 텍스트들을 업데이트
        
        Args:
            frame (int): 현재 프레임
            theta (float): 막대 각도 (라디안)
            status (str): 막대 상태 메시지
        """
        # 현재 스텝 정보
        self.step_text.set_text(f'Step: {frame+1}/{len(self.states)}')
        
        # Actor가 선택한 행동 표시
        # actions[frame]: 0이면 왼쪽 힘, 1이면 오른쪽 힘
        action_str = "⬅ LEFT" if self.actions[frame] == 0 else "➡ RIGHT"
        self.action_text.set_text(f'Action: {action_str}')
        
        # 막대 각도와 안정성 상태
        angle_deg = abs(theta * 180 / np.pi)
        self.angle_text.set_text(f'Angle: {angle_deg:.1f}° ({status})')
    
    def _update_a2c_info(self, frame):
        """
        A2C 알고리즘 특화 정보 업데이트
        
        Args:
            frame (int): 현재 프레임
        """
        # Critic의 상태 가치 예측 표시
        if frame < len(self.state_values):
            state_value = self.state_values[frame]
            self.value_text.set_text(f'V(s) = {state_value:.3f}')
            
            # 가치에 따른 색상 변경 (높을수록 파란색, 낮을수록 빨간색)
            if state_value > 0:
                self.value_text.set_color('darkgreen')
            else:
                self.value_text.set_color('darkred')
        
        # Actor의 행동 확률 분포 표시
        if frame < len(self.action_probs):
            left_prob, right_prob = self.action_probs[frame]
            prob_text = f'Left: {left_prob:.3f}\nRight: {right_prob:.3f}'
            
            # 선택된 행동 강조
            if self.actions[frame] == 0:  # Left 선택
                prob_text = f'→ Left: {left_prob:.3f}\n  Right: {right_prob:.3f}'
            else:  # Right 선택
                prob_text = f'  Left: {left_prob:.3f}\n→ Right: {right_prob:.3f}'
            
            self.prob_text.set_text(prob_text)
    
    def create_animation(self, agent, env, max_frames=200, interval=100, repeat=True):
        """
        전체 A2C 애니메이션 생성 프로세스
        
        Args:
            agent: 학습된 A2C 에이전트
            env: CartPole 환경
            max_frames (int): 최대 프레임 수
            interval (int): 프레임 간 간격 (밀리초) - A2C 정보 표시를 위해 조금 느리게
            repeat (bool): 애니메이션 반복 여부
            
        Returns:
            tuple: (애니메이션 객체, 총 보상)
        """
        # 1단계: A2C 에이전트 플레이 데이터 수집
        self.collect_episode_data(agent, env, max_frames)
        
        # 2단계: A2C 정보를 포함한 애니메이션 캔버스 설정
        self.setup_animation_canvas()
        
        # 3단계: 애니메이션 생성
        anim = animation.FuncAnimation(
            self.fig, self.update_frame,     # 매 프레임마다 호출할 메서드
            frames=len(self.states),         # 총 프레임 수 (수집된 상태 개수)
            interval=interval,               # 프레임 간 간격 (밀리초) - 100ms = 10 FPS
            blit=False,                      # 전체 다시 그리기 (A2C 정보 표시를 위해 필요)
            repeat=repeat                    # 애니메이션 반복 재생
        )
        
        plt.tight_layout()
        return anim, self.total_reward


# 사용 예시 함수 (기존 함수와의 호환성 유지)
def create_a2c_animation(agent, env, title="A2C CartPole", max_frames=200):
    """
    A2C 전용 애니메이션 생성 함수
    
    Args:
        agent: 학습된 A2C 에이전트
        env: CartPole 환경  
        title (str): 애니메이션 제목
        max_frames (int): 최대 프레임 수
        
    Returns:
        tuple: (애니메이션 객체, 총 보상)
    """
    animator = A2CCartPoleAnimator(title=title)
    return animator.create_animation(agent, env, max_frames=max_frames)


# 기존 REINFORCE 함수와의 호환성을 위한 래퍼
def create_gif_style_animation(agent, env, title="A2C CartPole", max_frames=200):
    """
    기존 REINFORCE 코드와의 호환성을 위한 래퍼 함수
    A2C 에이전트에도 동일하게 사용 가능
    
    Args:
        agent: 학습된 A2C 에이전트
        env: CartPole 환경  
        title (str): 애니메이션 제목
        max_frames (int): 최대 프레임 수
        
    Returns:
        tuple: (애니메이션 객체, 총 보상)
    """
    return create_a2c_animation(agent, env, title, max_frames)

### 1. 랜덤 에이전트 정의 및 테스트

In [7]:
class RandomAgent:
    def select_action(self, state):
        return np.random.choice(2)

random_agent = RandomAgent()

In [8]:
print("=" * 70)
print("RELIABLE CARTPOLE LEARNING DEMO Start")
print("=" * 70)

# 환경 생성
env = gym.make('CartPole-v1')

RELIABLE CARTPOLE LEARNING DEMO Start


In [9]:
# 1. Random Agent 성능 테스트
print("\n1. Testing Random Agent (10 episodes)...")
random_performance = test_agent_performance(random_agent, env, 10)
random_avg = np.mean(random_performance)
random_std = np.std(random_performance)
print(f"   Random Agent: {random_avg:.1f} ± {random_std:.1f} steps")


# 애니메이션 생성만 하고 표시하지 않기
plt.ioff()  # 대화형 모드 끄기

# A2C 스타일 애니메이션 생성 (RandomAgent도 호환됨)
random_anim, random_episode_reward = create_gif_style_animation(
    random_agent, env, "RANDOM AGENT (Baseline)", max_frames=200
)

plt.close()  # 현재 figure 닫기
plt.ion()   # 대화형 모드 다시 켜기

print("Displaying Random Agent:")
display(random_anim)


1. Testing Random Agent (10 episodes)...
Testing A2C agent performance (10 episodes)
----------------------------------------

Test Results Summary:
Average reward: 22.80 ± 15.07
Best performance: 60.00
Worst performance: 11.00
   Random Agent: 22.8 ± 15.1 steps
A2C Animation episode: 29 steps, Reward: 29.0
Average state value: 0.000
Displaying Random Agent:


### 2. A2C 알고리즘 학습

In [None]:
# 2. 학습된 에이전트
print(f"\n2. Training REINFORCE Agent...")
trained_agent = A2CAgent(state_size=4, action_size=2, lr=0.001, gamma=0.99)

# 더 많은 에피소드로 확실히 학습
training_rewards = train_agent_properly(env, trained_agent, num_episodes=500, verbose=True)


2. Training REINFORCE Agent...
Training A2C agent (500 episodes)
Episode 100: Recent 100 avg reward = 42.14, avg loss = 490.5820
Episode 200: Recent 100 avg reward = 159.59, avg loss = 1906.1394
Episode 300: Recent 100 avg reward = 315.76, avg loss = 1473.6945


### 3. REINFORCE 알고리즘 결과 애니메이션 시각화

In [None]:
print(f"3. Testing Trained Agent (10 episodes)...")
trained_performance = test_agent_performance(trained_agent, env, 10)
trained_avg = np.mean(trained_performance)
trained_std = np.std(trained_performance)
print(f"   Trained Agent: {trained_avg:.1f} ± {trained_std:.1f} steps")

# 애니메이션 생성만 하고 표시하지 않기
plt.ioff()  # 대화형 모드 끄기

print(f"\n   Creating Trained Agent Animation...")
trained_anim, trained_episode_reward = create_gif_style_animation(
    trained_agent, env, "TRAINED AGENT"
)

plt.close()  # 현재 figure 닫기
plt.ion()   # 대화형 모드 다시 켜기

print("   Displaying Trained Agent:")
display(trained_anim)

### 4. 랜덤 에이전트 vs A2C 알고리즘 에이전트

In [None]:
print(f"" + "="*70)
print("RESULTS ANALYSIS")
print("="*70)
print(f"Random Agent Performance:")
print(f"  Average: {random_avg:.1f} ± {random_std:.1f} steps")
print(f"  Range: {min(random_performance):.0f} - {max(random_performance):.0f} steps")

print(f"\nTrained Agent Performance:")
print(f"  Average: {trained_avg:.1f} ± {trained_std:.1f} steps")
print(f"  Range: {min(trained_performance):.0f} - {max(trained_performance):.0f} steps")

improvement = trained_avg - random_avg
print(f"\nImprovement: {improvement:.1f} steps ({improvement/random_avg*100:.1f}% increase)")

if improvement > 50:
    print("✅ Training was SUCCESSFUL!")
elif improvement > 20:
    print("⚠️ Training showed some improvement")
else:
    print("❌ Training needs more work")

### 5. 학습 곡선 시각화

In [None]:
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
window_size = 50
# training_rewards 수정 (튜플인 경우 언패킹)
if isinstance(training_rewards, tuple) and len(training_rewards) == 4:
    episode_rewards, actor_losses, critic_losses, total_losses = training_rewards
    training_rewards = episode_rewards
    
if len(training_rewards) >= window_size:
    moving_avg = []
    for i in range(window_size, len(training_rewards)):
        moving_avg.append(np.mean(training_rewards[i-window_size:i]))
    
    plt.plot(range(window_size, len(training_rewards)), moving_avg, 'b-', linewidth=2)
    plt.axhline(y=random_avg, color='red', linestyle='--', alpha=0.7,
               label=f'Random Baseline ({random_avg:.1f})')
    plt.xlabel('Episode')
    plt.ylabel('Average Reward (50-episode window)')
    plt.title('Learning Progress')
    plt.legend()
    plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
agents = ['Random\nAgent', 'Trained\nAgent']
means = [random_avg, trained_avg]
stds = [random_std, trained_std]

bars = plt.bar(agents, means, yerr=stds, capsize=5,
               color=['red', 'green'], alpha=0.7, edgecolor='black')
plt.ylabel('Average Episode Length')
plt.title('Performance Comparison')
plt.grid(True, alpha=0.3)

# 막대 위에 수치 표시
for bar, mean, std in zip(bars, means, stds):
    plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + std + 5,
             f'{mean:.1f}±{std:.1f}', ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.show()

env.close()

### 🚀 최신 알고리즘 발전

Actor-Critic 계열
- SAC (Soft Actor-Critic): 최대 엔트로피 기반 학습
- TD3 (Twin Delayed DDPG): 연속 제어를 위한 개선된 방법
- IMPALA: 대규모 분산 학습 프레임워크

### 실무 팁

#### 하이퍼파라미터 튜닝

중요한 하이퍼파라미터들
- learning_rate = [1e-4, 3e-4, 1e-3] -> 가장 중요! 너무 크면 불안정, 너무 작으면 느린 수렴
- gamma = [0.95, 0.99, 0.999]        -> 환경에 따라 조정
- network_size = [64, 128, 256]      -> 문제 복잡도에 맞게, 과소적합 vs 과적합의 균형
- 배치 크기                           -> 메모리 효율성과 학습 안정성


#### 재현 가능한 실험
시드 고정으로 재현성 확보
- torch.manual_seed(42)
- np.random.seed(42)
- random.seed(42)

✓ 실습 성과:
- CartPole 환경에서 3가지 알고리즘 구현
- PyTorch 기반 신경망 구조 이해
- 각 방법의 학습 특성과 성능 비교
- 하이퍼파라미터 영향도 분석