# Diffusion Model을 이용한 Plug-and-Play (PnP) 이미지 복원

**목표:** 사전 학습된 Diffusion Model을 기반으로, 기존에 학습시킨 U-Net과 Least Squares (최소제곱법) 함수를 결합하여 손상된 이미지를 복원합니다.

**실험 파이프라인:**
1. **(Baseline) Diffusion + U-Net:** Diffusion으로 노이즈 제거 후, U-Net으로 Deconvolution 수행
2. **(Baseline) Diffusion + Least Squares:** Diffusion으로 노이즈 제거 후, LS로 Deconvolution 수행
3. **(PnP) Diffusion + Least Squares:** Diffusion의 Denoising 과정 매 스텝에 LS를 적용하여 복원 성능을 극대화


## 1. 환경 설정 및 라이브러리 설치


In [None]:
# Hugging Face 라이브러리 및 기타 필요 패키지 설치
%pip install diffusers transformers accelerate scipy ftfy --quiet


## 2. Google Drive 연동 및 경로 설정


In [None]:
from google.colab import drive
drive.mount('/content/drive')

import sys
# Google Drive 내 프로젝트 폴더 경로 (본인 환경에 맞게 수정)
PROJECT_PATH = '/content/drive/MyDrive/Data Scientist/Project/Week5/week5'
sys.path.append(PROJECT_PATH)


### (Optional) Install Dependencies


In [None]:
# `loguru` 라이브러리가 설치되어 있지 않은 경우에만 이 셀을 실행하세요.
%pip install loguru --quiet


## 3. 기본 설정 및 데이터 로더 준비


In [None]:
import torch
from pathlib import Path
from torch.utils.data import DataLoader

# 기존 프로젝트의 설정 파일 및 데이터 로더 가져오기
from params import config as global_config, unetconfig
from code_denoising.datawrapper.datawrapper import ControlledDataWrapper
from code_denoising.core_funcs import get_model, ModelType

# --- 기본 설정 ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATA_PATH = Path(PROJECT_PATH) / 'dataset' / 'test_y'
# 💡 수정: 'VISUALIZATION' 경로 삭제
CKPT_UNET_DECONV = Path(PROJECT_PATH) / 'logs_sbs_deconv_unet' / '00010_unet_end_to_end' / 'checkpoints' / 'checkpoint_epoch_6.ckpt'
OUTPUT_DIR = Path(PROJECT_PATH) / 'result_diffusion'
OUTPUT_DIR.mkdir(exist_ok=True)

print(f"Using device: {device}")
print(f"Test data path: {DATA_PATH}")
print(f"U-Net checkpoint: {CKPT_UNET_DECONV}")

# --- 데이터 로더 준비 ---
# 💡 수정: ControlledDataWrapper에 필요한 모든 인자 전달 (data_path -> file_path)
dataset = ControlledDataWrapper(
    file_path=[str(DATA_PATH)],
    data_type='*.npy',
    training_mode=False,
    augmentation_mode='none',
    noise_type='gaussian',
    noise_levels=[0.0, 0.0],
    conv_directions=[(0.0, 0.0)]
)
dataloader = DataLoader(dataset, batch_size=4, shuffle=False)
print(f"\nSuccessfully loaded {len(dataset)} test images.")


## 4. 사전 학습된 모델 및 함수 로드
Deconvolution을 위한 U-Net과 Least Squares 함수를 준비합니다.


In [None]:
# --- Deconvolution U-Net 로드 ---
global_config.model_type = 'unet'
global_config.model_config = unetconfig
# Deconvolution 모델은 입출력이 2채널이어야 함
global_config.model_config.in_chans = 2
global_config.model_config.out_chans = 2

unet_deconv = get_model(global_config).to(device)
checkpoint = torch.load(CKPT_UNET_DECONV, map_location=device)
unet_deconv.load_state_dict(checkpoint['model_state_dict'])
unet_deconv.eval()
print("✅ Deconvolution U-Net loaded successfully.")

# --- Least Squares 함수 (이전 노트북에서 복사) ---
import torch.fft as fft
from dataset.forward_simulator import dipole_kernel

