U-Net + Gaussian Diffusion + Trainer

# Step-by-Step Plan
1. 레포 분석 요약: 핵심 클래스(UNet, GaussianDiffusion, Trainer), 사용 예시, 1D 변형 및 multi-GPU 지원 포인트 정리
2. 실습 환경 준비: pip 설치, GPU 확인, 시드 고정
3. 설정 구성: 데이터셋, 이미지 크기, 타임스텝 수, 베타 스케줄(코사인/선형), 학습 스텝/배치/러닝레이트, 주기별 샘플/체크포인트
4. 모델 구현: 시간 임베딩(sinusoidal + MLP) + ResBlock + Self-Attention 포함 UNet
5. Diffusion 구현: q_sample, p_sample(DDPM), 손실(ε-예측/ν-예측), 샘플러
6. 데이터 파이프라인: CIFAR-10(기본) 또는 커스텀 폴더 이미지, [-1, 1] 정규화
7. 콜백과 로거: EMA, Checkpoint, SampleCallback, 실시간 표 갱신(Logger)
8. 학습 루프: 진행바, 손실/시간/EMA 업데이트, 주기적 샘플/체크포인트 저장 및 시각화
9. 추론/샘플링: 학습된(또는 EMA) 모델로 샘플 생성 및 표시
10. 활용 팁: 더 큰 모델/해상도/스텝으로 확장하는 방법 안내

In [1]:
# [Cell 1] 설치 & 환경 점검 (최신 버전 설치, AMP 사용 안 함)

# 최신 버전 설치 (Colab은 torch 사전 설치되어 있을 수 있습니다)
# 필요 시 주석 해제하여 실행
# %pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# %pip install -U torchvision einops rich matplotlib tqdm pandas tabulate

import os, sys, math, time, random
from dataclasses import dataclass
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms, datasets, utils as vutils
from einops import rearrange
from tqdm.auto import tqdm
from IPython.display import display, clear_output
import pandas as pd
from tabulate import tabulate

print("PyTorch:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

PyTorch: 2.8.0+cu126
CUDA available: True


device(type='cuda')

실습에 필요한 라이브러리를 설치/불러오고, CUDA 사용 가능 여부를 확인합니다.

einops: 텐서 차원 변환을 더 가독성 높게 처리

rich, tqdm: 출력 미화 및 진행바

pandas, tabulate: 로그 표를 깔끔하게 렌더링

In [2]:
# [Cell 2] 설정(Config) 정의

@dataclass
class Config:
    # 데이터/학습
    dataset: str = 'CIFAR10'             # 'CIFAR10' 또는 'FOLDER'
    data_root: str = './data'            # CIFAR10 자동 다운로드 위치 또는 폴더 이미지 경로
    image_size: int = 32
    channels: int = 3
    num_classes: int = 0                 # class-conditional 미사용 (본 실습은 unconditional)
    timesteps: int = 1000                # 확산 단계 수(학습/샘플)
    beta_schedule: str = 'cosine'        # 'linear' 또는 'cosine'
    objective: str = 'pred_eps'          # 'pred_eps' 또는 'pred_v'
    train_steps: int = 5000              # 데모용 소스텝. 실제는 수십~수백 K
    batch_size: int = 128
    lr: float = 2e-4
    weight_decay: float = 0.0
    grad_clip: float = 1.0
    ema_decay: float = 0.995
    num_workers: int = 2
    # 로깅/체크포인트/샘플
    log_every: int = 50
    sample_every: int = 500
    sample_batch_size: int = 16
    ckpt_every: int = 1000
    out_dir: str = './results'
    ckpt_dir: str = './checkpoints'
    # 시드
    seed: int = 42

cfg = Config()
os.makedirs(cfg.out_dir, exist_ok=True)
os.makedirs(os.path.join(cfg.out_dir, 'samples'), exist_ok=True)
os.makedirs(cfg.ckpt_dir, exist_ok=True)

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(cfg.seed)
cfg

Config(dataset='CIFAR10', data_root='./data', image_size=32, channels=3, num_classes=0, timesteps=1000, beta_schedule='cosine', objective='pred_eps', train_steps=5000, batch_size=128, lr=0.0002, weight_decay=0.0, grad_clip=1.0, ema_decay=0.995, num_workers=2, log_every=50, sample_every=500, sample_batch_size=16, ckpt_every=1000, out_dir='./results', ckpt_dir='./checkpoints', seed=42)

dataset: 'CIFAR10' 또는 'FOLDER' 선택

image_size: 기본 32(CIFAR-10에 맞춤). 해상도를 올리면 모델/메모리 비용 증가

timesteps: 확산 단계 수(학습/샘플 공통); 1000이 DDPM의 기본

beta_schedule: 'linear' 또는 'cosine'(논문에서 개선된 코사인 스케줄 추천)

objective: 'pred_eps'(ε 예측, DDPM 기본) 또는 'pred_v'(v-pred, 학습 안정성 개선 사례 있음)

train_steps, batch_size, lr, grad_clip, ema_decay 등 학습 하이퍼파라미터

log/sample/ckpt 주기와 출력 디렉토리(out_dir, ckpt_dir)

seed: 재현성 확보

set_seed: torch/np/random 모두 고정

In [3]:
# [Cell 3] 유틸리티: 시간 임베딩, 모듈들(ResBlock/Attention), 헬퍼 함수

def exists(x):
    return x is not None

def default(val, d):
    return val if exists(val) else d

# Sinusoidal time embedding
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
    def forward(self, t):
        # t: (B,)
        device = t.device
        half_dim = self.dim // 2
        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = t[:, None] * emb[None, :]
        emb = torch.cat([emb.sin(), emb.cos()], dim=-1)
        return emb

class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn
    def forward(self, x, *args, **kwargs):
        return self.fn(x, *args, **kwargs) + x

class PreNorm(nn.Module):
    def __init__(self, dim, fn, groups=8):
        super().__init__()
        self.norm = nn.GroupNorm(groups, dim)
        self.fn = fn
    def forward(self, x):
        return self.fn(self.norm(x))

class ConvNextBlock(nn.Module):
    # 간단한 ConvNeXt 스타일 블록(선택사항). 여기서는 ResConv 위주로 사용 가능.
    def __init__(self, dim, mult=2):
        super().__init__()
        hidden_dim = int(dim * mult)
        self.ds_conv = nn.Conv2d(dim, dim, 7, padding=3, groups=dim)
        self.net = nn.Sequential(
            nn.GroupNorm(1, dim),
            nn.Conv2d(dim, hidden_dim, 3, padding=1),
            nn.GELU(),
            nn.GroupNorm(1, hidden_dim),
            nn.Conv2d(hidden_dim, dim, 3, padding=1)
        )
    def forward(self, x):
        x = self.ds_conv(x) + x
        return self.net(x) + x

class ResnetBlock(nn.Module):
    def __init__(self, dim, dim_out, time_emb_dim=None, groups=8, dropout=0.0):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.SiLU(),
            nn.Linear(time_emb_dim, dim_out)
        ) if exists(time_emb_dim) else None

        self.block1 = nn.Sequential(
            nn.GroupNorm(groups, dim),
            nn.SiLU(),
            nn.Conv2d(dim, dim_out, 3, padding=1)
        )
        self.block2 = nn.Sequential(
            nn.GroupNorm(groups, dim_out),
            nn.SiLU(),
            nn.Dropout(dropout),
            nn.Conv2d(dim_out, dim_out, 3, padding=1)
        )
        self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def forward(self, x, t=None):
        h = self.block1(x)
        if exists(self.mlp) and exists(t):
            h = h + self.mlp(t)[:, :, None, None]
        h = self.block2(h)
        return h + self.res_conv(x)

