<a href="https://colab.research.google.com/github/machine-perception-robotics-group/MPRGDeepLearningLectureNotebook/blob/master/12_gan/latent_diffusion_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Latent Diffusion Model (LDM)

---
## 目的
Pytorchを用いてLatent Diffusion Model (LDM) を構築し，画像の生成を行う．

## モジュールのインポート

はじめに必要となるモジュールをインポートします．

### GPUの確認
GPUを使用した計算が可能かどうかを確認します．

`GPU availability: True`と表示されれば，GPUを使用した計算を行うことが可能です．
Falseとなっている場合は，上部のメニューバーの「ランタイム」→「ランタイムのタイプを変更」からハードウェアアクセラレータをGPUにしてください．

In [None]:
import os
import numpy as np
from time import time

%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML

from tqdm.auto import tqdm
from einops import einsum
from collections import namedtuple

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch.utils.data import DataLoader
from torch.optim import Adam
from torchvision import datasets, transforms

# GPUの確認
use_cuda = torch.cuda.is_available()
print('Use CUDA:', use_cuda)

## ネットワークの構築
LDMは医療用画像のセマンティックセグメンテーションを行うモデルとして提案されたU-Netと，離散的なベクトルを扱う生成モデルとして提案されたVector Quantised Variational AutoEncoder (VQ-VAE) をベースにネットワークを構築しています．

Denoising Diffusion Probabilistic Model (DDPM) ではRGB画像上でデノイジング処理をしていました．しかし，画像の解像度に比例して計算量も増加するという問題がDDPMには存在していたため，LDMではVQ-VAEを用いて高次元なデータである画像を低次元なデータである特徴量に圧縮し，その特徴量に対してデノイジング処理をしています．また，テキストやセマンティックマスク画像などの特徴量をU-Netに与えることで条件付け生成を可能としています．そのためLDMには，実行したいタスクに合わせて条件付けエンコーダ（TransformerやCLIP，VAEなど）が追加されることがあります．

LDMのネットワーク構造は以下の点でDDPMとは異なります．
* VQ-VAEの追加
* 条件付け用エンコーダの追加（必要に応じて）

## Position Embeddings
LDMでは各時刻$t$のノイズを推定する時，ネットワークのパラメータは共通です．時刻$t$ごとにネットワークを構築するのではなく，どの時刻$t$かを表す情報をネットワークに与えることで各時刻$t$のノイズを推定することが可能となります．

Position Embeddingsでは以下に示す式によって時刻の特徴量を求め，U-Net層の各Residual blockに追加されます．
$$
PE_{(pos, 2i)} = \sin (\frac{pos}{10000^{(2i/d)}})
$$
$$
PE_{(pos, 2i+1)} = \cos (\frac{pos}{10000^{(2i/d)}})
$$
ここで，$pos$は時刻，$i$は時刻特徴量の次元のインデックス，$d$は時刻特徴量の次元数を表します．

