# 학습된 SAC 에이전트 테스트

이 노트북은 학습된 스마트팜 제어 모델을 테스트하고 성능을 시각화합니다.

In [None]:
# 필요한 패키지 설치
!pip install gymnasium stable-baselines3 numpy matplotlib shimmy

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from gymnasium import spaces
from stable_baselines3 import SAC

# 한글 폰트 설정 (Colab용)
plt.rcParams['font.family'] = 'DejaVu Sans'
plt.rcParams['axes.unicode_minus'] = False

In [None]:
# 환경 클래스 정의 (학습할 때와 동일)
class SmartFarmEnv(gym.Env):
    """
    스마트팜 환경 시뮬레이터
    """
    def __init__(self):
        super(SmartFarmEnv, self).__init__()

        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,), dtype=np.float32)

        self.observation_space = spaces.Box(
            low=np.array([0.0, 0.0, 0.0, 0.0], dtype=np.float32),
            high=np.array([50.0, 100.0, 50.0, 100.0], dtype=np.float32),
            dtype=np.float32
        )

        self.state = None
        self.target_temp = 25.0
        self.target_hum = 60.0
        self.max_steps = 100
        self.current_step = 0

        self.ambient_temp = 20.0
        self.ambient_hum = 50.0
        self.temp_decay = 0.05
        self.hum_decay = 0.03

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        start_temp = np.random.uniform(15.0, 35.0)
        start_hum = np.random.uniform(40.0, 80.0)

        self.state = np.array([start_temp, start_hum, self.target_temp, self.target_hum], dtype=np.float32)
        self.current_step = 0

        return self.state, {}

    def step(self, action):
        temp_action = np.clip(action[0], -1.0, 1.0)
        hum_action = np.clip(action[1], -1.0, 1.0)

        current_temp, current_hum, _, _ = self.state

        dt_temp = temp_action * 1.5
        dt_hum = hum_action * 4.0

        dt_temp += (self.ambient_temp - current_temp) * self.temp_decay
        dt_hum += (self.ambient_hum - current_hum) * self.hum_decay

        noise_temp = np.random.normal(0, 0.2)
        noise_hum = np.random.normal(0, 0.8)

        next_temp = current_temp + dt_temp + noise_temp
        next_hum = current_hum + dt_hum + noise_hum

        next_temp = np.clip(next_temp, 0.0, 50.0)
        next_hum = np.clip(next_hum, 0.0, 100.0)

        self.state = np.array([next_temp, next_hum, self.target_temp, self.target_hum], dtype=np.float32)

        temp_error = abs(next_temp - self.target_temp) / 25.0
        hum_error = abs(next_hum - self.target_hum) / 50.0

        temp_reward = np.exp(-temp_error**2 / 0.1)
        hum_reward = np.exp(-hum_error**2 / 0.1)

        reward = 0.6 * temp_reward + 0.4 * hum_reward

        energy_penalty = 0.01 * (abs(temp_action) + abs(hum_action))
        reward -= energy_penalty

        self.current_step += 1
        terminated = False
        truncated = self.current_step >= self.max_steps

        return self.state, reward, terminated, truncated, {}

In [None]:
# 학습된 모델 로드
print("Loading trained model...")
model = SAC.load("sac_smartfarm_agent")
print("Model loaded successfully!")

## 1. 단일 에피소드 테스트 (시각화)

In [None]:
# 환경 생성
env = SmartFarmEnv()

# 데이터 저장용 리스트
temps = []
hums = []
temp_actions = []
hum_actions = []
rewards = []
steps = []

# 테스트 실행
obs, _ = env.reset(seed=42)
print(f"Initial State: Temp={obs[0]:.2f}°C, Humidity={obs[1]:.2f}%")
print(f"Target: Temp={obs[2]:.2f}°C, Humidity={obs[3]:.2f}%\n")

for i in range(100):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, _ = env.step(action)
    
    # 데이터 저장
    steps.append(i)
    temps.append(obs[0])
    hums.append(obs[1])
    temp_actions.append(action[0])
    hum_actions.append(action[1])
    rewards.append(reward)
    
    if terminated or truncated:
        break

print(f"Test completed: {len(steps)} steps")
print(f"Average reward: {np.mean(rewards):.3f}")
print(f"Final Temp: {temps[-1]:.2f}°C (Target: {env.target_temp:.1f}°C)")
print(f"Final Humidity: {hums[-1]:.2f}% (Target: {env.target_hum:.1f}%)")

In [None]:
# 시각화
fig, axes = plt.subplots(3, 1, figsize=(12, 10))

