# DiffBatt Model Testing and Review

이 노트북에서는 DiffBatt 모델의 성능을 테스트하고 결과를 분석합니다.

## 1. testing.py와 동일한 코드 실행

In [ ]:
# DiffBatt 모델 테스트 실행
# 이 셀은 testing.py와 동일한 코드를 실행합니다.
# 
# 전체 프로세스:
# 1. GPU 설정 및 메모리 관리
# 2. 사전 학습된 모델 로드 (일반 네트워크 + EMA 네트워크)
# 3. Gaussian Diffusion 유틸리티 초기화 (노이즈 추가/제거 과정 관리)
# 4. DiffusionModel 생성 (실제 예측을 수행하는 모델)
# 5. 테스트 데이터셋 로드
# 6. PostProcess를 통한 예측 및 평가
#    - 배터리 용량 매트릭스를 조건으로 SOH(State of Health) 곡선 생성
#    - 노이즈에서 시작하여 1000단계의 역확산 과정을 통해 예측
# 7. 평가 지표 계산 (SOH RMSE, RUL RMSE)
# 8. 결과 시각화

import tensorflow as tf
from tensorflow import keras
from tensorflow.data import Dataset as tfds
from gaussian_diffusion import GaussianDiffusion
from diffusion_model import DiffusionModel
from postproc_utils import PostProcess

# GPU 메모리 점진적 할당 설정
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

tf.config.set_visible_devices(gpus, 'GPU')

# 설정 파라미터
battery_dataset = 'mix'  # 사용할 배터리 데이터셋
total_timesteps = 1000   # 확산 과정의 총 단계 수
p_uncond = 0.2          # Classifier-free guidance를 위한 unconditional 확률

# 사전 학습된 모델 로드
mdir = './trained_models/'
network = keras.models.load_model(mdir+f'{battery_dataset}_network')
ema_network = keras.models.load_model(mdir+f'{battery_dataset}_ema_network')

# Gaussian Diffusion 유틸리티 인스턴스 생성
gdf_util = GaussianDiffusion(timesteps=total_timesteps)

# DiffusionModel 인스턴스 생성
model = DiffusionModel(
    network=network,
    ema_network=ema_network,
    gdf_util=gdf_util,
    timesteps=total_timesteps,
    p_uncond=p_uncond,
)

# 학습 및 테스트 데이터셋 로드
train_ds = tfds.load(f'./data/{battery_dataset}_train_ds')
test_ds = tfds.load(f'./data/{battery_dataset}_test_ds')

# 예측 및 평가 수행
post_process = PostProcess(test_ds, model)
refs, preds = post_process.pred(reps=1)  # reps=1: 각 샘플당 1번의 예측
soh_rmse = post_process.eval_soh(refs, preds)
rul_rmse = post_process.eval_rul(refs, preds)

# 결과 시각화 및 출력
post_process.plot_sample(refs, preds)
print('RUL RMSE', rul_rmse, 'SOH RMSE', soh_rmse)

In [ ]:
# DiffBatt 모델의 상세 예측 과정 시각화
# 이 셀은 .py 파일들의 내부 구현을 모두 포함하여 전체 예측 과정을 보여줍니다.

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.data import Dataset as tfds

# 1. 테스트 데이터셋 로드 및 전처리
print("1. 테스트 데이터셋 로드 중...")
battery_dataset = 'mix'
test_ds = tfds.load(f'./data/{battery_dataset}_test_ds')

# 데이터셋에서 SOH 참조값과 용량 매트릭스 추출
refs_soh = []
capacity_matrices = []

for data in test_ds.as_numpy_iterator():
    refs_soh.append(data[0])  # SOH curves
    capacity_matrices.append(data[2])  # Capacity matrices

refs_soh = np.array(refs_soh)
capacity_matrices = np.array(capacity_matrices)
print(f"   - 로드된 테스트 샘플 수: {len(refs_soh)}")
print(f"   - SOH 데이터 shape: {refs_soh.shape}")
print(f"   - 용량 매트릭스 shape: {capacity_matrices.shape}")

# 2. 모델 로드
print("\n2. 사전 학습된 모델 로드 중...")
mdir = './trained_models/'
network = keras.models.load_model(mdir+f'{battery_dataset}_network')
ema_network = keras.models.load_model(mdir+f'{battery_dataset}_ema_network')
print("   - 일반 네트워크와 EMA 네트워크 로드 완료")

# 3. Gaussian Diffusion 설정
print("\n3. Gaussian Diffusion 설정...")
total_timesteps = 1000

# Beta schedule 생성 (노이즈 스케줄)
def cosine_beta_schedule(timesteps, s=0.008):
    steps = timesteps + 1
    x = tf.linspace(0, timesteps, steps)
    alphas_cumprod = tf.cos(((x / timesteps) + s) / (1 + s) * np.pi * 0.5) ** 2
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return tf.clip_by_value(betas, 0.0001, 0.9999)

