In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip uninstall -y diffusers transformers huggingface_hub peft
!pip install diffusers==0.27.2 transformers==4.37.2 huggingface_hub==0.23.4 peft==0.10.0

In [None]:
import torch
from diffusers import DiffusionPipeline
from huggingface_hub import login
from google.colab import userdata

# --- 1. Log in to Hugging Face ---
try:
    hf_token = userdata.get('HF_TOKEN')
    login(token=hf_token)
    print("Hugging Face login successful.")
except Exception as e:
    print(f"Hugging Face login failed: {e}")

# --- 2. Define the model cache path on Google Drive ---
DRIVE_ROOT = "/content/drive/MyDrive/"
PROJECT_DIR = DRIVE_ROOT + "Group14_Project/"
MODEL_CACHE_DIR = PROJECT_DIR + "Models/"

print(f"Models will be loaded from: {MODEL_CACHE_DIR}")

# --- 3. Load SDXL Base 1.0 model ---
print("Loading SDXL Base 1.0 model to CPU...")
try:
    base = DiffusionPipeline.from_pretrained(
        "stabilityai/stable-diffusion-xl-base-1.0",
        torch_dtype=torch.float16,
        variant="fp16",
        use_safetensors=True,
        cache_dir=MODEL_CACHE_DIR
    )
    print("SDXL Base 1.0 successfully loaded to CPU.")
except Exception as e:
    print(f"Failed to load Base model: {e}")

# --- 4. Load SDXL Refiner 1.0 model ---
print("Loading SDXL Refiner 1.0 model to CPU...")
try:
    refiner = DiffusionPipeline.from_pretrained(
        "stabilityai/stable-diffusion-xl-refiner-1.0",
        torch_dtype=torch.float16,
        variant="fp16",
        use_safetensors=True,
        cache_dir=MODEL_CACHE_DIR
    )
    print("SDXL Refiner 1.0 successfully loaded to CPU.")
except Exception as e:
    print(f"Failed to load Refiner model: {e}")

print("\n--- Step 1 (modified version) complete: both models are on CPU ---")


In [None]:
import time
import numpy as np
import os
import json

# --- 1. Define Workloads (Updated) ---
PROMPT_SIMPLE = "a high-quality photo of a cat"
PROMPT_COMIC = "a superhero landing in a city, dynamic pose, comic book style, vibrant colors"
PROMPT_COMPLEX = "Extreme close-up of an elderly human eye, highly detailed iris texture, visible skin pores and wrinkles, 8k resolution, photorealistic, cinematic lighting, macro photography"

# --- 2. Define Paths ---
DRIVE_ROOT = "/content/drive/MyDrive/"
PROJECT_DIR = DRIVE_ROOT + "Group14_Project/"
DATA_DIR = PROJECT_DIR + "Data/"
OUTPUT_JSON_PATH = os.path.join(DATA_DIR, "measurement_results.json")

print(f"Simple Prompt: {PROMPT_SIMPLE}")
print(f"Comic Prompt:  {PROMPT_COMIC}")
print(f"Complex Prompt: {PROMPT_COMPLEX}")
print(f"Data Path: {OUTPUT_JSON_PATH}")

if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR)

In [None]:
# Fix the random seed so every generation run is consistent
generator = torch.Generator(device="cuda").manual_seed(42)

