# Setting

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install lpips
import lpips

!pip install pytorch-fid
from pytorch_fid import fid_score

In [None]:
import os
import shutil
import tempfile
from pathlib import Path
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
from tqdm.auto import tqdm
import math

In [None]:
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

# Dataset

In [None]:
class CelebADataset(Dataset):
    def __init__(self, img_dir, img_num_list, transform=None):
        self.img_paths = [os.path.join(img_dir, f'{i:06d}.jpg') for i in img_num_list]
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        path = self.img_paths[idx]
        img = Image.open(path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img

In [None]:
# CelebA 압축파일 Colab 런타임 VM으로 복사
# 디렉토리에 celeba_dataset.zip 파일이 있어야 합니다.
!cp /content/sample_directory/celeba_dataset.zip /content/

# VM에 파일 압축풀기
# /content/data 경로에 모든 이미지가 저장됩니다.
!unzip -q /content/celeba_dataset.zip -d /content/data/

In [None]:
num_data = 30000
train_test_ratio = 0.9
batch_size = 64

train_transform = transforms.Compose([
    transforms.Resize([64,64]),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3),
])
test_transform = transforms.Compose([
    transforms.Resize([64,64]),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3),
])

train_img_num = np.arange(1,num_data+1)[:int(num_data*train_test_ratio)]
test_img_num = np.arange(1,num_data+1)[int(num_data*train_test_ratio):]

train_dataset = CelebADataset('/content/data/img_align_celeba', img_num_list=train_img_num, transform=train_transform)
test_dataset = CelebADataset('/content/data/img_align_celeba', img_num_list=test_img_num, transform=test_transform)
print(f'Train dataset size: {len(train_dataset)}') # Ex) 27000
print(f'Test dataset size: {len(test_dataset)}') # Ex) 3000

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=False)

# Model

In [None]:
# train에서 구현한 model 코드 그대로 사용하시면 됩니다.

def swish(x):
    # DDPM에서 쓰이는 활성화함수
    return x * torch.sigmoid(x)

def get_timestep_embedding(t,channel):
    # DDPM은 타임스텝 t 또한 입력으로 받아 이미지에 낀 노이즈의 값을 예측합니다.
    # get_timestep_embedding 함수는 batch_size만큼의 타임스텝 t를 입력으로 받아서,
    # 이에 대응되는 embedding vector(torch.Size([B,channel]))를 반환합니다.
    # Transformer의 sinusodial positional encoding과 동일합니다. (자세한 수식은 PPT 참고해주세요.)

    # channel 값은 짝수 int로 입력해야 코드가 잘 작동합니다.
    half = channel // 2
    device = t.device

    # (구현) timestep embedding
    inv_freq = torch.exp(-math.log(10000) * torch.arange(half)/half).to(device)
    args = t.float().unsqueeze(1) * inv_freq.unsqueeze(0)
    emb = torch.cat([torch.sin(args), torch.cos(args)], dim=-1)

    return emb

class GroupNorm(nn.GroupNorm):
    # Group normalization
    def __init__(self, num_channels, num_groups=8, eps=1e-6):
        super().__init__(num_groups, num_channels, eps=eps)

def conv2d(in_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True, init_scale=1.0):
    # 3x3 convolution
    conv = nn.Conv2d(in_ch, out_ch, kernel_size, stride, padding, bias=bias)
    with torch.no_grad():
        # init_scale==0으로 설정시 처음에 레이어의 가중치가 0으로 초기화됩니다.
        conv.weight.data *= init_scale
    return conv

def nin(in_ch, out_ch, init_scale=1.0):
    # 1x1 convolution
    layer = nn.Conv2d(in_ch, out_ch, kernel_size=1, stride=1, padding=0)
    with torch.no_grad():
        layer.weight.data *= init_scale
    return layer

def linear(in_features, out_features, init_scale=1.0):
    # linear layer
    fc = nn.Linear(in_features, out_features)
    with torch.no_grad():
        fc.weight.data *= init_scale
    return fc