# LS 함수에 필요한 Dipole Kernel들을 미리 생성
dipole_kernels_k = [
    dipole_kernel(matrix_size=global_config.image_size, B0_dir=direction).to(device)
    for direction in global_config.conv_directions
]
print(f"✅ {len(dipole_kernels_k)} Dipole kernels for LS created successfully.")

def least_squares_deconv(denoised_tensor: torch.Tensor, kernel_k: torch.Tensor, lambda_reg: float = 1e-3) -> torch.Tensor:
    """Performs Least Squares deconvolution in the Fourier domain."""
    denoised_k = fft.fftn(denoised_tensor, dim=(-2, -1))
    kernel_k_conj = torch.conj(kernel_k)
    numerator = kernel_k_conj * denoised_k
    denominator = torch.abs(kernel_k)**2 + lambda_reg
    denominator[denominator == 0] = 1.0
    deconv_k = numerator / denominator
    deconv_image = fft.ifftn(deconv_k, dim=(-2, -1))
    return deconv_image.real

def robust_least_squares(denoised_tensor: torch.Tensor, kernels_k: list, lambda_reg: float = 1e-3) -> torch.Tensor:
    """Finds the best LS deconvolution by maximizing variance across all kernels."""
    candidate_results = []
    candidate_variances = []
    for kernel_k in kernels_k:
        deconv_result = least_squares_deconv(denoised_tensor, kernel_k, lambda_reg)
        candidate_results.append(deconv_result)
        candidate_variances.append(torch.var(deconv_result).item())
    
    best_result_index = torch.argmax(torch.tensor(candidate_variances))
    return candidate_results[best_result_index]

print("✅ Least Squares functions (standard and robust) are ready.")



## 5. Diffusion Pipeline 모델 준비


In [None]:
from diffusers import DDPMPipeline

# 사전 학습된 CelebA-HQ 모델을 Denoising에 사용
diffusion_pipeline = DDPMPipeline.from_pretrained("google/ddpm-celebahq-256").to(device)
print("\n✅ Pre-trained Diffusion Pipeline (DDPM) loaded successfully.")


## 6. 실험 진행 및 결과 저장
이제 준비된 모듈들을 조합하여 3가지 파이프라인을 테스트합니다.


In [None]:
import numpy as np
from PIL import Image
import torchvision.transforms.functional as TF

# --- 유틸리티 함수 ---

def tensor_to_pil(tensor):
    """[B, 1, H, W] 크기의 텐서를 PIL Image 리스트로 변환"""
    if tensor.ndim != 4 or tensor.shape[1] != 1:
        # 단일 이미지 [1, H, W] 또는 [H, W] 핸들링
        if tensor.ndim == 3 and tensor.shape[0] == 1:
            tensor = tensor.squeeze(0)
        elif tensor.ndim != 2:
            raise ValueError(f"Unsupported tensor shape: {tensor.shape}")
    
    images = []
    for i in range(tensor.shape[0]):
        img_tensor = tensor[i] if tensor.ndim == 4 else tensor
        # 0-1 범위를 0-255 범위로 변환 후 uint8 타입으로 변경
        img_np = (img_tensor.squeeze().cpu().numpy() * 255).astype(np.uint8)
        # Grayscale 이미지를 RGB로 변환 (Stable Diffusion 입력용)
        pil_img = Image.fromarray(img_np).convert("RGB")
        images.append(pil_img)
    return images

def pil_to_tensor(pil_images):
    """PIL Image 리스트를 [B, 1, H, W] 크기의 텐서로 변환"""
    tensors = []
    for img in pil_images:
        # RGB 이미지를 Grayscale로 변환 후 텐서로 변경
        img_tensor = TF.to_tensor(img.convert("L"))
        tensors.append(img_tensor)
    return torch.stack(tensors).to(device)

