<a href="https://colab.research.google.com/github/narnia-ai-eilie/Narnia-Edu/blob/main/Lecture/250516_KHT/KHT_Diffusion_%EC%8B%A4%EC%8A%B5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 실전 생성 AI : 2D Diffusion

* 날짜:
* 이름:




## **(0) Environment Setup**
---

### **| 라이브러리 설치**

In [None]:
!pip install --upgrade diffusers[torch]

In [None]:
!pip install pytorch-lightning

### **| Utils**

In [None]:
import glob
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from mpl_toolkits.axes_grid1 import make_axes_locatable
import random

def set_seed(seed=42):
    # Python의 내장 난수 생성기 시드 설정
    random.seed(seed)

    # NumPy 난수 생성기 시드 설정
    np.random.seed(seed)

    # PyTorch 난수 생성기 시드 설정 (CPU)
    torch.manual_seed(seed)

    # PyTorch 난수 생성기 시드 설정 (GPU)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # 모든 GPU에 대해 시드 설정

    # CuDNN 설정
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def draw_img_from_cond(skin, geo, cmap='gray'):
    plt.figure(figsize=(3,3))
    path = f'carhood/skin_{skin}_geometry_{geo}.npy'
    img = np.load(path)
    plt.imshow(img, cmap=cmap)
    plt.colorbar()
    plt.show()
    return img


def draw_dist(inputs, samples=100):
    if len(inputs) > samples:
        inputs = random.sample(list(inputs), samples)

    pixel_values = []
    for item in inputs:
        if isinstance(item, Image.Image):
            pixel_values.extend(list(item.getdata()))
        elif isinstance(item, np.ndarray):
            pixel_values.extend(item.flatten())
        elif torch.is_tensor(item):
            if len(item.shape) == 4 and item.shape[1] == 1:  # 이미지가 (batch_size, 1, height, width) 꼴일 때
                item = item.squeeze(1)
            elif len(item.shape) == 3:  # 이미지가 (batch_size, height, width) 꼴일 때
                item = item.unsqueeze(1)
            else:
                raise ValueError("Unsupported image shape. Supported shapes: (batch_size, height, width) or (batch_size, 1, height, width).")
            item = item.detach().cpu().numpy()
            pixel_values.extend(item.flatten())
        else:
            raise ValueError("Unsupported image type. Supported types: PIL Image, numpy array, torch tensor.")

    plt.figure(figsize=(5,3))
    plt.hist(pixel_values, bins=20, color='blue', alpha=0.7)
    plt.xlabel('Pixel Value')
    plt.ylabel('Frequency')
    plt.title('Image Pixel Value Distribution')
    plt.show()



