# NoiseDiffusion Colab Notebook (Colab Pro / A100-ready)
**Purpose:** reproduce and experiment with *Noise Diffusion* for improving semantic faithfulness in text-to-image generation.

**What this notebook provides**
- Environment setup (installs optimized for Colab Pro / A100)
- Lightweight **CLIP-MonteCarlo** demo (fast proxy; good for pilots)
- **NoiseDiffusion** implementation (latent updates per paper) — more expensive but faithful
- Utilities: GPU detection, HF token input, CLIP scoring, saving outputs

**Important**
- Set *Runtime → Change runtime type* to **GPU** (preferably A100).  
- Use Colab Pro for larger jobs; for heavy reproduction (paper-scale) prefer a >=48GB GPU.  
- This notebook will prompt for your Hugging Face token when needed. Do **not** share the token publicly.


In [None]:
# Install required packages (run once)
# This installation is tuned for Colab. It may take a few minutes.
import sys
print("Python", sys.version)
# Install packages
!pip install -q --upgrade pip
# core libs
!pip install -q diffusers[torch]==0.19.1 transformers accelerate safetensors ftfy
# Use transformers' CLIP implementation instead of openai/CLIP to avoid building from source
!pip install -q transformers[torch]
# optional performance libs (xformers can speed up attention if available)
try:
    get_ipython().system_raw('pip install -q xformers==0.0.20')
except Exception as e:
    print('xformers install skipped or failed (optional):', e)
print('Install finished. Restart runtime if required by Colab.')


In [None]:
# GPU detection and recommended default parameters
import torch, subprocess, json
def gpu_info():
    try:
        out = subprocess.check_output(['nvidia-smi', '--query-gpu=name,memory.total,driver_version', '--format=csv,noheader,nounits']).decode().strip()
        name, mem, drv = [x.strip() for x in out.split(',')]
        return name, int(mem), drv
    except Exception as e:
        return None, None, None

gpu_name, gpu_mem, driver = gpu_info()
print('GPU:', gpu_name, 'VRAM(MB):', gpu_mem, 'driver:', driver)
# Set safe defaults (you can override later)
if gpu_name and 'A100' in gpu_name:
    DEFAULT_IMAGE_SIZE = 512
    DEFAULT_STEPS = 20
    DEFAULT_NUM_CANDS = 8
    DEFAULT_NUM_ITERS = 3
else:
    # T4 or others
    DEFAULT_IMAGE_SIZE = 384
    DEFAULT_STEPS = 20
    DEFAULT_NUM_CANDS = 4
    DEFAULT_NUM_ITERS = 2

print('Defaults set -> image_size:', DEFAULT_IMAGE_SIZE, 'steps:', DEFAULT_STEPS,
      'num_candidates:', DEFAULT_NUM_CANDS, 'num_iters:', DEFAULT_NUM_ITERS)


In [None]:
# Hugging Face token (you will be prompted)
from getpass import getpass
import os
hf_token = os.environ.get('HF_TOKEN', None)
if not hf_token:
    hf_token = getpass('Paste your Hugging Face token (with access to SD models): ')
    os.environ['HF_TOKEN'] = hf_token
print('HF token set in env (session only).')

In [None]:
# Load Stable Diffusion pipeline and CLIP scorer (transformers)
import os, torch
from diffusers import StableDiffusionPipeline, DDIMScheduler
from transformers import CLIPProcessor, CLIPModel

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Device:', device)

MODEL_ID = 'runwayml/stable-diffusion-v1-5'
hf_token = os.environ.get('HF_TOKEN') or ''

print('Loading Stable Diffusion model (this will download weights)...')
pipe = StableDiffusionPipeline.from_pretrained(
    MODEL_ID,
    torch_dtype=torch.float16 if device=='cuda' else torch.float32,
    use_auth_token=hf_token
).to(device)
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)
pipe.enable_attention_slicing()
try:
    pipe.enable_xformers_memory_efficient_attention()
except Exception as e:
    print('xformers not enabled or not installed:', e)

print('Loading CLIP scorer (transformers)...')
clip_model = CLIPModel.from_pretrained('openai/clip-vit-base-patch32').to(device)
clip_processor = CLIPProcessor.from_pretrained('openai/clip-vit-base-patch32')
print('Models loaded.')