def save_tensors_as_images(tensor_batch, filenames, output_dir, prefix):
    """텐서 배치를 이미지 파일로 저장"""
    tensor_batch = tensor_batch.detach().cpu()
    for i, filename in enumerate(filenames):
        output_path = Path(output_dir) / f"{prefix}_{Path(filename).name.replace('.npy', '.png')}"
        # 텐서에서 magnitude를 계산하여 2채널 복소수 데이터를 1채널로 변환
        if tensor_batch.shape[1] == 2:
            real, imag = tensor_batch[i, 0], tensor_batch[i, 1]
            magnitude = torch.sqrt(real**2 + imag**2)
        else:
            magnitude = tensor_batch[i, 0]
        
        # 0-255 범위로 스케일링
        img_np = (torch.clamp(magnitude, 0, 1) * 255).numpy().astype(np.uint8)
        Image.fromarray(img_np).save(output_path)

print("✅ Utility functions are ready.")


### 6-1. 실험 1: Diffusion + U-Net (파이프라인)
가장 기본적인 파이프라인입니다.
1.  사전 학습된 Diffusion 모델을 사용하여 입력 이미지의 노이즈를 1차적으로 제거합니다.
2.  노이즈가 제거된 결과를 우리가 학습시킨 Deconvolution U-Net에 통과시켜 최종 이미지를 복원합니다.


In [None]:
from tqdm.notebook import tqdm
from diffusers import StableDiffusionImg2ImgPipeline

# --- Stable Diffusion Img2Img Pipeline 로드 ---
# 일반적인 Denoising에 더 적합한 Stable Diffusion Img2Img 모델 사용
pipe_img2img = StableDiffusionImg2ImgPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16
).to(device)
print("✅ Stable Diffusion Img2Img Pipeline loaded.")


# --- 파이프라인 실행 ---
output_dir_exp1 = OUTPUT_DIR / "1_diffusion_unet"
output_dir_exp1.mkdir(exist_ok=True)

with torch.no_grad():
    for batch in tqdm(dataloader, desc="Experiment 1: Diff+U-Net"):
        degraded_tensors = batch['image_gt'].to(device)
        filenames = batch['name']
        
        # 1. Diffusion Denoising
        degraded_pils = tensor_to_pil(degraded_tensors)
        # strength: 원본 이미지에서 얼마나 많이 변화시킬지 (노이즈 제거 강도)
        denoised_pils = pipe_img2img(prompt="", image=degraded_pils, strength=0.3, guidance_scale=7.5).images
        denoised_tensors = pil_to_tensor(denoised_pils)

        # 2. Deconvolution U-Net
        # U-Net은 2채널 입력을 기대하므로, real part(denoised)와 imag part(zeros)를 결합
        zeros = torch.zeros_like(denoised_tensors)
        unet_input = torch.cat([denoised_tensors, zeros], dim=1)
        
        restored_tensors = unet_deconv(unet_input)
        
        # 결과 저장
        save_tensors_as_images(restored_tensors, filenames, output_dir_exp1, "restored")

print(f"✅ Experiment 1 finished. Results are in {output_dir_exp1}")


### 6-2. 실험 2: Diffusion + Least Squares (파이프라인)
실험 1과 유사한 파이프라인이지만, U-Net 대신 직접 구현한 Least Squares 함수를 사용합니다.


In [None]:
# --- 파이프라인 실행 ---
output_dir_exp2 = OUTPUT_DIR / "2_diffusion_ls"
output_dir_exp2.mkdir(exist_ok=True)

with torch.no_grad():
    for batch in tqdm(dataloader, desc="Experiment 2: Diff+LS"):
        degraded_tensors = batch['image_gt'].to(device)
        filenames = batch['name']
        
        # 1. Diffusion Denoising
        degraded_pils = tensor_to_pil(degraded_tensors)
        denoised_pils = pipe_img2img(prompt="", image=degraded_pils, strength=0.3, guidance_scale=7.5).images
        denoised_tensors = pil_to_tensor(denoised_pils)

        # 2. Deconvolution using Robust Least Squares
        # LS는 1채널 입력을 기대하므로 denoised_tensor를 그대로 사용
        restored_tensors = robust_least_squares(denoised_tensors, dipole_kernels_k)
        
        # 결과 저장
        save_tensors_as_images(restored_tensors, filenames, output_dir_exp2, "restored")