def show_img(images_list,
             r=1,
             cmap='gray',
             img_size=(5, 5),
             axis="off",
             colorbar=False,
             colorbar_range=None,
             save_path=None):
    if r < 1:
        r = 1

    total_images = len(images_list)
    if total_images == 0:
        print("No images to display.")
        return

    cols = (total_images + r - 1) // r
    fig, axs = plt.subplots(r, cols, figsize=(cols * img_size[0], r * img_size[1]))

    if r == 1:
        axs = axs.reshape(1, -1)

    for idx, item in enumerate(images_list):
        ax = axs[0, idx] if r == 1 else axs[idx // cols, idx % cols]
        im = None
        if isinstance(item, Image.Image):
            if item.mode in ['L', '1']:  # Grayscale images
                im = ax.imshow(item, cmap=cmap)
            else:  # Color images
                im = ax.imshow(item)
        elif isinstance(item, np.ndarray):
            if item.ndim == 2:  # 2D array, grayscale image
                im = ax.imshow(item, cmap=cmap, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            elif item.ndim == 3:  # 3D array, color image
                im = ax.imshow(item, cmap=cmap if item.shape[-1] == 1 else None, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            elif item.ndim == 4 and item.shape[0] == 1:  # 4D array with batch dimension of 1
                im = ax.imshow(item[0], cmap=cmap if item.shape[1] == 1 else None, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            else:
                raise ValueError(f"Unsupported numpy array shape: {item.shape}.")
        elif torch.is_tensor(item):
            item = item.detach().cpu().numpy()
            if item.ndim == 2:  # 2D tensor, grayscale image
                im = ax.imshow(item, cmap=cmap, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            elif item.ndim == 3:  # 3D tensor, color image
                im = ax.imshow(item.transpose(1, 2, 0), cmap=cmap if item.shape[0] == 1 else None, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            elif item.ndim == 4 and item.shape[0] == 1:  # 4D tensor with batch dimension of 1
                im = ax.imshow(item[0].transpose(1, 2, 0), cmap=cmap if item.shape[1] == 1 else None, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            else:
                raise ValueError(f"Unsupported torch tensor shape: {item.shape}.")
        else:
            raise ValueError("Unsupported image type. Supported types: PIL Image, numpy array, torch tensor.")

        if colorbar and im is not None:
            divider = make_axes_locatable(ax)
            cax = divider.append_axes("right", size="5%", pad=0.05)
            fig.colorbar(im, cax=cax)

        ax.axis(axis)

    plt.tight_layout()
    # 이미지 저장
    if save_path is not None:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        plt.savefig(save_path, bbox_inches='tight')
        plt.close()
    else:
        plt.show()


from PIL import Image

def load_img(path):
    """지정된 경로의 이미지를 로드하여 반환합니다."""
    try:
        img = plt.imread(path)
        img = img / 255.
        return img
    except Exception as e:
        print(f"이미지를 불러오는 중 오류가 발생했습니다: {e}")
        return None

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
import torch
from mpl_toolkits.axes_grid1 import make_axes_locatable

def show_img(images_list,
             r=1,
             cmap='gray',
             img_size=(5, 5),
             axis="off",
             colorbar=False,
             colorbar_range=None,
             save_path=None,
             titles=[]):  # titles 추가
    if r < 1:
        r = 1

    total_images = len(images_list)
    if total_images == 0:
        print("No images to display.")
        return

    cols = (total_images + r - 1) // r
    fig, axs = plt.subplots(r, cols, figsize=(cols * img_size[0], r * img_size[1]))

    # Adjust the axs shape for single subplot cases
    if r == 1 and cols == 1:
        axs = np.array([axs])

    for idx, item in enumerate(images_list):
        ax = axs.flatten()[idx]
        im = None
        if isinstance(item, Image.Image):
            if item.mode in ['L', '1']:  # Grayscale images
                im = ax.imshow(item, cmap=cmap)
            else:  # Color images
                im = ax.imshow(item)
        elif isinstance(item, np.ndarray):
            if item.ndim == 2:  # 2D array, grayscale image
                im = ax.imshow(item, cmap=cmap, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            elif item.ndim == 3:  # 3D array, color image
                if item.shape[-1] == 1:  # Grayscale image with single channel
                    im = ax.imshow(item.squeeze(), cmap=cmap, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
                else:  # Color image
                    im = ax.imshow(item, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            elif item.ndim == 4 and item.shape[0] == 1:  # 4D array with batch dimension of 1
                im = ax.imshow(item[0].transpose(1, 2, 0), cmap=cmap if item.shape[1] == 1 else None, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            else:
                raise ValueError(f"Unsupported numpy array shape: {item.shape}.")
        elif torch.is_tensor(item):
            item = item.detach().cpu().numpy()
            if item.ndim == 2:  # 2D tensor, grayscale image
                im = ax.imshow(item, cmap=cmap, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            elif item.ndim == 3:  # 3D tensor, color image
                im = ax.imshow(item.transpose(1, 2, 0), cmap=cmap if item.shape[0] == 1 else None, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            elif item.ndim == 4 and item.shape[0] == 1:  # 4D tensor with batch dimension of 1
                im = ax.imshow(item[0].transpose(1, 2, 0), cmap=cmap if item.shape[1] == 1 else None, vmin=colorbar_range[0] if colorbar_range else None, vmax=colorbar_range[1] if colorbar_range else None)
            else:
                raise ValueError(f"Unsupported torch tensor shape: {item.shape}.")
        else:
            raise ValueError("Unsupported image type. Supported types: PIL Image, numpy array, torch tensor.")

        # Add title if titles list is provided
        if titles and idx < len(titles):
            ax.set_title(titles[idx], fontsize=10)

        if colorbar and im is not None:
            divider = make_axes_locatable(ax)
            cax = divider.append_axes("right", size="5%", pad=0.05)
            fig.colorbar(im, cax=cax)

        ax.axis(axis)

    plt.tight_layout()
    # 이미지 저장
    if save_path is not None:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        plt.savefig(save_path, bbox_inches='tight')
        plt.close()
    else:
        plt.show()

In [None]:
set_seed(0)

### **| Dataset download**

In [None]:
!gdown --folder https://drive.google.com/drive/u/0/folders/1vd1SmXp4L_xJXwDjoCzSoDfYziFDo5sS

In [None]:
!unzip KHT/PTN.zip -d .

## **(1) Dataset**

### **| EDA**



In [None]:
paths = glob.glob('./PTN/*jpg')
print(len(paths))

img = load_img(paths[6])
print(img.shape)
show_img([img], axis=True, colorbar=True)

* Groove 추출

In [None]:
def get_groove(im, void=2, thred=10):
    bk = np.zeros_like(im)
    for i, r in enumerate(im):
        if np.sum(r) <=thred:
            bk[i-void:i+void,:]=1
    return bk

In [None]:
groove_img = get_groove(img)
line_img = np.clip(groove_img + img, 0, 1)
show_img([img, groove_img, line_img], colorbar=True)

* 선 추출

In [None]:
import cv2

def adjust_thickness(image_array, kernel_size=5):
    """
    넘파이 배열 (0~1) 이미지를 입력받아 굵기 조절 (확장/축소) 적용.

    Parameters:
        - image_array (numpy.ndarray): 0~1 범위의 (n, n) 흑백 이미지
        - kernel_size (int): 필터 크기 (클수록 강하게 적용)
        - operation (str): 'dilate' (굵게) or 'erode' (얇게)

    Returns:
        - modified_image (numpy.ndarray): 변환된 0~1 범위 이미지
    """
    # 0~1 → 0~255로 변환 (OpenCV는 0~255 범위 사용)
    img = (image_array * 255).astype(np.uint8)

    # 커널 생성 (커널 크기 조절 가능)
    kernel = np.ones((kernel_size, kernel_size), np.uint8)

    # 굵기 조절 연산 적용
    modified_img = cv2.dilate(img, kernel, iterations=1)

    # 결과를 다시 0~1 범위로 변환
    return modified_img.astype(np.float32) / 255.0

In [None]:
simple_img = adjust_thickness(line_img, kernel_size=5)
condition_img = np.clip(simple_img - groove_img, 0, 1)
show_img([img, simple_img, condition_img], colorbar=True)

### **| DataLoader**

In [None]:
import torch
import numpy as np
import glob
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import cv2


def get_groove(im, void=2, thred=10):
    """
    이미지 내 밝기 값이 특정 임계값 이하인 행(row)을 중심으로
    '홈' 영역을 마스크로 반환합니다.

    Parameters:
        im (ndarray): 입력 이미지 (2D grayscale 또는 RGB 단일 채널)
        void (int): 탐지된 행을 중심으로 마스크를 확장할 범위 (상하 각 void픽셀)
        thred (int): 한 행의 밝기 합계가 이 값 이하이면 홈으로 판단

    Returns:
        bk (ndarray): 입력 이미지와 동일한 크기의 마스크 배열 (홈 영역은 1, 나머지는 0)
    """
    # 출력 마스크 초기화 (입력 이미지와 동일한 shape, 값은 모두 0)
    bk = np.zeros_like(im)

    # 각 행(r)을 순회하며 밝기 합 검사
    for i, r in enumerate(im):
        if np.sum(r) <= thred:
            # 조건을 만족하는 경우 해당 행의 위아래 void 픽셀 범위 마스크를 1로 설정
            bk[i - void:i + void, :] = 1

    return bk

def adjust_thickness(image_array, kernel_size=5):
    """
    넘파이 배열 (0~1) 이미지를 입력받아 굵기 조절 (확장/축소) 적용.

    Parameters:
        - image_array (numpy.ndarray): 0~1 범위의 (n, n) 흑백 이미지
        - kernel_size (int): 필터 크기 (클수록 강하게 적용)
        - operation (str): 'dilate' (굵게) or 'erode' (얇게)

    Returns:
        - modified_image (numpy.ndarray): 변환된 0~1 범위 이미지
    """
    # 0~1 → 0~255로 변환 (OpenCV는 0~255 범위 사용)
    img = (image_array * 255).astype(np.uint8)

    # 커널 생성 (커널 크기 조절 가능)
    kernel = np.ones((kernel_size, kernel_size), np.uint8)

    # 굵기 조절 연산 적용
    modified_img = cv2.dilate(img, kernel, iterations=1)

    # 결과를 다시 0~1 범위로 변환
    return modified_img.astype(np.float32) / 255.0


class PP(Dataset):

    def __init__(self,
                 paths=[],
                 return_condition=True,
                 resize=None,
                 batch_size: int = 4,
                 shuffle: bool = True,
                 aug=True,
                 dtype = torch.float16,
                ):

        # init
        self.paths = paths

        self.return_condition = return_condition
        self.aug = aug
        self.resize = resize
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.dtype = dtype

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):

        path = self.paths[idx]
        img = load_img(path)
        groove_img = get_groove(img)
        line_img = np.clip(groove_img + img, 0, 1)
        simple_img = adjust_thickness(line_img, kernel_size=5)
        condition_img = np.clip(simple_img - groove_img, 0, 1)

        img = torch.tensor(img, dtype=self.dtype).unsqueeze(0)           # (1, H, W)
        img = ( img - 0.5 ) * 2.
        condition_img = torch.tensor(condition_img, dtype=self.dtype).unsqueeze(0)

        if self.aug:
            _, h, w = img.shape
            max_x = w - h
            start_x = torch.randint(0, max_x + 1, (1,)).item()

            img = img[:, :, start_x:start_x + 512]
            condition_img = condition_img[:, :, start_x:start_x + 512]

            if torch.rand(1) < 0.5:
                img = torch.flip(img, dims=[2])              # width 방향
                condition_img = torch.flip(condition_img, dims=[2])

            if torch.rand(1) < 0.5:
                img = torch.flip(img, dims=[1])              # height 방향
                condition_img = torch.flip(condition_img, dims=[1])


        if self.resize is not None:
            img = transforms.Resize(self.resize)(img)
            condition_img = transforms.Resize(self.resize)(condition_img)

        if self.return_condition:
            return img, condition_img
        else:
            return img


    def get_loader(self):
        return DataLoader(self, batch_size=self.batch_size, shuffle=self.shuffle)

    def get_batch(self, idx=0):
        ds = self.get_loader()
        for i, batch in enumerate(ds):
            if i == idx:
                break
        return batch

In [None]:
import glob
paths= glob.glob('./PTN/*jpg')
pp = PP(paths, resize=256, shuffle=False)
for i in range(3):
    print('-- trial', i)
    img, condition = pp.get_batch(0)
    print(img.shape, condition.shape)
    show_img(img, colorbar=True, axis=True)
    show_img(condition, colorbar=True, axis=True)

## **(2) Modules**

![](https://github.com/EilieYoun/Narnia-Edu/blob/main/imgs/%E1%84%89%E1%85%B3%E1%84%8F%E1%85%B3%E1%84%85%E1%85%B5%E1%86%AB%E1%84%89%E1%85%A3%E1%86%BA%202024-08-23%20%E1%84%8B%E1%85%A9%E1%84%92%E1%85%AE%202.46.03.png?raw=true)


![](https://github.com/EilieYoun/box/blob/main/images/240214_ddpm.png?raw=true)


**Diffusion**은 이미지를 점진적으로 정제하는 과정을 통해 노이즈를 제거하고, 원래 이미지를 복원하는 생성 모델입니다. 이번 시간에서는 **DDIM** (Denoising Diffusion Implicit Models)을 사용하여 노이즈를 추가하고 제거하는 과정을 수행합니다.

이번 실습에서는 **Hugging Face**에서 제공하는 `diffusers` 라이브러리를 사용하여 **Diffusion** 모델을 구축합니다. 이 라이브러리는 다양한 **Diffusion** 모델을 쉽게 구현하고 실험할 수 있는 도구를 제공합니다. `diffusers` 라이브러리는 모듈화된 구조를 가지고 있어, 다양한 모델 구성 요소를 쉽게 설정하고 사용할 수 있습니다. 자세한 내용은 https://github.com/huggingface 에서 확인할 수 있습니다.


### **| UNet**

첫 번째 모듈은 UNet입니다. UNet은 이미지 분할 및 재구성 작업에 널리 사용되는 네트워크 구조로, 이번 실습에서는 노이즈를 예측하는 데 사용됩니다. 모듈에 관한 자세한 내용은 [Hugging Face UNET 페이지](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/unets/unet_2d.py)에서 확인할 수 있습니다.

- `UNet2DModel`을 사용하여 **UNet** 모델을 정의합니다.
- 입력이미지와 채널 수를 적절하게 정의해 줍니다.
- `layers_per_block`과 `block_out_channels`를 기존보다 작게 설정해 가벼운 모델을 만듭니다.

In [None]:
from diffusers import UNet2DModel

unet = UNet2DModel(
    sample_size = 128, # 이미지 데이터의 사이즈
    in_channels = 1, # 이미지 채널 수
    out_channels = 1, # 이미지 채널 수
    layers_per_block = 1, # 모델의 크기 영향
    block_out_channels = [64, 128, 256, 512], # 모델의 크기 영향
)

- `noisy_images`와 `timestep`을 입력으로 받아 예측된 노이즈를 출력합니다.


In [None]:
noisy_images = torch.randn((4, 1, 128, 128)) # 노이즈가 가미된 이미지 (테스트를 위해 랜덤 값으로 구성)
timestep = torch.tensor([10]) # 1000 ~ 0

# unet은 timestep에 맞게 노이즈가 첨가된 이미지에서 노이즈를 예측합니다.
with torch.no_grad():
    pred_noises = unet(noisy_images, timestep).sample
print(pred_noises.shape) # 노이즈의 shape는 noisy_images 와 같아야 한다.
show_img(pred_noises)

### **| Noise scheduler**

두 번째 모듈은 **DDIM Noise Scheduler**입니다. **DDIM**은 Denoising Diffusion Implicit Models의 약자로, 이미지 생성 과정에서 점진적으로 노이즈를 제거하는 역할을 합니다. 이번 실습에서는 노이즈 추가 및 제거를 통해 이미지 재구성을 수행합니다. 모듈에 관한 자세한 내용은 [Hugging Face DDIM 페이지](https://github.com/huggingface/diffusers/blob/main/src/diffusers/schedulers/scheduling_ddim.py)에서 확인할 수 있습니다.

- `DDIMScheduler`를 사용하여 **Noise Scheduler**를 설정합니다. 이 모듈은 각 타임스텝에서 노이즈를 추가하거나 제거하는 역할을 합니다.
- `num_train_timesteps`를 통해 전체 학습 타임스텝 수를 정의합니다.


In [None]:
from diffusers import UNet2DModel, DDIMScheduler

noise_scheduler = DDIMScheduler(num_train_timesteps=1000)
print('timesteps : ', len(noise_scheduler.timesteps), noise_scheduler.timesteps) # timesteps 확인

- `set_timesteps()` 를 통해 사용할 타임스텝 수를 설정하고, 설정된 타임스텝을 출력합니다.

In [None]:
noise_scheduler.set_timesteps(5) # noise scheduler timesteps 설정
print('timesteps : ', len(noise_scheduler.timesteps), noise_scheduler.timesteps) # timesteps 확인

**노이즈 추가**

- `noise_scheduler.timesteps`를 반복하면서 각 타임스텝마다 노이즈를 추가합니다. 이는 원래 이미지에 점진적으로 노이즈를 더해가며 모델을 훈련시키기 위한 과정입니다.
- `torch.randn`을 사용하여 랜덤 노이즈를 생성합니다.
- `noise_scheduler.add_noise` 메서드를 통해 노이즈 이미지로 변환합니다.
- 노이즈 이미지를 시각화하여 확인합니다.

In [None]:
for timestep in noise_scheduler.timesteps:
    print(timestep)
    noises = torch.randn(img.shape) # 순수한 노이즈를 랜덤하게 생성
    noisy_images = noise_scheduler.add_noise(img, noises, timestep) # 노이즈 스케줄러는 타임스텝에 맞게 적절한 노이즈를 이미지에 추가
    show_img(noisy_images)

### **| Image Denoising Process**

UNet과 DDIM Noise Scheduler를 사용하여 노이즈가 있는 이미지를 점진적으로 복원하는 과정 구현합니다. 가이드 코드는 [Hugging Face DDIM 파이프라인](https://github.com/huggingface/diffusers/blob/main/src/diffusers/pipelines/ddim/pipeline_ddim.py)에서 확인할 수 있습니다.

In [None]:
# 이미지 생성하는 과정.

noise_scheduler.set_timesteps(4) # train 에서는 1000 step 사용, inference( 즉, 생성할 때는 )  보통 (20, 30, 50)

images =  torch.randn((4, 1, 128, 128)) # 초기 noise 설정 (우리의 이미지는 노이즈로부터 생성될 예정)
for timestep in noise_scheduler.timesteps: # timesteps 만큼 반복하기
    print(timestep)
    with torch.no_grad(): # 가중치 계산 비활성화
        pred_noises = unet(images, timestep).sample # unet은 timestep 1000에서 거의 노이즈에 가까운 이미지로부터 노이즈만 예측
        images  = noise_scheduler.step(pred_noises, timestep, images).prev_sample # 노이즈에 가까운 이미지에서 노이즈를 제거

images = images.numpy()
print(images.shape)
show_img(images[:,0])

# 이미지가 제대로 안나올 것 , 왜냐면 아직 unet을 학습하지 않았기 때문에.

## **(3) Model**

### **| Defining the Diffusion Model**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from diffusers import DDIMScheduler
from diffusers.optimization import get_cosine_schedule_with_warmup
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor
from pytorch_lightning.loggers import CSVLogger

from diffusers import UNet2DModel, DDIMScheduler
import os

class DiffusionModel(pl.LightningModule):
    def __init__(self,
                 sample_size=128,
                 in_channels=1,
                 out_channels=1,
                 layers_per_block=1,
                 block_out_channels=[64, 128, 256, 512],
                 num_train_timesteps=1000,
                 device='cuda',
                 seed =0):

        super(DiffusionModel, self).__init__()
        self.sample_size = sample_size
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.layers_per_block = layers_per_block
        self.block_out_channels = block_out_channels
        self.num_train_timesteps = num_train_timesteps
        self.device_type = device

        self.seed = seed

        self.save_hyperparameters() # 하이퍼파라미터 값들이 자동 저장.

        # 모듈 정의1. unet -> 학습할 모델
        self.unet = UNet2DModel(
            sample_size=sample_size,
            in_channels=in_channels,
            out_channels=out_channels,
            layers_per_block=layers_per_block,
            block_out_channels=block_out_channels,
        )
        # 모듈 정의2. noise scheduler
        self.noise_scheduler = DDIMScheduler(num_train_timesteps=num_train_timesteps)

    def forward(self, x, timesteps): # x: 노이즈~이미지
        return self.unet(x, timesteps).sample

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.unet.parameters(), lr=self.learning_rate)
        scheduler = get_cosine_schedule_with_warmup( # learning rate를 조정하는 스케줄러 != noise scheduler 다른개념!
            optimizer,
            num_warmup_steps=self.lr_warmup_steps,
            num_training_steps=(self.num_training_steps * self.trainer.max_epochs),
        )
        return [optimizer], [{"scheduler": scheduler, "interval": "step"}]

    def training_step(self, batch, batch_idx):
        imgs = batch

        # image에 들어갈 noise와 timestep 학습때마다 랜덤하게 정의 된다.
        noise = torch.randn(imgs.shape).to(self.device)
        timesteps = torch.randint(
            0, self.noise_scheduler.config.num_train_timesteps,
            (imgs.shape[0],),
            device=self.device
        ).long()

        # 랜덤한 노이즈를 t 스텝에 맞게 적절히 이미지에 추가를 합니다.
        noisy_images = self.noise_scheduler.add_noise(imgs, noise, timesteps)

        # unet은 적절하게 노이즈가 가미된 이미지로 부터 순수한 랜덤 노이즈가 무엇인지 예측
        noise_pred = self.unet(noisy_images, timesteps).sample

        # 손실함수 : noise 를 unet이 예측하도록 함
        loss = F.mse_loss(noise_pred, noise)

        # Logging loss
        lr = self.optimizers().param_groups[0]['lr']
        self.log('train_loss', loss)
        self.log('learning_rate', lr, prog_bar=True, logger=True)
        return loss

    def infer(self, n=4, seed=0, num_inference_steps=30):
        generator = torch.manual_seed(seed)
        self.noise_scheduler.set_timesteps(num_inference_steps)
        shape = (n, self.in_channels, self.sample_size, self.sample_size)
        images = torch.randn(shape, generator=generator).to(self.device)

        for timestep in self.noise_scheduler.timesteps:
            with torch.no_grad():
                pred_noises = self.unet(images.to(self.device), timestep.to(self.device)).sample
                images = self.noise_scheduler.step(pred_noises, timestep, images, generator=generator).prev_sample
        return images

    def fit(self, train_loader, save_dir, learning_rate=1e-4, lr_warmup_steps=10, num_epochs=10, patience=5, gradient_accumulation_steps=1):
        self.learning_rate = learning_rate
        self.lr_warmup_steps = lr_warmup_steps
        self.num_training_steps = len(train_loader) // gradient_accumulation_steps
        self.save_dir = save_dir

        # Callbacks
        checkpoint_callback = ModelCheckpoint(
            dirpath=save_dir,
            filename='unet_weights',
            save_top_k=1,
            verbose=True,
            monitor='train_loss',
            mode='min'
        )

        lr_monitor = LearningRateMonitor(logging_interval='step')
        csv_logger = CSVLogger(save_dir, name="csv_logs")

        # Trainer
        trainer = pl.Trainer(
            accelerator='cuda',
            max_epochs=num_epochs,
            default_root_dir=save_dir,
            callbacks=[checkpoint_callback, lr_monitor],
            logger=[csv_logger],
            log_every_n_steps=len(train_loader) // gradient_accumulation_steps,
            gradient_clip_val=1.0,
            accumulate_grad_batches=gradient_accumulation_steps,
            precision=16,
        )

        trainer.fit(self, train_loader)


    def on_train_epoch_end(self):
        outputs = self.infer(seed=self.seed)
        epoch = self.trainer.current_epoch
        show_img(outputs, colorbar=True, save_path=f'{self.save_dir}/sample_epoch_{epoch:05d}.png')

### **| Training**

* 모델 객체 생성

In [None]:
model = DiffusionModel()

* 모델 학습

In [None]:
import glob
paths= glob.glob('./PTN/*jpg')
pp = PP(paths, resize=128, shuffle=True, return_condition=False)
loader = pp.get_loader()

model.fit(loader, './test_log', num_epochs=30)

### **| Generation**

* 저장된 weight 불러오기

In [None]:
model =  DiffusionModel.load_from_checkpoint('./test_log/unet_weights.ckpt')
# save hyperparameter 를 정의했기 때문에 weight 불러올 때 별도의 파라미터를 정이해주지 않아도 된다.

* 이미지 생성

In [None]:
for seed in range(3):
  outputs = model.infer(seed=seed, num_inference_steps=20)
  print(outputs.shape)
  show_img(outputs, colorbar=True)

## (4) Model : Stable Diffusion

![](https://github.com/narnia-ai-eilie/box/blob/main/images/240214_sd_beyond.png?raw=true)