In [1]:
from huggingface_hub import notebook_login

notebook_login()


VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [3]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import os
from PIL import Image
from diffusers import DDPMPipeline, DDIMScheduler
from scipy.spatial.distance import cosine

class NoisePerturbationTest:
    def __init__(self, model_path, device='cuda'):
        """
        Initialize the tester.

        Args:
            model_path: Path to the diffusion model
            device: Device to run on
        """
        # Set deterministic operations
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

        print(f"Loading pipeline from {model_path}...")
        self.device = device

        # Get pipeline
        self.pipeline = DDPMPipeline.from_pretrained(model_path).to(device)

        # Get the scheduler and UNet from the pipeline
        #self.scheduler = self.pipeline.scheduler
        self.scheduler = DDIMScheduler.from_config(self.pipeline.scheduler.config)
        self.unet = self.pipeline.unet
        
        # Make sampling deterministic
        self.scheduler.eta = 0.0  
        

        # Set noise shape
        if hasattr(self.unet, 'config') and hasattr(self.unet.config, 'sample_size'):
            size = self.unet.config.sample_size
            self.noise_shape = (1, 3, size, size)
        else:
            # input size 
            self.noise_shape = (1, 3, 128, 128)
        print(f"Using noise shape: {self.noise_shape}")

        # Create output directory
        os.makedirs("noise_stability_results", exist_ok=True)

    def generate_perturbed_noise(self, base_noise, perturbation_scales=[0.001, 0.01, 0.05, 0.1],
                                samples_per_scale=3, seed=42):
        """
        Generate perturbed versions of base noise by adding controlled amounts of random noise.

        Args:
            base_noise: Base noise tensor to perturb
            perturbation_scales: List of scales to test
            samples_per_scale: Number of samples per scale
            seed: Random seed for reproducibility

        Returns:
            Dictionary mapping scales to lists of perturbed noise tensors
        """
        print("Generating perturbed noise tensors...")
        perturbed_dict = {}

        for scale in perturbation_scales:
            perturbed_list = []

            # For each sample at this scale
            for i in range(samples_per_scale):
                # Use deterministic perturbation
                gen = torch.Generator(device=self.device)
                gen.manual_seed(seed + i)

                # Generate perturbation and scale it
                perturbation = torch.randn(base_noise.shape, generator=gen,
                                          device=self.device, dtype=base_noise.dtype)
                perturbation = perturbation * scale

                # Add to base noise
                perturbed = base_noise.clone() + perturbation

                # Add perturbed noise to list
                perturbed_list.append(perturbed)

            # Add to dict 
            perturbed_dict[scale] = perturbed_list

        return perturbed_dict

    def run_diffusion_process(self, noise_tensor, num_inference_steps=50):
        """
        Run the diffusion process manually from a specific noise tensor.

        Args:
            noise_tensor: Starting noise tensor
            num_inference_steps: Number of diffusion steps

        Returns:
            Final image as PIL Image
        """
        # Clone noise to avoid modifying the original
        sample = noise_tensor.clone()

        # Set timesteps
        self.scheduler.set_timesteps(num_inference_steps)

        # Run the reverse diffusion process step by step
        for i, t in enumerate(self.scheduler.timesteps):
            # Get model prediction (no gradient needed)
            with torch.no_grad():
                # Scale input
                model_input = self.scheduler.scale_model_input(sample, t)

                # Get noise prediction
                noise_pred = self.unet(model_input, t).sample

                # Update sample using scheduler
                sample = self.scheduler.step(noise_pred, t, sample).prev_sample

        # Convert to image
        sample = sample.detach().cpu()
        # Normalize to [0, 1]
        sample = (sample + 1) / 2  # [-1, 1] -> [0, 1]
        sample = sample.clamp(0, 1)
        sample = sample.permute(0, 2, 3, 1).numpy()[0]
        sample = (sample * 255).astype(np.uint8)

        return Image.fromarray(sample)

    def test_noise_perturbation(self, perturbation_scales=[0, 0.001, 0.01, 0.05, 0.1],
                                      samples_per_scale=3, base_seed=42, num_inference_steps=50):
        """
        Test the model's stability to direct noise perturbations.

        Args:
            perturbation_scales: List of perturbation scales to test
            samples_per_scale: Number of samples per scale
            base_seed: Random seed for base noise
            num_inference_steps: Number of diffusion steps

        Returns:
            Dictionary with results
        """
        print("\nRunning noise perturbation test...")

        # Generate base noise tensor deterministically
        generator = torch.Generator(device=self.device)
        generator.manual_seed(base_seed)
        base_noise = torch.randn(self.noise_shape, generator=generator,
                                device=self.device, dtype=self.unet.dtype)

        # Add zero perturbation case
        if 0 not in perturbation_scales:
            perturbation_scales = [0] + perturbation_scales

        # Generate perturbed versions
        perturbed_dict = self.generate_perturbed_noise(
            base_noise,
            perturbation_scales=[s for s in perturbation_scales if s > 0],  # Skip zero case
            samples_per_scale=samples_per_scale,
            seed=base_seed + 100  # Use different seed from base noise
        )

        # Add base noise to perturbed_dict under scale 0
        perturbed_dict[0] = [base_noise.clone() for _ in range(samples_per_scale)]

        # Run diffusion for each noise tensor and gather results
        all_results = {}

        # First, generate the reference image from the base noise
        print("\nGenerating reference image from base noise...")
        reference_image = self.run_diffusion_process(base_noise, num_inference_steps)
        reference_image.save("noise_stability_results/reference_image.png")
        print("Saved reference image to 'noise_stability_results/reference_image.png'")

        # Convert reference image to numpy array for comparison
        reference_np = np.array(reference_image) / 255.0  # Normalize to [0,1]

        # For each scale
        results = {}
        for scale in perturbation_scales:
            print(f"\nProcessing perturbation scale {scale}...")
            scale_images = []
            scale_np_images = []

            # Add reference to the list for consistency
            scale_images.append(reference_image)
            scale_np_images.append(reference_np)

            # # For each perturbed noise at this scale
            for i, noise in enumerate(perturbed_dict[scale]):
            #     # For zero perturbation case, we should get identical results every time
            #     if scale == 0 and i > 0:
            #         # Just copy the first result to avoid unnecessary computation
            #         image = reference_image
            #     else:
            #         # Run diffusion process
            #         image = self.run_diffusion_process(noise, num_inference_steps)
                image = self.run_diffusion_process(noise, num_inference_steps)

                # Save image
                image_path = f"noise_stability_results/scale_{scale}_sample_{i}.png"
                image.save(image_path)
                print(f"Saved {image_path}")

                # Convert to numpy for metrics
                np_image = np.array(image) / 255.0

                # Add to lists
                scale_images.append(image)
                scale_np_images.append(np_image)

            # Compute metrics
            metrics = self.compute_metrics(scale_np_images)

            # Visualize
            self.visualize_results(scale_np_images, metrics, f"scale_{scale}")

            # Store results
            results[scale] = metrics


        return results

    def compute_metrics(self, images):
        """
        Compute similarity metrics between images.

        Args:
            images: List of numpy arrays with the images

        Returns:
            Dictionary of metrics
        """
        results = {
            'cosine_similarity': []
        }

        # Use the first image as reference
        reference = images[0]

        # Calculate metrics for all pairs with the reference
        for i in range(1, len(images)):

            # Cosine similarity (higher is more similar)
            cosine_sim = 1 - cosine(reference.flatten(), images[i].flatten())
            results['cosine_similarity'].append(cosine_sim)

        # Compute statistics
        results['mean_cosine'] = np.mean(results['cosine_similarity']) if results['cosine_similarity'] else 0
        results['std_cosine'] = np.std(results['cosine_similarity']) if results['cosine_similarity'] else 0

        return results

    def visualize_results(self, images, metrics, test_name):
        """
        Visualize the generated images and their metrics.

        Args:
            images: List of images as numpy arrays
            metrics: Dictionary of metrics
            test_name: Name for saving files
        """
        # Plot the images
        n_imgs = min(len(images), 5)  # Limit to 5
        fig, axes = plt.subplots(1, n_imgs, figsize=(4*n_imgs, 4))
        if n_imgs == 1:
            axes = [axes]

        for i, (ax, img) in enumerate(zip(axes, images[:n_imgs])):
            ax.imshow(img)
            if i == 0:
                ax.set_title("Reference")
            else:
                metrics_text = ""
                if 'cosine_similarity' in metrics and len(metrics['cosine_similarity']) >= i:
                    metrics_text += f"CosSim: {metrics['cosine_similarity'][i-1]:.4f}\n"
                ax.set_title(f"Sample {i}\n{metrics_text}")
            ax.axis('off')

        plt.tight_layout()
        plt.savefig(f"noise_stability_results/comparison_{test_name}.png")
        plt.close()