def run_config_high(prompt_text):
    """
    Run the *high-quality* configuration (50 steps + Refiner),
    with dynamic CPU↔GPU loading to save VRAM.
    Returns a detailed timing breakdown.
    """
    timings = {}

    # --- Overall timer ---
    torch.cuda.synchronize()
    start_total = time.time()

    # --- 1. Base model stage ---
    torch.cuda.synchronize()
    start_base_load = time.time()
    base.to("cuda")  # move Base to GPU
    torch.cuda.synchronize()
    start_base_run = time.time()

    latents = base(
        prompt=prompt_text,
        num_inference_steps=50,
        denoising_end=0.8,
        output_type="latent",
        generator=generator.manual_seed(42)
    ).images

    torch.cuda.synchronize()
    start_base_unload = time.time()
    base.to("cpu")  # move back to CPU to free GPU memory
    torch.cuda.synchronize()
    end_base = time.time()

    timings["base_load_ms"] = (start_base_run - start_base_load) * 1000
    timings["base_run_ms"] = (start_base_unload - start_base_run) * 1000
    timings["base_unload_ms"] = (end_base - start_base_unload) * 1000

    # --- 2. Refiner model stage ---
    torch.cuda.synchronize()
    start_refiner_load = time.time()
    refiner.to("cuda")  # move Refiner to GPU
    torch.cuda.synchronize()
    start_refiner_run = time.time()

    image = refiner(
        prompt=prompt_text,
        image=latents,
        num_inference_steps=50,
        denoising_start=0.8,
        generator=generator.manual_seed(42)
    ).images[0]

    torch.cuda.synchronize()
    start_refiner_unload = time.time()
    refiner.to("cpu")  # move back to CPU
    torch.cuda.synchronize()
    end_refiner = time.time()

    timings["refiner_load_ms"] = (start_refiner_run - start_refiner_load) * 1000
    timings["refiner_run_ms"] = (start_refiner_unload - start_refiner_run) * 1000
    timings["refiner_unload_ms"] = (end_refiner - start_refiner_unload) * 1000

    # --- 3. Total latency ---
    torch.cuda.synchronize()
    end_total = time.time()
    timings["total_latency_ms"] = (end_total - start_total) * 1000

    return image, timings


def run_config_fast(prompt_text):
    """
    Run the *fast* configuration (20 steps, no Refiner),
    with dynamic CPU↔GPU loading to conserve GPU memory.
    Returns a detailed timing breakdown.
    """
    timings = {}

    # --- Overall timer ---
    torch.cuda.synchronize()
    start_total = time.time()

    # --- 1. Base model stage ---
    torch.cuda.synchronize()
    start_base_load = time.time()
    base.to("cuda")  # move Base to GPU
    torch.cuda.synchronize()
    start_base_run = time.time()

    image = base(
        prompt=prompt_text,
        num_inference_steps=20,
        generator=generator.manual_seed(42)
    ).images[0]

    torch.cuda.synchronize()
    start_base_unload = time.time()
    base.to("cpu")  # move back to CPU
    torch.cuda.synchronize()
    end_base = time.time()

    timings["base_load_ms"] = (start_base_run - start_base_load) * 1000
    timings["base_run_ms"] = (start_base_unload - start_base_run) * 1000
    timings["base_unload_ms"] = (end_base - start_base_unload) * 1000

    # --- 2. No Refiner in this config ---
    timings["refiner_load_ms"] = 0.0
    timings["refiner_run_ms"] = 0.0
    timings["refiner_unload_ms"] = 0.0

    # --- 3. Total latency ---
    torch.cuda.synchronize()
    end_total = time.time()
    timings["total_latency_ms"] = (end_total - start_total) * 1000

    return image, timings

print(" Measurement functions run_config_high and run_config_fast (modified version) defined successfully.")

In [None]:
print("--- Warming up the GPU ---")
try:
    for i in range(3):
        print(f"Warmup run {i+1}/3...")
        _ = run_config_fast(prompt_text="warmup")
    print(" GPU warmup complete.")
except Exception as e:
    print(f" Warmup failed: {e}")
    print("Check the error message. If it’s still an OOM issue, you might need to restart the runtime or switch to an A100 GPU.")

print("\n--- Running a single test (Sanity Check) ---")

# --- 1. Test: High-quality config + comic prompt ---
print("Test: Config_High + PROMPT_COMIC")
try:
    image_high, timings_high = run_config_high(PROMPT_COMIC)
    print(f"  Total latency: {timings_high['total_latency_ms']:.2f} ms")
    print(f"  Base breakdown: load={timings_high['base_load_ms']:.2f} ms, run={timings_high['base_run_ms']:.2f} ms, unload={timings_high['base_unload_ms']:.2f} ms")
    print(f"  Refiner breakdown: load={timings_high['refiner_load_ms']:.2f} ms, run={timings_high['refiner_run_ms']:.2f} ms, unload={timings_high['refiner_unload_ms']:.2f} ms")