betas = cosine_beta_schedule(total_timesteps)
alphas = 1.0 - betas
alphas_cumprod = tf.math.cumprod(alphas, axis=0)
alphas_cumprod_prev = tf.concat([tf.ones((1,)), alphas_cumprod[:-1]], axis=0)

# 확산 과정에 필요한 파라미터들
sqrt_alphas_cumprod = tf.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = tf.sqrt(1.0 - alphas_cumprod)
sqrt_recip_alphas = tf.sqrt(1.0 / alphas)
posterior_variance = betas * (1.0 - alphas_cumprod_prev) / (1.0 - alphas_cumprod)

print(f"   - Beta schedule 생성 완료 (timesteps: {total_timesteps})")

# 4. 역확산 과정 구현 (Denoising)
print("\n4. 예측 시작 (역확산 과정)...")

# 첫 번째 테스트 샘플로 시연
sample_idx = 0
sample_capacity = capacity_matrices[sample_idx:sample_idx+1]

# 랜덤 노이즈에서 시작
image_size = 64
channels = 1
noise = tf.random.normal(shape=(1, image_size, image_size, channels))
sample = noise

# 시각화를 위한 중간 단계 저장
intermediate_steps = []
save_steps = [999, 800, 600, 400, 200, 100, 50, 0]

print("   - 역확산 시작 (1000 → 0 timesteps)")
for t in reversed(range(0, total_timesteps)):
    # 현재 timestep
    tt = tf.cast(tf.fill(1, t), dtype=tf.int64)
    
    # 모델을 통한 노이즈 예측
    pred_noise = ema_network([sample, tt, sample_capacity], training=False)
    
    # 예측된 원본 이미지 계산
    coeff1 = tf.cast(tf.reshape(1.0 / sqrt_alphas_cumprod[t], [1, 1, 1, 1]), sample.dtype)
    coeff2 = tf.cast(tf.reshape(sqrt_one_minus_alphas_cumprod[t], [1, 1, 1, 1]), sample.dtype)
    pred_x0 = coeff1 * (sample - coeff2 * pred_noise)
    pred_x0 = tf.clip_by_value(pred_x0, -1.0, 1.0)
    
    # 평균과 분산 계산
    coeff1 = tf.reshape(sqrt_recip_alphas[t], [1, 1, 1, 1])
    coeff2 = tf.reshape(betas[t] / sqrt_one_minus_alphas_cumprod[t], [1, 1, 1, 1])
    model_mean = coeff1 * (sample - coeff2 * pred_noise)
    
    # 다음 샘플 생성
    if t > 0:
        noise = tf.random.normal(shape=sample.shape)
        nonzero_mask = tf.cast(tf.reshape(1 - (t == 0), [1, 1, 1, 1]), sample.dtype)
        std_dev = tf.sqrt(posterior_variance[t])
        sample = model_mean + nonzero_mask * tf.reshape(std_dev, [1, 1, 1, 1]) * noise
    else:
        sample = model_mean
    
    # 중간 단계 저장
    if t in save_steps:
        intermediate_steps.append((t, sample.numpy().copy()))

print("   - 역확산 완료")

# 5. 후처리 및 SOH 곡선 변환
print("\n5. 예측 결과 후처리...")

# 최종 예측 이미지를 SOH 곡선으로 변환
final_prediction = sample.numpy()

# 이미지를 1차원 SOH 시계열로 변환 (reshape)
pred_soh = final_prediction.reshape(-1, 64*64)

# 스케일 조정 (이미지 값 [-1, 1] → SOH [0, 100])
pred_soh = (pred_soh + 1) * 50  # -1~1 → 0~100

# 2560 사이클로 리사이즈
from scipy.interpolate import interp1d
x_old = np.linspace(0, 1, pred_soh.shape[1])
x_new = np.linspace(0, 1, 2560)
f = interp1d(x_old, pred_soh[0], kind='linear')
pred_soh_resized = f(x_new)

# 참조 SOH도 리사이즈
ref_soh = refs_soh[sample_idx]
f_ref = interp1d(np.linspace(0, 1, len(ref_soh)), ref_soh, kind='linear')
ref_soh_resized = f_ref(x_new)

print(f"   - 예측 SOH shape: {pred_soh_resized.shape}")

# 6. 평가 지표 계산
print("\n6. 평가 지표 계산...")

# SOH RMSE (60% 이상만 계산)
mask = ref_soh_resized > 60
soh_rmse = np.sqrt(np.mean((ref_soh_resized[mask] - pred_soh_resized[mask])**2))

# RUL 계산 (80% 임계값)
def get_rul(soh_curve, eol=80):
    indices = np.where(soh_curve < eol)[0]
    return indices[0] if len(indices) > 0 else len(soh_curve)

ref_rul = get_rul(ref_soh_resized)
pred_rul = get_rul(pred_soh_resized)
rul_error = abs(ref_rul - pred_rul)

print(f"   - SOH RMSE: {soh_rmse:.4f}")
print(f"   - 참조 RUL: {ref_rul} cycles")
print(f"   - 예측 RUL: {pred_rul} cycles")
print(f"   - RUL 오차: {rul_error} cycles")