In [9]:
if __name__ == "__main__":
    # Path to the trained model
    model_path = "otausendschoen/ddpm-our-faces-reduced"

    # Initialize tester
    tester = NoisePerturbationTest(model_path=model_path, device="cuda" if torch.cuda.is_available() else "cpu")

    # Run the test with noise perturbation
    results = tester.test_noise_perturbation(
        # perturbation_scales=[0, 0.001, 0.005, 0.01, 0.05, 0.1],
        perturbation_scales=[0, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
        samples_per_scale=1,
        base_seed=40,
        num_inference_steps=50
    )

    print("\nDirect Noise Perturbation Testing Complete!")
    print("Check the 'noise_stability_results' directory for images and analysis.")

Loading pipeline from otausendschoen/ddpm-our-faces-reduced...


Loading pipeline components...:   0%|          | 0/2 [00:00<?, ?it/s]

Using noise shape: (1, 3, 128, 128)

Running noise perturbation test...
Generating perturbed noise tensors...

Generating reference image from base noise...
Saved reference image to 'noise_stability_results/reference_image.png'

Processing perturbation scale 0...
Saved noise_stability_results/scale_0_sample_0.png

Processing perturbation scale 0.05...
Saved noise_stability_results/scale_0.05_sample_0.png

Processing perturbation scale 0.1...
Saved noise_stability_results/scale_0.1_sample_0.png

Processing perturbation scale 0.2...
Saved noise_stability_results/scale_0.2_sample_0.png

Processing perturbation scale 0.3...
Saved noise_stability_results/scale_0.3_sample_0.png

Processing perturbation scale 0.4...
Saved noise_stability_results/scale_0.4_sample_0.png

Processing perturbation scale 0.5...
Saved noise_stability_results/scale_0.5_sample_0.png

Processing perturbation scale 0.6...
Saved noise_stability_results/scale_0.6_sample_0.png

Processing perturbation scale 0.7...
Saved no