<a href="https://colab.research.google.com/github/spreck/GISF2E/blob/master/Improved_Architectural_ControlNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install necessary libraries
!pip install -q diffusers transformers accelerate opencv-python pillow safetensors xformers tqdm

# Import necessary libraries
import torch
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, DPMSolverMultistepScheduler
from PIL import Image
import os
import requests
from tqdm import tqdm
import matplotlib.pyplot as plt
from IPython.display import display
from google.colab import files, drive
from datetime import datetime
import cv2
import numpy as np

# Utility functions
def get_timestamp():
    """Generate a timestamp string for filenames."""
    return datetime.now().strftime("%Y%m%d_%H%M%S")

def save_to_drive(image, drive_path, prefix, original_filename=None):
    """Save image to Google Drive with timestamp and original filename elements."""
    timestamp = get_timestamp()
    if original_filename:
        # Get the original extension
        _, ext = os.path.splitext(original_filename)
        # Create filename with timestamp and original name
        filename = f"{prefix}_{timestamp}_{os.path.splitext(original_filename)[0]}{ext}"
    else:
        filename = f"{prefix}_{timestamp}.png"

    save_path = os.path.join(drive_path, filename)
    image.save(save_path)
    return save_path

def download_with_progress(url, save_path):
    """Download a file with a progress bar."""
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    block_size = 1024  # 1 KB
    t = tqdm(total=total_size, unit='iB', unit_scale=True, desc="Downloading Model")
    with open(save_path, 'wb') as f:
        for chunk in response.iter_content(block_size):
            t.update(len(chunk))
            f.write(chunk)
    t.close()
    if total_size != 0 and t.n != total_size:
        raise ValueError("ERROR: Download size mismatch!")
    print(f"Model downloaded and saved to {save_path}")

def upload_and_save_image(drive_path):
    """Handle image upload and save to Google Drive."""
    print("Please upload an image file...")
    uploaded = files.upload()

    if not uploaded:
        raise ValueError("No file was uploaded")

    # Get the first uploaded file
    filename = list(uploaded.keys())[0]

    # First save to temporary location
    temp_path = f"/content/{filename}"
    with open(temp_path, 'wb') as f:
        f.write(uploaded[filename])

    # Load and display the image
    img = Image.open(temp_path)
    plt.figure(figsize=(8, 8))
    plt.imshow(img)
    plt.title("Uploaded Image")
    plt.axis('off')
    plt.show()

    # Save to Google Drive
    drive_save_path = save_to_drive(img, drive_path, "source", filename)
    print(f"Original image saved to Google Drive: {drive_save_path}")

    return temp_path, filename

def preprocess_canny(image_path, output_width=1024):
    """Preprocess the input image for Canny edge detection."""
    image = Image.open(image_path).convert("RGB")
    image = image.resize((output_width, int(output_width * image.height / image.width)))

    # Convert to grayscale and apply Canny edge detection
    image_np = np.array(image)
    gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)
    edges = cv2.Canny(gray, 100, 200)  # Apply Canny edge detection
    edges_rgb = cv2.cvtColor(edges, cv2.COLOR_GRAY2RGB)  # Convert edges to RGB format

    return Image.fromarray(edges_rgb), image.size

def process_image(image_path, output_width=1024):
    """Process input image while maintaining aspect ratio."""
    image = Image.open(image_path).convert("RGB")
    aspect_ratio = image.height / image.width
    new_height = int(output_width * aspect_ratio)
    return image.resize((output_width, new_height)), (output_width, new_height)

def setup_pipelines(model_path):
    """Initialize and configure the Stable Diffusion pipelines for both lineart and canny."""
    # Setup Lineart ControlNet
    lineart_controlnet = ControlNetModel.from_pretrained(
        "lllyasviel/control_v11p_sd15_lineart",
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
    )
    lineart_pipe = StableDiffusionControlNetPipeline.from_single_file(
        model_path,
        controlnet=lineart_controlnet,
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
        safety_checker=None
    ).to("cuda" if torch.cuda.is_available() else "cpu")
    lineart_pipe.scheduler = DPMSolverMultistepScheduler.from_config(lineart_pipe.scheduler.config)
    lineart_pipe.scheduler.algorithm_type = "karras"
    if torch.cuda.is_available():
        lineart_pipe.enable_xformers_memory_efficient_attention()

    # Setup Canny ControlNet
    canny_controlnet = ControlNetModel.from_pretrained(
        "lllyasviel/sd-controlnet-canny",
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
    )
    canny_pipe = StableDiffusionControlNetPipeline.from_single_file(
        model_path,
        controlnet=canny_controlnet,
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
        safety_checker=None
    ).to("cuda" if torch.cuda.is_available() else "cpu")
    canny_pipe.scheduler = DPMSolverMultistepScheduler.from_config(canny_pipe.scheduler.config)
    canny_pipe.scheduler.algorithm_type = "karras"
    if torch.cuda.is_available():
        canny_pipe.enable_xformers_memory_efficient_attention()

    return lineart_pipe, canny_pipe

