# Diffusion & Deconvolution Components Test

This notebook is for testing the individual components implemented for the advanced image restoration pipeline:
1. `ClippedInverseFilter`: The project-specific deconvolution method.
2. `HuggingFace_Denoiser`: The plug-in denoiser for the PnP algorithm.
3. `DiffPIR_Pipeline`: The high-performance guided diffusion restoration model.


In [None]:
import sys
from pathlib import Path
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

# --- Environment Setup ---
# Check if running in Google Colab
try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    print("Running in Google Colab. Mounting Google Drive...")
    from google.colab import drive
    drive.mount('/content/drive')
    
    # !!! IMPORTANT: Please adjust this path to your project's root folder in Google Drive !!!
    ROOT = Path('/content/drive/MyDrive/Data Scientist/Project/Week5/week5') 
    
    # Pull the latest code from GitHub
    print("Pulling latest code from GitHub...")
    # Make sure to navigate to the correct directory before pulling
    %cd {ROOT}
    !git pull origin main
else:
    print("Running in local environment.")
    ROOT = Path.cwd()

# Add project root to system path for module imports
if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))
print(f"Project Root set to: {ROOT}")


# --- Module Imports ---
from code_denoising.classical_methods.deconvolution import ClippedInverseFilter
from code_denoising.diffusion_methods.hf_denoiser import HuggingFace_Denoiser
from code_denoising.diffusion_methods.hf_diffpir import DiffPIR_Pipeline
from diffusers import DDPMScheduler, UNet2DModel

print("Imports successful!")


Imports successful!


## 1. Load Sample Data

We will load a sample image from the `test_y` folder to use as input for our tests.


In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Load a sample degraded image
# Corrected the filename to a real file that exists in the directory.
sample_path = ROOT / "dataset/test_y/L1_000d090392623f8046ebe84a1b345bf7.npy"
sample_image_np = np.load(sample_path)
sample_image_torch = torch.from_numpy(sample_image_np).unsqueeze(0).unsqueeze(0).float().to(DEVICE) # Shape: [1, 1, H, W]

# Define the 5 known convolution directions
B0_DIRS = [(-0.809, -0.5878), (-0.809, 0.5878), (0.309, -0.9511), (0.309, 0.9511), (1.0, 0.0)]

def plot_image(tensor, title=""):
    plt.imshow(tensor.squeeze().cpu().numpy(), cmap='gray')
    plt.title(title)
    plt.axis('off')

plt.figure()
plot_image(sample_image_torch, title="Original Degraded Image")
plt.show()


## 2. Test `ClippedInverseFilter`


In [None]:
deconv_filter = ClippedInverseFilter()
restored_images_deconv = deconv_filter.run_on_all_directions(sample_image_torch, B0_DIRS)

plt.figure(figsize=(20, 4))
plt.subplot(1, 6, 1)
plot_image(sample_image_torch, title="Input")
for i, (img, b0_dir) in enumerate(zip(restored_images_deconv, B0_DIRS)):
    plt.subplot(1, 6, i + 2)
    plot_image(img, title=f"Deconv Dir {i+1}")
plt.suptitle("ClippedInverseFilter Results for all 5 Directions", fontsize=16)
plt.show()


## 3. Test `HuggingFace_Denoiser`

Note: This will download a pre-trained model from Hugging Face Hub, which might take some time.


In [None]:
denoiser = HuggingFace_Denoiser(device=DEVICE)

# The denoiser expects input in [-1, 1], so we normalize our [0, 1] image
normalized_input = sample_image_torch * 2.0 - 1.0
# The pre-trained model expects 3 channels (RGB), so we repeat our single channel
normalized_input_3ch = normalized_input.repeat(1, 3, 1, 1)

denoised_image = denoiser.denoise(normalized_input_3ch, noise_level=150)

# Convert back to 1 channel grayscale and [0, 1] range for visualization
denoised_image_gray = denoised_image.mean(dim=1, keepdim=True)
denoised_image_final = (denoised_image_gray + 1.0) / 2.0

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plot_image(sample_image_torch, title="Input")
plt.subplot(1, 2, 2)
plot_image(denoised_image_final, title="Denoised Output")
plt.suptitle("HuggingFace_Denoiser Result", fontsize=16)
plt.show()


## 4. Test `DiffPIR_Pipeline`


In [None]:
# Load a pre-trained model and scheduler
model_name = "google/ddpm-celebahq-256"
unet = UNet2DModel.from_pretrained(model_name).to(DEVICE)
scheduler = DDPMScheduler.from_pretrained(model_name)

# Create our custom pipeline
diffpir_pipeline = DiffPIR_Pipeline(unet=unet, scheduler=scheduler)

# We need to guess the correct B0_dir for the sample image. Let's try the first one.
chosen_b0_dir = B0_DIRS[0]

# DiffPIR also works on 3-channel images in range [-1, 1]
normalized_input_3ch_diffpir = sample_image_torch.repeat(1, 3, 1, 1)

restored_image_diffpir = diffpir_pipeline(
    degraded_image=normalized_input_3ch_diffpir,
    B0_dir=chosen_b0_dir,
    guidance_scale=0.5,
    num_inference_steps=50 # Use fewer steps for a quick test
)

# Convert back for visualization
restored_image_gray = restored_image_diffpir.mean(dim=1, keepdim=True)
restored_image_final = (restored_image_gray + 1.0) / 2.0

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plot_image(sample_image_torch, title="Input")
plt.subplot(1, 2, 2)
plot_image(restored_image_final, title="DiffPIR Restored Output")
plt.suptitle("DiffPIR_Pipeline Result", fontsize=16)
plt.show()


