## Install Required Librarie

In [None]:
!pip install diffusers==0.19.0 accelerate torch transformers datasets pillow --quiet


In [None]:
!git clone https://github.com/huggingface/diffusers.git


## Download and Extract the Dataset

In [None]:
!mkdir ~/.kaggle


In [None]:
from google.colab import files

# Upload the kaggle.json file
uploaded = files.upload()

# Move the kaggle.json file to the Kaggle directory
!mv kaggle.json ~/.kaggle/


In [None]:
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!pip install kaggle --quiet


In [None]:
!kaggle datasets list


In [None]:
!kaggle datasets download -d tongpython/cat-and-dog
!unzip cat-and-dog.zip -d data/


## Preprocess the Dataset

In [None]:
import os
from torchvision import transforms
from PIL import Image, UnidentifiedImageError
from torch.utils.data import Dataset

# Path to dataset
data_dir = "data/training_set/training_set"
categories = os.listdir(data_dir)

# Define transformations
image_transforms = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5]),
])

# Supported image extensions
valid_extensions = {".jpg", ".jpeg", ".png", ".bmp", ".gif"}

# Custom dataset class
class ImageCaptionDataset(Dataset):
    def __init__(self, data_dir, categories, transforms, valid_extensions):
        self.data = []
        for category in categories:
            class_dir = os.path.join(data_dir, category)
            for img_file in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_file)

                # Check for valid image file
                if os.path.splitext(img_file)[-1].lower() not in valid_extensions:
                    continue

                try:
                    # Load image and caption
                    image = Image.open(img_path).convert("RGB")
                    caption = f"A picture of a {category.lower()}."
                    self.data.append((image, caption))
                except UnidentifiedImageError:
                    print(f"Skipping invalid image: {img_path}")

        self.transforms = transforms

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image, caption = self.data[idx]
        return {
            "pixel_values": self.transforms(image),
            "captions": caption
        }

# Create dataset instance
dataset = ImageCaptionDataset(data_dir, categories, image_transforms, valid_extensions)
print(f"Dataset size: {len(dataset)}")


## Load the Stable Diffusion Model

In [None]:
!pip install diffusers==0.19.0 accelerate torch transformers --quiet


In [None]:
!pip install --upgrade huggingface_hub

In [None]:
!pip install --upgrade diffusers --quiet

In [None]:
from diffusers import StableDiffusionImg2ImgPipeline
import torch
from huggingface_hub import hf_hub_download

# Load Stable Diffusion pipeline for image-to-image generation
model_id = "runwayml/stable-diffusion-v1-5"

# Define a function to replace cached_download functionality
def download_cached_model(repo_id, filename, cache_dir=None):
    """Downloads a file from the Hugging Face Hub, using caching if available."""
    return hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=cache_dir)


pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
    model_id,
    revision="fp16",
    torch_dtype=torch.float16,
    # Use the custom download function for cached downloads
    custom_pipeline_kwargs={"cached_download": download_cached_model},
)

pipe = pipe.to("cuda")

## Fine-Tune the Model

In [None]:
from torch.utils.data import DataLoader
from torch.optim import AdamW
from accelerate import Accelerator
from tqdm.auto import tqdm

# Training parameters
batch_size = 4
learning_rate = 5e-5
num_epochs = 3

# Create dataloader
train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Extract model components
unet = pipe.unet
optimizer = AdamW(unet.parameters(), lr=learning_rate)

# Initialize Accelerator
accelerator = Accelerator(mixed_precision="fp16")
unet, optimizer, train_dataloader = accelerator.prepare(unet, optimizer, train_dataloader)


In [None]:
# Training loop
unet.train()
for epoch in range(num_epochs):
    progress_bar = tqdm(train_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs}")
    for batch in progress_bar:
        # Prepare inputs
        pixel_values = torch.stack([item["pixel_values"] for item in batch]).to(accelerator.device)
        captions = [item["captions"] for item in batch]

        # Tokenize captions
        inputs = pipe.tokenizer(captions, padding="max_length", return_tensors="pt", truncation=True)
        input_ids = inputs.input_ids.to(accelerator.device)

        # Generate latent noise
        latents = pipe.vae.encode(pixel_values).latent_dist.sample()
        latents = latents * pipe.vae.config.scaling_factor

        # Predict noise
        noise_pred = unet(latents, input_ids).sample

        # Loss calculation
        loss = torch.nn.functional.mse_loss(noise_pred, latents)

        # Backpropagation
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()

        # Update progress bar
        progress_bar.set_postfix(loss=loss.item())


## Save the Fine-Tuned Model

In [None]:
unet.save_pretrained("./fine_tuned_unet")
pipe.text_encoder.save_pretrained("./fine_tuned_text_encoder")
pipe.tokenizer.save_pretrained("./fine_tuned_tokenizer")


## Test the Fine-Tuned Model

In [None]:
from PIL import Image

# Reload fine-tuned components
pipe.unet = pipe.unet.from_pretrained("./fine_tuned_unet")
pipe.text_encoder = pipe.text_encoder.from_pretrained("./fine_tuned_text_encoder")
pipe.tokenizer = pipe.tokenizer.from_pretrained("./fine_tuned_tokenizer")

# Test the model
input_image = Image.open("data/example.jpg").resize((512, 512))
prompt = "A cute dog sitting on a chair."

output_image = pipe(prompt=prompt, init_image=input_image, strength=0.8).images[0]
output_image.save("generated_image.jpg")


## Deploy as a Streamlit Web App

In [None]:
import streamlit as st
from PIL import Image
from diffusers import StableDiffusionImg2ImgPipeline

# Load the fine-tuned model
pipe = StableDiffusionImg2ImgPipeline.from_pretrained("./fine_tuned_model").to("cuda")

st.title("Image-to-Image Generation with Stable Diffusion")
st.write("Upload an image and provide a prompt to generate a new image.")

# Upload input image
uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "png"])
prompt = st.text_input("Prompt")

if st.button("Generate"):
    if uploaded_file and prompt:
        input_image = Image.open(uploaded_file).resize((512, 512))
        output_image = pipe(prompt=prompt, init_image=input_image, strength=0.8).images[0]
        st.image(output_image, caption="Generated Image")
    else:
        st.error("Please upload an image and provide a prompt.")