except Exception as e:
    print(f" Config_High test failed: {e}")

# --- 2. Test: Fast config + comic prompt ---
print("\nTest: Config_Fast + PROMPT_COMIC")
try:
    image_fast, timings_fast = run_config_fast(PROMPT_COMIC)
    print(f"  Total latency: {timings_fast['total_latency_ms']:.2f} ms")
    print(f"  Base breakdown: load={timings_fast['base_load_ms']:.2f} ms, run={timings_fast['base_run_ms']:.2f} ms, unload={timings_fast['base_unload_ms']:.2f} ms")
except Exception as e:
    print(f" Config_Fast test failed: {e}")

print("\n--- Step 2 (modified version) complete ---")


In [None]:
from tqdm.notebook import tqdm

# --- 1. Load existing data ---
if os.path.exists(OUTPUT_JSON_PATH):
    print(f"Found existing data at {OUTPUT_JSON_PATH}. Loading...")
    with open(OUTPUT_JSON_PATH, 'r') as f:
        results = json.load(f)
else:
    print("No existing data found. Initializing new dictionary.")
    results = {
        "run_config": {
            "gpu_type": "NVIDIA T4",
            "diffusers_version": "0.27.2",
            "n_runs": 50
        }
    }

# --- 2. Initialize Keys for Complex Prompt ---
new_keys = ["config_high_complex", "config_fast_complex"]
for key in new_keys:
    if key not in results:
        print(f"Initializing new key: {key}")
        results[key] = {
            "description": f"{'High-quality' if 'high' in key else 'Fast'} config + Complex prompt",
            "latency_distribution_ms": [],
            "breakdown_base_load_ms": [], "breakdown_base_run_ms": [], "breakdown_base_unload_ms": [],
            "breakdown_refiner_load_ms": [], "breakdown_refiner_run_ms": [], "breakdown_refiner_unload_ms": [],
        }

# --- 3. Run Experiments (Only for missing data) ---
N_RUNS = results["run_config"]["n_runs"]

try:
    # High + Complex
    key = "config_high_complex"
    current_runs = len(results[key]["latency_distribution_ms"])
    if current_runs < N_RUNS:
        print(f"\nMeasuring: {key} (Need {N_RUNS - current_runs} more runs)")
        for _ in tqdm(range(N_RUNS - current_runs)):
            _, timings = run_config_high(PROMPT_COMPLEX)
            results[key]["latency_distribution_ms"].append(timings["total_latency_ms"])
            # Store breakdowns...
            results[key]["breakdown_base_load_ms"].append(timings["base_load_ms"])
            results[key]["breakdown_base_run_ms"].append(timings["base_run_ms"])
            results[key]["breakdown_base_unload_ms"].append(timings["base_unload_ms"])
            results[key]["breakdown_refiner_load_ms"].append(timings["refiner_load_ms"])
            results[key]["breakdown_refiner_run_ms"].append(timings["refiner_run_ms"])
            results[key]["breakdown_refiner_unload_ms"].append(timings["refiner_unload_ms"])
    else:
        print(f"Skipping {key}: Already complete.")

    # Fast + Complex
    key = "config_fast_complex"
    current_runs = len(results[key]["latency_distribution_ms"])
    if current_runs < N_RUNS:
        print(f"\nMeasuring: {key} (Need {N_RUNS - current_runs} more runs)")
        for _ in tqdm(range(N_RUNS - current_runs)):
            _, timings = run_config_fast(PROMPT_COMPLEX)
            results[key]["latency_distribution_ms"].append(timings["total_latency_ms"])
            # Store breakdowns...
            results[key]["breakdown_base_load_ms"].append(timings["base_load_ms"])
            results[key]["breakdown_base_run_ms"].append(timings["base_run_ms"])
            results[key]["breakdown_base_unload_ms"].append(timings["base_unload_ms"])
            # Zeros for refiner
            results[key]["breakdown_refiner_load_ms"].append(0.0)
            results[key]["breakdown_refiner_run_ms"].append(0.0)
            results[key]["breakdown_refiner_unload_ms"].append(0.0)
    else:
        print(f"Skipping {key}: Already complete.")

    print("\n--- Complex Measurements Complete ---")

    # --- 4. Save Intermediate Results ---
    # Calculate averages
    for key in new_keys:
        data = results[key]
        if data["latency_distribution_ms"]:
            data["avg_total_latency_ms"] = np.mean(data["latency_distribution_ms"])
            # (Optional: calculate other breakdown averages here if needed for report)

    with open(OUTPUT_JSON_PATH, 'w') as f:
        json.dump(results, f, indent=4)
    print("Updated JSON saved.")