# 1. 온도 그래프
axes[0].plot(steps, temps, label='Current Temperature', color='red', linewidth=2)
axes[0].axhline(y=env.target_temp, color='darkred', linestyle='--', label='Target Temperature', linewidth=2)
axes[0].fill_between(steps, env.target_temp-2, env.target_temp+2, alpha=0.2, color='red', label='Acceptable Range')
axes[0].set_ylabel('Temperature (°C)', fontsize=12)
axes[0].set_title('Smart Farm Control Performance - Temperature', fontsize=14, fontweight='bold')
axes[0].legend(loc='upper right')
axes[0].grid(True, alpha=0.3)

# 2. 습도 그래프
axes[1].plot(steps, hums, label='Current Humidity', color='blue', linewidth=2)
axes[1].axhline(y=env.target_hum, color='darkblue', linestyle='--', label='Target Humidity', linewidth=2)
axes[1].fill_between(steps, env.target_hum-5, env.target_hum+5, alpha=0.2, color='blue', label='Acceptable Range')
axes[1].set_ylabel('Humidity (%)', fontsize=12)
axes[1].set_title('Smart Farm Control Performance - Humidity', fontsize=14, fontweight='bold')
axes[1].legend(loc='upper right')
axes[1].grid(True, alpha=0.3)

# 3. 제어 액션 그래프
axes[2].plot(steps, temp_actions, label='Heating/Cooling Action', color='orange', linewidth=2, alpha=0.7)
axes[2].plot(steps, hum_actions, label='Humidifier/Dehumidifier Action', color='cyan', linewidth=2, alpha=0.7)
axes[2].axhline(y=0, color='black', linestyle='-', linewidth=0.5)
axes[2].set_xlabel('Time Steps', fontsize=12)
axes[2].set_ylabel('Action Value', fontsize=12)
axes[2].set_title('Control Actions by AI Agent', fontsize=14, fontweight='bold')
axes[2].legend(loc='upper right')
axes[2].grid(True, alpha=0.3)
axes[2].set_ylim(-1.2, 1.2)

plt.tight_layout()
plt.savefig('smart_farm_test_results.png', dpi=150, bbox_inches='tight')
plt.show()

print("\nGraph saved as 'smart_farm_test_results.png'")

## 2. 다중 에피소드 성능 평가

In [None]:
# 여러 에피소드 실행하여 통계 계산
n_episodes = 20
episode_rewards = []
episode_temp_errors = []
episode_hum_errors = []
episode_convergence_times = []

print(f"Running {n_episodes} test episodes...\n")

for episode in range(n_episodes):
    obs, _ = env.reset()
    episode_reward = 0
    temp_errors = []
    hum_errors = []
    converged_step = None
    
    for step in range(100):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, _ = env.step(action)
        
        episode_reward += reward
        
        temp_error = abs(obs[0] - env.target_temp)
        hum_error = abs(obs[1] - env.target_hum)
        
        temp_errors.append(temp_error)
        hum_errors.append(hum_error)
        
        # 목표값에 충분히 가까워진 시점 기록
        if converged_step is None and temp_error < 2.0 and hum_error < 5.0:
            converged_step = step
        
        if terminated or truncated:
            break
    
    episode_rewards.append(episode_reward)
    episode_temp_errors.append(np.mean(temp_errors))
    episode_hum_errors.append(np.mean(hum_errors))
    episode_convergence_times.append(converged_step if converged_step else 100)
    
    print(f"Episode {episode+1:2d}: Reward={episode_reward:6.2f}, "
          f"Avg Temp Error={np.mean(temp_errors):4.2f}°C, "
          f"Avg Hum Error={np.mean(hum_errors):4.2f}%, "
          f"Convergence={converged_step if converged_step else 'N/A'} steps")

print("\n" + "="*80)
print("PERFORMANCE SUMMARY")
print("="*80)
print(f"Average Episode Reward:        {np.mean(episode_rewards):6.2f} ± {np.std(episode_rewards):5.2f}")
print(f"Average Temperature Error:     {np.mean(episode_temp_errors):6.2f}°C ± {np.std(episode_temp_errors):5.2f}°C")
print(f"Average Humidity Error:        {np.mean(episode_hum_errors):6.2f}% ± {np.std(episode_hum_errors):5.2f}%")
print(f"Average Convergence Time:      {np.mean(episode_convergence_times):6.1f} steps")
print(f"Success Rate (converged):      {sum(1 for t in episode_convergence_times if t < 100)/n_episodes*100:.1f}%")
print("="*80)