class LinearAttention(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.heads = heads
        self.dim_head = dim_head
        hidden_dim = dim_head * heads
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)
        self.to_out = nn.Sequential(
            nn.Conv2d(hidden_dim, dim, 1),
            nn.GroupNorm(8, dim)
        )

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x).chunk(3, dim=1)
        q, k, v = map(lambda t: rearrange(t, 'b (h d) x y -> b h (x y) d', h=self.heads), qkv)
        q = q.softmax(dim=-1)
        k = k.softmax(dim=-2)
        context = torch.einsum('b h n d, b h n e -> b h d e', k, v)
        out = torch.einsum('b h n d, b h d e -> b h n e', q, context)
        out = rearrange(out, 'b h (x y) d -> b (h d) x y', x=h, y=w)
        return self.to_out(out)

class AttentionBlock(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.attn = Residual(PreNorm(dim, LinearAttention(dim, heads=heads, dim_head=dim_head)))
    def forward(self, x):
        return self.attn(x)

def make_beta_schedule(timesteps: int, schedule: str = 'linear'):
    if schedule == 'linear':
        beta_start, beta_end = 1e-4, 2e-2
        return torch.linspace(beta_start, beta_end, timesteps)
    elif schedule == 'cosine':
        # Nichol & Dhariwal cosine schedule
        steps = timesteps + 1
        s = 0.008
        x = torch.linspace(0, timesteps, steps)
        alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * math.pi * 0.5) ** 2
        alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
        betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
        return torch.clip(betas, 1e-8, 0.999)
    else:
        raise ValueError("Unknown beta_schedule")

- SinusoidalPosEmb :

목적: 시간 스텝 t(정수)를 연속적 표현으로 임베딩

방식: Transformer에서 쓰이는 사인/코사인 주파수 임베딩

UNet의 각 ResBlock에 시간 정보를 주입하여 “시점별(노이즈 양별)” 처리를 가능하게 합니다.

- Residual, PreNorm:

잔차 연결과 GroupNorm 기반 정규화 래퍼