# 7. 시각화
print("\n7. 결과 시각화...")

fig, axes = plt.subplots(2, 4, figsize=(16, 8))

# 역확산 과정 시각화
for i, (t, img) in enumerate(intermediate_steps):
    ax = axes[0, i]
    ax.imshow(img[0, :, :, 0], cmap='viridis')
    ax.set_title(f'Timestep {t}')
    ax.axis('off')

# SOH 예측 결과
ax = axes[1, :2].flatten()[0]
ax.plot(ref_soh_resized, label='Reference', alpha=0.7)
ax.plot(pred_soh_resized, label='Prediction', alpha=0.7)
ax.axhline(y=80, color='r', linestyle='--', alpha=0.5, label='EOL (80%)')
ax.set_xlabel('Cycles')
ax.set_ylabel('SOH (%)')
ax.set_title('SOH Prediction vs Reference')
ax.legend()
ax.grid(True, alpha=0.3)

# 오차 분석
ax = axes[1, 2]
error = ref_soh_resized - pred_soh_resized
ax.plot(error)
ax.axhline(y=0, color='k', linestyle='-', alpha=0.5)
ax.set_xlabel('Cycles')
ax.set_ylabel('Error (%)')
ax.set_title('Prediction Error')
ax.grid(True, alpha=0.3)

# 용량 매트릭스 시각화
ax = axes[1, 3]
ax.imshow(sample_capacity[0, :, :, 0], cmap='hot')
ax.set_title('Capacity Matrix (Condition)')
ax.axis('off')

plt.suptitle('DiffBatt Model: 전체 예측 과정 시각화', fontsize=16)
plt.tight_layout()
plt.show()

print("\n전체 예측 과정 완료!")

## 2. 다른 데이터셋 테스트하기

In [None]:
# 사용 가능한 데이터셋들
available_datasets = ['mix', 'clo', 'cruh', 'crush', 'matr_1', 'matr_2', 'snl']

def test_other_dataset(dataset_name):
    """다른 데이터셋으로 테스트"""
    print(f"\nTesting {dataset_name} dataset...")
    
    # 모델 로드
    network = keras.models.load_model(f'./trained_models/{dataset_name}_network')
    ema_network = keras.models.load_model(f'./trained_models/{dataset_name}_ema_network')
    
    # 모델 생성
    model = DiffusionModel(
        network=network,
        ema_network=ema_network,
        gdf_util=gdf_util,
        timesteps=total_timesteps,
        p_uncond=p_uncond,
    )
    
    # 데이터셋 로드
    test_ds = tfds.load(f'./data/{dataset_name}_test_ds')
    
    # 예측 및 평가
    post_process = PostProcess(test_ds, model)
    refs, preds = post_process.pred(reps=1)
    soh_rmse = post_process.eval_soh(refs, preds)
    rul_rmse = post_process.eval_rul(refs, preds)
    
    post_process.plot_sample(refs, preds)
    print(f'{dataset_name.upper()} - RUL RMSE: {rul_rmse:.4f}, SOH RMSE: {soh_rmse:.4f}')
    
    return rul_rmse, soh_rmse

print("Available datasets:", available_datasets)
print("\nExample usage:")
print("test_other_dataset('clo')")
print("test_other_dataset('snl')")

## 3. CLO 데이터셋 테스트

In [None]:
# CLO 데이터셋 테스트
test_other_dataset('clo')

## 4. SNL 데이터셋 테스트

In [None]:
# SNL 데이터셋 테스트
test_other_dataset('snl')

## 5. MATR_1 데이터셋 테스트

In [None]:
# MATR_1 데이터셋 테스트
test_other_dataset('matr_1')

## 6. 모든 데이터셋 성능 비교

In [None]:
# 모든 데이터셋 성능 비교
results = {}

for dataset in available_datasets:
    try:
        print(f"\n{'='*50}")
        print(f"Testing {dataset.upper()} Dataset")
        print(f"{'='*50}")
        
        rul_rmse, soh_rmse = test_other_dataset(dataset)
        results[dataset] = {'RUL_RMSE': rul_rmse, 'SOH_RMSE': soh_rmse}
        
    except Exception as e:
        print(f"Error testing {dataset}: {e}")
        results[dataset] = {'RUL_RMSE': None, 'SOH_RMSE': None}

# 결과 요약
print("\n" + "="*70)
print("SUMMARY OF ALL DATASETS PERFORMANCE")
print("="*70)
print(f"{'Dataset':<12} {'RUL RMSE':<15} {'SOH RMSE':<15}")
print("-"*70)

for dataset, metrics in results.items():
    rul = f"{metrics['RUL_RMSE']:.4f}" if metrics['RUL_RMSE'] is not None else "Error"
    soh = f"{metrics['SOH_RMSE']:.4f}" if metrics['SOH_RMSE'] is not None else "Error"
    print(f"{dataset.upper():<12} {rul:<15} {soh:<15}")

print("="*70)