## 5. Test Full PnP Restoration

Now, we combine the `ClippedInverseFilter` (as a baseline) and the `HuggingFace_Denoiser` into the `PnP_Restoration` framework.


In [None]:
from code_denoising.pnp_restoration import PnP_Restoration

# We already have the denoiser loaded from the previous step
pnp_restorer = PnP_Restoration(denoiser=denoiser)

# We need to guess the correct B0_dir for the sample image. Let's try the first one again.
chosen_b0_dir_pnp = B0_DIRS[0]

restored_image_pnp = pnp_restorer.run(
    degraded_image=sample_image_torch,
    B0_dir=chosen_b0_dir_pnp,
    max_iter=10,
    rho=0.5,
    denoiser_noise_level=100
)

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plot_image(sample_image_torch, title="Input")
plt.subplot(1, 2, 2)
plot_image(restored_image_pnp, title="PnP Restored Output")
plt.suptitle("PnP_Restoration Result", fontsize=16)
plt.show()


# Diffusion & Deconvolution Components Test

This notebook is for testing the individual components implemented for the advanced image restoration pipeline:
1. `ClippedInverseFilter`: The project-specific deconvolution method.
2. `HuggingFace_Denoiser`: The plug-in denoiser for the PnP algorithm.
3. `DiffPIR_Pipeline`: The high-performance guided diffusion restoration model.


## 1. Load Sample Data

We will load a sample image from the `test_y` folder to use as input for our tests.


In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Load a sample degraded image
sample_path = ROOT / "dataset/test_y/00000.npy"
sample_image_np = np.load(sample_path)
sample_image_torch = torch.from_numpy(sample_image_np).unsqueeze(0).unsqueeze(0).float().to(DEVICE) # Shape: [1, 1, H, W]

# Define the 5 known convolution directions
B0_DIRS = [(-0.809, -0.5878), (-0.809, 0.5878), (0.309, -0.9511), (0.309, 0.9511), (1.0, 0.0)]

def plot_image(tensor, title=""):
    plt.imshow(tensor.squeeze().cpu().numpy(), cmap='gray')
    plt.title(title)
    plt.axis('off')

plt.figure()
plot_image(sample_image_torch, title="Original Degraded Image")
plt.show()


## 2. Test `ClippedInverseFilter`


In [None]:
deconv_filter = ClippedInverseFilter()
restored_images_deconv = deconv_filter.run_on_all_directions(sample_image_torch, B0_DIRS)

plt.figure(figsize=(20, 4))
plt.subplot(1, 6, 1)
plot_image(sample_image_torch, title="Input")
for i, (img, b0_dir) in enumerate(zip(restored_images_deconv, B0_DIRS)):
    plt.subplot(1, 6, i + 2)
    plot_image(img, title=f"Deconv Dir {i+1}")
plt.suptitle("ClippedInverseFilter Results for all 5 Directions", fontsize=16)
plt.show()


## 3. Test `HuggingFace_Denoiser`

Note: This will download a pre-trained model from Hugging Face Hub, which might take some time.


In [None]:
denoiser = HuggingFace_Denoiser(device=DEVICE)

# The denoiser expects input in [-1, 1], so we normalize our [0, 1] image
normalized_input = sample_image_torch * 2.0 - 1.0
# The pre-trained model expects 3 channels (RGB), so we repeat our single channel
normalized_input_3ch = normalized_input.repeat(1, 3, 1, 1)

denoised_image = denoiser.denoise(normalized_input_3ch, noise_level=150)

# Convert back to 1 channel grayscale and [0, 1] range for visualization
denoised_image_gray = denoised_image.mean(dim=1, keepdim=True)
denoised_image_final = (denoised_image_gray + 1.0) / 2.0

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plot_image(sample_image_torch, title="Input")
plt.subplot(1, 2, 2)
plot_image(denoised_image_final, title="Denoised Output")
plt.suptitle("HuggingFace_Denoiser Result", fontsize=16)
plt.show()


## 4. Test `DiffPIR_Pipeline`


In [None]:
# Load a pre-trained model and scheduler
model_name = "google/ddpm-celebahq-256"
unet = UNet2DModel.from_pretrained(model_name).to(DEVICE)
scheduler = DDPMScheduler.from_pretrained(model_name)

# Create our custom pipeline
diffpir_pipeline = DiffPIR_Pipeline(unet=unet, scheduler=scheduler)

# We need to guess the correct B0_dir for the sample image. Let's try the first one.
chosen_b0_dir = B0_DIRS[0]

# DiffPIR also works on 3-channel images in range [-1, 1]
normalized_input_3ch_diffpir = sample_image_torch.repeat(1, 3, 1, 1)

restored_image_diffpir = diffpir_pipeline(
    degraded_image=normalized_input_3ch_diffpir,
    B0_dir=chosen_b0_dir,
    guidance_scale=0.5,
    num_inference_steps=50 # Use fewer steps for a quick test
)

# Convert back for visualization
restored_image_gray = restored_image_diffpir.mean(dim=1, keepdim=True)
restored_image_final = (restored_image_gray + 1.0) / 2.0

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plot_image(sample_image_torch, title="Input")
plt.subplot(1, 2, 2)
plot_image(restored_image_final, title="DiffPIR Restored Output")
plt.suptitle("DiffPIR_Pipeline Result", fontsize=16)
plt.show()