GroupNorm은 배치 크기에 덜 민감(Colab/작은 배치에서도 안정)

- ResnetBlock:

구조: GN -> SiLU -> Conv -> (시간 임베딩 MLP 추가) -> GN -> SiLU -> Dropout -> Conv + SkipConv

시간 임베딩을 채널 차원으로 투영해 특징맵에 더해줌으로써 “현재 t에서 무엇을 해야 하는지”를 학습

- LinearAttention/AttentionBlock:

이미지 해상도 일부에서 어텐션을 사용해 전역 정보(장거리 의존성)를 반영

메모리/속도를 고려해 선형 어텐션(Simple) 채택

- make_beta_schedule:

linear: DDPM 원형 스케줄

cosine: Nichol & Dhariwal의 개선 스케줄(학습 안정/샘플 품질에 유리한 경험적 보고)

결과: betas($β_t$), 이후 alphas=1-β, 누적곱 등 파생량 계산에 사용


In [5]:
# [Cell 4] UNet 구현 (수정본)

class UNet(nn.Module):
    def __init__(
        self,
        dim=64,
        dim_mults=(1, 2, 4, 8),
        channels=3,
        self_attn_res=(16, 8),  # 특정 해상도에서 attention 수행
        time_emb_dim=256,
        dropout=0.0
    ):
        super().__init__()
        self.channels = channels
        self.init_conv = nn.Conv2d(channels, dim, 7, padding=3)

        dims = [dim, *map(lambda m: dim * m, dim_mults)]
        in_out = list(zip(dims[:-1], dims[1:]))

        # 시간 임베딩
        self.time_mlp = nn.Sequential(
            SinusoidalPosEmb(time_emb_dim),
            nn.Linear(time_emb_dim, time_emb_dim*4),
            nn.SiLU(),
            nn.Linear(time_emb_dim*4, time_emb_dim)
        )

        # Down
        self.downs = nn.ModuleList([])
        curr_res = cfg.image_size
        for i, (dim_in, dim_out) in enumerate(in_out):
            self.downs.append(nn.ModuleList([
                ResnetBlock(dim_in, dim_out, time_emb_dim=time_emb_dim, dropout=dropout),
                ResnetBlock(dim_out, dim_out, time_emb_dim=time_emb_dim, dropout=dropout),
                AttentionBlock(dim_out) if curr_res in self_attn_res else nn.Identity(),
                nn.Conv2d(dim_out, dim_out, 3, 2, 1) if i < len(in_out) - 1 else nn.Identity()
            ]))
            if i < len(in_out) - 1:
                curr_res = curr_res // 2

        # Middle
        mid_dim = dims[-1]
        self.mid_block1 = ResnetBlock(mid_dim, mid_dim, time_emb_dim=time_emb_dim, dropout=dropout)
        self.mid_attn = AttentionBlock(mid_dim)
        self.mid_block2 = ResnetBlock(mid_dim, mid_dim, time_emb_dim=time_emb_dim, dropout=dropout)

        # Up
        self.ups = nn.ModuleList([])
        for i, (dim_in, dim_out) in enumerate(reversed(in_out)):
            # 수정: 스킵(=dim_out 채널)과 현재 텐서(=dim_out 채널)를 concat 하므로 입력채널 = dim_out + dim_out
            self.ups.append(nn.ModuleList([
                ResnetBlock(dim_out + dim_out, dim_out, time_emb_dim=time_emb_dim, dropout=dropout),
                ResnetBlock(dim_out, dim_out, time_emb_dim=time_emb_dim, dropout=dropout),
                AttentionBlock(dim_out) if curr_res in self_attn_res else nn.Identity(),
                nn.ConvTranspose2d(dim_out, dim_in, 4, 2, 1) if i < len(in_out) - 1 else nn.Identity()
            ]))
            if i < len(in_out) - 1:
                curr_res = curr_res * 2

        self.final_block = nn.Sequential(
            nn.GroupNorm(8, dim),
            nn.SiLU(),
            nn.Conv2d(dim, channels, 3, padding=1)
        )

    def forward(self, x, t):
        t_emb = self.time_mlp(t)

        x = self.init_conv(x)
        hs = [x]

        # Down path
        for (res1, res2, attn, down) in self.downs:
            x = res1(x, t_emb)
            x = res2(x, t_emb)
            x = attn(x)
            hs.append(x)
            x = down(x)

        # Middle
        x = self.mid_block1(x, t_emb)
        x = self.mid_attn(x)
        x = self.mid_block2(x, t_emb)

        # Up path
        for (res1, res2, attn, up) in self.ups:
            x = torch.cat((x, hs.pop()), dim=1)
            x = res1(x, t_emb)
            x = res2(x, t_emb)
            x = attn(x)
            x = up(x)

        # Final
        return self.final_block(x)

