In [1]:
%pip install ipywidgets

Collecting jedi>=0.16 (from ipython>=4.0.0->ipywidgets)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi
Successfully installed jedi-0.19.2


In [2]:
%pip install lpips

Collecting lpips
  Downloading lpips-0.1.4-py3-none-any.whl.metadata (10 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=0.4.0->lpips)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=0.4.0->lpips)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=0.4.0->lpips)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=0.4.0->lpips)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=0.4.0->lpips)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch>=0.4.0->lpips)
  Downloading nvidia_cufft

## Load Library

In [3]:
import os
import shutil
import csv
import random
import numpy as np
from PIL import Image
import torch
from torchvision import transforms
from diffusers import ControlNetModel, StableDiffusionInpaintPipeline
from skimage.metrics import peak_signal_noise_ratio as compute_psnr
from skimage.metrics import structural_similarity as compute_ssim
import lpips  # pip install lpips
from glob import glob
from tqdm import tqdm
import cv2  # OpenCV

In [None]:
# ---------------------------
# Set seeds for reproducibility
# ---------------------------
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

##ControlNet helper functions

In [4]:
# ---------------------------
# ControlNet helper functions
# ---------------------------
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

def clean_huggingface_cache(model_path):
    """Remove unnecessary Hugging Face cache directories and .lock files."""
    for root, dirs, files in os.walk(model_path, topdown=False):
        for name in files:
            if name.endswith(".lock"):
                os.remove(os.path.join(root, name))
        for name in dirs:
            if name.startswith("models--") or name == "temp":
                shutil.rmtree(os.path.join(root, name), ignore_errors=True)

def get_latest_snapshot(model_path):
    """Find and move the correct snapshot folder for a downloaded model."""
    if os.path.exists(model_path):
        for subdir in os.listdir(model_path):
            snapshot_path = os.path.join(model_path, subdir, "snapshots")
            if os.path.exists(snapshot_path):
                snapshots = sorted(os.listdir(snapshot_path), reverse=True)
                if snapshots:
                    latest_snapshot = os.path.join(snapshot_path, snapshots[0])
                    for file_name in os.listdir(latest_snapshot):
                        src = os.path.join(latest_snapshot, file_name)
                        dest = os.path.join(model_path, file_name)
                        if not os.path.exists(dest):
                            shutil.move(src, dest)
                    shutil.rmtree(os.path.dirname(latest_snapshot), ignore_errors=True)
                    return model_path
    return model_path

def check_and_download_model(model_name, model_path, is_controlnet=False):
    """Check if the model exists; if not, download and move it to the correct directory."""
    if is_controlnet:
        model_path = os.path.join(model_path, "controlnet")
    else:
        model_path = os.path.join(model_path, "stable-diffusion")

    if os.path.exists(model_path) and os.listdir(model_path):
        return

    # Download silently
    temp_dir = os.path.join("models", "temp")
    if is_controlnet:
        ControlNetModel.from_pretrained(model_name, cache_dir=temp_dir)
    else:
        StableDiffusionInpaintPipeline.from_pretrained(model_name, cache_dir=temp_dir)

    correct_model_path = get_latest_snapshot(temp_dir)
    os.makedirs(model_path, exist_ok=True)
    for file_name in os.listdir(correct_model_path):
        src = os.path.join(correct_model_path, file_name)
        dest = os.path.join(model_path, file_name)
        if not os.path.exists(dest):
            shutil.move(src, dest)
    shutil.rmtree(temp_dir, ignore_errors=True)

def load_controlnet():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    models_dir = "models"
    os.makedirs(models_dir, exist_ok=True)
    controlnet_dir = os.path.join(models_dir, "controlnet")
    stable_diffusion_dir = os.path.join(models_dir, "stable-diffusion")
    os.makedirs(controlnet_dir, exist_ok=True)
    os.makedirs(stable_diffusion_dir, exist_ok=True)

    check_and_download_model("stabilityai/stable-diffusion-2-inpainting", models_dir, is_controlnet=False)
    check_and_download_model("lllyasviel/control_v11p_sd15_inpaint", models_dir, is_controlnet=True)

    clean_huggingface_cache(models_dir)

    pipe = StableDiffusionInpaintPipeline.from_pretrained(
        stable_diffusion_dir, torch_dtype=torch_dtype, local_files_only=True
    ).to(device, dtype=torch_dtype)
    return pipe