In [None]:
# 성능 통계 시각화
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 에피소드별 보상
axes[0, 0].bar(range(1, n_episodes+1), episode_rewards, color='green', alpha=0.7)
axes[0, 0].axhline(y=np.mean(episode_rewards), color='red', linestyle='--', 
                    label=f'Mean: {np.mean(episode_rewards):.2f}', linewidth=2)
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Total Reward')
axes[0, 0].set_title('Episode Rewards', fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 온도 오차
axes[0, 1].bar(range(1, n_episodes+1), episode_temp_errors, color='red', alpha=0.7)
axes[0, 1].axhline(y=np.mean(episode_temp_errors), color='darkred', linestyle='--', 
                    label=f'Mean: {np.mean(episode_temp_errors):.2f}°C', linewidth=2)
axes[0, 1].set_xlabel('Episode')
axes[0, 1].set_ylabel('Average Error (°C)')
axes[0, 1].set_title('Temperature Control Error', fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 습도 오차
axes[1, 0].bar(range(1, n_episodes+1), episode_hum_errors, color='blue', alpha=0.7)
axes[1, 0].axhline(y=np.mean(episode_hum_errors), color='darkblue', linestyle='--', 
                    label=f'Mean: {np.mean(episode_hum_errors):.2f}%', linewidth=2)
axes[1, 0].set_xlabel('Episode')
axes[1, 0].set_ylabel('Average Error (%)')
axes[1, 0].set_title('Humidity Control Error', fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# 수렴 시간
axes[1, 1].bar(range(1, n_episodes+1), episode_convergence_times, color='purple', alpha=0.7)
axes[1, 1].axhline(y=np.mean(episode_convergence_times), color='darkviolet', linestyle='--', 
                    label=f'Mean: {np.mean(episode_convergence_times):.1f} steps', linewidth=2)
axes[1, 1].set_xlabel('Episode')
axes[1, 1].set_ylabel('Steps to Convergence')
axes[1, 1].set_title('Convergence Speed', fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('performance_statistics.png', dpi=150, bbox_inches='tight')
plt.show()

print("\nGraph saved as 'performance_statistics.png'")

## 3. 다양한 초기 조건 테스트

In [None]:
# 극단적인 초기 조건에서 테스트
test_conditions = [
    {"name": "Very Cold & Dry", "temp": 10.0, "hum": 30.0},
    {"name": "Very Hot & Humid", "temp": 40.0, "hum": 90.0},
    {"name": "Cold & Humid", "temp": 15.0, "hum": 80.0},
    {"name": "Hot & Dry", "temp": 35.0, "hum": 40.0},
]

print("Testing under extreme initial conditions...\n")
print("="*80)

for condition in test_conditions:
    env = SmartFarmEnv()
    # 초기 상태를 수동으로 설정
    env.state = np.array([condition["temp"], condition["hum"], 
                          env.target_temp, env.target_hum], dtype=np.float32)
    env.current_step = 0
    
    obs = env.state
    total_reward = 0
    temp_errors = []
    hum_errors = []
    
    for step in range(100):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        temp_errors.append(abs(obs[0] - env.target_temp))
        hum_errors.append(abs(obs[1] - env.target_hum))
        
        if terminated or truncated:
            break
    
    print(f"{condition['name']:20s} | Initial: T={condition['temp']:5.1f}°C H={condition['hum']:5.1f}% | "
          f"Final: T={obs[0]:5.1f}°C H={obs[1]:5.1f}% | "
          f"Avg Error: T={np.mean(temp_errors):4.2f}°C H={np.mean(hum_errors):4.2f}%")

print("="*80)

## 4. 랜덤 정책과 비교

In [None]:
# SAC 모델 vs 랜덤 정책 비교
n_test = 10

sac_rewards = []
random_rewards = []

print("Comparing SAC agent with random policy...\n")

for i in range(n_test):
    # SAC 모델 테스트
    env = SmartFarmEnv()
    obs, _ = env.reset()
    sac_reward = 0
    
    for _ in range(100):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, _ = env.step(action)
        sac_reward += reward
        if terminated or truncated:
            break
    
    sac_rewards.append(sac_reward)
    
    # 랜덤 정책 테스트
    env = SmartFarmEnv()
    obs, _ = env.reset()
    random_reward = 0
    
    for _ in range(100):
        action = env.action_space.sample()  # 랜덤 행동
        obs, reward, terminated, truncated, _ = env.step(action)
        random_reward += reward
        if terminated or truncated:
            break
    
    random_rewards.append(random_reward)

# 결과 비교
fig, ax = plt.subplots(figsize=(10, 6))

x = np.arange(2)
means = [np.mean(sac_rewards), np.mean(random_rewards)]
stds = [np.std(sac_rewards), np.std(random_rewards)]
colors = ['green', 'gray']

bars = ax.bar(x, means, yerr=stds, color=colors, alpha=0.7, capsize=10)
ax.set_ylabel('Average Total Reward', fontsize=12)
ax.set_title('SAC Agent vs Random Policy Comparison', fontsize=14, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(['SAC Agent (Trained)', 'Random Policy'])
ax.grid(True, alpha=0.3, axis='y')

# 값 표시
for i, (mean, std) in enumerate(zip(means, stds)):
    ax.text(i, mean + std + 1, f'{mean:.2f}\n±{std:.2f}', 
            ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.savefig('sac_vs_random.png', dpi=150, bbox_inches='tight')
plt.show()

print(f"\nSAC Agent:     {np.mean(sac_rewards):6.2f} ± {np.std(sac_rewards):5.2f}")
print(f"Random Policy: {np.mean(random_rewards):6.2f} ± {np.std(random_rewards):5.2f}")
print(f"\nImprovement: {((np.mean(sac_rewards) - np.mean(random_rewards)) / abs(np.mean(random_rewards)) * 100):.1f}%")
print("\nGraph saved as 'sac_vs_random.png'")

## 5. 실시간 인터랙티브 테스트 (목표값 변경)

In [None]:
# 목표 온도/습도를 중간에 변경하여 적응력 테스트
env = SmartFarmEnv()
obs, _ = env.reset()

temps = []
hums = []
target_temps = []
target_hums = []
steps_list = []

print("Testing adaptability to changing target values...\n")

# 목표값 변경 시나리오
target_changes = [
    (0, 25.0, 60.0),    # 초기 목표
    (30, 28.0, 70.0),   # 30스텝에서 변경
    (60, 22.0, 50.0),   # 60스텝에서 변경
]

change_idx = 0
for step in range(100):
    # 목표값 변경
    if change_idx < len(target_changes) and step == target_changes[change_idx][0]:
        env.target_temp = target_changes[change_idx][1]
        env.target_hum = target_changes[change_idx][2]
        env.state[2] = env.target_temp
        env.state[3] = env.target_hum
        print(f"Step {step}: Target changed to T={env.target_temp:.1f}°C, H={env.target_hum:.1f}%")
        change_idx += 1
    
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, _ = env.step(action)
    
    steps_list.append(step)
    temps.append(obs[0])
    hums.append(obs[1])
    target_temps.append(env.target_temp)
    target_hums.append(env.target_hum)
    
    if terminated or truncated:
        break

# 시각화
fig, axes = plt.subplots(2, 1, figsize=(12, 8))

# 온도
axes[0].plot(steps_list, temps, label='Actual Temperature', color='red', linewidth=2)
axes[0].plot(steps_list, target_temps, label='Target Temperature', color='darkred', 
             linestyle='--', linewidth=2)
for change in target_changes[1:]:
    axes[0].axvline(x=change[0], color='gray', linestyle=':', alpha=0.5)
axes[0].set_ylabel('Temperature (°C)', fontsize=12)
axes[0].set_title('Adaptability Test - Temperature Tracking', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# 습도
axes[1].plot(steps_list, hums, label='Actual Humidity', color='blue', linewidth=2)
axes[1].plot(steps_list, target_hums, label='Target Humidity', color='darkblue', 
             linestyle='--', linewidth=2)
for change in target_changes[1:]:
    axes[1].axvline(x=change[0], color='gray', linestyle=':', alpha=0.5)
axes[1].set_xlabel('Time Steps', fontsize=12)
axes[1].set_ylabel('Humidity (%)', fontsize=12)
axes[1].set_title('Adaptability Test - Humidity Tracking', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('adaptability_test.png', dpi=150, bbox_inches='tight')
plt.show()

print("\nAdaptability test completed!")
print("Graph saved as 'adaptability_test.png'")

## 결론

이 테스트를 통해 다음을 확인할 수 있습니다:

1. **제어 정확도**: 목표 온도/습도에 얼마나 가까이 유지하는가
2. **수렴 속도**: 목표값에 도달하는데 걸리는 시간
3. **안정성**: 다양한 초기 조건에서도 안정적으로 작동하는가
4. **랜덤 정책 대비 개선도**: 학습의 효과
5. **적응력**: 목표값이 변경될 때 빠르게 적응하는가