역할: 확산 모델의 핵심 네트워크. 다운샘플-업샘플 경로 + 스킵 연결 + 선택적 어텐션.

init_conv: 입력 이미지를 초기 채널(dim)로 투영(파라미터 공유 없는 “stem” 성격)

dims/in_out: 각 스테이지 채널 폭을 계산, (in, out) 쌍으로 다운/업 경로를 정의

time_mlp: Sinusoidal 임베딩 -> MLP -> time_emb_dim으로 투영(모든 ResBlock에서 사용)

downs: 각 스테이지마다
ResBlock 두 개(풍부한 표현력)

AttentionBlock(선택): self_attn_res에 지정된 해상도(예: 16, 8)에서만 전역 상호작용
Conv(stride=2)에 의한 다운샘플(해상도 절반)

mid: 가장 낮은 해상도에서 ResBlock-Attn-ResBlock

ups: 각 스테이지마다
“중요(버그 수정 포인트)”: 업샘플 경로의 첫 ResBlock 입력 채널 수
업경로에서 x = torch.cat((x, hs.pop()), dim=1)로 현재 텐서와 스킵 텐서를 concat
이 둘의 채널 수가 동일(dim_out)인 시점에서 합치므로 입력 채널 = dim_out + dim_out 이어야 합니다.

원래 코드가 dim_out + dim_in로 되어 있으면 채널 불일치로 GroupNorm/Conv 가중치 모양 오류(RuntimeError) 발생

수정: ResnetBlock(dim_out + dim_out, dim_out, ...)

두 번째 ResBlock, 선택적 Attention, ConvTranspose2d로 업샘플(해상도 2배)
final_block: GroupNorm+SiLU 후 최종 Conv로 원래 채널 수(예: 3)로 매핑

forward:
다운 경로에서 각 단계 결과를 hs에 저장(스킵 연결용)
업 경로에서 저장해둔 특징과 concat하여 세밀한 정보를 복원

U자형 스킵 연결은 고해상도 세부 정보를 보존하면서 심층 특징을 결합하기 위함

시간 임베딩은 t마다 다른 잡음량/역과정 특성을 반영

선택적 어텐션은 멀리 떨어진 영역 간 의존성 캡처

In [6]:
# [Cell 5] Diffusion(DDPM) 구현: ε-pred / v-pred, 샘플러

class GaussianDiffusion(nn.Module):
    def __init__(self, model: nn.Module, image_size: int, timesteps: int = 1000, beta_schedule: str = 'cosine', objective: str = 'pred_eps'):
        super().__init__()
        self.model = model
        self.image_size = image_size
        self.channels = model.channels
        self.objective = objective
        self.num_timesteps = timesteps

        betas = make_beta_schedule(timesteps, beta_schedule)
        alphas = 1. - betas
        alphas_cumprod = torch.cumprod(alphas, dim=0)
        alphas_cumprod_prev = torch.cat([torch.tensor([1.], device=alphas.device), alphas_cumprod[:-1]], dim=0)

        self.register_buffer('betas', betas)
        self.register_buffer('alphas', alphas)
        self.register_buffer('alphas_cumprod', alphas_cumprod)
        self.register_buffer('alphas_cumprod_prev', alphas_cumprod_prev)
        self.register_buffer('sqrt_alphas_cumprod', torch.sqrt(alphas_cumprod))
        self.register_buffer('sqrt_one_minus_alphas_cumprod', torch.sqrt(1. - alphas_cumprod))
        self.register_buffer('sqrt_recip_alphas', torch.sqrt(1. / alphas))
        self.register_buffer('posterior_variance', betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod))

        # for v-pred conversion
        self.register_buffer('posterior_mean_coef1', betas * torch.sqrt(alphas_cumprod_prev) / (1. - alphas_cumprod))
        self.register_buffer('posterior_mean_coef2', (1. - alphas_cumprod_prev) * torch.sqrt(alphas) / (1. - alphas_cumprod))

    def q_sample(self, x_start, t, noise=None):
        if noise is None:
            noise = torch.randn_like(x_start)
        return self.sqrt_alphas_cumprod[t][:, None, None, None] * x_start + \
               self.sqrt_one_minus_alphas_cumprod[t][:, None, None, None] * noise

    def model_predictions(self, x, t):
        # t: (B,)
        # 모델 출력은 ε 또는 v. 필요 시 변환하여 ε/ x0 를 유도
        model_out = self.model(x, t)
        if self.objective == 'pred_eps':
            pred_eps = model_out
            x0 = (x - self.sqrt_one_minus_alphas_cumprod[t][:, None, None, None] * pred_eps) / \
                 self.sqrt_alphas_cumprod[t][:, None, None, None]
        elif self.objective == 'pred_v':
            v = model_out
            # from pred-v to x0, eps: epsilon = sqrt(a_t) * v + sqrt(1-a_t) * x0 -> derive x0
            sqrt_at = self.sqrt_alphas_cumprod[t][:, None, None, None]
            sqrt_omt = self.sqrt_one_minus_alphas_cumprod[t][:, None, None, None]
            x0 = (x - sqrt_omt * v) / sqrt_at
            pred_eps = (x - sqrt_at * x0) / sqrt_omt
        else:
            raise ValueError("objective must be 'pred_eps' or 'pred_v'")
        return pred_eps, x0.clamp(-1., 1.)

    def p_mean_variance(self, x, t):
        pred_eps, x0 = self.model_predictions(x, t)
        # posterior mean = coef1 * x0 + coef2 * x_t
        model_mean = self.posterior_mean_coef1[t][:, None, None, None] * x0 + \
                     self.posterior_mean_coef2[t][:, None, None, None] * x
        model_var = self.posterior_variance[t][:, None, None, None]
        return model_mean, model_var, pred_eps, x0

    @torch.no_grad()
    def p_sample(self, x, t):
        model_mean, model_var, _, _ = self.p_mean_variance(x, t)
        noise = torch.randn_like(x) if (t > 0).any() else torch.zeros_like(x)
        nonzero_mask = (t != 0).float()[:, None, None, None]  # t=0이면 노이즈 x
        return model_mean + nonzero_mask * torch.sqrt(model_var) * noise

    @torch.no_grad()
    def sample(self, batch_size=16):
        device = self.betas.device
        x = torch.randn((batch_size, self.channels, self.image_size, self.image_size), device=device)
        for t in tqdm(reversed(range(0, self.num_timesteps)), desc='sampling', total=self.num_timesteps):
            t_batch = torch.full((batch_size,), t, device=device, dtype=torch.long)
            x = self.p_sample(x, t_batch)
        return x.clamp(-1., 1.)

    def forward(self, x_start):
        # 학습 시 loss 반환
        b, c, h, w = x_start.shape
        device = x_start.device
        t = torch.randint(0, self.num_timesteps, (b,), device=device).long()
        noise = torch.randn_like(x_start)
        x_noisy = self.q_sample(x_start, t, noise)

        if self.objective == 'pred_eps':
            target = noise
        elif self.objective == 'pred_v':
            sqrt_at = self.sqrt_alphas_cumprod[t][:, None, None, None]
            sqrt_omt = self.sqrt_one_minus_alphas_cumprod[t][:, None, None, None]
            x0 = x_start
            target = sqrt_at * noise + sqrt_omt * x0  # v = sqrt(a)*eps + sqrt(1-a)*x0
        else:
            raise ValueError

        pred = self.model(x_noisy, t)
        loss = F.mse_loss(pred, target)
        return loss

