<a href="https://colab.research.google.com/github/shizoda/education/blob/main/diffusion/clip_guided_diffusion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# テキストプロンプトからの画像生成

今回は以下の3つの技術を用いて、テキストプロンプトからの画像生成を行います。

#### 主要な用語

- **CLIP** (Contrastive Language–Image Pre-training)
テキストと画像の両方から埋め込みベクトルをつくるモデル

- **拡散モデル**
ノイズ画像から徐々に画像を除去していくことによる画像生成モデル

- **guided-diffusion**
拡散モデルにおいてノイズを除去する際、CLIP などのガイダンスを用いられるようにしたモデル

#### 本ノートブックの出典

https://colab.research.google.com/drive/12a_Wrfi2_gwwAuN3VvMTwVMz9TfqctNj

Many thanks to the original author!

> By Katherine Crowson (https://github.com/crowsonkb, https://twitter.com/RiversHaveWings). It uses OpenAI's 256x256 unconditional ImageNet diffusion model (https://github.com/openai/guided-diffusion) together with CLIP (https://github.com/openai/CLIP) to connect text prompts with images.

In [None]:
# @title Licensed under the MIT License

# Copyright (c) 2021 Katherine Crowson
# Copyright (c) 2024 Hirohisa Oda

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

### **CLIP**（Contrastive Language–Image Pre-training）

CLIP は OpenAI が開発したモデルで、テキストと画像を同じ空間に「埋め込みベクトル」としてマッピングできます。

- テキストと画像の類似度を数値化。
- テキストプロンプトを用いて画像生成や検索が可能。

<a title="OpenAI, MIT &lt;http://opensource.org/licenses/mit-license.php&gt;, via Wikimedia Commons" href="https://commons.wikimedia.org/wiki/File:Contrastive_Language-Image_Pretraining.png"><img width="512" alt="Contrastive Language-Image Pretraining" src="https://upload.wikimedia.org/wikipedia/commons/thumb/e/ee/Contrastive_Language-Image_Pretraining.png/512px-Contrastive_Language-Image_Pretraining.png?20240906194850"></a>

#### 埋め込みベクトル

埋め込みベクトルは、テキストや画像を数学的な表現（数値のリスト）に変換したものです。この表現により、似ているもの同士は「近い位置」に配置されます。例えば：

- **「日本」＋「首都」＝「東京」** という類推が、数値ベースで可能になります。
- 画像の場合、富士山の写真が「日本の象徴」という特徴を捉えれば、関連するテキスト（例：「日本」や「富士山」）のベクトルと近くなります。

#### テキストと画像の共通空間

CLIPは、テキストと画像の埋め込みベクトルを同じ空間に配置します。その結果、たとえ入力がテキストでも画像でも、同じ概念を持つものは近い位置に配置されます。例えば，「犬」というテキストと、犬の写真は非常に似たベクトルになります。

In [None]:
# Check CUDA
import torch
assert torch.cuda.is_available()

# Install dependencies
!git clone https://github.com/openai/CLIP
!git clone https://github.com/crowsonkb/guided-diffusion
!pip install -e ./CLIP
!pip install -e ./guided-diffusion
!pip install lpips

# Download the diffusion model
!curl -OL 'https://openaipublic.blob.core.windows.net/diffusion/jul-2021/256x256_diffusion_uncond.pt'

In [None]:
# Imports

import gc
import io
import math
import sys

from IPython import display
import lpips
from PIL import Image
import requests

from torch import nn
from torch.nn import functional as F
from torchvision import transforms
from torchvision.transforms import functional as TF
from tqdm.notebook import tqdm

sys.path.append('./CLIP')
sys.path.append('./guided-diffusion')

import clip
from guided_diffusion.script_util import create_model_and_diffusion, model_and_diffusion_defaults

In [None]:
# ファイルをダウンロードするライブラリ
import urllib.request

# PIL, NumPy, Matplotlibのインポート
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

def preprocess_image_from_url(image_url, output_size=(224, 224)):
    """
    指定されたURLから画像をダウンロードし、中心をクロップして縮小する。

    Args:
        image_url (str): 画像のURL。
        output_size (tuple): 出力画像のサイズ（デフォルトは224x224）。

    Returns:
        PIL.Image: 前処理後の画像。
    """
    # 画像をダウンロードして保存
    temp_path = '/content/temp_image.jpg'
    urllib.request.urlretrieve(image_url, temp_path)

    # 画像を読み込む
    img = Image.open(temp_path)

    # 画像の幅と高さを取得
    width, height = img.size

    # 中心をクロップ
    crop_size = min(width, height)  # 正方形になるよう最小辺を基準にする
    left = (width - crop_size) // 2
    top = (height - crop_size) // 2
    right = left + crop_size
    bottom = top + crop_size
    img_cropped = img.crop((left, top, right, bottom))

    # 縮小して224x224にリサイズ
    img_resized = img_cropped.resize(output_size, Image.BICUBIC)

    return img_resized

# サンプルのURLを使用して前処理
image_url = 'https://upload.wikimedia.org/wikipedia/commons/thumb/3/3e/MtFuji_FujiCity.jpg/320px-MtFuji_FujiCity.jpg'
image = preprocess_image_from_url(image_url)

# 画像の表示
plt.figure(figsize=(3, 3))
plt.imshow(np.array(image))
plt.axis('off')
plt.show()

In [None]:
# Load CLIP model
device = 'cuda' if torch.cuda.is_available() else 'cpu'
clip_model, preprocess = clip.load("ViT-B/32", device=device)

# Function to calculate embedding vectors for images
def calculate_clip_embedding(image):
    """
    CLIPモデルを使用して画像の埋め込みベクトルを計算する。

    Args:
        image (PIL.Image): 前処理済みのPIL形式の画像。

    Returns:
        numpy.ndarray: 埋め込みベクトル。
    """
    # Transform the PIL image into a CLIP-compatible tensor
    preprocess = transforms.Compose([
        transforms.ToTensor(),  # Convert to Tensor
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize to [-1, 1]
    ])
    image_tensor = preprocess(image).unsqueeze(0).to(device)  # Add batch dimension

    # Calculate embedding vector using CLIP
    with torch.no_grad():
        image_embedding = clip_model.encode_image(image_tensor).cpu().numpy()
    return image_embedding

# Function to calculate embedding vector for a text input
def calculate_text_embedding(text):
    text_tokens = clip.tokenize([text]).to(device)
    with torch.no_grad():
        text_embedding = clip_model.encode_text(text_tokens).cpu().numpy()
    return text_embedding

# Compare image and text embeddings
def cosine_similarity(image_embedding, text_embedding):
    # 確認と変換: 入力を1次元に変換
    image_embedding = torch.tensor(image_embedding).squeeze()  # (1, 512) -> (512,)
    text_embedding = torch.tensor(text_embedding).squeeze()    # (1, 512) -> (512,)

    # コサイン類似度を計算
    similarity = torch.nn.functional.cosine_similarity(image_embedding, text_embedding, dim=0)
    return similarity.item()

###  コサイン類似度

コサイン類似度は、2つのベクトルがどれだけ類似しているかをその**方向**（角度）に基づいて測定します。コサイン類似度の数式は次の通りです：

$$
\text{cosine_similarity}(\mathbf{u}, \mathbf{v}) = \frac{\mathbf{u} \cdot \mathbf{v}}{\|\mathbf{u}\| \|\mathbf{v}\|}
$$

ここで：
- $\mathbf{u}$ と $\mathbf{v}$ は比較する2つのベクトル。
- $\mathbf{u} \cdot \mathbf{v}$ は内積（dot product）。
- $\|\mathbf{u}\|$ と $\|\mathbf{v}\|$ はそれぞれのベクトルのノルム（長さ）。

---

### **2. コサイン類似度の値の範囲**
コサイン類似度は$[-1,1]の範囲をとります。

- 1 のとき：$\mathbf{u}$ と $\mathbf{v}$ が完全に同じ方向を向いている（高い類似性）。
- 0 のとき： $\mathbf{u}$ と $\mathbf{v}$が直交している（全く関連性がない）。
- -1 のとき： $\mathbf{u}$ と$\mathbf{v}$ が完全に逆方向を向いている（正反対の意味）。

---

### **3. コサイン類似度が一般的に使用される理由**
1. **方向性を重視**:
   - コサイン類似度はベクトルの方向（角度）に基づくため、ベクトルの長さ（スケール）の影響を受けません。
   - 埋め込み空間では、意味的な関連性が方向に表現されるため、適しています。

2. **高次元データに適している**:
   - 埋め込みベクトルは通常、高次元空間（例: 512次元）に存在します。コサイン類似度は次元数に依存せず、効率的に計算できます。

3. **距離ではなく類似性を測る**:
   - ユークリッド距離（L2ノルム）では、スケールや密度の違いが影響を与える場合がありますが、コサイン類似度はこれらの影響を受けにくいです。


### 演習１

とりあえず現在のプロンプトに対するコサイン類似度を記録しておいてください．その上で，
- より画像に近い説明のプロンプトで，コサイン類似度が大きくなることを確認
- より画像に近い説明のプロンプトで，コサイン類似度が小さくなることを確認

In [None]:
text_input = "Anime with a car and two boys"

text_embedding = calculate_text_embedding(text_input)

image_embedding = calculate_clip_embedding(image)
print("Image embegging")
print(image_embedding[0,0:7], "...")
print()

# Calculate difference
embedding_difference = cosine_similarity(image_embedding, text_embedding)

print(f"Embedding of text input: {text_input}")
print(text_embedding[0,0:7], "...")
print()
print(f"Cosine similarity: {embedding_difference}")


### **拡散モデル**

ノイズを加える「拡散」とノイズを除去する「逆拡散」をもとに、画像を生成するモデルです。

#### ノイズの加え方:
拡散モデルでは、元の画像に段階的にノイズを加えます。このプロセスは数学的に定義されており、通常、ランダムなガウスノイズを少しずつ足していきます。
この「ノイズを加える過程」は事前に決められた固定のルール（拡散プロセス）に基づいています。

#### ノイズの除去を学習:
学習の目的は「ノイズを取り除いて元の画像を復元する方法」をモデルに教えることです。拡散モデルでは、「ノイズの段階」と「ノイズを取り除いた結果の予測」を繰り返し学習します。

<a title="Benlisquare, CC BY-SA 4.0 &lt;https://creativecommons.org/licenses/by-sa/4.0&gt;, via Wikimedia Commons" href="https://commons.wikimedia.org/wiki/File:X-Y_plot_of_algorithmically-generated_AI_art_of_European-style_castle_in_Japan_demonstrating_DDIM_diffusion_steps.png"><img width="512" alt="X-Y plot of algorithmically-generated AI art of European-style castle in Japan demonstrating DDIM diffusion steps" src="https://upload.wikimedia.org/wikipedia/commons/thumb/9/99/X-Y_plot_of_algorithmically-generated_AI_art_of_European-style_castle_in_Japan_demonstrating_DDIM_diffusion_steps.png/512px-X-Y_plot_of_algorithmically-generated_AI_art_of_European-style_castle_in_Japan_demonstrating_DDIM_diffusion_steps.png?20221031225518"></a>

### **guided-diffusion**

これも OpenAI が開発したモデルで、拡散モデルの基本的な仕組みを利用しつつ、生成画像に「ガイダンス」を与えることができます。「ガイダンス」とは、特定の目標に基づいて生成過程を誘導することを意味します。これにより、単なるランダムな画像生成ではなく、特定の条件（例: テキストプロンプトやラベル）に基づく生成が可能となります。

#### CLIP ガイダンス

ここでは活用して画像生成を制御します。

- 生成中の画像が、指定されたテキストプロンプトにどれだけ一致しているかを CLIP で評価。この評価に基づき、ノイズ除去の方向を調整し、テキストに一致する画像を作り出します。

これにより、テキストやラベルといった条件に基づき、画像生成を制御できます。通常の拡散モデルよりも、より意味的な制約を反映した画像生成が可能となります。

### 準備

関連するコードやモデルをダウンロードし，ライブラリをインポートします．

また，処理中で必要となるクラスや関数を定義します．

In [None]:
# Define necessary functions

def fetch(url_or_path):
    if str(url_or_path).startswith('http://') or str(url_or_path).startswith('https://'):
        r = requests.get(url_or_path)
        r.raise_for_status()
        fd = io.BytesIO()
        fd.write(r.content)
        fd.seek(0)
        return fd
    return open(url_or_path, 'rb')


def parse_prompt(prompt):
    if prompt.startswith('http://') or prompt.startswith('https://'):
        vals = prompt.rsplit(':', 2)
        vals = [vals[0] + ':' + vals[1], *vals[2:]]
    else:
        vals = prompt.rsplit(':', 1)
    vals = vals + ['', '1'][len(vals):]
    return vals[0], float(vals[1])


class MakeCutouts(nn.Module):
    def __init__(self, cut_size, cutn, cut_pow=1.):
        super().__init__()
        self.cut_size = cut_size
        self.cutn = cutn
        self.cut_pow = cut_pow

    def forward(self, input):
        sideY, sideX = input.shape[2:4]
        max_size = min(sideX, sideY)
        min_size = min(sideX, sideY, self.cut_size)
        cutouts = []
        for _ in range(self.cutn):
            size = int(torch.rand([])**self.cut_pow * (max_size - min_size) + min_size)
            offsetx = torch.randint(0, sideX - size + 1, ())
            offsety = torch.randint(0, sideY - size + 1, ())
            cutout = input[:, :, offsety:offsety + size, offsetx:offsetx + size]
            cutouts.append(F.adaptive_avg_pool2d(cutout, self.cut_size))
        return torch.cat(cutouts)

def spherical_dist_loss(x, y):
    x = F.normalize(x, dim=-1)
    y = F.normalize(y, dim=-1)
    return (x - y).norm(dim=-1).div(2).arcsin().pow(2).mul(2)

def tv_loss(input):
    """L2 total variation loss, as in Mahendran et al."""
    input = F.pad(input, (0, 1, 0, 1), 'replicate')
    x_diff = input[..., :-1, 1:] - input[..., :-1, :-1]
    y_diff = input[..., 1:, :-1] - input[..., :-1, :-1]
    return (x_diff**2 + y_diff**2).mean([1, 2, 3])

def range_loss(input):
    return (input - input.clamp(-1, 1)).pow(2).mean([1, 2, 3])

In [None]:
# Model settings

model_config = model_and_diffusion_defaults()
model_config.update({
    'attention_resolutions': '32, 16, 8',
    'class_cond': False,
    'diffusion_steps': 1000,
    'rescale_timesteps': True,
    'timestep_respacing': '1000',  # Modify this value to decrease the number of
                                   # timesteps.
    'image_size': 256,
    'learn_sigma': True,
    'noise_schedule': 'linear',
    'num_channels': 256,
    'num_head_channels': 64,
    'num_res_blocks': 2,
    'resblock_updown': True,
    'use_checkpoint': False,
    'use_fp16': True,
    'use_scale_shift_norm': True,
})

In [None]:
# Load models
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

model, diffusion = create_model_and_diffusion(**model_config)
model.load_state_dict(torch.load('256x256_diffusion_uncond.pt', map_location='cpu'))
model.requires_grad_(False).eval().to(device)
if model_config['use_fp16']:
    model.convert_to_fp16()

# CLIPのバックボーンモデル（ViT-B/16）をロード
clip_model = clip.load('ViT-B/16', jit=False)[0].eval().requires_grad_(False).to(device)

# 入力解像度（clip_size）も設定
clip_size = clip_model.visual.input_resolution

# 画像の正規化の設定
normalize = transforms.Normalize(mean=[0.48145466, 0.4578275, 0.40821073],
                                 std=[0.26862954, 0.26130258, 0.27577711])
lpips_model = lpips.LPIPS(net='vgg').to(device)

### guided-diffusion での画像生成

プロンプトを記入して実行してください。

In [None]:
prompts = ['Expressways and cars in Japan']


image_prompts = []
batch_size = 1
clip_guidance_scale = 1000  # Controls how much the image should look like the prompt.
tv_scale = 150              # Controls the smoothness of the final output.
range_scale = 50            # Controls how far out of range RGB values are allowed to be.
cutn = 16
n_batches = 1
init_image = None   # This can be an URL or Colab local path and must be in quotes.
skip_timesteps = 0  # This needs to be between approx. 200 and 500 when using an init image.
                    # Higher values make the output look more like the init.
init_scale = 0      # This enhances the effect of the init image, a good value is 1000.
seed = 0

In [None]:
def do_run():
    if seed is not None:
        torch.manual_seed(seed)

    make_cutouts = MakeCutouts(clip_size, cutn)
    side_x = side_y = model_config['image_size']

    target_embeds, weights = [], []

    for prompt in prompts:
        txt, weight = parse_prompt(prompt)
        target_embeds.append(clip_model.encode_text(clip.tokenize(txt).to(device)).float())
        weights.append(weight)

    for prompt in image_prompts:
        path, weight = parse_prompt(prompt)
        img = Image.open(fetch(path)).convert('RGB')
        img = TF.resize(img, min(side_x, side_y, *img.size), transforms.InterpolationMode.LANCZOS)
        batch = make_cutouts(TF.to_tensor(img).unsqueeze(0).to(device))
        embed = clip_model.encode_image(normalize(batch)).float()
        target_embeds.append(embed)
        weights.extend([weight / cutn] * cutn)

    target_embeds = torch.cat(target_embeds)
    weights = torch.tensor(weights, device=device)
    if weights.sum().abs() < 1e-3:
        raise RuntimeError('The weights must not sum to 0.')
    weights /= weights.sum().abs()

    init = None
    if init_image is not None:
        init = Image.open(fetch(init_image)).convert('RGB')
        init = init.resize((side_x, side_y), Image.LANCZOS)
        init = TF.to_tensor(init).to(device).unsqueeze(0).mul(2).sub(1)

    cur_t = None

    def cond_fn(x, t, out, y=None):
        n = x.shape[0]
        fac = diffusion.sqrt_one_minus_alphas_cumprod[cur_t]
        x_in = out['pred_xstart'] * fac + x * (1 - fac)
        clip_in = normalize(make_cutouts(x_in.add(1).div(2)))
        image_embeds = clip_model.encode_image(clip_in).float()
        dists = spherical_dist_loss(image_embeds.unsqueeze(1), target_embeds.unsqueeze(0))
        dists = dists.view([cutn, n, -1])
        losses = dists.mul(weights).sum(2).mean(0)
        tv_losses = tv_loss(x_in)
        range_losses = range_loss(out['pred_xstart'])
        loss = losses.sum() * clip_guidance_scale + tv_losses.sum() * tv_scale + range_losses.sum() * range_scale
        if init is not None and init_scale:
            init_losses = lpips_model(x_in, init)
            loss = loss + init_losses.sum() * init_scale
        return -torch.autograd.grad(loss, x)[0]

    if model_config['timestep_respacing'].startswith('ddim'):
        sample_fn = diffusion.ddim_sample_loop_progressive
    else:
        sample_fn = diffusion.p_sample_loop_progressive

    for i in range(n_batches):
        cur_t = diffusion.num_timesteps - skip_timesteps - 1

        samples = sample_fn(
            model,
            (batch_size, 3, side_y, side_x),
            clip_denoised=False,
            model_kwargs={},
            cond_fn=cond_fn,
            progress=True,
            skip_timesteps=skip_timesteps,
            init_image=init,
            randomize_class=True,
            cond_fn_with_grad=True,
        )

        for j, sample in enumerate(samples):
            cur_t -= 1
            if j % 100 == 0 or cur_t == -1:
                print()
                for k, image in enumerate(sample['pred_xstart']):
                    filename = f'progress_{i * batch_size + k:05}.png'
                    TF.to_pil_image(image.add(1).div(2).clamp(0, 1)).save(filename)
                    tqdm.write(f'Batch {i}, step {j}, output {k}:')
                    display.display(display.Image(filename))

gc.collect()
do_run()