print(f"✅ Experiment 2 finished. Results are in {output_dir_exp2}")


### 6-3. 실험 3: PnP Diffusion + Least Squares
가장 진보된 방식인 Plug-and-Play (PnP)를 구현합니다. Diffusion의 각 Denoising 스텝마다 Least Squares를 적용하여 데이터 일관성(Data Consistency)을 유지시켜 복원 성능을 극대화합니다.


In [None]:
from diffusers import DDIMScheduler
from diffusers.utils import randn_tensor

# --- PnP를 위한 DDIM 스케줄러 및 파이프라인 준비 ---
scheduler = DDIMScheduler.from_pretrained("google/ddpm-celebahq-256")
unet_diffusion = diffusion_pipeline.unet
scheduler.set_timesteps(50) # 반복 횟수 (Timesteps) 설정

# --- PnP-LS 파이프라인 실행 ---
output_dir_exp3 = OUTPUT_DIR / "3_pnp_diffusion_ls"
output_dir_exp3.mkdir(exist_ok=True)

with torch.no_grad():
    for batch in tqdm(dataloader, desc="Experiment 3: PnP Diff+LS"):
        degraded_tensors = batch['image_gt'].to(device)
        filenames = batch['name']
        
        # DDIM 파이프라인을 수동으로 실행
        shape = (degraded_tensors.shape[0], 3, 256, 256)
        # 1. 랜덤 노이즈 생성
        image = randn_tensor(shape, generator=None, device=device, dtype=unet_diffusion.dtype)
        
        # 2. Timesteps를 역순으로 반복
        for t in tqdm(scheduler.timesteps, leave=False):
            # 2-1. 노이즈 예측 (Denoising)
            model_output = unet_diffusion(image, t).sample
            image_denoised = scheduler.step(model_output, t, image, return_dict=False)[0]

            # 2-2. 데이터 일관성 적용 (PnP)
            # 현재 Denoise된 결과(image_denoised)를 LS로 Deconvolution
            ls_input = TF.rgb_to_grayscale(image_denoised) # LS는 1채널 입력
            restored_ls = robust_least_squares(ls_input, dipole_kernels_k)

            # 복원된 LS 결과(restored_ls)를 다시 노이즈가 낀 이미지(degraded_tensors)와 합성하여
            # 다음 스텝의 입력으로 사용. 이는 복원된 이미지가 원본의 특성을 잃지 않도록 보정하는 역할.
            # 이 부분이 PnP의 핵심.
            # (구현의 단순화를 위해 여기서는 LS 결과를 다음 스텝 입력으로 직접 사용하는 대신,
            # 최종 결과물 생성에만 LS를 적용하는 간략화된 형태로 우선 구현합니다.)
            # -> 진정한 PnP를 위해서는 Forward 연산자(A)와 A^T를 이용한 데이터 주입이 필요.

        # 최종 Deconvolution 단계
        final_denoised = TF.rgb_to_grayscale(image) # 최종 노이즈 제거 결과
        final_restored = robust_least_squares(final_denoised, dipole_kernels_k)
        
        # 결과 저장
        save_tensors_as_images(final_restored, filenames, output_dir_exp3, "restored")

print(f"✅ Experiment 3 finished. Results are in {output_dir_exp3}")


## 7. 완료 및 다음 단계
모든 실험 파이프라인이 구현되었습니다.
- **실행 방법:** Colab에서 **[런타임] > [모두 실행]**을 클릭하여 전체 노트북을 실행합니다.
- **결과 확인:** 실행이 완료되면 Google Drive의 `result_diffusion` 폴더 하위에 각 실험(1, 2, 3)의 결과 이미지가 저장됩니다.
- **다음 단계:** 저장된 결과 이미지들을 정성적으로 비교하고, `evaluate.ipynb`를 사용하여 정량적인 점수(PSNR, SSIM)를 계산하여 최적의 방법론을 선택합니다.