역할: 전방/역방향 확산 과정 수식과 학습/샘플링 루틴을 캡슐화

register_buffer:
betas, alphas, alphas_cumprod(ᾱ_t), prev 등: 학습 중 고정되는 텐서로서 .to(device) 자동, state_dict에 저장

sqrt_alphas_cumprod, sqrt_one_minus_alphas_cumprod: $q(x_t|x_0) $ 계산에 필요

posterior_variance, posterior_mean_coef1/2: $p(x_{t-1}|x_t, x_0)$ 계산에 필요

q_sample(x0, t)

$x_t = \sqrt(ᾱ_t) x0 + \sqrt(1-ᾱ_t) ε$

학습 시 입력에 노이즈를 섞어 모델에게 “이것이 t에서 나온 $x_t$일 때, 목표를 예측하라”고 요구

model_predictions

objective='pred_eps': 모델 출력=ε̂, $x_0 = (x_t - \sqrt(1-ᾱ_t) ε̂)/\sqrt(ᾱ_t)$

objective='pred_v': 모델 출력=v̂, v 정의에 따라 $x_0/ε$ 변환(문헌에 근거한 변환식 적용)

x0.clamp(-1,1): 복원된 $x_0$ 가 정상 범위 벗어나지 않도록 안정화

p_mean_variance

후방분포(posterior)의 평균/분산을 계산하여 역과정 한 스텝 샘플링에 사용

p_sample

t>0이면 가우시안 샘플링으로 노이즈 더하고(t=0이면 더하지 않음), 한 스텝 역이동

sample

x_T ~ N(0, I)에서 시작해 t=T..1..0 순으로 반복적으로 p_sample 수행

forward(학습)

무작위 t 샘플링(전체 단계에 걸쳐 학습)

pred_eps(또는 v)와 타깃(noise 또는 v-타깃) 사이의 MSE 손실

DDPM의 표준 수식 구현. pred_v 옵션은 최근 연구에서 학습 안정/성능대안으로 자주 사용

In [7]:
# [Cell 6] 데이터셋/로더: CIFAR10 또는 폴더 이미지 ([-1, 1] 정규화)