except Exception as e:
    print(f"\nError: {e}")

In [None]:
import matplotlib.pyplot as plt
import json
import numpy as np

print("--- Step 4 (Task A): Visualizing latency data ---")

# Load the results JSON
try:
    with open(OUTPUT_JSON_PATH, 'r') as f:
        results = json.load(f)
    print(f"Loaded data from: {OUTPUT_JSON_PATH}")
except Exception as e:
    print(f"Failed to load JSON: {e}")

# Prepare latency distributions (convert ms → s for readability)
try:
    dist_high_simple = np.array(results["config_high_simple"]["latency_distribution_ms"]) / 1000
    dist_high_comic = np.array(results["config_high_comic"]["latency_distribution_ms"]) / 1000
    dist_fast_simple = np.array(results["config_fast_simple"]["latency_distribution_ms"]) / 1000
    dist_fast_comic = np.array(results["config_fast_comic"]["latency_distribution_ms"]) / 1000

    data_to_plot = [dist_high_simple, dist_high_comic, dist_fast_simple, dist_fast_comic]
    labels = ["High (Simple)", "High (Comic)", "Fast (Simple)", "Fast (Comic)"]

    # Create and save boxplot
    plt.figure(figsize=(10, 7))
    plt.boxplot(data_to_plot, labels=labels)
    plt.title(f'SDXL Latency Distribution on {results["run_config"]["gpu_type"]} (N={results["run_config"]["n_runs"]})')
    plt.ylabel('Total Latency (seconds)')
    plt.grid(True, linestyle='--', alpha=0.6)

    VIZ_OUTPUT_PATH = os.path.join(DATA_DIR, "latency_distribution_plot.png")
    plt.savefig(VIZ_OUTPUT_PATH, dpi=300, bbox_inches='tight')

    print(f"\n Plot saved to: {VIZ_OUTPUT_PATH}")
    print("Check your Drive folder: Group14_Project/Data/latency_distribution_plot.png")

except Exception as e:
    print(f"Error while creating plot: {e}")


In [None]:
# --- Define Directories ---
IMAGE_DIR_HIGH_SIMPLE = os.path.join(DATA_DIR, "images_high_simple")
IMAGE_DIR_HIGH_COMIC = os.path.join(DATA_DIR, "images_high_comic")
IMAGE_DIR_FAST_SIMPLE = os.path.join(DATA_DIR, "images_fast_simple")
IMAGE_DIR_FAST_COMIC = os.path.join(DATA_DIR, "images_fast_comic")

# [NEW] Directories for Complex Prompt
IMAGE_DIR_HIGH_COMPLEX = os.path.join(DATA_DIR, "images_high_complex")
IMAGE_DIR_FAST_COMPLEX = os.path.join(DATA_DIR, "images_fast_complex")

image_dirs = [
    IMAGE_DIR_HIGH_SIMPLE, IMAGE_DIR_HIGH_COMIC, IMAGE_DIR_HIGH_COMPLEX,
    IMAGE_DIR_FAST_SIMPLE, IMAGE_DIR_FAST_COMIC, IMAGE_DIR_FAST_COMPLEX
]

for img_dir in image_dirs:
    if not os.path.exists(img_dir):
        os.makedirs(img_dir)
        print(f"Created: {img_dir}")
    else:
        print(f"Exists: {img_dir}")

In [None]:
import time

