In [None]:
!pip install torch-fidelity
!pip install torchmetrics[image] -U
!pip install torchmetrics
!pip install -q diffusers transformers accelerate huggingface_hub
from google.colab import drive
drive.mount('/content/drive')

#prompt


In [None]:
import json
from pathlib import Path
from transformers import AutoProcessor, Blip2ForConditionalGeneration
import torch
from PIL import Image
from tqdm import tqdm
import re
from typing import List, Tuple

def load_model():
    model_name = "Salesforce/blip2-opt-2.7b"
    processor = AutoProcessor.from_pretrained(model_name)
    model = Blip2ForConditionalGeneration.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        device_map="auto"
    )
    return processor, model

def clean_description(text):
    text = re.sub(r'[^a-zA-Z0-9\s\.,]', '', text)
    text = re.sub(r'Describe the scene.*?detail\.?\s*', '', text)
    text = re.sub(r'What is.*?\?', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

def load_and_preprocess_images(image_paths: List[str], processor) -> Tuple[torch.Tensor, List[str]]:
    images = []
    valid_paths = []

    for img_path in image_paths:
        try:
            image = Image.open(img_path).convert('RGB')
            images.append(image)
            valid_paths.append(img_path)
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")

    if not images:
        return None, []

    inputs = processor(
        images=images,
        text=["A photograph of"] * len(images),
        return_tensors="pt"
    ).to(device="cuda", dtype=torch.float16)

    return inputs, valid_paths

def generate_batch_descriptions(image_paths: List[str], processor, model, batch_size=2):
    try:
        inputs, valid_paths = load_and_preprocess_images(image_paths, processor)
        if inputs is None:
            return {}

        generated_ids = model.generate(
            **inputs,
            do_sample=True,
            num_beams=3,
            max_length=50,
            min_length=10,
            length_penalty=1.0,
            temperature=0.7,
        )

        descriptions = processor.batch_decode(generated_ids, skip_special_tokens=True)
        descriptions = [clean_description(desc) for desc in descriptions]

        return dict(zip(valid_paths, descriptions))

    except Exception as e:
        print(f"Error in batch generation: {e}")
        return {}

def add_safety_prompts(description):
    if not description.strip():
        return None

    positive = "professional photograph, photorealistic, 8k uhd, sharp focus, site visibility, safety focus, natural site lighting, warm industrial tones"
    negative = "deformed, blurry, bad anatomy, distorted, poor quality, low quality, mutation, artificial, unnatural, cartoon, anime, illustration, painting, drawing, rendering, 3d, cg, digital art, toy tools, fake safety gear"

    return f"{description}, {positive} ### {negative}"

def generate_prompts(json_data, image_folder, processor, model, batch_size=2):
    results = []
    image_folder = Path(image_folder)
    total_images = len(json_data)

    try:
        with open('intermediate_results.json', 'r', encoding='utf-8') as f:
            results = json.load(f)
            print(f"Loaded {len(results)} existing results")
            processed_images = set(item['image'] for item in results)
    except FileNotFoundError:
        results = []
        processed_images = set()

    pending_items = [item for item in json_data if item['image'] not in processed_images]

    for i in tqdm(range(0, len(pending_items), batch_size), desc="Processing batches"):
        batch_items = pending_items[i:i + batch_size]
        batch_paths = [str(image_folder / item["image"]) for item in batch_items]

        print(f"\nProcessing batch {i//batch_size + 1}, images {i+1}-{min(i+batch_size, len(pending_items))}")

        descriptions = generate_batch_descriptions(batch_paths, processor, model, batch_size)

        for item, img_path in zip(batch_items, batch_paths):
            if img_path not in descriptions:
                print(f"Skipping {item['image']} due to generation error")
                continue

            generated_text = descriptions[img_path]
            original_labels = ", ".join(item["labels"])

            new_item = item.copy()
            new_item['generated_text'] = generated_text
            new_item['prompt_w_label'] = f"{generated_text}, {original_labels}"
            new_item['prompt_w_suffix'] = add_safety_prompts(f"{generated_text}, {original_labels}")

            results.append(new_item)

            print(f"\nGenerated for {item['image']}:")
            print(f"Text: {generated_text}")

        with open('intermediate_results.json', 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
        print(f"Saved progress: {len(results)}/{total_images} images processed")

    return results

def main():
    print("Starting prompt generation for all images...")

    image_folder = "/content/drive/MyDrive/hw/images"
    json_path = "/content/drive/MyDrive/hw/label.json"

    BATCH_SIZE = 2

    with open(json_path, 'r', encoding='utf-8') as f:
        json_data = json.load(f)

    print(f"JSON data loaded successfully with {len(json_data)} items")

    print("Loading BLIP-2 model...")
    processor, model = load_model()
    print("Model loaded successfully")

    results = generate_prompts(json_data, image_folder, processor, model, BATCH_SIZE)

    output_path = "/content/drive/MyDrive/hw/final_generated_prompts.json"
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(results, f, indent=2, ensure_ascii=False)

    print(f"\nFinal results saved to {output_path}")
    print(f"Processed {len(results)} images in total")

if __name__ == "__main__":
    main()

#generate picture

In [None]:
import torch
from diffusers import StableDiffusionPipeline
import json
from pathlib import Path
from tqdm import tqdm
import os
from torch.cuda import empty_cache
from contextlib import nullcontext

def ensure_directories(base_dir):
    directories = [
        f"{base_dir}/generated_text",
        f"{base_dir}/prompt_w_label",
        f"{base_dir}/prompt_w_suffix"
    ]
    for dir_path in directories:
        os.makedirs(dir_path, exist_ok=True)
        print(f"Ensured directory exists: {dir_path}")

def check_drive_mounted():
    if not os.path.exists("/content/drive/MyDrive"):
        print("Google Drive not mounted. Mounting now...")
        from google.colab import drive
        drive.mount('/content/drive')
        print("Google Drive mounted successfully")

def load_model():
    model_id = "SG161222/Realistic_Vision_V2.0"
    pipe = StableDiffusionPipeline.from_pretrained(
        model_id,
        torch_dtype=torch.float16,
        safety_checker=None
    )
    pipe = pipe.to("cuda")
    pipe.enable_attention_slicing()
    return pipe

def process_prompt_suffix(prompt_text):
    if "###" in prompt_text:
        positive, negative = prompt_text.split("###")
        return positive.strip(), negative.strip()
    return prompt_text.strip(), ""

def get_completed_images(output_dirs):
    completed = set()
    for dir_path in output_dirs.values():
        if dir_path.exists():
            for file_path in dir_path.glob("*.png"):
                base_name = file_path.stem.split('_')[0]
                completed.add(base_name)
    return completed

def save_progress(current_index, progress_file):
    with open(progress_file, 'w') as f:
        json.dump({'last_processed_index': current_index}, f)

def load_progress(progress_file):
    try:
        with open(progress_file, 'r') as f:
            data = json.load(f)
            return data.get('last_processed_index', -1)
    except FileNotFoundError:
        return -1

def generate_images_batch(pipe, batch_items, output_dirs, gen_kwargs):
    for prompt_type in ['generated_text', 'prompt_w_label', 'prompt_w_suffix']:
        batch_prompts = []
        batch_negative_prompts = []
        batch_paths = []
        batch_indices = []

        for idx, item in enumerate(batch_items):
            prompt = item.get(prompt_type, "")
            if not prompt:
                continue

            base_filename = Path(item['image']).stem
            output_filename = f"{base_filename}_{prompt_type}.png"
            output_path = output_dirs[prompt_type] / output_filename

            if output_path.exists():
                print(f"Skipping existing image: {output_path}")
                continue

            if prompt_type == 'prompt_w_suffix':
                positive_prompt, negative_prompt = process_prompt_suffix(prompt)
                batch_prompts.append(positive_prompt)
                batch_negative_prompts.append(negative_prompt)
            else:
                batch_prompts.append(prompt)
                batch_negative_prompts.append("")

            batch_paths.append(output_path)
            batch_indices.append(idx)

        if not batch_prompts:
            continue

        try:
            with torch.autocast("cuda"):
                if prompt_type == 'prompt_w_suffix':
                    images = pipe(
                        prompt=batch_prompts,
                        negative_prompt=batch_negative_prompts,
                        num_images_per_prompt=1,
                        **gen_kwargs
                    ).images
                else:
                    images = pipe(
                        prompt=batch_prompts,
                        num_images_per_prompt=1,
                        **gen_kwargs
                    ).images

                for img, path in zip(images, batch_paths):
                    img.save(str(path))
                    print(f"Saved image to {path}")

        except Exception as e:
            print(f"Error generating {prompt_type} images: {str(e)}")
            continue

        empty_cache()

def generate_images(pipe, prompt_data, base_output_dir, batch_size=32):
    output_dirs = {
        'generated_text': Path(base_output_dir) / 'generated_text',
        'prompt_w_label': Path(base_output_dir) / 'prompt_w_label',
        'prompt_w_suffix': Path(base_output_dir) / 'prompt_w_suffix'
    }

    for dir_path in output_dirs.values():
        os.makedirs(dir_path, exist_ok=True)
        print(f"Ensured directory exists: {dir_path}")

    progress_file = Path(base_output_dir) / 'generation_progress.json'
    last_processed = load_progress(progress_file)

    completed_images = get_completed_images(output_dirs)
    print(f"Found {len(completed_images)} completed images")

    gen_kwargs = {
        "num_inference_steps": 40,
        "guidance_scale": 7.5,
        "height": 512,
        "width": 512,
    }

    start_idx = max(0, last_processed + 1)
    end_idx = len(prompt_data)

    for i in tqdm(range(start_idx, end_idx, batch_size),
                 desc="Processing batches",
                 initial=start_idx//batch_size,
                 total=(end_idx-start_idx+batch_size-1)//batch_size):
        batch_items = prompt_data[i:min(i + batch_size, end_idx)]

        batch_needed = False
        for item in batch_items:
            base_name = Path(item['image']).stem
            if base_name not in completed_images:
                batch_needed = True
                break

        if not batch_needed:
            print(f"Skipping completed batch {i//batch_size + 1}")
            continue

        print(f"\nProcessing batch {i//batch_size + 1}")
        generate_images_batch(pipe, batch_items, output_dirs, gen_kwargs)

        save_progress(i + len(batch_items) - 1, progress_file)
        empty_cache()

def main():
    print("Starting image generation...")

    check_drive_mounted()

    input_json = "/content/drive/MyDrive/hw/final_generated_prompts.json"
    output_dir = "/content/drive/MyDrive/hw/generated_images"

    ensure_directories(output_dir)

    if not os.path.exists(input_json):
        raise FileNotFoundError(f"Cannot find input file: {input_json}")

    with open(input_json, 'r', encoding='utf-8') as f:
        prompt_data = json.load(f)

    print(f"Loaded {len(prompt_data)} prompts from {input_json}")

    print("Loading Realistic Vision V2.0 model...")
    pipe = load_model()
    print("Model loaded successfully")

    BATCH_SIZE = 16

    generate_images(pipe, prompt_data, output_dir, batch_size=BATCH_SIZE)

    print("\nImage generation completed!")

if __name__ == "__main__":
    main()

#calculate FID

In [None]:
import os
import json
import torch
import numpy as np
from PIL import Image
from torchmetrics.image.fid import FrechetInceptionDistance
from tqdm import tqdm

def calculate_fid(generated_dirs, orig_dir):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    results = {}

    for prompt_type, gen_dir in generated_dirs.items():
        print(f"\nCalculating FID for {prompt_type}...")
        fid = FrechetInceptionDistance(normalize=True).to(device)
        score = _calculate_single_fid(orig_dir, gen_dir, fid)
        results[prompt_type] = score
        fid.reset()

    return results

def _calculate_single_fid(orig_dir, gen_dir, fid):
    generated_images = [f for f in os.listdir(gen_dir) if f.endswith('.png')]

    for gen_img_name in tqdm(generated_images, desc="Processing images"):
        orig_img_name = gen_img_name.split('_')[0] + '.jpeg'

        try:
            orig_path = os.path.join(orig_dir, orig_img_name)
            if os.path.exists(orig_path):
                orig_img = process_image(orig_path)
                fid.update(orig_img, real=True)

                gen_path = os.path.join(gen_dir, gen_img_name)
                gen_img = process_image(gen_path)
                fid.update(gen_img, real=False)
            else:
                print(f"Original image not found: {orig_path}")

        except Exception as e:
            print(f"Error processing {gen_img_name}: {str(e)}")
            continue

    return float(fid.compute())

def process_image(image_path):

    img = Image.open(image_path).convert('RGB')

    img = img.resize((512, 512), Image.Resampling.LANCZOS)

    img = img.resize((299, 299), Image.Resampling.LANCZOS)

    img = torch.from_numpy(np.array(img)).permute(2, 0, 1).unsqueeze(0) / 255.0
    return img.cuda()

def main():
    orig_dir = "/content/drive/MyDrive/hw/images"
    generated_dirs = {
        'generated_text': "/content/drive/MyDrive/hw/generated_images/generated_text",
        'prompt_w_label': "/content/drive/MyDrive/hw/generated_images/prompt_w_label",
        'prompt_w_suffix': "/content/drive/MyDrive/hw/generated_images/prompt_w_suffix"
    }

    for dir_path in [orig_dir] + list(generated_dirs.values()):
        if not os.path.exists(dir_path):
            raise FileNotFoundError(f"Directory not found: {dir_path}")

    print("Starting FID calculation...")

    results = calculate_fid(generated_dirs, orig_dir)

    print("\nFID Scores:")
    for prompt_type, score in results.items():
        print(f"{prompt_type}: {score:.2f}")

    output_path = "/content/drive/MyDrive/hw/fid_results.json"
    with open(output_path, 'w') as f:
        json.dump(results, f, indent=2)
    print(f"\nResults saved to {output_path}")

if __name__ == "__main__":
    main()

#picture with layout&image

In [None]:
import os
import os
import json
import torch
from diffusers import StableDiffusionPipeline
from PIL import Image
import numpy as np
from tqdm import tqdm
from huggingface_hub import HfApi, hf_hub_download
import time

def download_model_with_retry(model_id, max_retries=3):
    for attempt in range(max_retries):
        try:
            print(f"Downloading model attempt {attempt + 1}/{max_retries}")
            pipe = StableDiffusionPipeline.from_pretrained(
                model_id,
                torch_dtype=torch.float16,
                low_cpu_mem_usage=True,
                resume_download=True,
                local_files_only=False
            )
            return pipe
        except Exception as e:
            print(f"Download attempt {attempt + 1} failed: {str(e)}")
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Waiting {wait_time} seconds before retrying...")
                time.sleep(wait_time)
            else:
                raise Exception(f"Failed to download model after {max_retries} attempts")

def load_checkpoint():
    checkpoint_path = "/content/drive/MyDrive/hw/checkpoint.json"
    if os.path.exists(checkpoint_path):
        with open(checkpoint_path, 'r') as f:
            return json.load(f)
    return {"completed_images": []}

def save_checkpoint(completed_images):
    checkpoint_path = "/content/drive/MyDrive/hw/checkpoint.json"
    with open(checkpoint_path, 'w') as f:
        json.dump({"completed_images": completed_images}, f)

def create_layout_mask(bboxes, height, width):
    mask = np.zeros((height, width))
    for bbox in bboxes:
        x1, y1, x2, y2 = [int(coord) for coord in bbox]
        mask[y1:y2, x1:x2] = 1
    return mask

def process_batch(pipe, batch_items, output_dir, completed_images):
    batch_prompts = []
    batch_reference_images = []
    batch_masks = []

    for item in batch_items:
        ref_image_path = os.path.join("/content/drive/MyDrive/hw/images", item["image"])
        if os.path.exists(ref_image_path):
            reference_image = Image.open(ref_image_path).convert("RGB")

            layout_mask = create_layout_mask(
                item["bboxes"],
                item["height"],
                item["width"]
            )
            layout_mask = Image.fromarray((layout_mask * 255).astype(np.uint8))

            batch_prompts.append(item["generated_text"])  # Using generated_text instead of prompt_w_suffix
            batch_reference_images.append(reference_image)
            batch_masks.append(layout_mask)

    try:
        # Generate images in batch
        generated_images = pipe(
            prompt=batch_prompts,
            image=batch_reference_images,
            mask_image=batch_masks,
            num_inference_steps=50,
            guidance_scale=7.5
        ).images

        # Save generated images
        for i, generated_image in enumerate(generated_images):
            output_path = os.path.join(output_dir, f"generated_{batch_items[i]['image']}")
            generated_image.save(output_path)
            completed_images.append(batch_items[i]['image'])

    except torch.cuda.OutOfMemoryError:
        print("CUDA out of memory.")
        for item in batch_items:
            try:
                process_batch([item], output_dir, completed_images)
            except Exception as e:
                print(f"Error processing single item {item['image']}: {str(e)}")
                continue

def validate_checkpoint(completed_images, output_dir):
    validated_images = []
    for image_name in completed_images:
        output_path = os.path.join(output_dir, f"generated_{image_name}")
        if os.path.exists(output_path):
            validated_images.append(image_name)
    return validated_images

def main():
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    print("model download...")
    model_id = "SG161222/Realistic_Vision_V2.0"

    try:
        pipe = download_model_with_retry(model_id)
        pipe = pipe.to("cuda")
        print("Model downloaded and loaded successfully!")
    except Exception as e:
        print(f"Error loading model: {str(e)}")
        return

    print("Loading JSON data...")
    with open("/content/drive/MyDrive/hw/final_generated_prompts.json", 'r') as f:
        data = json.load(f)

    output_dir = "/content/drive/MyDrive/hw/generated_images/layout"
    os.makedirs(output_dir, exist_ok=True)

    checkpoint = load_checkpoint()
    completed_images = validate_checkpoint(checkpoint["completed_images"], output_dir)
    print(f"Found {len(completed_images)} valid completed images")

    batch_size = 4  
    try:
        current_batch = []
        for item in tqdm(data, desc="Processing images"):
            if item["image"] in completed_images:
                if os.path.exists(os.path.join(output_dir, f"generated_{item['image']}")):
                    print(f"Skipping {item['image']} - already processed")
                    continue
                else:
                    print(f"Regenerating {item['image']} - output file not found")

            current_batch.append(item)

            if len(current_batch) == batch_size:
                process_batch(pipe, current_batch, output_dir, completed_images)
                save_checkpoint(completed_images)
                current_batch = []
        if current_batch:
            process_batch(pipe, current_batch, output_dir, completed_images)
            save_checkpoint(completed_images)

    except KeyboardInterrupt:
        print("\nProcess interrupted. Progress saved in checkpoint.")
        save_checkpoint(completed_images)
    except Exception as e:
        print(f"Error occurred: {str(e)}")
        save_checkpoint(completed_images)
        raise e

if __name__ == "__main__":
    main()