class DownsampleBlock(nn.Module):
    # B*C*2H*2W 이미지를 입력으로 받아, B*C*H*W 이미지를 반환합니다.
    # with_conv 여부에 따라 downsampling 방식이 달라집니다.
    def __init__(self, channels, with_conv=True):
        super().__init__()
        if with_conv:
            self.op = nn.Conv2d(channels, channels, kernel_size=3, stride=2, padding=1)
        else:
            self.op = nn.AvgPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        return self.op(x)

class UpsampleBlock(nn.Module):
    # B*C*H*W 이미지를 입력으로 받아, B*C*2H*2W 이미지를 반환합니다.
    # with_conv 여부에 따라 upsampling 방식이 달라집니다.
    def __init__(self, channels, with_conv=True):
        super().__init__()
        self.with_conv = with_conv
        if with_conv:
            self.conv = nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        x = F.interpolate(x, scale_factor=2.0, mode='nearest')
        if self.with_conv:
            x = self.conv(x)
        return x

class ResnetBlock(nn.Module):
    # DDPM의 주요 구성요소 중 하나인 ResnetBlock입니다.
    # 모델 구조 및 구현에 관한 세부사항은 PPT 참고바랍니다.
    # 모든 레이어의 init_scale은 1.0으로 설정합니다.
    def __init__(self, in_channels, out_channels, temb_channels=256, dropout=0.0):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.temb_channels = temb_channels # channel of timestep embedding
        self.dropout = dropout

        # (구현) ResnetBlock
        self.norm1 = GroupNorm(self.in_channels)
        self.conv1 = conv2d(self.in_channels, self.out_channels,
                            kernel_size=3, stride=1, padding=1, init_scale=1.0)
        self.temb_proj = linear(self.temb_channels, self.out_channels, init_scale=1.0)
        self.norm2 = GroupNorm(self.out_channels)
        self.conv2 = conv2d(self.out_channels, self.out_channels,
                            kernel_size=3, stride=1, padding=1, init_scale=1.0)
        self.conv_shortcut = nin(self.in_channels, self.out_channels)

    def forward(self, x, temb):
        # B*in_channels*H*W 크기의 텐서 x와 B*temb_channels 크기의 temb를 입력으로 받아서
        # B*out_channels*H*W 크기의 텐서를 반환합니다.

        # (구현) ResnetBlock
        h = self.norm1(x)
        h = swish(h)
        h = self.conv1(h)
        h_temb = swish(temb)
        h_temb = self.temb_proj(h_temb)
        h_temb = h_temb[:, :, None, None]
        h = h + h_temb
        h = self.norm2(h)
        h = swish(h)
        h = F.dropout(h, p=self.dropout, training=self.training)
        h = self.conv2(h)
        x = self.conv_shortcut(x)
        return x + h

class AttnBlock(nn.Module):
    # DDPM의 주요 구성요소 중 하나인 ResnetBlock입니다.
    # 모델 구조 및 구현에 관한 세부사항은 PPT 참고바랍니다.
    def __init__(self, channels):
        super().__init__()
        self.norm = GroupNorm(channels)
        self.q = nin(channels, channels)
        self.k = nin(channels, channels)
        self.v = nin(channels, channels)
        self.proj_out = nin(channels, channels, init_scale=0.0)

    def forward(self, x):
        B, C, H, W = x.shape
        h = self.norm(x)

        # Q,K,V 텐서 생성
        q = self.q(h)
        k = self.k(h)
        v = self.v(h)

        q = q.view(B, C, H * W).permute(0, 2, 1)  # (B, T, C)
        k = k.view(B, C, H * W).permute(0, 2, 1)  # (B, T, C)
        v = v.view(B, C, H * W).permute(0, 2, 1)  # (B, T, C)

        scale = q.shape[-1] ** -0.5

        attn = torch.softmax(torch.bmm(q, k.transpose(1, 2)) * scale, dim=-1)
        h_ = torch.bmm(attn, v)
        h_ = h_.permute(0, 2, 1).reshape(B, C, H, W)

        h_ = self.proj_out(h_) # torch.Size([B,C,H,W])
        return x + h_