def generate_and_save_high(prompt_text, file_path):
    try:
        # Base stage
        base.to("cuda")
        latents = base(
            prompt=prompt_text,
            num_inference_steps=50,
            denoising_end=0.8,
            output_type="latent"
        ).images
        base.to("cpu")

        # Refiner stage
        refiner.to("cuda")
        image = refiner(
            prompt=prompt_text,
            image=latents,
            num_inference_steps=50,
            denoising_start=0.8
        ).images[0]
        refiner.to("cpu")

        # Save result
        image.save(file_path)
        return True

    except Exception as e:
        print(f"  Error while generating {file_path}: {e}")
        return False


def generate_and_save_fast(prompt_text, file_path):
    try:
        base.to("cuda")
        image = base(
            prompt=prompt_text,
            num_inference_steps=20
        ).images[0]
        base.to("cpu")

        image.save(file_path)
        return True

    except Exception as e:
        print(f"  Error while generating {file_path}: {e}")
        return False


print("--- 'generate_and_save' functions defined successfully ---")


In [None]:
from tqdm.notebook import tqdm

N_IMAGES = results["run_config"]["n_runs"]
print(f"--- Generating images (Target: {N_IMAGES}) ---")

# --- 1. High + Complex ---
print(f"\nGenerating: {IMAGE_DIR_HIGH_COMPLEX}")
for i in tqdm(range(N_IMAGES)):
    file_path = os.path.join(IMAGE_DIR_HIGH_COMPLEX, f"image_{i:03d}.png")
    if not os.path.exists(file_path):
        generate_and_save_high(PROMPT_COMPLEX, file_path)

# --- 2. Fast + Complex ---
print(f"\nGenerating: {IMAGE_DIR_FAST_COMPLEX}")
for i in tqdm(range(N_IMAGES)):
    file_path = os.path.join(IMAGE_DIR_FAST_COMPLEX, f"image_{i:03d}.png")
    if not os.path.exists(file_path):
        generate_and_save_fast(PROMPT_COMPLEX, file_path)

print("Complex image generation complete.")

In [None]:
import os

# --- 1. Define the new "Complex" Prompt ---
PROMPT_COMPLEX = "A close-up portrait of an elderly person with extremely detailed skin texture, wrinkles, and pores, 8k resolution, photorealistic, cinematic lighting"

print(f"Complex Prompt defined: {PROMPT_COMPLEX}")

# --- 2. Define and Create New Image Directories ---

IMAGE_DIR_HIGH_COMPLEX = os.path.join(DATA_DIR, "images_high_complex")
IMAGE_DIR_FAST_COMPLEX = os.path.join(DATA_DIR, "images_fast_complex")

for img_dir in [IMAGE_DIR_HIGH_COMPLEX, IMAGE_DIR_FAST_COMPLEX]:
    if not os.path.exists(img_dir):
        os.makedirs(img_dir)
        print(f"Created directory: {img_dir}")
    else:
        print(f"Directory exists: {img_dir}")

In [None]:
from tqdm.notebook import tqdm

if 'N_IMAGES_PER_CONFIG' not in locals():
    N_IMAGES_PER_CONFIG = 50

print(f"--- Generating {N_IMAGES_PER_CONFIG} images for COMPLEX prompt ---")

# --- 1. Generate High-Quality + Complex ---
print(f"\nGenerating in: {IMAGE_DIR_HIGH_COMPLEX}")
for i in tqdm(range(N_IMAGES_PER_CONFIG)):
    file_name = f"image_{i:03d}.png"
    file_path = os.path.join(IMAGE_DIR_HIGH_COMPLEX, file_name)

    # Check if file exists to avoid re-generating
    if not os.path.exists(file_path):
        generate_and_save_high(PROMPT_COMPLEX, file_path)

# --- 2. Generate Fast + Complex ---
print(f"\nGenerating in: {IMAGE_DIR_FAST_COMPLEX}")
for i in tqdm(range(N_IMAGES_PER_CONFIG)):
    file_name = f"image_{i:03d}.png"
    file_path = os.path.join(IMAGE_DIR_FAST_COMPLEX, file_name)

    if not os.path.exists(file_path):
        generate_and_save_fast(PROMPT_COMPLEX, file_path)

