# VAE와 Diffusion을 결합한 이미지 생성 모델

**프로젝트 목표:**
- VAE(Variational Autoencoder)를 사용하여 고차원의 이미지를 저차원의 잠재 공간(Latent Space)으로 압축하고, Diffusion 모델을 사용하여 이 잠재 공간에서 노이즈를 점진적으로 제거하며 새로운 이미지를 생성합니다.
- L1 손실과 지각 손실(Perceptual Loss)을 함께 사용하여 생성된 이미지의 품질을 높입니다.

**수행 과정:**
1.  **설정:** 이미지 크기, 배치 사이즈, 잠재 공간 차원 등 주요 하이퍼파라미터를 설정합니다.
2.  **데이터 준비:** 이미지 데이터셋을 불러오고 모델에 입력할 수 있도록 전처리합니다.
3.  **모델 정의:** Encoder, Decoder, Denoiser, VAE, Diffusion 모델의 각 구성 요소를 정의합니다.
4.  **손실 함수 및 최적화:** 모델 학습에 사용될 손실 함수와 Optimizer를 정의합니다.
5.  **학습 루프:** 정의된 모델과 손실 함수를 사용하여 Epoch별로 학습을 진행하고, 중간 결과를 시각화하여 저장합니다.
6.  **최종 이미지 생성:** 학습이 완료된 모델을 사용하여 최종 결과 이미지를 생성합니다.

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import os
import glob
from tqdm.auto import tqdm

# 경고 메시지 무시 (선택 사항)
import warnings
warnings.filterwarnings('ignore')

## 1. 하이퍼파라미터 및 전역 설정
모델 학습과 데이터 처리에 필요한 주요 변수들을 한곳에서 관리합니다.

In [None]:
# --- 데이터 관련 설정 ---
IMG_SIZE = 64
BATCH_SIZE = 64
DATASET_PATH = "./img_align_celeba/img_align_celeba/*.jpg"

# --- 모델 아키텍처 설정 ---
LATENT_DIM = 512 # VAE의 잠재 공간 차원
ENCODER_CHANNELS = [64, 128, 256, 512]
DECODER_CHANNELS = [256, 128, 64, 32]
DENOISER_CHANNELS = [512, 256, 128, 64] # Denoiser의 채널 구성

# --- 학습 관련 설정 ---
EPOCHS = 30
LEARNING_RATE = 2e-4
L1_LOSS_WEIGHT = 1.0       # L1 손실 가중치
PERCEPTUAL_LOSS_WEIGHT = 0.1 # 지각 손실 가중치

# --- 결과 저장 설정 ---
SAVE_DIR = "generated_images_vae_diffusion_v13"
if not os.path.exists(SAVE_DIR):
    os.makedirs(SAVE_DIR)

## 2. 데이터셋 준비
지정된 경로에서 이미지 파일들을 불러와 `tf.data.Dataset`으로 만듭니다. 이미지를 리사이즈하고, 값을 [-1, 1] 범위로 정규화하며, 배치 단위로 묶어 학습에 사용할 수 있도록 준비합니다.

In [None]:
# 데이터셋 경로에서 파일 목록 가져오기
image_paths = glob.glob(DATASET_PATH)
print(f"총 {len(image_paths)}개의 이미지 발견.")

# tf.data.Dataset 파이프라인 구성
def preprocess_image(path):
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
    # 픽셀 값을 [0, 255]에서 [-1, 1] 범위로 정규화
    image = (tf.cast(image, tf.float32) - 127.5) / 127.5
    return image

train_dataset = (
    tf.data.Dataset.from_tensor_slices(image_paths)
    .map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
    .cache()
    .shuffle(buffer_size=1000)
    .batch(BATCH_SIZE)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)
print("TensorFlow 데이터셋 준비 완료.")

## 3. 모델 아키텍처 정의
VAE와 Diffusion 모델의 핵심 구성 요소인 Encoder, Decoder, Denoiser를 각각 정의합니다.

- **Encoder**: 이미지를 입력받아 잠재 벡터(평균, 로그 분산)로 압축합니다.
- **Decoder**: 잠재 벡터를 입력받아 다시 이미지로 복원합니다.
- **Denoiser**: 노이즈가 섞인 잠재 벡터와 노이즈 레벨(시간)을 입력받아 노이즈를 예측합니다.