class DDPMModel(nn.Module):
    # 최종 DDPM 모델입니다.
    # Downsample, Middle, Upsample 블락으로 구성됩니다.
    # 각 블락은 resnetblock, attentionblock으로 구성됩니다.
    # 모델 구조 및 구현에 관한 세부사항은 PPT 참고바랍니다.
    def __init__(
        self,
        in_channels=3,
        out_channels=3,
        ch=64,
        ch_mult=(1,2,4),
        num_res_blocks=2,
        attn_resolutions={32},
        dropout=0.0,
        resamp_with_conv=False,
        init_resolution=64
    ):
        super().__init__()
        self.ch = ch
        self.ch_mult = ch_mult
        self.num_res_blocks = num_res_blocks
        self.attn_resolutions = attn_resolutions
        self.dropout = dropout
        self.num_levels = len(ch_mult)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.resamp_with_conv = resamp_with_conv
        self.init_resolution = init_resolution

        # Timestep embedding channel
        self.temb_ch = ch * 4

        # Timestep embedding layers
        self.temb_dense0 = linear(self.ch, self.temb_ch)
        self.temb_dense1 = linear(self.temb_ch, self.temb_ch)

        # Input conv
        self.conv_in = conv2d(in_channels, ch, kernel_size=3, stride=1, padding=1)

        # (구현) Downsample blocks
        self.down_blocks = nn.ModuleList()

        curr_ch = ch
        curr_res = init_resolution
        for level in range(self.num_levels):
            level_blocks = nn.ModuleList()
            out_ch = ch * ch_mult[level]
            for i in range(num_res_blocks):
                level_blocks.append(ResnetBlock(curr_ch, out_ch, temb_channels=self.temb_ch, dropout=dropout))
                if curr_res in attn_resolutions:
                    level_blocks.append(AttnBlock(out_ch))
                curr_ch = out_ch
            self.down_blocks.append(level_blocks)
            if level != self.num_levels - 1:
                self.down_blocks.append(DownsampleBlock(curr_ch, with_conv=resamp_with_conv))
                curr_res //= 2

        # (구현) Middle blocks
        self.mid_block = nn.ModuleList()

        self.mid_block = nn.ModuleList([
            ResnetBlock(curr_ch, curr_ch, temb_channels=self.temb_ch, dropout=dropout),
            AttnBlock(curr_ch),
            ResnetBlock(curr_ch, curr_ch, temb_channels=self.temb_ch, dropout=dropout)
        ])

        # (구현) Upsample blocks
        self.up_blocks = nn.ModuleList()

        self.up_blocks = nn.ModuleList()
        for level in reversed(range(self.num_levels)):
            level_blocks = nn.ModuleList()
            out_ch = ch * ch_mult[level]
            level_blocks.append(ResnetBlock(curr_ch + out_ch, out_ch, temb_channels=self.temb_ch, dropout=dropout))
            if (init_resolution // (2 ** level)) in attn_resolutions:
                level_blocks.append(AttnBlock(out_ch))
            curr_ch = out_ch
            for i in range(num_res_blocks):
                level_blocks.append(ResnetBlock(curr_ch, curr_ch, temb_channels=self.temb_ch, dropout=dropout))
                if (init_resolution // (2 ** level)) in attn_resolutions:
                    level_blocks.append(AttnBlock(curr_ch))
            if level != 0:
                level_blocks.append(UpsampleBlock(curr_ch, with_conv=resamp_with_conv))
            self.up_blocks.append(level_blocks)

        # Output conv
        self.norm_out = GroupNorm(curr_ch)
        self.conv_out = conv2d(curr_ch, out_channels, kernel_size=3, stride=1, padding=1, init_scale=0.0)

    def forward(self, x, t):
        # Timestep embedding
        temb = get_timestep_embedding(t, self.ch) # B*ch
        temb = self.temb_dense0(temb) # B*4ch
        temb = swish(temb)
        temb = self.temb_dense1(temb) # B*4ch

        skips = []
        h = self.conv_in(x)

        # (구현) down_blocks에 있는 레이어들을 따라 downsampling 진행
        # 각 레이어의 output을 skips 리스트에 저장합니다. (이는 Upsample시 활용됩니다)
        # resnetblock과 attentionblock에 들어가는 input이 다름을 주의해주세요.
        down_iter = iter(self.down_blocks)
        for level in range(self.num_levels):
            blocks = next(down_iter)
            for layer in blocks:
                h = layer(h, temb) if isinstance(layer, ResnetBlock) else layer(h)
            skips.append(h)
            if level != self.num_levels - 1:
                downsample = next(down_iter)
                h = downsample(h)

        # (구현) mid_blocks에 있는 레이어들을 따라 진행
        for layer in self.mid_block:
            h = layer(h, temb) if isinstance(layer, ResnetBlock) else layer(h)

        # (구현) up_blocks에 있는 레이어들을 따라 upsampling 진행
        # downsample 과정에서 구한 텐서와 병합 후 레이어에 넣어줍니다.
        for level in range(self.num_levels):
            blocks = self.up_blocks[level]
            skip = skips.pop()
            h = torch.cat([h, skip], dim=1)
            h = blocks[0](h, temb)
            for layer in blocks[1:]:
                if isinstance(layer, ResnetBlock):
                    h = layer(h, temb)
                else:
                    h = layer(h)

        # Output
        h = self.norm_out(h)
        h = swish(h)
        h = self.conv_out(h)
        return h

# Get alphas_cumprod

In [None]:
def get_beta_alpha_linear(beta_start=0.0001, beta_end=0.02, num_timesteps=1000):
    # DDPM 학습 및 샘플링에 쓰일 alpha, beta, alphas_cumprod 반환
    # Train에서 쓰인 함수와 정확히 같습니다.

    betas = np.linspace(beta_start, beta_end, num_timesteps, dtype=np.float32)
    betas = torch.tensor(betas)
    alphas = 1.0 - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)

    return betas, alphas, alphas_cumprod

# Sampling

In [None]:
# sample에서 구현한 sampling 코드 그대로 사용하시면 됩니다.

def p_sample_ddim(model, x_t, t_cur, t_prev, alphas_cumprod, eta=0.0):
    # DDIM reverse process의 단일 스텝 코드입니다. 이전 스텝의 이미지 x_t를 받아서 다음 스텝의 이미지 x_prev를 반환합니다.
    # Ex) 480(t_cur) 타임스텝에서의 이미지를 입력으로 받아 475(t_prev) 타임스텝의 이미지를 반환

    # 이전스텝의 alpha_bar
    alpha_bar_t = alphas_cumprod[t_cur-1]
    # 다음스텝의 alpha_bar
    if t_prev > 0:
        alpha_bar_prev = alphas_cumprod[t_prev-1]
    else:
        alpha_bar_prev = torch.tensor(1.0, device=x_t.device)

    # (구현) eta, alpha_bar_t, alpha_bar_prev 이용해서 DDIM의 sigma_t 구현
    sigma_t = eta * torch.sqrt((1 - alpha_bar_prev) / (1 - alpha_bar_t)) \
              * torch.sqrt(1 - alpha_bar_t / alpha_bar_prev)

    # (구현) 학습된 모델을 이용해 노이즈 레벨 eps_theta 예측
    B = x_t.size(0)
    t_tensor = torch.full((B,), t_cur, device=x_t.device, dtype=torch.long)
    eps_theta = model(x_t, t_tensor)

    # (구현) x_t, eps_theta 이용해 x0 예측
    sqrt_ab_t    = torch.sqrt(alpha_bar_t)
    sqrt_ab_prev = torch.sqrt(alpha_bar_prev)
    x0_pred = (x_t - torch.sqrt(1 - alpha_bar_t).view(-1,1,1,1) * eps_theta) \
              / sqrt_ab_t.view(-1,1,1,1)

    # DDIM의 방향성 컴포넌트 구현
    dir_xt = torch.sqrt(torch.clamp(1 - alpha_bar_prev - sigma_t**2, min=0.0)).view(-1,1,1,1) * eps_theta

    # 노이즈 추가
    noise = torch.randn_like(x_t) if t_prev > 0 else torch.zeros_like(x_t)

    # (구현) x0_pred, dir_xt, simga_t, noise 이용해서 다음스텝의 이미지 계산
    x_prev = sqrt_ab_prev.view(-1,1,1,1) * x0_pred + dir_xt + sigma_t.view(-1,1,1,1) * noise

    return x_prev

def sample_ddim(model, shape, alphas_cumprod, device, ddim_steps, eta=0.0):
    # p_sample_ddim을 적절한 timestep을 따라서 반복하는 최종 DDIM 샘플링 함수

    num_timesteps = alphas_cumprod.shape[0]
    x = torch.randn(shape, device=device)

    # 0-based 인덱스로 균등 서브샘플링
    idx_lin = torch.linspace(0, num_timesteps-1, steps=ddim_steps+1, device=device)
    idx0 = idx_lin.round().long()

    # 항상 처음(0)과 끝(T-1) 포함하여 중복 제거 & 정렬
    idx0 = torch.cat([torch.tensor([0, num_timesteps-1], device=device, dtype=torch.long),idx0]).unique(sorted=True)

    # 1-based timestep으로 변환
    seq_asc = idx0 + 1 # Ex) [1,5,10,...,995,1000]
    # 역순으로 뒤집기
    seq_rev = torch.flip(seq_asc, dims=[0])
    # 마지막에 0 추가
    seq = torch.cat([seq_rev, torch.tensor([0], device=device, dtype=torch.long)]) # Ex) [1000,995,...,5,1,0]

    # seq를 따라서 p_sample_ddim 반복적으로 수행
    prev_t = seq[0].item() # Ex) prev_t = 1000
    for next_t in tqdm(seq[1:],desc='Sampling'):
        t_cur  = prev_t
        t_prev = next_t.item()

        x = p_sample_ddim(
            model,
            x,                   # 현재 노이즈 x_{t_cur}
            t_cur,               # 현재 스텝 (마지막엔 1)
            t_prev,              # 다음 스텝 (마지막엔 0)
            alphas_cumprod,
            eta
        )
        prev_t = t_prev # 다음 반복의 "현재"가 됩니다

    return x.cpu().detach()

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
_, _, alphas_cumprod = get_beta_alpha_linear()
alphas_cumprod = alphas_cumprod.to(device)

# 학습할 때와 같은 batch_size 사용 권장
batch_size = 64
image_size = 64

# train할 때 저장한 모델 파라미터 pt파일 불러오기
model = DDPMModel().to(device)
model_path = ''
checkpoint = torch.load(model_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
print(f'Model is loaded')

model.eval()

with torch.no_grad():
    x0 = sample_ddim(model,
                     shape=(batch_size, 3, image_size, image_size),
                     alphas_cumprod=alphas_cumprod,
                     ddim_steps=200,
                     device=device
                    )

# Accuracy Score - Lpips

In [None]:
lpips_fn = lpips.LPIPS(net='alex').to(device)
lpips_fn.eval()

N = batch_size

# batch_size 개수만큼의 3*64*64 원본 이미지를 리스트에 저장
real_imgs = []
count = 0
for img in test_dataset:
    real_imgs.append(img)
    count += 1
    if count==batch_size:
        break

# batch_size 개수만큼의 3*64*64 샘플링 이미지를 리스트에 저장
fake_imgs = [x0[i] for i in range(N)]

scores = []
# 진짜/가짜 이미지 쌍의 점수 비교
with torch.no_grad():
    for i in range(N):
        real_img = real_imgs[i]
        fake_img = fake_imgs[i]
        fake_img = fake_img.to(device)
        real_img = real_img.to(device)
        # dist shape: [1,1,1,1]
        dist = lpips_fn(real_img, fake_img)
        scores.append(dist.item())

scores = np.array(scores)

print(f"Evaluated {N} pairs")
print(f"Mean LPIPS: {scores.mean():.4f}")
print(f"Median LPIPS: {np.median(scores):.4f}")
print(f"Std LPIPS: {scores.std():.4f}")

# Accuracy Score - FID

In [None]:
def tensor_batch_to_folder(imgs, folder):
    # imgs에 들어있는 이미지들을 폴더에 저장합니다.
    # 저장된 폴더의 디렉토리를 이후 함수에 전달해 FID 점수를 계산합니다.

    # 폴더가 이미 있으면 비우고, 없으면 생성
    if os.path.isdir(folder):
        shutil.rmtree(folder)
    os.makedirs(folder, exist_ok=True)

    # 이미지 변환
    imgs = imgs.detach().cpu()
    imgs_uint8 = ((imgs + 1.0) * 0.5 * 255.0).clamp(0, 255).to(torch.uint8)

    # 각 이미지 폴더에 저장
    B = imgs_uint8.size(0)
    for i in range(B):
        img_i = imgs_uint8[i]
        np_img = img_i.permute(1, 2, 0).numpy()
        # PIL 이미지로 변환
        pil_img = Image.fromarray(np_img)
        # 파일명: img_000.png, img_001.png, ...
        filename = os.path.join(folder, f"img_{i:04d}.png")
        pil_img.save(filename)

def calculate_fid_from_tensors(real_imgs, fake_imgs, batch_size=64, device='cuda'):
    # 입력으로 받은 진짜/가짜 이미지들을 폴더에 저장한 후에
    # fid_score 함수에 폴더의 디렉토리를 입력으로 넣어 FID 점수를 얻습니다.

    # 임시 디렉터리 생성 (real/fake)
    temp_dir = tempfile.mkdtemp(prefix="fid_temp_")
    dir_real = Path(temp_dir) / "real"
    dir_fake = Path(temp_dir) / "fake"

    # 텐서를 각각 이미지로 저장
    tensor_batch_to_folder(real_imgs, str(dir_real))
    tensor_batch_to_folder(fake_imgs, str(dir_fake))

    # pytorch-fid를 이용해 FID 계산
    fid_value = fid_score.calculate_fid_given_paths(
        [str(dir_real), str(dir_fake)],
        batch_size=batch_size,
        device=device,
        dims=2048,
    )

    # 임시 디렉터리 삭제
    # shutil.rmtree(temp_dir, ignore_errors=True)

    print(temp_dir)

    return float(fid_value)

# real, fake 모두 B*3*64*64 크기의 텐서입니다.
for img in test_loader:
    real = img
    break

fake = x0

device = "cuda" if torch.cuda.is_available() else "cpu"

fid = calculate_fid_from_tensors(
    real, fake, batch_size=batch_size, device=device
)
print(f"FID score: {fid:.2f}")

In [None]:
import matplotlib.pyplot as plt

def visualize_sample(x0, idx=0):
    img = x0[idx]
    img = np.transpose(img, (1, 2, 0))
    img = (img + 1.0) / 2.0
    img = np.clip(img, 0, 1)

    plt.figure(figsize=(4, 4))
    plt.imshow(img)
    plt.axis('off')
    plt.title("Sampled x_0")
    plt.show()

In [None]:
visualize_sample(x0,idx=0)