def get_dataloader(cfg: Config):
    if cfg.dataset.upper() == 'CIFAR10':
        transform = transforms.Compose([
            transforms.Resize(cfg.image_size),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5]*cfg.channels, std=[0.5]*cfg.channels)
        ])
        train_set = datasets.CIFAR10(root=cfg.data_root, train=True, download=True, transform=transform)
    elif cfg.dataset.upper() == 'FOLDER':
        # cfg.data_root 경로 아래 이미지 로드
        transform = transforms.Compose([
            transforms.Resize(cfg.image_size),
            transforms.CenterCrop(cfg.image_size),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5]*cfg.channels, std=[0.5]*cfg.channels)
        ])
        train_set = datasets.ImageFolder(root=cfg.data_root, transform=transform)
    else:
        raise ValueError("Unknown dataset")

    loader = DataLoader(
        train_set,
        batch_size=cfg.batch_size,
        shuffle=True,
        num_workers=cfg.num_workers,
        pin_memory=True,
        drop_last=True
    )
    return loader

train_loader = get_dataloader(cfg)
len(train_loader), next(iter(train_loader))[0].shape

100%|██████████| 170M/170M [00:13<00:00, 12.6MB/s]


(390, torch.Size([128, 3, 32, 32]))

CIFAR-10 또는 임의의 폴더 이미지(FOLDER)

transforms

Resize/CenterCrop/RandomHorizontalFlip: 데이터 증강과 크기 정규화

ToTensor + Normalize(mean=0.5, std=0.5): [-1, 1] 범위로 스케일(모델 출력도 [-1,1]을 목표)

DataLoader

drop_last=True: 배치 크기 고정(일부 정규화, 배치연산 안정)

num_workers/pin_memory: 성능 향상

In [8]:
# [Cell 7] 콜백 및 로거(Logger) 구현: EMA, Checkpoint, 샘플, 실시간 표 갱신