def generate_visualization(pipe, image, dimensions, prompt, negative_prompt="low quality, distorted, unrealistic, bad perspective"):
    """Generate the architectural visualization."""
    return pipe(
        prompt=prompt,
        negative_prompt=negative_prompt,
        image=image,
        height=dimensions[1],
        width=dimensions[0],
        num_inference_steps=30,
        guidance_scale=6
    ).images[0]

# Main execution function
def main():
    """Main execution function with enhanced Google Drive integration."""
    # Mount Google Drive
    drive.mount('/content/drive')

    # Create organized folder structure in Drive
    base_path = "/content/drive/My Drive/StableDiffusion"
    folders = {
        'models': os.path.join(base_path, 'Models'),
        'source': os.path.join(base_path, 'SourceImages'),
        'output': os.path.join(base_path, 'GeneratedImages')
    }
    for folder in folders.values():
        os.makedirs(folder, exist_ok=True)

    # Model setup
    model_save_path = os.path.join(folders['models'], "architecturerealmix_v11.safetensors")
    model_url = "https://huggingface.co/jwha/Architectural_Design/resolve/main/architecturerealmix_v11.safetensors"
    if not os.path.exists(model_save_path):
        print("Downloading the architectural model...")
        download_with_progress(model_url, model_save_path)

    # Setup pipelines
    lineart_pipe, canny_pipe = setup_pipelines(model_save_path)

    # Interactive loop
    while True:
        try:
            temp_image_path, original_filename = upload_and_save_image(folders['source'])

            # Process input images for both lineart and canny
            original_image, dimensions = process_image(temp_image_path)
            canny_image, canny_dimensions = preprocess_canny(temp_image_path)
            assert dimensions == canny_dimensions, "Dimensions for lineart and canny inputs must match"

            # Get user prompt
            prompt = input("Enter your text prompt (e.g., 'A skyscraper with reflective blue windows'): ")

            # Generate visualizations
            print("Generating architectural visualizations...")
            lineart_result = generate_visualization(lineart_pipe, original_image, dimensions, prompt)
            canny_result = generate_visualization(canny_pipe, canny_image, dimensions, prompt)

            # Display results
            plt.figure(figsize=(18, 6))
            plt.subplot(1, 3, 1)
            plt.imshow(original_image)
            plt.title("Original Image")
            plt.axis("off")

            plt.subplot(1, 3, 2)
            plt.imshow(lineart_result)
            plt.title("Lineart Visualization")
            plt.axis("off")

            plt.subplot(1, 3, 3)
            plt.imshow(canny_result)
            plt.title("Canny Visualization")
            plt.axis("off")
            plt.show()

            # Save results to Drive
            save_to_drive(lineart_result, folders['output'], "lineart", original_filename)
            save_to_drive(canny_result, folders['output'], "canny", original_filename)
            print(f"Results saved to Google Drive.")

            # Ask user if they want to continue
            action = input(
                "Would you like to (1) upload a new image, (2) use the same image with a new prompt, or (3) exit? Enter 1, 2, or 3: "
            ).strip()

            if action == "1":
                temp_image_path, original_filename = None, None
                original_image, dimensions = None, None
            elif action == "3":
                print("Exiting. Thank you for using the script!")
                break
            elif action != "2":
                print("Invalid input. Exiting.")
                break

        except Exception as e:
            print(f"An error occurred: {str(e)}")
            raise

# Run the script
if __name__ == "__main__":
    main()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Fetching 11 files:   0%|          | 0/11 [00:00<?, ?it/s]

Loading pipeline components...:   0%|          | 0/6 [00:00<?, ?it/s]

Some weights of the model checkpoint were not used when initializing CLIPTextModel: 
 ['text_model.embeddings.position_ids']
You have disabled the safety checker for <class 'diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline'> by passing `safety_checker=None`. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .


OutOfMemoryError: CUDA out of memory. Tried to allocate 74.00 MiB. GPU 0 has a total capacity of 14.75 GiB of which 1.06 MiB is free. Process 7847 has 14.74 GiB memory in use. Of the allocated memory 14.59 GiB is allocated by PyTorch, and 22.10 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)