def make_divisible_by_8(size):
    """Ensure both width and height are divisible by 8."""
    width, height = size
    width = (width // 8) * 8
    height = (height // 8) * 8
    return width, height

def run_controlnet_inpaint(image_path, mask_path, pipe, reference_images, prompt, output_path, seed=42):
    # Open image and mask
    image = Image.open(image_path).convert("RGB")
    mask = Image.open(mask_path).convert("L")
    original_size = image.size
    adjusted_size = make_divisible_by_8(original_size)

    conditioning = None
    if reference_images:
        conditioning = [
            img.resize(adjusted_size, Image.Resampling.LANCZOS)
            for img in reference_images
        ]

    # Create a generator with a fixed seed for reproducibility
    device = "cuda" if torch.cuda.is_available() else "cpu"
    generator = torch.Generator(device=device).manual_seed(seed)

    result = pipe(
        prompt=prompt,
        image=image.resize(adjusted_size, Image.Resampling.LANCZOS),
        mask_image=mask.resize(adjusted_size, Image.Resampling.LANCZOS),
        conditioning_image=conditioning,
        height=adjusted_size[1],
        width=adjusted_size[0],
        generator=generator
    ).images[0]
    result = result.resize(original_size, Image.Resampling.LANCZOS)
    result.save(output_path)

## LPIPS model loading

In [5]:
# ---------------------------
# LPIPS model loading
# ---------------------------
def load_lpips_model(model_dir="models/lpips"):
    os.makedirs(model_dir, exist_ok=True)
    model_path = os.path.join(model_dir, "lpips_alex.pth")
    model = lpips.LPIPS(net='alex')
    if os.path.exists(model_path):
        model.load_state_dict(torch.load(model_path, map_location='cpu'))
    else:
        torch.save(model.state_dict(), model_path)
    model.eval()
    if torch.cuda.is_available():
        model.cuda()
    return model

lpips_model = load_lpips_model()

Setting up [LPIPS] perceptual loss: trunk [alex], v[0.1], spatial [off]


Downloading: "https://download.pytorch.org/models/alexnet-owt-7be5be79.pth" to /root/.cache/torch/hub/checkpoints/alexnet-owt-7be5be79.pth
100%|██████████| 233M/233M [00:02<00:00, 96.7MB/s]


Loading model from: /usr/local/lib/python3.11/dist-packages/lpips/weights/v0.1/alex.pth


##Evaluation functions

In [6]:
# ---------------------------
# Evaluation functions
# ---------------------------
def prepare_for_lpips(pil_image):
    tensor = transforms.ToTensor()(pil_image).unsqueeze(0)
    tensor = tensor * 2 - 1
    if torch.cuda.is_available():
        tensor = tensor.cuda()
    return tensor

def evaluate_metrics(gt_img, inpaint_img):
    gt_np = np.array(gt_img).astype(np.float32) / 255.0
    inpaint_np = np.array(inpaint_img).astype(np.float32) / 255.0

    if gt_np.shape != inpaint_np.shape:
        inpaint_img = inpaint_img.resize(gt_img.size, Image.Resampling.LANCZOS)
        inpaint_np = np.array(inpaint_img).astype(np.float32) / 255.0

    psnr = compute_psnr(gt_np, inpaint_np, data_range=1.0)

    min_size = min(gt_np.shape[0], gt_np.shape[1])
    win_size = 7 if min_size >= 7 else (min_size if min_size % 2 == 1 else min_size - 1)
    ssim = compute_ssim(gt_np, inpaint_np, win_size=win_size, channel_axis=2, data_range=1.0)

    gt_tensor = prepare_for_lpips(gt_img)
    inpaint_tensor = prepare_for_lpips(inpaint_img)
    with torch.no_grad():
        lpips_distance = lpips_model(gt_tensor, inpaint_tensor).item()

    return psnr, ssim, lpips_distance

## Main combined evaluation

In [None]:
# ---------------------------
# Main combined evaluation
# ---------------------------
if __name__ == "__main__":
    image_dir = "DUT-OMRON-image"   # Ground truth images (JPEG)
    mask_dir = "DUT-OMRON-mask"     # Masks (PNG)
    results_dir = "results"
    controlnet1_dir = os.path.join(results_dir, "controlnet1")
    controlnet2_dir = os.path.join(results_dir, "controlnet2")
    controlnet3_dir = os.path.join(results_dir, "controlnet2")  # Directory for OpenCV results
    os.makedirs(controlnet1_dir, exist_ok=True)
    os.makedirs(controlnet2_dir, exist_ok=True)
    os.makedirs(controlnet3_dir, exist_ok=True)

    pipe = load_controlnet()

    prompt = (
        "Replace the masked region with a natural extension of the surrounding background, ensuring the textures, colors, and lighting blend seamlessly. Do not recreate any specific object shapes from the mask."
    )

    evaluation_results = []
    image_paths = sorted(glob(os.path.join(image_dir, "*.*")))
    pbar = tqdm(image_paths, total=len(image_paths), desc="Processing images", leave=True)

    for image_path in pbar:
        filename = os.path.basename(image_path)
        basename = os.path.splitext(filename)[0]
        mask_path = os.path.join(mask_dir, basename + ".png")  # Adjust extension if needed
        if not os.path.exists(mask_path):
            # silently skip missing masks
            continue

        out_controlnet1 = os.path.join(controlnet1_dir, filename)
        out_controlnet2 = os.path.join(controlnet2_dir, filename)
        out_controlnet3 = os.path.join(controlnet3_dir, filename)

        try:
            run_controlnet_inpaint(image_path, mask_path, pipe, reference_images, prompt, out_controlnet1, seed=42)
        except Exception as e:
            print(f"Error in Lama for {filename}: {e}")
            continue

        try:
            reference_images = None  # or [] if needed
            # You can optionally vary the seed per image (e.g., seed = 42 + idx) for diversity
            run_controlnet_inpaint(image_path, mask_path, pipe, reference_images, prompt, out_controlnet2, seed=42)
        except Exception as e:
            print(f"Error in ControlNet for {filename}: {e}")
            continue

        try:
            run_controlnet_inpaint(image_path, mask_path, pipe, reference_images, prompt, out_controlnet3, seed=42)
        except Exception as e:
            print(f"Error in OpenCV for {filename}: {e}")
            continue

        gt_image = Image.open(image_path).convert("RGB")
        controlnet1_result = Image.open(out_controlnet1).convert("RGB")
        controlnet2_result = Image.open(out_controlnet2).convert("RGB")
        controlnet3_result = Image.open(out_controlnet3).convert("RGB")

        controlnet1_psnr, _, _ = evaluate_metrics(gt_image, controlnet1_result)
        controlnet2_psnr, _, _ = evaluate_metrics(gt_image, controlnet2_result)
        controlnet3_psnr, _, _ = evaluate_metrics(gt_image, controlnet3_result)

        # Update the progress bar with the latest metrics
        pbar.set_postfix({
            "ControlNet1_PSNR": f"{controlnet1_psnr:.2f}",
            "ControlNet2_PSNR": f"{controlnet2_psnr:.2f}",
            "ControlNet3_PSNR": f"{controlnet3_psnr:.2f}"
        })

        evaluation_results.append({
            'filename': filename,
            'controlnet1_PSNR': controlnet1_psnr,
            'controlnet2_PSNR': controlnet2_psnr,
            'controlnet3_PSNR': controlnet3_psnr
        })

    # Write results to CSV
    csv_file_path = "evaluation_results.csv"
    csv_fields = ['filename', 'lama_PSNR', 'controlnet_PSNR', 'opencv_PSNR']
    with open(csv_file_path, mode='w', newline='') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=csv_fields)
        writer.writeheader()
        for row in evaluation_results:
            writer.writerow(row)

    print(f"Processing images: {len(image_paths)}/{len(image_paths)} completed. Results saved to {csv_file_path}")
