In [1]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torchvision import transforms
from PIL import Image
import pandas as pd
from transformers import CLIPTokenizer
from diffusers import StableDiffusionPipeline, UNet2DModel, AutoencoderKL
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Dataset class
class LogoDataset(Dataset):
    def __init__(self, csv_path, image_folder, tokenizer, image_size=(256, 256)):
        self.data = pd.read_csv(csv_path)
        self.image_folder = image_folder
        self.tokenizer = tokenizer
        self.transform = transforms.Compose([
            transforms.Resize(image_size),
            transforms.ToTensor(),
            transforms.Normalize([0.5], [0.5]),  # Normalize to [-1, 1]
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        prompt = row['Description']
        image_path = f"{self.image_folder}/{row['Name']}"

        tokens = self.tokenizer(prompt, padding="max_length", truncation=True, max_length=77, return_tensors="pt")
        image = Image.open(image_path).convert("RGB")
        image = self.transform(image)

        return tokens['input_ids'].squeeze(), tokens['attention_mask'].squeeze(), image


In [3]:
# Initialize dataset and dataloader
csv_path = "data.csv"
image_folder = "dataset"
tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14")

In [4]:
dataset = LogoDataset(csv_path, image_folder, tokenizer)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)  # Smaller batch size

In [5]:
pipeline = StableDiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1-base").to("cuda")

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Fetching 13 files: 100%|██████████| 13/13 [10:18<00:00, 47.59s/it]
Loading pipeline components...: 100%|██████████| 6/6 [00:00<00:00,  7.04it/s]


In [6]:
# Optimizer
optimizer = torch.optim.AdamW(pipeline.unet.parameters(), lr=5e-5)

In [7]:
# Training loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
epochs = 5
accumulation_steps = 4

In [None]:
for epoch in range(epochs):
    epoch_loss = 0.0
    for step, (input_ids, attention_mask, images) in enumerate(tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}")):
        images = images.to(device)
        input_ids, attention_mask = input_ids.to(device), attention_mask.to(device)

        # Encode images to latent space using VAE
        with torch.no_grad():
            latents = pipeline.vae.encode(images).latent_dist.sample() * 0.18215

        # Generate text embeddings using CLIP text encoder
        with torch.no_grad():
            text_embeddings = pipeline.text_encoder(input_ids=input_ids, attention_mask=attention_mask)[0]

        noise = torch.randn_like(latents)
        timesteps = torch.randint(0, 1000, (latents.size(0),), device=latents.device).long()

        # Add noise to latents
        noisy_latents = latents + noise * 0.1

        # UNet forward pass with text conditioning
        model_output = pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample

        # Compute MSE loss
        loss = nn.functional.mse_loss(model_output, noise)
        loss = loss / accumulation_steps  # Normalize for gradient accumulation

        # Backward pass and optimization
        loss.backward()
        if (step + 1) % accumulation_steps == 0 or (step + 1) == len(dataloader):
            optimizer.step()
            optimizer.zero_grad()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {epoch_loss:.4f}")

print("Training Complete.")

Epoch 1/5:   1%|          | 2/333 [02:50<8:05:39, 88.03s/it]

In [None]:
import os
import torch
from torchvision.utils import save_image
from tqdm import tqdm

# Create directory to save models and outputs
save_dir = "trained_model"
os.makedirs(save_dir, exist_ok=True)

In [None]:
# Function to save model components and tokenizer
def save_model_and_tokenizer(pipeline, save_dir):
    pipeline.unet.save_pretrained(os.path.join(save_dir, "unet"))
    pipeline.vae.save_pretrained(os.path.join(save_dir, "vae"))
    pipeline.text_encoder.save_pretrained(os.path.join(save_dir, "text_encoder"))
    pipeline.tokenizer.save_pretrained(os.path.join(save_dir, "tokenizer"))
    print(f"Model components and tokenizer saved to {save_dir}")

In [None]:
# Validation Function for Performance Metrics
def validate_model(pipeline, dataloader, device, num_samples=8):
    pipeline.eval()
    total_mse_loss = 0.0
    num_batches = 0

    with torch.no_grad():
        for step, (input_ids, attention_mask, images) in enumerate(tqdm(dataloader, desc="Validating")):
            if step >= num_samples:  # Evaluate only on a few samples for performance
                break
            
            images = images.to(device)
            input_ids, attention_mask = input_ids.to(device), attention_mask.to(device)

            # Encode images to latent space
            latents = pipeline.vae.encode(images).latent_dist.sample() * 0.18215

            # Generate text embeddings
            text_embeddings = pipeline.text_encoder(input_ids=input_ids, attention_mask=attention_mask)[0]

            # Add noise to latents
            noise = torch.randn_like(latents)
            timesteps = torch.randint(0, 1000, (latents.size(0),), device=latents.device).long()
            noisy_latents = latents + noise * 0.1

            # UNet forward pass
            predicted_noise = pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample

            # Compute MSE loss
            mse_loss = nn.functional.mse_loss(predicted_noise, noise)
            total_mse_loss += mse_loss.item()
            num_batches += 1

    avg_mse_loss = total_mse_loss / num_batches
    print(f"Validation MSE Loss: {avg_mse_loss:.4f}")
    return avg_mse_loss


In [None]:
# Generate Sample Images
def generate_samples(pipeline, prompts, output_dir, num_steps=50):
    os.makedirs(output_dir, exist_ok=True)
    pipeline.eval()

    for i, prompt in enumerate(prompts):
        with torch.no_grad():
            # Tokenize prompt
            inputs = pipeline.tokenizer(prompt, return_tensors="pt", padding="max_length", truncation=True, max_length=77)
            input_ids = inputs.input_ids.to(device)
            attention_mask = inputs.attention_mask.to(device)

            # Generate text embeddings
            text_embeddings = pipeline.text_encoder(input_ids=input_ids, attention_mask=attention_mask)[0]

            # Random noise
            latents = torch.randn((1, pipeline.unet.in_channels, 64, 64), device=device)

            # Denoise latents
            for t in reversed(range(num_steps)):
                timestep = torch.full((1,), t, device=device, dtype=torch.long)
                noise_pred = pipeline.unet(latents, timestep, encoder_hidden_states=text_embeddings).sample
                latents = latents - noise_pred * 0.1  # Step adjustment for denoising

            # Decode latents to images
            images = pipeline.vae.decode(latents / 0.18215).sample
            save_image(images, os.path.join(output_dir, f"sample_{i}.png"))
            print(f"Generated image for prompt '{prompt}' saved as sample_{i}.png")

In [None]:
# Save Model
save_model_and_tokenizer(pipeline, save_dir)

# Validate Model
validation_loss = validate_model(pipeline, dataloader, device)

In [None]:
# Generate Samples
sample_prompts = ["A modern minimalist logo", "A vintage-style floral logo", "An abstract tech-themed logo"]
generate_samples(pipeline, sample_prompts, output_dir="generated_samples")

print("Model saved, validation completed, and samples generated.")