In [None]:
# 인코더 모델 정의
def build_encoder(latent_dim, channels):
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    x = inputs
    for ch in channels:
        x = layers.Conv2D(ch, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU(alpha=0.2)(x)
    x = layers.Flatten()(x)
    mean = layers.Dense(latent_dim, name="mean")(x)
    log_var = layers.Dense(latent_dim, name="log_var")(x)
    return keras.Model(inputs, [mean, log_var], name="encoder")

# 디코더 모델 정의
def build_decoder(latent_dim, channels):
    inputs = layers.Input(shape=(latent_dim,))
    # Conv2DTranspose의 입력에 맞게 차원 재구성
    x = layers.Dense(4 * 4 * channels[0])(inputs)
    x = layers.Reshape((4, 4, channels[0]))(x)
    for i, ch in enumerate(channels):
        x = layers.Conv2DTranspose(ch, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU(alpha=0.2)(x)
    # 마지막 레이어에서 채널 수를 3(RGB)으로 맞추고, tanh 활성화 함수로 픽셀 값 범위를 [-1, 1]로 조정
    outputs = layers.Conv2D(3, kernel_size=5, padding='same', activation='tanh')(x)
    return keras.Model(inputs, outputs, name="decoder")

# 디노이저(U-Net 기반) 모델 정의
def build_denoiser(latent_dim, channels):
    latent_input = layers.Input(shape=(latent_dim,))
    time_input = layers.Input(shape=(1,))

    # 시간 정보를 임베딩
    time_embedding = layers.Dense(latent_dim, activation='swish')(time_input)
    
    # 입력 잠재 벡터와 시간 임베딩 결합
    x = layers.Add()([latent_input, time_embedding])
    x = layers.Dense(latent_dim, activation='swish')(x)
    
    for ch in channels:
        x_res = x
        x = layers.Dense(ch, activation='swish')(x)
        x = layers.BatchNormalization()(x)
        # Residual Connection을 위해 차원을 맞춤
        if x_res.shape[-1] != ch:
            x_res = layers.Dense(ch)(x_res)
        x = layers.Add()([x, x_res])

    output = layers.Dense(latent_dim)(x)
    return keras.Model([latent_input, time_input], output, name="denoiser")

# 모델 빌드
encoder = build_encoder(LATENT_DIM, ENCODER_CHANNELS)
decoder = build_decoder(LATENT_DIM, DECODER_CHANNELS)
denoiser = build_denoiser(LATENT_DIM, DENOISER_CHANNELS)

encoder.summary()
decoder.summary()
denoiser.summary()

## 4. VAE 및 Diffusion 모델 클래스 정의
앞서 정의한 아키텍처들을 결합하여 VAE와 Diffusion 모델 전체를 클래스로 정의합니다.

- **VAE**: Encoder와 Decoder를 포함하며, Reparameterization Trick을 구현합니다.
- **DiffusionModel**: Denoiser를 사용하여 노이즈를 예측하고, 학습 시 노이즈를 추가하는 로직(Forward Process)을 포함합니다.

In [None]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
    
    # Reparameterization Trick: z = mean + exp(0.5 * log_var) * epsilon
    def reparameterize(self, mean, log_var):
        epsilon = tf.random.normal(shape=tf.shape(mean))
        return mean + tf.exp(0.5 * log_var) * epsilon

    def call(self, inputs):
        mean, log_var = self.encoder(inputs)
        z = self.reparameterize(mean, log_var)
        reconstructed = self.decoder(z)
        return reconstructed, mean, log_var

class DiffusionModel(keras.Model):
    def __init__(self, denoiser, **kwargs):
        super().__init__(**kwargs)
        self.denoiser = denoiser

    def call(self, inputs):
        noisy_latents, time = inputs
        # Denoiser는 추가된 노이즈를 예측
        predicted_noise = self.denoiser([noisy_latents, time])
        return predicted_noise

# 모델 인스턴스 생성
vae = VAE(encoder, decoder)
diffusion_model = DiffusionModel(denoiser)

## 5. 손실 함수 및 Optimizer 정의
모델 학습에 필요한 손실 함수들과 Optimizer를 설정합니다.

- **VAE Loss**: 재구성 손실(Reconstruction Loss)과 KL 발산(KL Divergence)으로 구성됩니다.
- **Perceptual Loss**: 이미지의 고차원 특징(Feature) 공간에서의 유사도를 측정하는 손실입니다. 시각적으로 더 자연스러운 이미지를 생성하는 데 도움을 줍니다. 이를 위해 미리 학습된 VGG19 모델을 사용합니다.
- **Optimizer**: Adam Optimizer를 사용합니다.

In [None]:
# VGG19 모델을 이용한 지각 손실(Perceptual Loss) 계산
vgg = tf.keras.applications.VGG19(include_top=False, weights='imagenet', input_shape=(IMG_SIZE, IMG_SIZE, 3))
vgg.trainable = False
perceptual_loss_model = tf.keras.Model(vgg.input, vgg.get_layer("block3_conv4").output)

def get_perceptual_loss(y_true, y_pred):
    y_true = (y_true + 1) * 127.5 # [-1, 1] -> [0, 255]
    y_pred = (y_pred + 1) * 127.5 # [-1, 1] -> [0, 255]
    y_true = tf.keras.applications.vgg19.preprocess_input(y_true)
    y_pred = tf.keras.applications.vgg19.preprocess_input(y_pred)
    true_features = perceptual_loss_model(y_true)
    pred_features = perceptual_loss_model(y_pred)
    return tf.reduce_mean(tf.square(true_features - pred_features))

# VAE 손실 함수 (재구성 손실 + KL 발산)
def vae_loss_fn(original_images, reconstructed_images, mean, log_var):
    l1_loss = tf.reduce_mean(tf.abs(original_images - reconstructed_images))
    perceptual_loss = get_perceptual_loss(original_images, reconstructed_images)
    reconstruction_loss = L1_LOSS_WEIGHT * l1_loss + PERCEPTUAL_LOSS_WEIGHT * perceptual_loss
    
    kl_loss = -0.5 * tf.reduce_sum(1 + log_var - tf.square(mean) - tf.exp(log_var), axis=-1)
    kl_loss = tf.reduce_mean(kl_loss)
    
    return reconstruction_loss + kl_loss

# Diffusion 모델 손실 함수 (Mean Squared Error)
mse_loss_fn = keras.losses.MeanSquaredError()

# Optimizer 정의
optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)

## 6. 학습 스텝(`train_step`) 정의
한 번의 배치(Batch)에 대한 학습 과정을 함수로 정의합니다. TensorFlow의 `@tf.function` 데코레이터를 사용하여 그래프 모드로 컴파일함으로써 학습 속도를 최적화합니다.

이 함수 내에서 VAE와 Diffusion 모델의 손실을 각각 계산하고, Gradient를 업데이트합니다.

In [None]:
@tf.function
def train_step(images):
    with tf.GradientTape() as tape:
        # --- VAE 학습 ---
        reconstructed, mean, log_var = vae(images)
        vae_total_loss = vae_loss_fn(images, reconstructed, mean, log_var)

        # --- Diffusion 모델 학습 ---
        # 1. VAE 인코더로 실제 이미지를 잠재 벡터로 변환
        true_latents, _ = vae.encoder(images)
        
        # 2. 노이즈 및 시간(t) 샘플링
        noise = tf.random.normal(shape=tf.shape(true_latents))
        # Epoch 기반으로 노이즈 레벨을 점진적으로 증가 (학습 안정화)
        t = tf.random.uniform(shape=(BATCH_SIZE,), minval=0.0, maxval=current_epoch_norm, dtype=tf.float32)
        t_reshaped = tf.reshape(t, (-1, 1))

        # 3. 잠재 벡터에 노이즈 추가 (Forward Process)
        noisy_latents = true_latents * (1.0 - t_reshaped) + noise * t_reshaped
        
        # 4. Denoiser가 노이즈 예측
        predicted_noise = diffusion_model([noisy_latents, t])
        diffusion_loss = mse_loss_fn(noise, predicted_noise)

        # --- 최종 손실 ---
        total_loss = vae_total_loss + diffusion_loss

    # Gradient 계산 및 적용
    trainable_vars = vae.trainable_variables + diffusion_model.trainable_variables
    grads = tape.gradient(total_loss, trainable_vars)
    optimizer.apply_gradients(zip(grads, trainable_vars))
    
    return total_loss, vae_total_loss, diffusion_loss

## 7. 학습 루프 실행
전체 데이터셋에 대해 `EPOCHS`만큼 반복하여 학습을 진행합니다. 매 Epoch마다 `train_step` 함수를 호출하여 모델을 업데이트하고, 주기적으로 생성된 이미지를 저장하여 학습 과정을 모니터링합니다.

In [None]:
# 이미지 생성 및 저장을 위한 헬퍼 함수
def save_plot(images, epoch, save_dir):
    plt.figure(figsize=(10, 10))
    for i in range(16):
        plt.subplot(4, 4, i + 1)
        plt.imshow((images[i] + 1) / 2) # [-1, 1] -> [0, 1]로 변환하여 출력
        plt.axis("off")
    plt.savefig(os.path.join(save_dir, f"epoch_{epoch:03d}.png"))
    plt.close()

print("--- 학습 시작 ---")
for epoch in range(EPOCHS):
    # 현재 epoch을 0~1 사이 값으로 정규화하여 노이즈 스케줄에 사용
    current_epoch_norm = float(epoch) / float(EPOCHS) 
    
    progress_bar = tqdm(train_dataset, desc=f"Epoch {epoch+1}/{EPOCHS}")
    total_loss_epoch, vae_loss_epoch, diff_loss_epoch = 0, 0, 0
    
    for step, images in enumerate(progress_bar):
        total_loss, vae_loss, diff_loss = train_step(images)
        total_loss_epoch += total_loss.numpy()
        vae_loss_epoch += vae_loss.numpy()
        diff_loss_epoch += diff_loss.numpy()
        
        progress_bar.set_postfix({
            "Total Loss": f"{total_loss_epoch/(step+1):.4f}",
            "VAE Loss": f"{vae_loss_epoch/(step+1):.4f}",
            "Diffusion Loss": f"{diff_loss_epoch/(step+1):.4f}"
        })

    # Epoch마다 생성된 이미지 샘플 저장
    # Diffusion 과정 시뮬레이션: 랜덤 노이즈에서 시작하여 점진적으로 노이즈 제거
    z_noise = tf.random.normal(shape=(16, LATENT_DIM))
    current_latents = z_noise
    
    # 20 스텝에 걸쳐 노이즈 제거 (Reverse Process)
    for t_step in np.linspace(current_epoch_norm, 0, 20):
        t = tf.constant([t_step] * 16, dtype=tf.float32)
        predicted_noise = diffusion_model([current_latents, t])
        # 현재 잠재 벡터에서 예측된 노이즈를 빼서 잠재 벡터를 정제
        current_latents -= (current_epoch_norm / 20) * predicted_noise
        
    generated_images = vae.decoder(current_latents)
    save_plot(generated_images, epoch + 1, SAVE_DIR)

    # 5 Epoch마다 모델 가중치 저장
    if (epoch + 1) % 5 == 0:
        vae.save_weights(os.path.join(SAVE_DIR, f"vae_epoch_{epoch+1}.h5"))
        diffusion_model.save_weights(os.path.join(SAVE_DIR, f"diffusion_epoch_{epoch+1}.h5"))

print("--- 학습 완료 ---")

## 8. 최종 결과 생성
학습이 완료된 모델을 사용하여 최종 이미지 그리드를 생성하고 파일로 저장합니다

In [None]:
print("--- 최종 이미지 생성 ---")
final_z_noise = tf.random.normal(shape=(64, LATENT_DIM))
current_latents = final_z_noise
final_epoch_norm = float(EPOCHS-1) / float(EPOCHS)

# 100 스텝에 걸쳐 노이즈 제거하여 고품질 이미지 생성
for t_step in tqdm(np.linspace(final_epoch_norm, 0, 100), desc="Generating Final Images"):
    t = tf.constant([t_step] * 64, dtype=tf.float32)
    predicted_noise = diffusion_model([current_latents, t])
    current_latents -= (final_epoch_norm / 100) * predicted_noise

final_images = vae.decoder(current_latents)

# 8x8 그리드로 이미지 저장
plt.figure(figsize=(16, 16))
for i in range(64):
    plt.subplot(8, 8, i + 1)
    plt.imshow((final_images[i] + 1) / 2)
    plt.axis("off")
plt.tight_layout()
plt.savefig(os.path.join(SAVE_DIR, "final_generated_grid.png"))
plt.show()

print(f"최종 이미지가 '{os.path.join(SAVE_DIR, 'final_generated_grid.png')}'에 저장되었습니다.")