class EMA:
    def __init__(self, model, beta=0.995):
        self.beta = beta
        self.model = model
        self.shadow = {}
        self.backup = {}

        for name, param in self.model.named_parameters():
            if param.requires_grad:
                self.shadow[name] = param.data.clone()

    @torch.no_grad()
    def update(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                assert name in self.shadow
                new_average = (1.0 - self.beta) * param.data + self.beta * self.shadow[name]
                self.shadow[name] = new_average.clone()

    def apply_shadow(self):
        self.backup = {}
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                self.backup[name] = param.data.clone()
                param.data = self.shadow[name]

    def restore(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                param.data = self.backup[name]
        self.backup = {}

class CheckpointCallback:
    def __init__(self, path_dir, every_steps=1000):
        self.path_dir = path_dir
        self.every_steps = every_steps
        os.makedirs(self.path_dir, exist_ok=True)
    def __call__(self, step, model, ema, optimizer, scheduler=None):
        if step % self.every_steps == 0 and step > 0:
            path = os.path.join(self.path_dir, f'model_step_{step}.pt')
            torch.save({
                'step': step,
                'model': model.state_dict(),
                'ema': ema.shadow if ema is not None else None,
                'optimizer': optimizer.state_dict(),
                'scheduler': scheduler.state_dict() if scheduler is not None else None
            }, path)
            print(f'[Checkpoint] Saved: {path}')

class SampleCallback:
    def __init__(self, diffusion, out_dir, every_steps=500, sample_bs=16):
        self.diffusion = diffusion
        self.out_dir = out_dir
        self.every_steps = every_steps
        self.sample_bs = sample_bs
        os.makedirs(os.path.join(out_dir, 'samples'), exist_ok=True)
    @torch.no_grad()
    def __call__(self, step, ema: EMA=None):
        if step % self.every_steps == 0 and step > 0:
            # EMA 가 있으면 EMA 가중치로 샘플
            if ema is not None:
                ema.apply_shadow()
            imgs = self.diffusion.sample(batch_size=self.sample_bs).cpu()
            if ema is not None:
                ema.restore()
            grid = vutils.make_grid((imgs + 1) * 0.5, nrow=int(math.sqrt(self.sample_bs)), padding=2)
            save_path = os.path.join(self.out_dir, 'samples', f'step_{step:06d}.png')
            vutils.save_image(grid, save_path)
            clear_output(wait=True)
            print(f'[Sample] Saved: {save_path}')
            display(grid.permute(1,2,0).numpy())

class LiveLogger:
    def __init__(self):
        self.history = []
    def log(self, step, loss, lr, dt, ema_decay):
        self.history.append({'step': step, 'loss': float(loss), 'lr': float(lr), 'dt(s)': round(dt, 3), 'ema': ema_decay})
        if len(self.history) > 2000:
            self.history = self.history[-2000:]
    def display_table(self, tail=20):
        df = pd.DataFrame(self.history[-tail:])
        print(tabulate(df, headers='keys', tablefmt='github', showindex=False))

EMA(Exponential Moving Average)

목적: 학습 파라미터의 지수 이동 평균을 추적하여 샘플링 시 더 안정적(일반적으로 EMA 가중치로 생성한 샘플 품질이 좋음)

update(): shadow[name] = β * shadow + (1-β) * param

apply_shadow()/restore(): 일시적으로 EMA 가중치로 모델 바꾸고 끝나면 복원

CheckpointCallback

주기적으로 모델/EMA/옵티마이저/스케줄러 상태 저장(.pt) , 장시간 학습/중단 복구에 필수

SampleCallback

주기마다 샘플 생성/저장 및 즉시 화면 표시,
EMA 가중치로 샘플링(적용/복원)하여 품질 향상

LiveLogger

최근 스텝 기록(step, loss, lr, dt, ema) 누적,
tabulate로 보기 좋게 표 렌더링

콜백 패턴으로 학습 루프를 간결하게 하고, 실시간 관찰과 주기적 결과물을 쉽게 유지

In [9]:
# [Cell 8] 모델/확산/옵티마이저/스케줄러/콜백 초기화

model = UNet(
    dim=64,
    dim_mults=(1,2,4,8),
    channels=cfg.channels,
    self_attn_res=(16, 8),  # 32->down16->down8 해상도에서 어텐션
    time_emb_dim=256,
    dropout=0.0
).to(device)

diffusion = GaussianDiffusion(
    model=model,
    image_size=cfg.image_size,
    timesteps=cfg.timesteps,
    beta_schedule=cfg.beta_schedule,
    objective=cfg.objective
).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=cfg.train_steps)

ema = EMA(model, beta=cfg.ema_decay)

ckpt_cb = CheckpointCallback(cfg.ckpt_dir, every_steps=cfg.ckpt_every)
sample_cb = SampleCallback(diffusion, cfg.out_dir, every_steps=cfg.sample_every, sample_bs=cfg.sample_batch_size)
logger = LiveLogger()

sum(p.numel() for p in model.parameters())/1e6

42.966531

파라미터 수: 42.97M

모델/확산/옵티마이저/스케줄러/EMA/콜백/로거

UNet 설정

dim=64, dim_mults=(1,2,4,8): CIFAR-10(32x32) 기준 경량 모델

self_attn_res=(16,8): 32→다운샘플 16→8에서 어텐션 수행(저해상도에서 전역 문맥 흡수)

Diffusion 설정

timesteps/beta_schedule/objective: Config 반영

AdamW + CosineAnnealingLR: 일반적으로 확산 학습에 잘 작동하는 조합

EMA: decay=0.995로 완만히 추적

파라미터 수 출력: 모델 크기 감(Colab에서 적절한지 확인)

In [10]:
# [Cell 9] 학습 루프 (AMP 미사용). 실시간 표 갱신 + 주기적 샘플/체크포인트

scaler = None  # AMP 미사용

model.train()
global_step = 0
start_time = time.time()

for epoch in range(9999999):  # 스텝 기준으로 종료
    for batch in train_loader:
        global_step += 1
        x, _ = batch
        x = x.to(device)

        t0 = time.time()
        optimizer.zero_grad(set_to_none=True)
        loss = diffusion(x)
        loss.backward()
        if cfg.grad_clip is not None and cfg.grad_clip > 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), cfg.grad_clip)
        optimizer.step()
        scheduler.step()
        ema.update()

        dt = time.time() - t0
        logger.log(global_step, loss.item(), optimizer.param_groups[0]['lr'], dt, cfg.ema_decay)

        if global_step % cfg.log_every == 0:
            clear_output(wait=True)
            print(f"Step {global_step}/{cfg.train_steps} | time total: {time.time()-start_time:.1f}s")
            logger.display_table(tail=20)

        # 샘플/체크포인트 콜백
        sample_cb(global_step, ema=ema)
        ckpt_cb(global_step, model=model, ema=ema, optimizer=optimizer, scheduler=scheduler)

        if global_step >= cfg.train_steps:
            break
    if global_step >= cfg.train_steps:
        break

print("Training done.")

[Sample] Saved: ./results/samples/step_005000.png


