In [None]:
import os

from torch.utils.data import DataLoader
import torchvision.transforms as transforms

# CLIP 관련 모듈
import clip
import torch
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize
from torchvision.transforms import ToPILImage
from PIL import Image
 
# image 관련 모듈
import matplotlib.pyplot as plt
import torchvision

# 작업 공간 설정
WORKSPACE_DIR = '../'
data_path = os.path.join(WORKSPACE_DIR, 'Data')

# device 설정 (gpu index는 항상 동일하게 설정)
device = "cuda:1" if torch.cuda.is_available() else "cpu"

In [None]:
# 이미지를 시각화하는 함수
def show_image(image):
    image = image.cpu().numpy().transpose((1, 2, 0))
    plt.imshow(image)
    plt.axis('off')
    plt.show()

# 이미지를 파일로 저장하는 함수
def save_image(image, save_path):
    image = image.cpu().numpy().transpose((1, 2, 0))
    image = (image * 255).astype('uint8')  # 이미지를 0~255 범위로 변환
    pil_image = Image.fromarray(image)
    pil_image.save(save_path)

# 이미지를 시각화하고 파일로 저장하는 함수 (p = 1이면 이미지를 출력함)
def show_and_save_image(image, save_path, p = 0):
    if p == 1:
        show_image(image)
    save_image(image, save_path)

In [None]:
def make_dir(dirname):
    try:
        if not (os.path.isdir(dirname)):
            os.makedirs(dirname)
    except OSError:
        print(f"Failed Create Your Directory : {dirname}")

In [None]:
from denoising_diffusion_pytorch import Unet
from guided_diffusion import GaussianDiffusion # dedoising_diffusion_pytorch 내부의 sampling 함수는 샘플링시 guidance를 받지 못함
from trainer import Trainer

model = Unet(
    dim = 64,
    dim_mults = (1, 2, 4, 8)
)

model.to(device)

diffusion = GaussianDiffusion(
    model,
    image_size = 32,
    timesteps = 1000,           # number of steps
    objective = 'pred_v',       # 학습을 'pred_v'로 했는지, 'pred_noise'로 했는지 확인하고 변경
    sampling_timesteps =  1000   # timestep <= sampling_timesteps가 커야만 guidance를 적용해서 샘플링 가능
)

dataset_name = 'cars_shapenet'

In [None]:
res_path = os.path.join('../diffusion_model')
print(res_path)

In [None]:
make_dir(res_path)

trainer = Trainer(
    diffusion,
    data_path + '/cars_train_test',   # 사용할 데이터 위치
    dataset_name = dataset_name,
    train_batch_size = 32,
    train_lr = 8e-5,                  # 러닝 레이트
    train_num_steps = 700000,         # 총 training steps
    gradient_accumulate_every = 2,    # gradient accumulation steps
    ema_decay = 0.995,                # exponential moving average decay
    amp = True,                       # turn on mixed precision
    calculate_fid = False,            # training 중 FID score 산출 여부
    results_folder = res_path,
    gpu_index = 1                     # 사용할 gpu 번호
)

In [None]:
# 모델을 로드하고 샘플링 결과를 보여줌 (내부적으로 모델을 호출하는 기능이 있음)
trainer.sample_images_at_milestone(700)

In [None]:
# CLIP 모델 초기화
clip_model, clip_preprocess = clip.load("ViT-B/32", device=device)

# vit-b/32 model을 fp32로 cast
clip_model = clip_model.float()

clip_model_name = "pretrained"

# 저장된 state_dict 로드
state_dict = torch.load(load_path, map_location=device)

# If the saved model is a ScriptModule, unwrap it
if isinstance(state_dict, torch.jit.ScriptModule):
    state_dict = state_dict.state_dict()

# 모델에 state_dict 적용
clip_model.load_state_dict(state_dict, strict=False)

In [None]:
class_file_path = "../db/cars_s.txt" 
with open(class_file_path, "r", encoding="utf-8") as file:
    classes = [line.strip() for line in file]

In [None]:
# 클래스를 모델에 전달하여 특징 벡터(임베딩) 계산
class_embeddings = []
for c in classes:
    text = clip.tokenize([c]).to(device)
    class_embedding = clip_model.encode_text(text)
    class_embeddings.append(class_embedding)

In [None]:
def classifier_cond_fn(x, t, classifier, y, classifier_scale=1):
    assert y is not None
    with torch.enable_grad():
        x_in = x.detach().requires_grad_(True)
        x_in = diffusion.unnormalize(x_in)
        x_in_upsample = torch.nn.functional.upsample(x_in, size=224, mode="bicubic")
        image_features = classifier(x_in_upsample)
        logits = image_features @ y.t()
        grad = torch.autograd.grad(logits.sum(), x_in)[0] * classifier_scale
        return grad

In [None]:
classifier =  clip_model.visual  # CLIP 모델의 이미지 분류기 사용

batch_size = 25
idx = 0

for c_embedding in class_embeddings:
    sampled_images = diffusion.sample(
        batch_size = batch_size,
        cond_fn = classifier_cond_fn, 
        guidance_kwargs={
            "classifier":classifier,
            "y":c_embedding,
            "classifier_scale": 0.1,
        }
    )
    sampled_images.shape

    print(classes[idx])
    path_prefix = os.path.join('../unnoise/samples', clip_model_name, classes[idx])
    make_dir(path_prefix)

    for i in range(batch_size):
        image_path = os.path.join(path_prefix, str(i).zfill(4) + ".png")
        show_and_save_image(sampled_images[i], image_path)

    idx += 1