In [2]:
def PositionEmbeddings(time_steps, temb_dim):
    factor = 10000 ** ((torch.arange(start=0, end=temb_dim // 2, dtype=torch.float32, device=time_steps.device) / (temb_dim // 2)))
    t_emb = time_steps[:, None].repeat(1, temb_dim // 2) / factor
    t_emb = torch.cat([torch.sin(t_emb), torch.cos(t_emb)], dim=-1)
    return t_emb

## ResNet Block
ResNetは，通常のネットワークのように，何かしらの処理ブロックによる変換$F(x)$を単純に次の層に渡していくのではなく，残差接続構造によりその処理ブロックへの入力$x$をショートカットし， $H(x) = F(x)+x$を次の層に渡すようにしています．残差接続構造により，誤差逆伝播時に勾配が消失しても，層をまたいで値を伝播することができます．このショートカットを含めた処理単位をResidual blockと呼びます．

LDMのResNet Blockでは，Position Embeddingsで求めた時刻特徴量や必要に応じて条件付け特徴量を画像特徴量に追加し，残差接続を行います．ここで効率的な学習のために活性化関数にはRectified Linear Unit (ReLU)関数ではなく，Sigmoid-weighted Linear Unit (SiLU) 関数を用います．

In [None]:
# ダウンサンプルブロック
class DownBlock(nn.Module):
    def __init__(self, in_channels, out_channels, t_emb_dim, down_sample, num_heads, num_layers, attn, norm_channels):
        super().__init__()
        self.num_layers = num_layers
        self.down_sample = down_sample
        self.attn = attn
        self.t_emb_dim = t_emb_dim
        self.resnet_conv_first = nn.ModuleList([
                nn.Sequential(
                    nn.GroupNorm(norm_channels, in_channels if i == 0 else out_channels),
                    nn.SiLU(),
                    nn.Conv2d(in_channels if i == 0 else out_channels, out_channels, kernel_size=3, stride=1, padding=1),
                ) for i in range(num_layers)])
        if self.t_emb_dim is not None:
            self.t_emb_layers = nn.ModuleList([
                nn.Sequential(
                    nn.SiLU(),
                    nn.Linear(self.t_emb_dim, out_channels)
                ) for _ in range(num_layers)])
        self.resnet_conv_second = nn.ModuleList([
                nn.Sequential(
                    nn.GroupNorm(norm_channels, out_channels),
                    nn.SiLU(),
                    nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
                ) for _ in range(num_layers)])

        if self.attn:
            self.attention_norms = nn.ModuleList([nn.GroupNorm(norm_channels, out_channels)
                 for _ in range(num_layers)])
            self.attentions = nn.ModuleList(
                [nn.MultiheadAttention(out_channels, num_heads, batch_first=True)
                 for _ in range(num_layers)])

        self.residual_input_conv = nn.ModuleList([
                nn.Conv2d(in_channels if i == 0 else out_channels, out_channels, kernel_size=1)
                for i in range(num_layers)])
        self.down_sample_conv = nn.Conv2d(out_channels, out_channels, 4, 2, 1) if self.down_sample else nn.Identity()

    def forward(self, x, t_emb=None):
        out = x
        for i in range(self.num_layers):
            resnet_input = out
            out = self.resnet_conv_first[i](out)
            if self.t_emb_dim is not None:
                out = out + self.t_emb_layers[i](t_emb)[:, :, None, None]
            out = self.resnet_conv_second[i](out)
            out = out + self.residual_input_conv[i](resnet_input)

            if self.attn:
                batch_size, channels, h, w = out.shape
                in_attn = out.reshape(batch_size, channels, h * w)
                in_attn = self.attention_norms[i](in_attn)
                in_attn = in_attn.transpose(1, 2)
                out_attn, _ = self.attentions[i](in_attn, in_attn, in_attn)
                out_attn = out_attn.transpose(1, 2).reshape(batch_size, channels, h, w)
                out = out + out_attn
        out = self.down_sample_conv(out)
        return out

# 中間ブロック
class MidBlock(nn.Module):
    def __init__(self, in_channels, out_channels, t_emb_dim, num_heads, num_layers, norm_channels):
        super().__init__()
        self.num_layers = num_layers
        self.t_emb_dim = t_emb_dim
        self.resnet_conv_first = nn.ModuleList([
                nn.Sequential(
                    nn.GroupNorm(norm_channels, in_channels if i == 0 else out_channels),
                    nn.SiLU(),
                    nn.Conv2d(in_channels if i == 0 else out_channels, out_channels, kernel_size=3, stride=1, padding=1),
                ) for i in range(num_layers + 1)])
        if self.t_emb_dim is not None:
            self.t_emb_layers = nn.ModuleList([
                nn.Sequential(
                    nn.SiLU(),
                    nn.Linear(t_emb_dim, out_channels)
                ) for _ in range(num_layers + 1)])
        self.resnet_conv_second = nn.ModuleList([
                nn.Sequential(
                    nn.GroupNorm(norm_channels, out_channels),
                    nn.SiLU(),
                    nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
                ) for _ in range(num_layers + 1)])

        self.attention_norms = nn.ModuleList(
            [nn.GroupNorm(norm_channels, out_channels)
             for _ in range(num_layers)])

        self.attentions = nn.ModuleList(
            [nn.MultiheadAttention(out_channels, num_heads, batch_first=True)
             for _ in range(num_layers)])
        self.residual_input_conv = nn.ModuleList([
                nn.Conv2d(in_channels if i == 0 else out_channels, out_channels, kernel_size=1)
                for i in range(num_layers + 1)])

    def forward(self, x, t_emb=None):
        out = x
        resnet_input = out
        out = self.resnet_conv_first[0](out)
        if self.t_emb_dim is not None:
            out = out + self.t_emb_layers[0](t_emb)[:, :, None, None]
        out = self.resnet_conv_second[0](out)
        out = out + self.residual_input_conv[0](resnet_input)

        for i in range(self.num_layers):
            batch_size, channels, h, w = out.shape
            in_attn = out.reshape(batch_size, channels, h * w)
            in_attn = self.attention_norms[i](in_attn)
            in_attn = in_attn.transpose(1, 2)
            out_attn, _ = self.attentions[i](in_attn, in_attn, in_attn)
            out_attn = out_attn.transpose(1, 2).reshape(batch_size, channels, h, w)
            out = out + out_attn

            resnet_input = out
            out = self.resnet_conv_first[i + 1](out)
            if self.t_emb_dim is not None:
                out = out + self.t_emb_layers[i + 1](t_emb)[:, :, None, None]
            out = self.resnet_conv_second[i + 1](out)
            out = out + self.residual_input_conv[i + 1](resnet_input)

        return out

# アップサンプルブロック
class UpBlock(nn.Module):
    def __init__(self, in_channels, out_channels, t_emb_dim, up_sample, num_heads, num_layers, attn, norm_channels, model_type):
        super().__init__()
        self.num_layers = num_layers
        self.up_sample = up_sample
        self.t_emb_dim = t_emb_dim
        self.attn = attn
        self.model_type = model_type
        self.resnet_conv_first = nn.ModuleList([
                nn.Sequential(
                    nn.GroupNorm(norm_channels, in_channels if i == 0 else out_channels),
                    nn.SiLU(),
                    nn.Conv2d(in_channels if i == 0 else out_channels, out_channels, kernel_size=3, stride=1, padding=1),
                ) for i in range(num_layers)])

        if self.t_emb_dim is not None:
            self.t_emb_layers = nn.ModuleList([
                nn.Sequential(
                    nn.SiLU(),
                    nn.Linear(t_emb_dim, out_channels)
                ) for _ in range(num_layers)])

        self.resnet_conv_second = nn.ModuleList([
                nn.Sequential(
                    nn.GroupNorm(norm_channels, out_channels),
                    nn.SiLU(),
                    nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
                ) for _ in range(num_layers)])
        if self.attn:
            self.attention_norms = nn.ModuleList([
                    nn.GroupNorm(norm_channels, out_channels)
                    for _ in range(num_layers)])

            self.attentions = nn.ModuleList([
                    nn.MultiheadAttention(out_channels, num_heads, batch_first=True)
                    for _ in range(num_layers)])

        self.residual_input_conv = nn.ModuleList([
                nn.Conv2d(in_channels if i == 0 else out_channels, out_channels, kernel_size=1)
                for i in range(num_layers)])
        if self.model_type == 'unet':
            self.up_sample_conv = nn.ConvTranspose2d(in_channels // 2, in_channels // 2, 4, 2, 1) if self.up_sample else nn.Identity()
        else:
            self.up_sample_conv = nn.ConvTranspose2d(in_channels, in_channels, 4, 2, 1) if self.up_sample else nn.Identity()

    def forward(self, x, out_down=None, t_emb=None):
        x = self.up_sample_conv(x)
        if out_down is not None:
            x = torch.cat([x, out_down], dim=1)
        out = x
        for i in range(self.num_layers):
            resnet_input = out
            out = self.resnet_conv_first[i](out)
            if self.t_emb_dim is not None:
                out = out + self.t_emb_layers[i](t_emb)[:, :, None, None]
            out = self.resnet_conv_second[i](out)
            out = out + self.residual_input_conv[i](resnet_input)

            if self.attn:
                batch_size, channels, h, w = out.shape
                in_attn = out.reshape(batch_size, channels, h * w)
                in_attn = self.attention_norms[i](in_attn)
                in_attn = in_attn.transpose(1, 2)
                out_attn, _ = self.attentions[i](in_attn, in_attn, in_attn)
                out_attn = out_attn.transpose(1, 2).reshape(batch_size, channels, h, w)
                out = out + out_attn
        return out

## VQ-VAE
VQ-VAEは，離散的な潜在空間を持ったVariational Autoencoder (VAE) です．Encoderにより高次元な入力データを低次元な特徴量（潜在変数）に圧縮します．そして，コードブックと呼ばれる事前に用意した離散的なベクトルと潜在変数の距離を計算し，潜在変数を最も距離が近いコードブックに置換することで離散的な潜在編巣を獲得します．最後に，Decoderにより離散的な潜在変数から入力データを再構成します．

LDMでは，VQ-VAEのEncoderで圧縮・離散化された潜在変数をU-Netに入力し，U-Netでノイズ除去されたデータをVQ-VAEのDecoderで再構成しています．

<img src="https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/3916683/fd7c0c34-9dfd-4a7d-5b32-8394e9ccf716.png" width=60%>

In [4]:
class VQVAE(nn.Module):
    def __init__(self, im_channels):
        super().__init__()
        # エンコーダ
        self.encoder_conv_in = nn.Conv2d(im_channels, 32, kernel_size=3, padding=(1, 1))
        self.encoder_layers = nn.ModuleList([])
        self.encoder_layers.append(DownBlock(32, 64, t_emb_dim=None, down_sample=True, num_heads=16, num_layers=1, attn=False, norm_channels=32))
        self.encoder_layers.append(DownBlock(64, 128, t_emb_dim=None, down_sample=True, num_heads=16, num_layers=1, attn=False, norm_channels=32))
        self.encoder_mids = MidBlock(128, 128, t_emb_dim=None, num_heads=16, num_layers=1, norm_channels=32)
        self.encoder_norm_out = nn.GroupNorm(32, 128)
        self.encoder_conv_out = nn.Conv2d(128, 3, kernel_size=3, padding=1)
        # 量子化
        self.pre_quant_conv = nn.Conv2d(3, 3, kernel_size=1)
        self.embedding = nn.Embedding(20, 3)
        self.post_quant_conv = nn.Conv2d(3, 3, kernel_size=1)
        # デコーダ
        self.decoder_conv_in = nn.Conv2d(3, 128, kernel_size=3, padding=(1, 1))
        self.decoder_mids = MidBlock(128, 128, t_emb_dim=None, num_heads=16, num_layers=1, norm_channels=32)
        self.decoder_layers = nn.ModuleList([])
        self.decoder_layers.append(UpBlock(128, 64, t_emb_dim=None, up_sample=True, num_heads=16, num_layers=1, attn=False, norm_channels=32, model_type='vqvae'))
        self.decoder_layers.append(UpBlock(64, 32, t_emb_dim=None, up_sample=True, num_heads=16, num_layers=1, attn=False, norm_channels=32, model_type='vqvae'))
        self.decoder_norm_out = nn.GroupNorm(32, 32)
        self.decoder_conv_out = nn.Conv2d(32, im_channels, kernel_size=3, padding=1)

    def quantize(self, x):
        B, C, H, W = x.shape
        x = x.permute(0, 2, 3, 1)
        x = x.reshape(x.size(0), -1, x.size(-1))
        dist = torch.cdist(x, self.embedding.weight[None, :].repeat((x.size(0), 1, 1)))
        min_encoding_indices = torch.argmin(dist, dim=-1)
        quant_out = torch.index_select(self.embedding.weight, 0, min_encoding_indices.view(-1))
        x = x.reshape((-1, x.size(-1)))
        commmitment_loss = torch.mean((quant_out.detach() - x) ** 2)
        codebook_loss = torch.mean((quant_out - x.detach()) ** 2)
        quantize_losses = {
            'codebook_loss': codebook_loss,
            'commitment_loss': commmitment_loss
        }
        quant_out = x + (quant_out - x).detach()
        quant_out = quant_out.reshape((B, H, W, C)).permute(0, 3, 1, 2)
        min_encoding_indices = min_encoding_indices.reshape((-1, quant_out.size(-2), quant_out.size(-1)))
        return quant_out, quantize_losses, min_encoding_indices

    def encode(self, x):
        out = self.encoder_conv_in(x)
        for down in self.encoder_layers:
            out = down(out)
        out = self.encoder_mids(out)
        out = self.encoder_norm_out(out)
        out = nn.SiLU()(out)
        out = self.encoder_conv_out(out)
        out = self.pre_quant_conv(out)
        out, quant_losses, _ = self.quantize(out)
        return out, quant_losses

    def decode(self, z):
        out = z
        out = self.post_quant_conv(out)
        out = self.decoder_conv_in(out)
        out = self.decoder_mids(out)
        for up in self.decoder_layers:
            out = up(out)
        out = self.decoder_norm_out(out)
        out = nn.SiLU()(out)
        out = self.decoder_conv_out(out)
        return out

    def forward(self, x):
        z, quant_losses = self.encode(x)
        out = self.decode(z)
        return out, z, quant_losses

## Unet
LDMのU-Netでは，ノイズが付与された画像特徴量のバッチとそれぞれのノイズレベル（時刻$t$），条件付け特徴量を入力として受け取り，画像特徴量に追加されたノイズを推定しています．

<img src="https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/3916683/98558f2e-fd55-ba8d-4863-2fe4661198a4.png" width=100%>

In [5]:
class Unet(nn.Module):
    def __init__(self, im_channels):
        super().__init__()
        self.class_emb = nn.Embedding(10, 256)
        self.conv_in = nn.Conv2d(im_channels, 128, kernel_size=3, padding=1)
        self.t_proj = nn.Sequential(
            nn.Linear(256, 256),
            nn.SiLU(),
            nn.Linear(256, 256)
        )
        # ダウンサンプル
        self.downs = nn.ModuleList([])
        self.downs.append(DownBlock(128, 256, t_emb_dim=256, down_sample=False, num_heads=16, num_layers=2, attn=True, norm_channels=32))
        self.downs.append(DownBlock(256, 256, t_emb_dim=256, down_sample=False, num_heads=16, num_layers=2, attn=True, norm_channels=32))
        self.downs.append(DownBlock(256, 256, t_emb_dim=256, down_sample=False, num_heads=16, num_layers=2, attn=True, norm_channels=32))
        # 中間
        self.mids = MidBlock(256, 256, t_emb_dim=256, num_heads=16, num_layers=2, norm_channels=32)
        # アップサンプル
        self.ups = nn.ModuleList([])
        self.ups.append(UpBlock(256 * 2, 256, t_emb_dim=256, up_sample=False, num_heads=16, num_layers=2, attn=True, norm_channels=32, model_type='unet'))
        self.ups.append(UpBlock(256 * 2, 128, t_emb_dim=256, up_sample=False, num_heads=16, num_layers=2, attn=True, norm_channels=32, model_type='unet'))
        self.ups.append(UpBlock(128 * 2, 128, t_emb_dim=256, up_sample=False, num_heads=16, num_layers=2, attn=True, norm_channels=32, model_type='unet'))
        self.norm_out = nn.GroupNorm(32, 128)
        self.conv_out = nn.Conv2d(128, im_channels, kernel_size=3, padding=1)

    def forward(self, x, t, cond_input=None):
        out = self.conv_in(x)
        t_emb = PositionEmbeddings(torch.as_tensor(t).long(), 256)
        t_emb = self.t_proj(t_emb)
        class_embed = einsum(cond_input.float(), self.class_emb.weight, 'b n, n d -> b d')
        t_emb += class_embed
        down_outs = []

        for idx, down in enumerate(self.downs):
            down_outs.append(out)
            out = down(out, t_emb)

        out = self.mids(out, t_emb)

        for up in self.ups:
            down_out = down_outs.pop()
            out = up(out, down_out, t_emb)
        out = self.norm_out(out)
        out = nn.SiLU()(out)
        out = self.conv_out(out)
        return out

## Forward Process
Forward Processは，入力画像$\mathbf{x}_0$に対してノイズを付与し最終的には完全なノイズ$\mathbf{x}_T$へと変換する確率過程であり，以下に示す式のように正規分布に従うマルコフ過程で定義されます．
$$
q(\mathbf{x}_{1:T} | \mathbf{x}_0) = \prod_{t=1}^T q(\mathbf{x}_t | \mathbf{x}_{t-1})
$$
$$
q(\mathbf{x}_t |  \mathbf{x}_{t-1}) = N(\mathbf{x}_t ; \sqrt{1 - \beta_t} \mathbf{x}_{t-1} \mathbf{I})
$$
ここで，$\beta_t$は変化量を表すパラメータを表します．Forward Processでは学習を行わず，単純に連続的な微小変化によって解析可能な分布に変換することが目的です．ノイズ量を調整するスケジューラの値は$0.0015 \sim 0.0195$，ノイズを付与するステップは$1000$回とします．

<img src="https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/3916683/44f8adec-7e88-557e-125d-3c5148616cc8.png" width=100%>

In [6]:
# スケジューラ
def linear_beta_schedule(timesteps):
    beta_start = 0.0015
    beta_end = 0.0195
    return torch.linspace(beta_start ** 0.5, beta_end ** 0.5, timesteps) ** 2

# ステップ数
num_timesteps = 1000

betas = linear_beta_schedule(timesteps=num_timesteps)
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, axis=0)
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
sqrt_recip_alphas = torch.sqrt(1.0 / alphas)

sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)

def extract(a, t, x_shape):
    batch_size = t.shape[0]
    out = a.gather(-1, t.cpu())
    return out.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device)

def q_sample(x_start, t, noise=None):
    if noise is None:
        noise = torch.randn_like(x_start)
    sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(sqrt_one_minus_alphas_cumprod, t, x_start.shape)
    return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise

## Reverse Process
変化量を表すパラメータ$\beta_t$が十分に小さい連続変換（Forward Process）の場合，その逆変換（Reverse Process）は同じ関数系で表現することが可能であり，ガウスノイズの除去として考えることができます．そのため，Reverse Processは以下に示す式のように定義します．
$$
p_\theta (\mathbf{x}_{0:T}) = p_\theta (\mathbf{x}_T) \prod_{t=1}^T p_\theta (\mathbf{x}_{t-1} | \mathbf{x}_t)
$$
$$
p_\theta (\mathbf{x}_{t-1} | \mathbf{x}_t) = N( \mathbf{x}_{t-1}; \mathbf{\mu}_\theta (\mathbf{x}_t, t), \Sigma_\theta(\mathbf{x}_t, t))
$$
上記の式では，平均$\mathbf{\mu}_\theta$と共分散$\Sigma_\theta(\mathbf{x}_t, t)$をニューラルネットワークで学習することになっていますが，論文では共分散$\Sigma_\theta(\mathbf{x}_t, t)$をあらかじめ固定し学習しません．そのため，本ノートブックでは共分散を固定して実装していますが，ステップ数$t$を減らした場合においては平均と共分散の両方を学習した方が良いです．

<img src="https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/3916683/68d24842-f3bc-0905-9588-9b279365484a.png" width=100%>

In [7]:
@torch.no_grad()
def p_sample(noise_pred, x, t, t_index, cond_input):
    betas_t = extract(betas, t, x.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(sqrt_one_minus_alphas_cumprod, t, x.shape)
    sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape)

    model_mean = sqrt_recip_alphas_t * (x - betas_t * noise_pred / sqrt_one_minus_alphas_cumprod_t)

    if t_index == 0:
        return model_mean
    else:
        posterior_variance_t = extract(posterior_variance, t, x.shape)
        noise = torch.randn_like(x)
        return model_mean + torch.sqrt(posterior_variance_t) * noise

@torch.no_grad()
def p_sample_loop(model, vae, cond_input, shape):
    device = next(model.parameters()).device
    b = shape[0]
    xt = torch.randn(shape, device=device)
    imgs = []

    for i in tqdm(reversed(range(0, num_timesteps)), desc='sampling loop time step', total=num_timesteps):
        t = (torch.ones((xt.shape[0],))*i).long().to(device)
        noise_pred = model(xt, t, cond_input)
        xt = p_sample(noise_pred, xt, torch.full((b,), i, device=device, dtype=torch.long), i, cond_input)
        im = vae.decode(xt)
        im = torch.clamp(im, -1., 1.).detach().cpu()
        im = (im + 1) / 2
        imgs.append(im)
    return imgs

@torch.no_grad()
def sample(model, vae, cond_input, image_size, batch_size=16, channels=3):
    return p_sample_loop(model, vae, cond_input, shape=(batch_size, channels, image_size, image_size))

## 損失関数
LDMでは，VAEと同様にデータ$\mathbf{x}_0$の対数尤度の変分下限を最大化するように学習します．LDMの損失関数は以下に示す式のようになります．
$$
\mathcal{L}_{\rm{simple}} = \mathbb{E}_{t, \mathbf{x}_0, \epsilon, y} [\parallel \epsilon - \epsilon_\theta(\sqrt{\bar{a}_t} \mathbf{x}_0 + \sqrt{1 - \bar{a}_t}\epsilon, t, y) \parallel^2]
$$
ここで，$\mathbf{x}_t = \sqrt{\bar{a}_t} \mathbf{x}_0 + \sqrt{1 - \bar{a}_t}\epsilon$と表せるため，損失関数は以下のようになります．
$$
\mathcal{L}_{\rm{simple}} = \mathbb{E}_{t, \mathbf{x}_0, \epsilon,y } [\parallel \epsilon - \epsilon_\theta(\mathbf{x}_t, t, y) \parallel^2]
$$
従ってLDMでは，ノイズが付与された画像特徴量$\mathbf{x}_t$と時刻$t$，条件付け特徴量$y$が入力され，付与されたノイズ$\epsilon$を推定するニューラルネットワーク$\epsilon_\theta$を学習します．

In [8]:
def p_losses(denoise_model, x_start, t, y, noise=None):
    if noise is None:
        noise = torch.randn_like(x_start)
    x_noisy = q_sample(x_start=x_start, t=t, noise=noise)
    predicted_noise = denoise_model(x_noisy, t, cond_input=y)
    loss = F.smooth_l1_loss(noise, predicted_noise)
    return loss

## VQ-VAEの学習

本来であれば，VQ-VAEを任意のデータセットで学習するのですが，今回は事前に学習したVQ-VAEの重みを用いてLDMを学習します．
ダウンロードするVQ-VAEは，MNISTデータセットを用いて学習したものです． 画像サイズは28×28です． 以下のリンクからpretrainモデルのzipファイルをダウンロードし，解凍をします． 中にはVQ-VAEのパラメータvqvae_autoencoder_ckpt.pthが入っています．

In [None]:
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1BuHdwkdqZxP9zstWodnVxTOsURk-Vf7z' -O ckpt.zip
!unzip -q -o ckpt.zip

## データセット，最適化関数などの設定
データセットはMNISTを用いて学習をします．このとき，LDMのU-Netを学習させるためにVQ-VAEのパラメータは固定します．
最適化関数にはAdam Optimizerを使用します．

In [None]:
transform_train = transforms.Compose([transforms.ToTensor()])
mnist_data = datasets.MNIST(root='./data', train=True, transform=transform_train, download=True)
train_loader = DataLoader(dataset=mnist_data, batch_size=128, shuffle=True)

image_size = 28
image_channels = 1

num_epochs = 6
num_classes = 10

device = "cuda" if torch.cuda.is_available() else "cpu"
vae = VQVAE(im_channels=image_channels).to(device)
vae.eval()
vae.load_state_dict(torch.load(os.path.join('./ckpt/vqvae_autoencoder_ckpt.pth'),map_location=device))
for param in vae.parameters():
    param.requires_grad = False
print('vq-vae param freezed')

model = Unet(im_channels=3).to(device)
optimizer = Adam(model.parameters(), lr=1e-5)

## ネットワークの学習

In [None]:
model.train()

start = time()
for epoch_idx in range(num_epochs):
    sum_loss = 0.0
    for x, y in train_loader:
        optimizer.zero_grad()
        x = x.float().to(device)
        with torch.no_grad():
            im, _ = vae.encode(x)

        class_condition = torch.nn.functional.one_hot(y, num_classes).to(device)
        class_drop_mask = torch.zeros((im.shape[0], 1), device=im.device).float().uniform_(0, 1) > 0.1
        y = class_condition * class_drop_mask

        t = torch.randint(0, num_timesteps, (im.shape[0],)).to(device)

        loss = p_losses(model, im, t, y)
        loss.backward()
        optimizer.step()

        sum_loss += loss.item()

    print("epoch:{}, Loss:{}, elapsed time: {}".format(epoch_idx, sum_loss / len(train_loader), time() - start))
    torch.save(model.state_dict(), os.path.join('./ckpt/ddpm_ckpt_class_cond.pth'))

## 学習済みモデルを用いたノイズからの画像生成とデノイジングの可視化
先ほど学習した重みパラメータを用いて，ノイズから画像の生成をします．VQ-VAEの潜在変数サイズと同じランダムなノイズを作成し，その値と生成したい画像のラベルをモデルに入力した結果を確認します．このとき，各ステップの潜在変数をVQ-VAEのDecoderで再構成することでデノイジングを可視化します．

In [None]:
# 生成したい画像の条件（MNISTのラベル）
sample_class = [0, 1, 2, 3, 4]

model = Unet(im_channels=3).to(device)
model.eval()
model.load_state_dict(torch.load(os.path.join('./ckpt/ddpm_ckpt_class_cond.pth'), map_location=device))

vae = VQVAE(im_channels=image_channels).to(device)
vae.eval()
vae.load_state_dict(torch.load(os.path.join('./ckpt/vqvae_autoencoder_ckpt.pth'), map_location=device), strict=True)

with torch.no_grad():
    sample_classe = torch.tensor(sample_class)
    print('Generating images for {}'.format(list(sample_classe.numpy())))
    cond_input = torch.nn.functional.one_hot(sample_classe, num_classes).to(device)

    # ノイズからのサンプリング
    samples = sample(model, vae, cond_input, image_size=7, batch_size=cond_input.size(0), channels=3)

fig = plt.figure()
ims = []
combined_list = []

for v in samples:
    combined_sample = torch.cat([v[i] for i in range(v.shape[0])], dim=2)
    combined_list.append(combined_sample)

for i in range(num_timesteps):
    if i % 50 == 0 or i == 999:
        input_imgs = np.concatenate([samples[i][m].reshape(image_size, image_size) for m in range(5)], axis=1)
        im = plt.imshow(input_imgs, cmap="gray", animated=True)
        ims.append([im])

animate = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=1000)
HTML(animate.to_jshtml())

# おまけ
## Stable Diffusion
Stable Diffusionは，LDMの条件付けEncoderと学習データを変更したText-to-Imageモデルです．テキストを条件として画像生成する場合，LDMの論文ではTransformerにより条件付け特徴量を獲得していました．また，学習データセットには画像とテキストのペアで構成されるLAION-400Mが使用されていました．しかし，Stable Diffusionでは条件付けEncoderにCLIPのテキストEncoderを使用します．また，学習データセットにはLAION-400Mよりも大規模であるLAION-5Bのサブセットを使用します．

Stable Diffusionがテキストを条件とした場合どのような画像を生成するか，公開されている重みと推論コードを使用して確認します．

※ Stable Diffusionには複数のバージョンが存在し，それぞれで条件付けEncoderや学習データセット，モデル構造などが異なります．本ノートブックではStable Diffusion v1を使用しています．

In [None]:
!pip install diffusers

In [None]:
from torch import autocast
from diffusers import StableDiffusionPipeline

pipe = StableDiffusionPipeline.from_pretrained(
	"CompVis/stable-diffusion-v1-4",
	use_auth_token=True
).to("cuda")

# 入力するテキスト
prompt = "a photo of an astronaut riding a horse on mars"
with autocast("cuda"):
    image = pipe(prompt)[0]

image[0]