array([[[0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ],
        ...,
        [0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ]],

       [[0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ],
        ...,
        [0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ]],

       [[0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ],
        [0.97541046, 0.96039605, 0.9414414 ],
        ...,
        [0.7951038 , 0.89613414, 0.96009177],
        [0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ]],

       ...,

       [[0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        ],
        [0.41709727, 0

[Checkpoint] Saved: ./checkpoints/model_step_5000.pt
Training done.


반복 구조: step 기준 종료(에폭은 무한 루프 형태지만 step 도달 시 break)

순서

optimizer.zero_grad

loss = diffusion(x) → 내부에서 t 샘플링/노이즈 주입/모델 예측/MSE

loss.backward()

clip_grad_norm_: 급격한 그라디언트 폭증 방지

optimizer.step(), scheduler.step()

ema.update(): 스텝마다 EMA 갱신

logger.log(), 주기마다 logger.display_table(): 최근 20개 스텝 표 렌더링

sample_cb / ckpt_cb: 설정 주기마다 샘플/체크포인트 실행

clear_output(wait=True): 표를 깔끔히 갱신(스크롤 폭증 방지)

In [10]:
# [Cell 10] 샘플링(수동 실행): EMA 가중치 적용하여 샘플 생성/표시

@torch.no_grad()
def sample_and_show(n=16, use_ema=True):
    if use_ema:
        ema.apply_shadow()
    imgs = diffusion.sample(batch_size=n).cpu()
    if use_ema:
        ema.restore()
    grid = vutils.make_grid((imgs + 1) * 0.5, nrow=int(math.sqrt(n)), padding=2)
    display(grid.permute(1,2,0).numpy())
    return imgs, grid

_ = sample_and_show(n=cfg.sample_batch_size, use_ema=True)

sample_and_show

use_ema=True일 때 EMA 가중치를 적용하여 샘플(품질 우선)

diffusion.sample: x_T~N(0,I)부터 역과정 반복

make_grid로 보기 좋게 그리드 이미지 생성 및 표시

반환: (imgs, grid) → 필요 시 파일 저장/후처리 가능

In [11]:
# [Cell 11] 체크포인트 로드(선택)
# 저장된 스텝 지정하여 복원할 수 있습니다.

def load_checkpoint(path, model, ema: EMA, optimizer=None, scheduler=None, map_location=None):
    ckpt = torch.load(path, map_location=map_location)
    model.load_state_dict(ckpt['model'], strict=True)
    if ema is not None and ckpt.get('ema') is not None:
        # EMA shadow 복원
        ema.shadow = {k: v.clone().to(device) for k, v in ckpt['ema'].items()}
    if optimizer is not None and ckpt.get('optimizer') is not None:
        optimizer.load_state_dict(ckpt['optimizer'])
    if scheduler is not None and ckpt.get('scheduler') is not None:
        scheduler.load_state_dict(ckpt['scheduler'])
    print(f"Loaded checkpoint from {path} at step {ckpt.get('step')}.")

# 예시:
# load_checkpoint('./checkpoints/model_step_1000.pt', model, ema, optimizer, scheduler, map_location=device)

torch.load로 딕셔너리 복원

model.load_state_dict

ema.shadow 복원(있다면)

optimizer/scheduler 상태 복원(있다면)

주의: map_location=device로 현재 디바이스에 맞게 로드

In [12]:
# [Cell 12] 간단 점검(테스트 성격): 형태/노이즈 주입/샘플 사이즈

# 모델 fwd shape
x = torch.randn(cfg.batch_size, cfg.channels, cfg.image_size, cfg.image_size, device=device)
t = torch.randint(0, cfg.timesteps, (cfg.batch_size,), device=device).long()
with torch.no_grad():
    y = model(x, t)
print("Model forward OK:", y.shape)

# q_sample 평균/분산 대략 점검
with torch.no_grad():
    noise = torch.randn_like(x)
    xq = diffusion.q_sample(x, t, noise)
    print("q_sample mean~", float(xq.mean().cpu()), "std~", float(xq.std().cpu()))

# 샘플 사이즈 점검
with torch.no_grad():
    imgs = diffusion.sample(batch_size=4)
print("Sample shape:", imgs.shape)

Model forward OK: torch.Size([128, 3, 32, 32])
q_sample mean~ 0.0008828024729155004 std~ 1.001911997795105


sampling:   0%|          | 0/1000 [00:00<?, ?it/s]

Sample shape: torch.Size([4, 3, 32, 32])


model forward OK: 임의 입력에 대한 출력 shape 검사(채널/해상도 일관성 확인)

q_sample 평균/표준편차: 대략 정상 범위인지 점검(디버그 보조)

sample shape: 샘플러가 올바른 크기의 이미지를 출력하는지 확인

Model forward OK: [128, 3, 32, 32]
입력/출력 텐서 형상이 기대와 일치합니다. 네트워크 배선(다운/업샘플, 스킵 연결, 최종 채널 수)이 올바르게 구성되었다는 뜻입니다.

q_sample mean ≈ 0.00088, std ≈ 1.0019
q_sample은 “x0에 시간 t에 맞춰 노이즈를 주입”한 결과입니다. 테스트 셀에서는 x를 표준정규로 두고 임의 t로 섞었기 때문에 평균≈0, 표준편차≈1에 가까운 값이 정상입니다. 즉, 노이즈 주입 로직과 스케줄 버퍼들이 올바르게 동작하고 있습니다.

sampling 1000/1000, 약 16초, ≈68 it/s
DDPM(1,000 스텝) 기준으로 배치 4에서 16초 안팎은 T4/V100급에서 무난한 속도입니다.
성능 상 문제 없어 보입니다.

Sample shape: [4, 3, 32, 32]
샘플러 출력 형상도 정상입니다.