# **RePaint: Inpainting using Denoising Diffusion Probabilistic Models(CVPR 2022)**
### Paper: https://arxiv.org/abs/2201.09865
### Used Pretrained Model: https://huggingface.co/google/ddpm-celebahq-256 (by Google)
### Implementation Environment: Colab(GPU ~ T4)


**본 구현에서는 해당 논문의 재현을 위해 위의 사전학습된 DDPM 모델을 활용했으며, 추가적인 학습은 하지 않고 Pipeline 구조만 수정하였음.**

## Mount google

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Importing modules

In [None]:
from diffusers import DDPMPipeline
import torch
from PIL import Image

## Jump Diffusion
* $x_{t}$ -> $x_{t+j}$

In [None]:
# x_{t} -> x_{t+n}
def jump_diffusion(x_t, t_now, t_end, scheduler):
    alpha_bar_t = scheduler.alphas_cumprod[t_now]
    alpha_bar_next = scheduler.alphas_cumprod[t_end]

    noise = torch.randn_like(x_t)

    coef_1 = (alpha_bar_next / alpha_bar_t).sqrt()
    coef_2 = (1 - alpha_bar_next / alpha_bar_t).sqrt()

    x_jumped = coef_1 * x_t + coef_2 * noise
    return x_jumped

## Sampling noisy x0 for conditioning
* $x_{0}$ -> $x_{t}$ (x0 is ground truth image)



In [None]:
# x_{0} -> x_{t}
def sample_conditioning(x0, t, scheduler):
    N, _, _, _ = x0.shape
    noise = torch.randn_like(x0)
    x_t = scheduler.add_noise(original_samples=x0, noise=noise, timesteps=(torch.ones(N, dtype=torch.long)*t).to('cuda'))
    return x_t

## Element-wise product
* $m \odot x$

In [None]:
def element_wise_product(a, x):
    return a*x

## Merge known regions with unknown regions
* $x_{t}$ = $(1-m) \odot x^{unknown}_{t}$ + $m \odot x^{known}_{t}$

In [None]:
# x_{t-1} = (1-m)*x_{t(unknown)} + m*x_{t(known)}
def merge_known_unknown(x_known, x_generated, mask):
    x_known = element_wise_product(mask, x_known)
    x_unknown = element_wise_product(1-mask, x_generated)
    return x_known + x_unknown

## Make jump time list for Resampling

In [None]:
def make_jump_t_list(max_t, j):
    assert max_t % j == 0, "T must be divisible by j."
    num_steps = max_t//j+1
    jump_steps = [j*t-1 for t in range(num_steps)]
    jump_steps[0] = 0
    return jump_steps[::-1]

## Load masked image

In [None]:
from PIL import Image
import torchvision.transforms as T
import torch
from google.colab import drive


drive.mount('/content/drive')

# mask로 일부 가려져 손상된 이미지 경로
img_path = "/content/drive/MyDrive/repaint/masked_iron.png"
image = Image.open(img_path).convert("RGB")

# Transform image to tensor
transform = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.5]*3, std=[0.5]*3)  # → [-1, 1]
])

x0 = transform(image).to('cuda')

In [None]:
class CustomDDPMPipeline(DDPMPipeline):
    def custom_sample(self, mask, j, n, x0, num_inference_steps=1000, seed=42):
        torch.manual_seed(seed)
        self.scheduler.set_timesteps(num_inference_steps)

        x_T = sample_conditioning(x0, num_inference_steps-1, self.scheduler)
        x = merge_known_unknown(x_T, torch.randn_like(x0), mask) # x_1000 ~ code's notation : x_999

        jump_steps = make_jump_t_list(num_inference_steps, j)

        # Jump cursor
        for cursor in range(len(jump_steps)-1):
            t_start = jump_steps[cursor]
            t_end = jump_steps[cursor+1]

            # Repeat Resampling for n steps
            for resample_step in range(n):

                # Generates unknown regions and constructs x_{t-1} by merging it with x_known
                for t in range(t_start, t_end, -1):
                    with torch.no_grad():
                        x_condition = sample_conditioning(x0, t-1, self.scheduler) # Get x_{t-1(known)}

                        noise_pred = self.unet(x, t).sample
                        x_generated = self.scheduler.step(noise_pred, t, x).prev_sample # Get x_{t-1(unknown)}

                        x = merge_known_unknown(x_condition, x_generated, mask) # Get x_{t-1}

                if resample_step != n-1: # For finishing resampling
                    print(f"Jumped! | t : {t_end} -> {t_start} | resample_cnt : {resample_step+1}")
                    x = jump_diffusion(x, t_end, t_start, self.scheduler) # diffuse x_{t-j} to x_{t}
                else:
                    print(f"Resampling finished. | t : {t_end} -> {t_start} | resample_cnt : {resample_step+1}")

        # Compuations for transforming tensor to image.
        images = (x / 2 + 0.5).clamp(0, 1)
        images = images.cpu().permute(0, 2, 3, 1).numpy()
        return images

# Off-the-shelf model
custom_pipeline = CustomDDPMPipeline.from_pretrained(
    "google/ddpm-celebahq-256",
    use_safetensors=False
)
custom_pipeline = custom_pipeline.to("cuda" if torch.cuda.is_available() else "cpu")

## 귀찮아서 아직 이 아래는 코드 정리 안해둠. 그래서 더러움. 미래의 내가 하겠지.. ##
sample_num = 3
x0 = x0.reshape(1, 3, 256, 256).repeat(sample_num, 1, 1, 1)
mask = torch.ones_like(x0).to('cuda')
mask[:, :, 50:180, 60:130] = 0 # 마스크 위치를 직접 지정하는 게 아니라 사진이랑 같이 불러오는 형식으로 수정해야 함.

# Performing RePaint
imgs = custom_pipeline.custom_sample(num_inference_steps=1000, mask=mask, j=100, n=15, x0=x0) # if n=1: no resampling

# Save results
for idx, img in enumerate(imgs):
    img = Image.fromarray((img * 255).astype("uint8"))
    img.save(f"/content/drive/MyDrive/result{idx}.png")
    img.show()