# 3. Использование диффузионных моделей для непарного переноса стиля

На 5 лекции мы имплементировали диффузионные модели для безусловной генерации и генерации при условии метки класса. В первой части домашки мы более глубоко исследовали генерацию при условии метки класса, а во второй — условную генерацию для решения парных (в т.ч. обратных) задач. Наконец, в третьей части мы разберем два простых метода, которые позволяют применять диффузионные модели для решения непарных задач перевода между доменами (непарного переноса стиля).

In [None]:
import math
import numpy as np
import torch
import torch.nn as nn

import matplotlib
import matplotlib.pyplot as plt
from PIL import Image

In [None]:
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import Compose, Resize, ToTensor

## Цветной MNIST

В домашке предлагается поработать с цветной модификацией датасета MNIST (код для покраски взят [у коллег](https://github.com/ngushchin/EntropicNeuralOptimalTransport/blob/main/src/tools.py) из Сколтеха). С одной стороны, такой датасет все еще оставляет возможность обучать диффузионные модели, но делает свойства модели более интерпретируемыми (например, в задачах условной генерации, таких, как дорисовывание, повышение разрешения и деблюринг, можно отследить корректное сохранение цвета изображения).

In [None]:
class ColoredMNIST(MNIST):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.hues = 360 * torch.rand(super().__len__())

    def __len__(self):
        return super().__len__()

    def color_image(self, img, idx):
        img_min = 0
        a = (img - img_min) * (self.hues[idx] % 60) / 60
        img_inc = a
        img_dec = img - a

        colored_image = torch.zeros((3, img.shape[1], img.shape[2]))
        H_i = round(self.hues[idx].item() / 60) % 6

        if H_i == 0:
            colored_image[0] = img
            colored_image[1] = img_inc
            colored_image[2] = img_min
        elif H_i == 1:
            colored_image[0] = img_dec
            colored_image[1] = img
            colored_image[2] = img_min
        elif H_i == 2:
            colored_image[0] = img_min
            colored_image[1] = img
            colored_image[2] = img_inc
        elif H_i == 3:
            colored_image[0] = img_min
            colored_image[1] = img_dec
            colored_image[2] = img
        elif H_i == 4:
            colored_image[0] = img_inc
            colored_image[1] = img_min
            colored_image[2] = img
        elif H_i == 5:
            colored_image[0] = img
            colored_image[1] = img_min
            colored_image[2] = img_dec

        return colored_image

    def __getitem__(self, idx):
        img, label = super().__getitem__(idx)
        return self.color_image(img, idx), label

In [None]:
transform = Compose([Resize((32, 32)), ToTensor()])
data_train = ColoredMNIST(root='.', train=True, download=False, transform=transform)
#data_train = ColoredMNIST(root='.', train=True, download=True, transform=transform)
data_test = ColoredMNIST(root='.', train=False, download=False, transform=transform)
train_dataloader = DataLoader(data_train, batch_size=64, shuffle=True)
test_dataloader = DataLoader(data_test, batch_size=64, shuffle=True)

In [None]:
from torchvision.utils import make_grid

def remove_ticks(ax):
    ax.tick_params(
        axis='both',
        which='both',
        bottom=False,
        top=False,
        labelbottom=False,
        left=False,
        labelleft=False
    )

def remove_xticks(ax):
    ax.tick_params(
        axis='both',
        which='both',
        bottom=False,
        top=False,
        labelbottom=False,
        left=True,
        labelleft=True
    )

def visualize_batch(img_vis, title='Семплы из цветного MNIST', nrow=10, ncol=4):
    img_grid = make_grid(img_vis, nrow=nrow)
    fig, ax = plt.subplots(1, figsize=(nrow, ncol))
    remove_ticks(ax)
    ax.set_title(title, fontsize=14)
    ax.imshow(img_grid.permute(1, 2, 0))
    plt.show()


## Предобученная диффузионная модель

Для дальнейшей работы с разного вида условной генерации нам понадобится предобученная **условная** диффузионная модель. Мы будем использовать простенькую архитектуру, которая была получена скрещиванием CUNet из того же [репозитория](https://github.com/ngushchin/EntropicNeuralOptimalTransport/blob/main/src/cunet.py) и части, кодирующей момент времени и метку класса, из SongUNet в [EDM](https://github.com/NVlabs/edm/blob/main/training/networks.py).

Такой выбор был мотивирован следующими наблюдениями:
* Готовые качественные архитектуры (те же SongUNet или DhariwalUNet) достаточно долго работают из-за своей глубины, что усложнит решение домашки, в которой, в основном, важны качественные результаты;
* Существующие имплементации этих архитектур достаточно абстрактно написаны, чтобы при первом знакомстве было удобно писать для них разного рода надстройки.

Архитектура CUNet представляет собой гораздо более легкую и неглубокую модель, за счет чего существенно ускоряет работу с ней и упрощает ее модификацию. Кодирование метки класса и момента времени везде более-менее одинаково (и включает в себя позиционное кодирование/positional encoding), поэтому выбор именно варианта из EDM не существенен.

Как и в семинаре, мы используем надстройку над архитектурой, которая делает все необходимые преобразования над входами: нормирование, взятие логарифма от уровня шума и т.д. Ее имплементация на этот раз взята из репозитория EDM, поэтому загрузка модели выглядит немного необычным образом. Можно не обращать внимания на устройство кода в следующих двух ячейках (небольшая часть кода из гитхаба EDM была изменена для удобства использования в ноутбуке).

In [None]:
#!git clone https://github.com/NVlabs/edm
!cp edm/training/networks.py edm/training/networks_copy.py
!cp fid.py edm/fid.py

In [None]:
def append_code(in_files, out_file):
    lines = ['\n']
    for in_file in in_files:
        with open(in_file, 'r') as f:
            for line in f:
                lines.append(line)

    with open(out_file, 'w') as f:
        for line in lines:
            f.write(line)

append_code(['edm/training/networks_copy.py', 'cunet.py'], 'edm/training/networks.py')

In [None]:
import pickle
%cd edm
from training.networks import EDMPrecond
from torch_utils import misc
from dnnlib import util
%cd ..

cond_model = EDMPrecond(img_resolution=32, img_channels=3, model_type='CUNet', noise_channels=128, base_factor=64, emb_channels=128, label_dim=11)
cond_model.eval().cuda()

with util.open_url('cond_cunet.pkl') as f:
    data = pickle.load(f)

misc.copy_params_and_buffers(src_module=data['ema'], dst_module=cond_model, require_all=True)
print(f"Модель имеет {sum(p.numel() for p in cond_model.parameters())} параметров")

Возьмем слегка модифицированный код для генерации и визуализации из семинара и посмотрим, как работает модель:

In [None]:
def normalize(x):
    return x / x.abs().max(dim=0)[0][None, ...]

def velocity_from_denoiser(x, model, sigma, class_labels=None, error_eps=1e-4, stochastic=False, cfg=0.0, **model_kwargs):
    sigma = sigma[:, None, None, None]
    cond_v = (-model(x, sigma, class_labels, **model_kwargs) + x) / (sigma + error_eps)

    if cfg > 0.0:
        dummy_labels = torch.zeros_like(class_labels)
        dummy_labels[:, -1] = 1
        uncond_v = (-model(x, sigma, dummy_labels, **model_kwargs) + x) / (sigma + error_eps)
        v = cond_v + cfg * (cond_v - uncond_v)
    else:
        v = cond_v

    if stochastic:
        v = v * 2

    return v

def get_timesteps(params):
    num_steps = params['num_steps']
    sigma_min, sigma_max = params['sigma_min'], params['sigma_max']
    rho = params['rho']

    step_indices = torch.arange(num_steps, device=params['device'])
    t_steps = (sigma_max ** (1 / rho) + step_indices / (num_steps - 1) * (sigma_min ** (1 / rho) - sigma_max ** (1 / rho))) ** rho
    t_steps = torch.cat([t_steps, torch.zeros_like(t_steps[:1])]) # t_N = 0
    return t_steps

def sample_euler(model, noise, params, class_labels=None, **model_kwargs):
    num_steps = params['num_steps']
    vis_steps = params['vis_steps']
    t_steps = get_timesteps(params)
    x = noise * params['sigma_max']
    x_history = [normalize(noise)]
    with torch.no_grad():
        for i in range(len(t_steps) - 1):
            t_cur = t_steps[i]
            t_next = t_steps[i + 1]
            t_net = t_steps[i] * torch.ones(x.shape[0], device=params['device'])
            x = x + velocity_from_denoiser(x, model, t_net, class_labels=class_labels, stochastic=params['stochastic'], cfg=params['cfg'], **model_kwargs) * (t_next - t_cur)
            if params['stochastic']:
                x = x + torch.randn_like(x) * torch.sqrt(torch.abs(t_next - t_cur) * 2 * t_cur)
            x_history.append(normalize(x).view(-1, 3, *x.shape[2:]))

    x_history = [x_history[0]] + x_history[::-(num_steps // (vis_steps - 2))][::-1] + [x_history[-1]]

    return x, x_history

def visualize_model_samples(model, params, labels_usage='dummy', class_labels=None, title='Семплы из модели', **model_kwargs):
    noise = torch.randn(40, 3, 32, 32, device=params['device'])
    if class_labels is None and labels_usage == 'dummy':
        class_labels = torch.zeros(40, 11, device=params['device'])
        class_labels[:, -1] = 1
    elif labels_usage == 'random':
        class_labels = torch.randint(low=0, high=10, size=(40,), device=params['device'])
        class_labels = (class_labels[:, None] == torch.arange(11, device=params['device'])[None, :]).float()

    out, trajectory = sample_euler(model, noise, params, class_labels=class_labels, **model_kwargs)
    out = out * 0.5 + 0.5
    visualize_batch(out.detach().cpu(), title=title)

Визуализируем условные семплы из модели (мы будем использовать коэффициент classifier-free guidance, равный 1).

In [None]:
def visualize_cond_samples(model, params, n_samples=3, cfgs=[0.0, 0.5, 1.0, 2.0], **model_kwargs):
    fig, ax = plt.subplots(len(cfgs), figsize=(12, 8))
    for i in range(len(cfgs)):
        remove_ticks(ax[i])
        ax[i].set_title('Семплы с коэффициентом cfg = %.4g' % cfgs[i], fontsize=15)

    for i in range(len(cfgs)):
        cfg = cfgs[i]
        noise = torch.randn(n_samples * 10, 3, 32, 32, device=params['device'])
        class_labels = torch.eye(n=10, m=11).unsqueeze(0).repeat(n_samples, 1, 1).reshape(-1, 11).float().to(params['device'])
        params['cfg'] = cfgs[i]
        img, _ = sample_euler(model, noise, params, class_labels=class_labels, **model_kwargs)
        img = img * 0.5 + 0.5
        img_grid = make_grid(img, nrow=10)
        ax[i].imshow(img_grid.permute(1, 2, 0).detach().cpu())

sampling_params = {
    'device': 'cuda',
    'sigma_min': 0.02,
    'sigma_max': 80.0,
    'num_steps': 50,
    'rho': 7.0,
    'vis_steps': 1,
    'stochastic': False,
}

visualize_cond_samples(cond_model, sampling_params, cfgs=[0.0, 1.0])

## Непарные задачи перевода между доменами

Задача переноса стиля (перевода между доменами) ставит перед собой построение отображения $G(\mathbf{X})$ между двумя распределениями $p$ и $q$: то есть, такого отображения, что если $\mathbf{X} \sim p$, то $G(\mathbf{X}) \sim q$. Любое ли отображение с таким свойством подойдет? Нет, потому что в таком случае мы будем, например, считать решением задачи превращения кошки в собаку отображение $G$, которое переводит кошку в произвольную собаку, не имеющую ничего общего со входным изображением. Нам же хотелось бы гарантировать связь между входом и выходом. Дальше на курсе мы формализуем эту идею с помощью задачи оптимального транспорта.

Непарными же считаются задачи, в которых в данных нет явного соответствия между объектами двух доменов (например, в задаче превратить кошку в собаку или мужчину в женщину не очень понятно, как именно выбирать соответствующие пары). Здесь мы просто считаем, что нам дано два независимых набора данных, соответствующих $\mathbf{X} \sim p$ и $\mathbf{Y} \sim q$.

В этой части домашки мы будем решать задачу перевода между распределением $p(\mathbf{x})$, соответствующим распределению цифр из MNIST и $q(\mathbf{x})$, соответствующим распределению одного из классов MNIST (например, распределение троек).

## SDEdit

Оба метода основываются на одной и той же идее: если у нас есть диффузионная модель, способная генерировать тройки (например, безусловная модель, обученная на тройках, или условная модель, обученная на всем датасете), то чисто теоретически можно превратить любую цифру в тройку следующим образом:
* Зашумить цифру $\mathbf{X}$ до такого уровня $t$, что очертания, позволяющие определить цифру по $\mathbf{X} + t \varepsilon$, размываются, но остаются различимыми такие более общие черты, как цвет/толщина и т.д.;
* Запустить с помощью "троечной" диффузионной модели процесс расшумления, начав его с момента времени $t$ и семпла $\mathbf{X}_t$.

В идеале, генерация с помощью троечной диффузионной модели позволит нам получить правдоподобную тройку, а черты, оставшиеся в картинке после зашумления, позволят на каком-то уровне сохранить стиль исходной цифры. Данный метод называется [SDEdit](https://arxiv.org/abs/2108.01073).

## Задача 1

* **(0.2 балла)** Имплементируйте SDEdit для перевода произвольной цифры в цифру фиксированного класса (передав соответствующую метку в предобученную условную модель). Обратите внимание, что наша имплементация семплинга по схеме Эйлера ждет на вход $X_t / t$, поскольку в коде вход умножается на $t$;
* **(0.2 балла)** Проанализируйте, как меняется качество работы модели при изменении ее единственного гиперпараметра — уровня шума $t$, который прибавляется к исходной цифре. Возьмите по одной цифре из каждого класса и визуализируйте выходы метода при разных $t$. Поэкспериментируйте с разными $t$, чтобы визуализировать такие $t$, между которыми качественно меняется работа метода (например, сравнивать метод на $t = 79.0$ и $t = 80.0$ нет смысла). Как при изменении $t$ изменяется качество семплов и их похожесть на вход? Какой уровень шума $t$ вы бы предложили использовать?

In [None]:
# params: параметры семплинга по Эйлеру
# x_source: исходная картинка
# target_label: число, метка класса, в который нужно превратить объекты на входе
# гиперпараметр t передается в метод как params['sigma_max']

def sdedit(model, x_source, target_label, params):
    out = ...
    return out


In [None]:
def visualize_transform(batch, batch_out):
    batch_cat = torch.cat((batch, batch_out), dim=0)
    image_grid = make_grid(batch_cat.cpu(), nrow=len(batch))

    fig, ax = plt.subplots(figsize=(3 * len(batch), 3))
    remove_ticks(ax)
    ax.imshow(image_grid.permute(1, 2, 0))
    plt.show()

In [None]:
sampling_params = {
    'device': 'cuda',
    'sigma_min': 0.02,
    'sigma_max': ...,
    'num_steps': 50,
    'rho': 7.0,
    'vis_steps': 1,
    'stochastic': False,
    'cfg': 1.0
}

x_source = (next(iter(train_dataloader))[0] * 2 - 1).cuda()[:10]
x_out = sdedit(cond_model, x_source, target_label=3, params=sampling_params)
visualize_transform(x_source, x_out)

## DDIB (Dual Diffusion Implicit Bridges)

Следующий метод, который мы рассмотрим, называется Dual Diffusion Implicit Bridges [(DDIB)](https://arxiv.org/abs/2203.08382). Он использует ту же идею, что SDEdit (зашумить семпл из исходного домена и расшумить его диффузионной моделью для целевого домена), но делает это более умно. Принципиальная проблема SDEdit состоит в том, что стохастическое зашумление всегда сопровождается потерей данных об исходном изображении (если $t$ слишком большое, то можно считать, что информации вообще никакой не остается). Если бы был способ детерминированного зашумления данных, это бы решило проблему, так как позволило бы превратить вход во что-то, что может принять на вход диффузионная модель для таргетного домена.

А такой способ у нас есть! Подойдет представление диффузионных моделей через обыкновенные дифференциальные уравнения: прямой процесс зашумления
$$
    \mathrm{d} \mathbf{X}_t = g(t) \mathrm{d} \mathbf{W}_t
$$
эквивалентен ОДУ
$$
    \mathrm{d} \mathbf{Y}_t = -\frac{g^2(t)}{2} \nabla \log p_t(\mathbf{Y}_t) \mathrm{d} t
$$
с точки зрения маргинальных распределений в каждый момент времени $t$. Тогда зашумить изображение из исходного домена можно решив соответствующее ОДУ с момента времени $0$ до момента времени $t$. Все, что для этого нужно, — иметь диффузионную модель для исходного домена (а такая модель в нашем сеттинге с условной моделью на MNIST'e есть).  Полученное зашумленное изображение, как и раньше, подается в диффузионную модель для целевого домена и расшумляется.

## Задача 2
* **(0.3 балла)** Реализуйте детерминированное зашумление изображения с помощью метода Эйлера, взяв $\sigma_t = t$ и $g(t) = \sqrt{2t}$ (именно эти параметры мы взяли в 6 лекции, с ними имплементировали схему Эйлера, которую потом скопировали во все 3 части домашки). На его основе релизуйте метод DDIB. Так как работаем мы с переводом произвольной цифры в, например, тройку, при зашумлении изображения мы будем подавать на вход сети его метку класса, а при расшумлении — метку целевого класса. При зашумлении тоже имеет смысл использовать CFG.

* Как и в SDEdit, обратите внимание, что наша имплементация семплинга по схеме Эйлера ждет на вход $X_t / t$, поскольку в коде вход умножается на $t$;

* **(0.1 балл)** Визуализируйте траекторию зашумления и расшумления изображения при выборе максимального $t = T$ (80.0 в наших экспериментах). Похож ли "детерминированный шум", полученный при кодировании входа, на семпл из нормального распределения? При визуализации траектории имеет смысл нормировать промежуточные изображения (например, так, как это делается в лекции 5).

  
* **(0.2 балла, копипаста из SDEdit)** Проанализируйте, как меняется качество работы модели при изменении ее единственного гиперпараметра — уровня шума $t$, до которого кодируется исходная картинка. Возьмите по одной цифре из каждого класса и визуализируйте выходы метода при разных $t$. Поэкспериментируйте с разными $t$, чтобы визуализировать такие $t$, между которыми качественно меняется работа метода. Как при изменении $t$ изменяется качество семплов и их похожесть на вход? Какой уровень шума $t$ вы бы предложили использовать?

In [None]:
sampling_params = {
    'device': 'cuda',
    'sigma_min': 0.02,
    'sigma_max': ...,
    'num_steps': 50,
    'rho': 7.0,
    'vis_steps': 1,
    'cfg': 1.0
}

def encode_euler(model, x_source, params, class_labels=None, **model_kwargs):
    num_steps = params['num_steps']
    vis_steps = params['vis_steps']
    t_steps = get_timesteps(params) # здесь t убывают!
    x = x_source

    x_history = [normalize(x)]
    with torch.no_grad():
        for i in range(len(t_steps) - 1):
            x = x + ...

            x_history.append(normalize(x).view(-1, 3, *x.shape[2:]))

    x_history = [x_history[0]] + x_history[::-(num_steps // (vis_steps - 2))][::-1] + [x_history[-1]]

    return x, x_history

def ddib(model, x_source, target_label, params):
    out = ...
    return out

## Задача 3 (бонус, 0.5 балла)

Возьмите по 3-5 (адекватных) значений $t$ для каждого из двух методов. Для каждого $t$ запустите метод (с тройками в качестве целевого домена) на подмножестве тестового датасета MNIST (1000-2000 картинок) и посчитайте две метрики:
* Средняя по датасету "похожесть" между входом и выходом, посчитанная как попиксельная $L_2$ норма разности;
* FID между сгенерированными тройками и тройками из трейн датасета (предпосчитанные статистики лежат в *cmnist_train_3.npz*).

Визуализируйте полученные метрики в виде двумерного scatter plot с осями, соответствующими метрикам, и прокомментируйте результаты. Какой из методов достигает лучшего баланса между качеством семплов и похожестью входа на выход?