In [None]:
# Helper functions: decode from seed or from explicit latent; CLIP scoring
import torch, numpy as np
from PIL import Image

def decode_with_seed(prompt, seed, steps=DEFAULT_STEPS, image_size=DEFAULT_IMAGE_SIZE, guidance_scale=7.5):
    gen = torch.Generator(device=device).manual_seed(int(seed))
    out = pipe(prompt, height=image_size, width=image_size, num_inference_steps=steps, guidance_scale=guidance_scale, generator=gen)
    return out.images[0]

def clip_score(prompt, pil_image):
    inputs = clip_processor(text=[prompt], images=pil_image, return_tensors='pt', padding=True).to(device)
    with torch.no_grad():
        img_emb = clip_model.get_image_features(inputs['pixel_values'])
        txt_emb = clip_model.get_text_features(inputs['input_ids'])
        img_emb = img_emb / img_emb.norm(p=2, dim=-1, keepdim=True)
        txt_emb = txt_emb / txt_emb.norm(p=2, dim=-1, keepdim=True)
        sim = (img_emb @ txt_emb.T).cpu().numpy().item()
    return float(sim)

# Functions for latent handling
def prepare_latent_from_seed(seed, image_size=DEFAULT_IMAGE_SIZE):
    # sample random normal latent for z_T consistent with sd pipeline
    # pipeline.vae.encode / prepare_latents methods vary across versions,
    # here we use pipeline.prepare_latents convenience if available.
    generator = torch.Generator(device=device).manual_seed(int(seed))
    # num_channels_latents = pipe.unet.in_channels? Use typical latent shape for SD
    batch = 1
    latents = torch.randn((batch, pipe.unet.in_channels, image_size // 8, image_size // 8), generator=generator, device=device, dtype=torch.float16 if device=='cuda' else torch.float32)
    return latents


In [None]:
# CLIP-MonteCarlo (fast proxy) demo
import random, time
def clip_monte_carlo(prompt, iterations=DEFAULT_NUM_ITERS, candidates=DEFAULT_NUM_CANDS, steps=DEFAULT_STEPS, image_size=DEFAULT_IMAGE_SIZE):
    best_seed = random.randint(0,2**31-2)
    best_img = decode_with_seed(prompt, best_seed, steps, image_size)
    best_score = clip_score(prompt, best_img)
    print(f'Init seed {best_seed}, CLIP score {best_score:.4f}')
    for it in range(iterations):
        t0 = time.time()
        cand = []
        for i in range(candidates):
            s = random.randint(0,2**31-2)
            img = decode_with_seed(prompt, s, steps, image_size)
            sc = clip_score(prompt, img)
            cand.append((sc, s, img))
        sc_chosen, seed_chosen, img_chosen = max(cand, key=lambda x: x[0])
        if sc_chosen > best_score:
            best_score, best_seed, best_img = sc_chosen, seed_chosen, img_chosen
            print(f'Iter {it+1}: improved to seed {best_seed}, CLIP {best_score:.4f} (time {time.time()-t0:.1f}s)')
        else:
            print(f'Iter {it+1}: no improvement (best {best_score:.4f}) (time {time.time()-t0:.1f}s)')
    return best_img, best_seed, best_score

# Quick run example
prompt='A photorealistic red vintage car parked in front of a Victorian house'
img, seed, score = clip_monte_carlo(prompt)
display(img)
print('Final seed', seed, 'score', score)

In [None]:
# NoiseDiffusion (latent update) implementation (expensive)
# WARNING: This performs explicit latent updates and denoises from provided latents.
# Tune num_iters, num_cands conservatively on Colab (start small).

import math, time, torch

def noise_diffusion_latent(prompt, num_iters=DEFAULT_NUM_ITERS, num_cands=DEFAULT_NUM_CANDS, steps=DEFAULT_STEPS, image_size=DEFAULT_IMAGE_SIZE, guidance_scale=7.5, gamma_rule='score'):
    # Initialize latent z_T (random)
    seed0 = random.randint(0,2**31-2)
    zT = prepare_latent_from_seed(seed0, image_size=image_size)  # shape [1,C,H/8,W/8]
    # Denoise once to get baseline
    with torch.no_grad():
        img = pipe(prompt, height=image_size, width=image_size, num_inference_steps=steps, guidance_scale=guidance_scale, latents=zT).images[0]
    best_img = img
    best_score = clip_score(prompt, img)
    print(f'Initial seed {seed0}, CLIP {best_score:.4f}')
    for it in range(num_iters):
        # compute gamma: paper uses gamma = 1 - sqrt(s) where s is VQA score; here use CLIP proxy in [0,1] after normalization.
        if gamma_rule == 'score':
            gamma = 1.0 - math.sqrt(max(0.0, min(1.0, (best_score + 1)/2)))  # crude mapping CLIP[-1,1] -> [0,1]
        else:
            gamma = 0.2
        cand_list = []
        t0 = time.time()
        for i in range(num_cands):
            sigma = torch.randn_like(zT)
            zT_cand = math.sqrt(1.0 - gamma) * zT + math.sqrt(gamma) * sigma
            with torch.no_grad():
                img_c = pipe(prompt, height=image_size, width=image_size, num_inference_steps=steps, guidance_scale=guidance_scale, latents=zT_cand).images[0]
            sc = clip_score(prompt, img_c)
            cand_list.append((sc, zT_cand, img_c))
        sc_chosen, zT_chosen, img_chosen = max(cand_list, key=lambda x: x[0])
        if sc_chosen > best_score:
            best_score = sc_chosen
            zT = zT_chosen
            best_img = img_chosen
            print(f'Iter {it+1}: improved CLIP {best_score:.4f} (time {time.time()-t0:.1f}s)')
        else:
            print(f'Iter {it+1}: no improvement (best {best_score:.4f}) (time {time.time()-t0:.1f}s)')
    return best_img, best_score

# Small test (use small num_iters/cands first)
prompt = 'A photo of a small brown dog running on grass'
img, sc = noise_diffusion_latent(prompt, num_iters=1, num_cands=2)
display(img)
print('Final CLIP', sc)


In [None]:
# Save last generated image (if any) to drive/workspace
out_path = '/content/noise_diffusion_result.png'
try:
    img.save(out_path)
    print('Saved to', out_path)
except Exception as e:
    print('No img object found in this cell scope; create image first.')

---
## Next steps / tips
- **Pilot:** run CLIP-MonteCarlo on 100 prompts to choose hyperparams (num_iters, num_cands, steps).  
- **Scale:** after pilot, run larger batches and log results (CLIPScore, LPIPS).  
- **Reproducibility:** save seeds & latents for top results.  
- **If you have large GPU (>=48GB) available later:** increase image size to 512, steps to 50 and num_cands/iters for full reproduction.

If you'd like, I can:
- produce a version that logs metrics to CSV,
- split prompts into shards for distributed runs,
- or generate a downloadable .ipynb file for direct upload to Colab.


In [None]:
# Install logging helpers (run once)
import sys
!pip install -q pandas tqdm
print('pandas and tqdm available')

In [None]:
# Provide a list of prompts to process in batch.
# You can either edit the 'prompts' list below directly or upload a .txt file with one prompt per line.
from google.colab import files
import os, csv, time

# Example prompts (replace or extend)
prompts = [
    "A photorealistic red vintage car parked in front of a Victorian house",
    "A cute golden retriever puppy playing with a blue ball on grass",
    "A modern kitchen interior with wooden cabinets and a marble island",
]

# Optionally, upload a text file with prompts (one per line)
print("Current num prompts:", len(prompts))
uploaded = files.upload(button_label='Upload prompt file (optional, one prompt per line)')
if uploaded:
    # take the first uploaded file
    fname = next(iter(uploaded.keys()))
    with open(fname, 'r', encoding='utf-8') as f:
        lines = [l.strip() for l in f.readlines() if l.strip()]
    if lines:
        prompts = lines
    print('Loaded', len(prompts), 'prompts from', fname)

# Save prompts to disk for reproducibility
with open('/content/prompts_list.txt', 'w', encoding='utf-8') as f:
    for p in prompts:
        f.write(p + '\n')
print('Prompts saved to /content/prompts_list.txt')

In [None]:
# Load prompts from repo datasets JSON files (if present)
import os, json, glob
prompts = []

repo_datasets_dir = '/content/NoiseDiffusion/datasets'
if not os.path.exists(repo_datasets_dir):
    repo_datasets_dir = '/mnt/data/NoiseDiffusion/datasets'  # alternate location if notebook ran elsewhere

print('Looking for JSON files in', repo_datasets_dir)
if os.path.exists(repo_datasets_dir):
    json_files = sorted(glob.glob(os.path.join(repo_datasets_dir, '*.json')))
    print('Found', len(json_files), 'json file(s)')
    for jf in json_files:
        try:
            with open(jf, 'r', encoding='utf-8') as f:
                data = json.load(f)
            # Robust extraction: if data is list -> assume list of prompts or dicts
            if isinstance(data, list):
                # if list of dicts with 'prompt' or 'text' keys, extract those; else if list of strings, extend directly
                if len(data) > 0 and isinstance(data[0], dict):
                    for item in data:
                        if isinstance(item, dict):
                            if 'prompt' in item:
                                prompts.append(item['prompt'])
                            elif 'text' in item:
                                prompts.append(item['text'])
                            elif 'caption' in item:
                                prompts.append(item['caption'])
                            else:
                                # fallback: try join values
                                prompts.append(' '.join(str(v) for v in item.values()))
                else:
                    # list of strings
                    prompts.extend([str(x) for x in data])
            elif isinstance(data, dict):
                # try common keys
                if 'prompts' in data and isinstance(data['prompts'], list):
                    prompts.extend([str(x) for x in data['prompts']])
                elif 'captions' in data and isinstance(data['captions'], list):
                    prompts.extend([str(x) for x in data['captions']])
                elif 'annotations' in data and isinstance(data['annotations'], list):
                    # COCO-style annotations might be list of dicts with 'caption' or 'text'
                    for a in data['annotations']:
                        if isinstance(a, dict):
                            if 'caption' in a:
                                prompts.append(a['caption'])
                            elif 'text' in a:
                                prompts.append(a['text'])
                else:
                    # fallback: collect string values
                    for k,v in data.items():
                        if isinstance(v, str):
                            prompts.append(v)
                        elif isinstance(v, list):
                            prompts.extend([str(x) for x in v if isinstance(x, str)])
        except Exception as e:
            print('Failed to load', jf, '->', e)

# dedupe and clean
prompts = [p.strip() for p in prompts if isinstance(p, str) and p.strip()]
prompts = list(dict.fromkeys(prompts))  # preserve order, dedupe
print('Total prompts extracted:', len(prompts))

# If none found, fall back to sample prompts for quick demo
if len(prompts) == 0:
    print('No prompts found in repo datasets. Using default demo prompts.')
    prompts = [
        'A photorealistic red vintage car parked in front of a Victorian house',
        'A cute golden retriever puppy playing with a blue ball on grass',
        'A modern kitchen interior with wooden cabinets and a marble island',
    ]

# Save prompts to disk for reproducibility
out_file = '/content/prompts_list.txt'
with open(out_file, 'w', encoding='utf-8') as f:
    for p in prompts:
        f.write(p + '\n')
print('Saved prompts to', out_file)
print('First 10 prompts:')
for i,p in enumerate(prompts[:10]):
    print(i+1, p)

# expose 'prompts' in notebook globals


In [None]:
# Batch runner for CLIP-MonteCarlo (fast proxy). Logs metrics to CSV and saves images.
import pandas as pd
from tqdm import tqdm
import os, time, csv
out_dir = '/content/noise_diffusion_outputs'
os.makedirs(out_dir, exist_ok=True)
csv_path = os.path.join(out_dir, 'results_clip_mc.csv')

# Parameters (tune as needed)
ITERATIONS = DEFAULT_NUM_ITERS if 'DEFAULT_NUM_ITERS' in globals() else 2
CANDIDATES = DEFAULT_NUM_CANDS if 'DEFAULT_NUM_CANDS' in globals() else 4
STEPS = DEFAULT_STEPS if 'DEFAULT_STEPS' in globals() else 20
IMAGE_SIZE = DEFAULT_IMAGE_SIZE if 'DEFAULT_IMAGE_SIZE' in globals() else 384
GUIDANCE = 7.5

results = []
# If CSV exists, resume from existing
if os.path.exists(csv_path):
    df_prev = pd.read_csv(csv_path)
    done_prompts = set(df_prev['prompt'].tolist())
    print('Resuming. Already processed', len(done_prompts), 'prompts.')
else:
    done_prompts = set()

for idx, prompt in enumerate(tqdm(prompts, desc='Prompts')):
    if prompt in done_prompts:
        continue
    t0 = time.time()
    img, seed, score = clip_monte_carlo(prompt, iterations=ITERATIONS, candidates=CANDIDATES, steps=STEPS, image_size=IMAGE_SIZE)
    elapsed = time.time() - t0
    fname = f'clipmc_{idx:04d}_seed{seed}.png'
    img.save(os.path.join(out_dir, fname))
    row = {'idx': idx, 'prompt': prompt, 'method': 'clip_mc', 'seed': seed, 'score': float(score), 'image_path': fname, 'time_s': elapsed}
    results.append(row)
    # Append to CSV after each prompt for safety
    df = pd.DataFrame(results)
    if os.path.exists(csv_path):
        df_prev = pd.read_csv(csv_path)
        df = pd.concat([df_prev, df], ignore_index=True)
    df.to_csv(csv_path, index=False)
    results = []  # reset buffer for incremental writes
print('Batch CLIP-MC completed (or resumed). Results saved to', csv_path)

In [None]:
# Batch runner for NoiseDiffusion latent update (more expensive). Logs to CSV and saves images.
import pandas as pd
from tqdm import tqdm
import os, time, math
out_dir = '/content/noise_diffusion_outputs'
os.makedirs(out_dir, exist_ok=True)
csv_path2 = os.path.join(out_dir, 'results_noise_diffusion.csv')

# Parameters (tune conservatively)
NUM_ITERS = DEFAULT_NUM_ITERS if 'DEFAULT_NUM_ITERS' in globals() else 2
NUM_CANDS = DEFAULT_NUM_CANDS if 'DEFAULT_NUM_CANDS' in globals() else 4
STEPS = DEFAULT_STEPS if 'DEFAULT_STEPS' in globals() else 20
IMAGE_SIZE = DEFAULT_IMAGE_SIZE if 'DEFAULT_IMAGE_SIZE' in globals() else 384
GUIDANCE = 7.5

results = []
if os.path.exists(csv_path2):
    df_prev = pd.read_csv(csv_path2)
    done_prompts = set(df_prev['prompt'].tolist())
    print('Resuming NoiseDiffusion. Already processed', len(done_prompts), 'prompts.')
else:
    done_prompts = set()

for idx, prompt in enumerate(tqdm(prompts, desc='Prompts ND')):
    if prompt in done_prompts:
        continue
    t0 = time.time()
    # run noise diffusion latent update with conservative params
    img, score = noise_diffusion_latent(prompt, num_iters=NUM_ITERS, num_cands=NUM_CANDS, steps=STEPS, image_size=IMAGE_SIZE, guidance_scale=GUIDANCE, gamma_rule='score')
    elapsed = time.time() - t0
    fname = f'nd_{idx:04d}.png'
    img.save(os.path.join(out_dir, fname))
    row = {'idx': idx, 'prompt': prompt, 'method': 'noise_diffusion', 'score': float(score), 'image_path': fname, 'time_s': elapsed, 'num_iters': NUM_ITERS, 'num_cands': NUM_CANDS}
    results.append(row)
    # write incrementally
    df = pd.DataFrame(results)
    if os.path.exists(csv_path2):
        df_prev = pd.read_csv(csv_path2)
        df = pd.concat([df_prev, df], ignore_index=True)
    df.to_csv(csv_path2, index=False)
    results = []
print('Batch NoiseDiffusion completed (or resumed). Results saved to', csv_path2)

In [None]:
# List output files and provide download links for convenience
import os, glob, pandas as pd, IPython.display as disp
out_dir = '/content/noise_diffusion_outputs'
print('Output dir:', out_dir)
for f in sorted(glob.glob(out_dir + '/*'))[:50]:
    print(f)
# show CSV heads if exist
for csvf in ['results_clip_mc.csv','results_noise_diffusion.csv']:
    p = os.path.join(out_dir, csvf)
    if os.path.exists(p):
        print('---', csvf, '---')
        display(pd.read_csv(p).head())