print("\n--- Complex image generation complete! ---")

In [None]:
from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel
import numpy as np
import os

# --- 1. Load CLIP model from Hugging Face ---
device = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_ID = "openai/clip-vit-base-patch32"
print(f"Loading CLIP model: {MODEL_ID} on {device}...")

model = CLIPModel.from_pretrained(MODEL_ID).to(device)
processor = CLIPProcessor.from_pretrained(MODEL_ID)
print("CLIP model loaded successfully.")

# --- 2. Define Scoring Function ---
def compute_clip_score_for_folder(image_folder, prompt):
    """Compute average CLIP similarity between all images in folder and a text prompt."""
    scores = []
    # List files
    files = [f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

    print(f"Scoring {len(files)} images in {os.path.basename(image_folder)}...")

    for fname in tqdm(files):
        try:
            image_path = os.path.join(image_folder, fname)
            image = Image.open(image_path).convert("RGB")

            # Process inputs
            inputs = processor(text=[prompt], images=image, return_tensors="pt", padding=True).to(device)

            with torch.no_grad():
                outputs = model(**inputs)
                # logits_per_image is the similarity score
                score = outputs.logits_per_image.squeeze().item()
                scores.append(score)
        except Exception as e:
            print(f"Error processing {fname}: {e}")

    avg_score = float(np.mean(scores)) if scores else 0.0
    return avg_score

In [None]:
import json

# --- 1. Compute Scores for ALL 6 Configs ---
quality_scores = {}

# List of (Key Name, Prompt, Folder Path)
tasks = [
    ("high_simple", PROMPT_SIMPLE, IMAGE_DIR_HIGH_SIMPLE),
    ("high_comic",  PROMPT_COMIC,  IMAGE_DIR_HIGH_COMIC),
    ("high_complex", PROMPT_COMPLEX, IMAGE_DIR_HIGH_COMPLEX), # NEW
    ("fast_simple", PROMPT_SIMPLE, IMAGE_DIR_FAST_SIMPLE),
    ("fast_comic",  PROMPT_COMIC,  IMAGE_DIR_FAST_COMIC),
    ("fast_complex", PROMPT_COMPLEX, IMAGE_DIR_FAST_COMPLEX), # NEW
]

print("--- Starting CLIP Score Calculation ---")

for key, prompt, folder in tasks:
    if os.path.exists(folder):
        print(f"\nCalculating for: {key}")
        score = compute_clip_score_for_folder(folder, prompt)
        quality_scores[key] = score
        print(f" -> Score: {score:.4f}")
    else:
        print(f"\nWarning: Folder not found for {key}: {folder}")

# --- 2. Update and Save JSON ---
print(f"\n--- Updating JSON file at: {OUTPUT_JSON_PATH} ---")

try:
    if os.path.exists(OUTPUT_JSON_PATH):
        with open(OUTPUT_JSON_PATH, 'r') as f:
            final_results = json.load(f)
    else:
        final_results = {"run_config": {"gpu_type": "NVIDIA T4"}}
        print("Warning: No existing JSON found. Creating new one.")

    # Map short keys to JSON config keys
    key_mapping = {
        "high_simple": "config_high_simple",
        "high_comic": "config_high_comic",
        "high_complex": "config_high_complex",
        "fast_simple": "config_fast_simple",
        "fast_comic": "config_fast_comic",
        "fast_complex": "config_fast_complex"
    }

    for short_key, json_key in key_mapping.items():
        if short_key in quality_scores:
            # Ensure the key exists in JSON structure
            if json_key not in final_results:
                final_results[json_key] = {"description": f"Added {short_key}"}

            # Update the score
            final_results[json_key]["quality_clip_score"] = quality_scores[short_key]

    # Save back to file
    with open(OUTPUT_JSON_PATH, 'w') as f:
        json.dump(final_results, f, indent=4)

    print("\n Success! JSON updated with all 6 CLIP scores.")

except Exception as e:
    print(f"Error saving JSON: {e}")