#Summaries for continued LoRa learning:


***additional dataset***

45 input/output pairs per style || 5 epochs each || 512 sizes || lr=5e-5

summary:

*   ghibli: Final loss = 0.0916
*   lego: Final loss = 0.1126
*   2d_animation: Final loss = 0.1057
*   3d_animation: Final loss = 0.1360

---

***additional-additional dataset***

**Continued training from models generated post additional-dataset

30 input/output pairs per style || 512 size || learning rate & epochs adjusted per style

summary:

*   ghibli: Final loss = 0.0932 (ran @ 5 epochs and lr=5e-5)
*   lego: Final loss = 0.0834 (ran @ 3 epochs and lr=1e-5)
*   2d_animation: Final loss = 0.0854 (ran @ 3 epochs and lr=1e-5)
*   3d_animation: Final loss = 0.0893 (ran @ 5 epochs and lr=5e-5)


In [None]:
# Cell 1: Install Dependencies
!pip install -q peft accelerate diffusers transformers datasets
!pip install -q torch torchvision --extra-index-url https://download.pytorch.org/whl/cu118

#Deepseek used for coding assitance

In [None]:
# Cell 2: Imports and Setup
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"  # Reduce TensorFlow logging
from safetensors.torch import load_file
import torch
import torch.nn.functional as F
from PIL import Image
import torchvision.transforms as transforms
from diffusers import UNet2DConditionModel, AutoencoderKL, DDPMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
from peft import LoraConfig, get_peft_model
from torch.utils.data import Dataset, DataLoader
import json
from tqdm import tqdm
import random
import zipfile
from google.colab import files

print(f"CUDA available: {torch.cuda.is_available()}")
print(f"GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'None'}")

In [None]:
# Cell 3: Upload and Extract Training Data
print("Upload your data zip file...")
uploaded = files.upload()

# Zip of data expected for upload
for filename in uploaded.keys():
    if filename.endswith('.zip'):
        print(f"Extracting {filename}...")
        # Create a directory named 'data' and extract the zip file into it
        os.makedirs('data', exist_ok=True)
        with zipfile.ZipFile(filename, 'r') as zip_ref:
            # List contents before extracting
            file_list = zip_ref.namelist()
            print(f"ZIP contains {len(file_list)} files/folders")
            print("First few items:", file_list[:10])
            zip_ref.extractall('data')  # Extract into the 'data' directory

        print(f"Extracted {filename} successfully into ./data!")

        expected_styles = ['ghibli-pairs', 'lego-pairs', '2Danimation-pairs', '3Danimation-pairs']

        data_path = 'data'
        if os.path.exists(data_path):
            data_dirs = [d for d in os.listdir(data_path) if os.path.isdir(os.path.join(data_path, d))]
            print(f"Found directories inside '{data_path}': {data_dirs}")

            # Check each expected style
            for style in expected_styles:
                style_path = os.path.join(data_path, style)
                input_path = os.path.join(style_path, 'input')
                output_path = os.path.join(style_path, 'output')

                if os.path.exists(input_path) and os.path.exists(output_path):
                    input_files = len([f for f in os.listdir(input_path) if f.endswith(('.jpg', '.jpeg', '.png'))])
                    output_files = len([f for f in os.listdir(output_path) if f.endswith(('.jpg', '.jpeg', '.png'))])
                    print(f"{style}: {input_files} input files, {output_files} output files")
                else:
                    print(f"{style}: Missing input or output directory within '{style_path}'")
        else:
            print(f"Directory '{data_path}' not found after extraction")
    else:
        print(f"Skipped {filename} - not a ZIP file")

In [None]:
# Cell 4: Dataset Class
class ImagePairDataset(Dataset):
    # max pairs adjusted per dataset
    def __init__(self, input_dir, output_dir, transform=None, max_pairs=30):
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.transform = transform
        self.max_pairs = max_pairs

        # Get matching image pairs
        input_files = [f for f in os.listdir(input_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        output_files = [f for f in os.listdir(output_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

        self.pairs = []
        for input_file in input_files:
            if input_file in output_files:
                self.pairs.append(input_file)

        # Limit to max_pairs for training
        if len(self.pairs) > max_pairs:
            self.pairs = random.sample(self.pairs, max_pairs)

        print(f"Using {len(self.pairs)} image pairs for training from {input_dir}")

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        filename = self.pairs[idx]

        input_path = os.path.join(self.input_dir, filename)
        output_path = os.path.join(self.output_dir, filename)

        input_image = Image.open(input_path).convert('RGB')
        output_image = Image.open(output_path).convert('RGB')

        if self.transform:
            input_image = self.transform(input_image)
            output_image = self.transform(output_image)

        return input_image, output_image

In [None]:
print("Upload existing LoRA .safetensors files...")
uploaded_loras = files.upload()

# Check what files were uploaded
for filename in uploaded_loras.keys():
    print(f"Uploaded: {filename}")

existing_loras = {}
for style in ['ghibli', 'lego', '2d_animation', '3d_animation']:
    possible_filenames = [
        f"{style}_lora_proplus.safetensors",
        f"models/{style}/adapter_model.safetensors"
    ]

    for filename in possible_filenames:
        if os.path.exists(filename):
            existing_loras[style] = filename
            print(f"Found existing LoRA for {style}: {filename}")
            break
    else:
        print(f"No existing LoRA found for {style}")

In [None]:
# Cell 5: U-Net Fine-tuning Function
def lightweight_finetune_style(style_name, input_dir, output_dir, device="cuda", existing_lora_path=None):

    print(f"\nStarting optimized fine-tuning for {style_name}...")

    if existing_lora_path:
        print(f"Continuing training from existing LoRA: {existing_lora_path}")

    if not os.path.exists(input_dir) or not os.path.exists(output_dir):
        print(f"Directories not found: {input_dir} or {output_dir}")
        return None, None

    print("Loading Stable Diffusion 1.5 components...")
    model_id = "runwayml/stable-diffusion-v1-5"

    # Load individual components (let autocast handle precision)
    tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")
    text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder").to(device)
    vae = AutoencoderKL.from_pretrained(model_id, subfolder="vae").to(device)
    unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet").to(device)
    scheduler = DDPMScheduler.from_pretrained(model_id, subfolder="scheduler")

    # Freeze all components except U-Net
    text_encoder.requires_grad_(False)
    vae.requires_grad_(False)

    print("Loaded all model components")

    # Configure LoRA for U-Net only
    lora_config = LoraConfig(
        r=16,
        lora_alpha=32,
        target_modules=["to_k", "to_q", "to_v", "to_out.0"],
        lora_dropout=0.1,
        bias="none",
    )

    # Apply LoRA to U-Net only
    unet = get_peft_model(unet, lora_config)

    if existing_lora_path and os.path.exists(existing_lora_path):
        print(f"Loading existing LoRA weights from {existing_lora_path}")
        try:
            if existing_lora_path.endswith('.safetensors'):
                state_dict = load_file(existing_lora_path, device=device)
                unet.load_state_dict(state_dict, strict=False)
                print("Successfully loaded existing LoRA weights from .safetensors")
            elif existing_lora_path.endswith('.pth'):
                state_dict = torch.load(existing_lora_path, map_location=device)
                if 'model_state_dict' in state_dict:
                    unet.load_state_dict(state_dict['model_state_dict'], strict=False)
                else:
                    unet.load_state_dict(state_dict, strict=False)
                print("Successfully loaded existing LoRA weights from .pth")
            else:
                print(f"Unsupported file format: {existing_lora_path}")

        except Exception as e:
            print(f"Could not load existing LoRA: {e}")
    else:
        print("Starting with fresh LoRA weights")

    unet.print_trainable_parameters()

    # Full 512px resolution
    transform = transforms.Compose([
        transforms.Resize((512, 512)),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ])

    # Create dataset - use all available pairs
    dataset = ImagePairDataset(input_dir, output_dir, transform=transform, max_pairs=50)

    if len(dataset) == 0:
        print(f"No training pairs found for {style_name}")
        return None, None

    dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

    # Training parameters for continued training
    num_epochs = 3
    optimizer = torch.optim.AdamW(unet.parameters(), lr=1e-5, weight_decay=0.01)
    lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=1e-6)

    # Training setup
    accumulation_steps = 2
    unet.train()
    losses = []

    # Early stopping setup
    best_loss = float('inf')
    patience = 2
    patience_counter = 0
    best_model_state = None

    # Create checkpoint directory
    checkpoint_dir = f"checkpoints/{style_name}"
    os.makedirs(checkpoint_dir, exist_ok=True)

    print(f"Starting diffusion training ({num_epochs} epochs, {len(dataset)} pairs, 512px)...")

    # Training loop with mixed precision and checkpointing
    for epoch in range(num_epochs):
        epoch_loss = 0
        optimizer.zero_grad()

        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")

        for batch_idx, (input_images, output_images) in enumerate(progress_bar):
            input_images = input_images.to(device)
            output_images = output_images.to(device)

            with torch.no_grad():
                latents = vae.encode(output_images).latent_dist.sample() * 0.18215

            noise = torch.randn_like(latents)
            batch_size = latents.shape[0]
            timesteps = torch.randint(0, scheduler.config.num_train_timesteps, (batch_size,), device=device).long()

            noisy_latents = scheduler.add_noise(latents, noise, timesteps)

            prompt = [f"high quality image in {style_name} style, detailed, artistic"] * batch_size
            text_inputs = tokenizer(
                prompt,
                padding="max_length",
                max_length=77,
                truncation=True,
                return_tensors="pt"
            ).to(device)

            with torch.no_grad():
                encoder_hidden_states = text_encoder(text_inputs.input_ids).last_hidden_state

            with torch.cuda.amp.autocast():
                noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
                loss = F.mse_loss(noise_pred, noise) / accumulation_steps

            loss.backward()

            if (batch_idx + 1) % accumulation_steps == 0:
                torch.nn.utils.clip_grad_norm_(unet.parameters(), 1.0)
                optimizer.step()
                optimizer.zero_grad()

            epoch_loss += loss.item() * accumulation_steps
            progress_bar.set_postfix({"batch_loss": f"{loss.item() * accumulation_steps:.4f}"})

            del input_images, output_images, latents, noise, noisy_latents, noise_pred
            if batch_idx % 10 == 0:
                torch.cuda.empty_cache()

        avg_loss = epoch_loss / len(dataloader)
        losses.append(avg_loss)
        print(f"Epoch {epoch+1} Average Loss: {avg_loss:.4f}")

        # Update learning rate
        lr_scheduler.step()
        current_lr = optimizer.param_groups[0]['lr']
        print(f"Learning rate: {current_lr:.2e}")

        # Early stopping check
        if avg_loss < best_loss:
            best_loss = avg_loss
            patience_counter = 0
            best_model_state = unet.state_dict().copy()
            print(f"New best loss: {best_loss:.4f}")
        else:
            patience_counter += 1
            print(f"No improvement. Patience: {patience_counter}/{patience}")

        # Save checkpoint
        checkpoint_path = f"{checkpoint_dir}/epoch_{epoch+1}.pth"
        torch.save({
            'epoch': epoch,
            'model_state_dict': unet.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': avg_loss,
            'lora_config': lora_config.to_dict(),
        }, checkpoint_path)
        print(f"Saved checkpoint: {checkpoint_path}")

        # Early stopping
        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            # Restore best model
            if best_model_state is not None:
                unet.load_state_dict(best_model_state)
            break

    # Save final model
    save_dir = f"models/{style_name}_lora_proplus"
    os.makedirs(save_dir, exist_ok=True)
    unet.save_pretrained(save_dir)

    # Save training info
    training_info = {
        "style": style_name,
        "training_pairs": len(dataset),
        "epochs": num_epochs,
        "final_loss": losses[-1] if losses else None,
        "best_loss": best_loss,
        "image_resolution": 512,
        "lora_rank": 16,
        "base_model": "runwayml/stable-diffusion-v1-5",
        "training_complete": True
    }

    with open(os.path.join(save_dir, "training_info.json"), "w") as f:
        json.dump(training_info, f, indent=2)

    print(f"Training completed for {style_name}")
    print(f"Final loss: {losses[-1]:.4f}" if losses else "No loss recorded")
    print(f"Best loss: {best_loss:.4f}")
    print(f"Model saved to: {save_dir}")

    return save_dir, losses

In [None]:
# Cell 6: Main Training Loop
print("U-Net LoRA Fine-tuning for Stable Diffusion 1.5")

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

def find_style_directories():
    """Find available style directories from our training data structure"""
    styles = {}
    data_path = 'data'

    if not os.path.exists(data_path):
        print(f"Directory '{data_path}' not found.")
        return styles

    style_mappings = {
        'ghibli-pairs': 'ghibli',
        'lego-pairs': 'lego',
        '2Danimation-pairs': '2d_animation',
        '3Danimation-pairs': '3d_animation'
    }

    for dir_name, style_name in style_mappings.items():
        dir_path = os.path.join(data_path, dir_name)
        input_dir = os.path.join(dir_path, 'input')
        output_dir = os.path.join(dir_path, 'output')

        if os.path.exists(input_dir) and os.path.exists(output_dir):
            input_files = len([f for f in os.listdir(input_dir) if f.endswith(('.jpg', '.jpeg', '.png'))])
            output_files = len([f for f in os.listdir(output_dir) if f.endswith(('.jpg', '.jpeg', '.png'))])

            if input_files > 0 and output_files > 0:
                styles[style_name] = {
                    "input_dir": input_dir,
                    "output_dir": output_dir
                }
                print(f"Found {style_name}: {input_files} input files, {output_files} output files")
            else:
                print(f"{style_name}: No image files found in input/output directories")
        else:
            print(f"{style_name}: Missing directories")

    return styles

# Find available styles
styles = find_style_directories()

if not styles:
    print("No valid style directories found for training!")
    print("Available directories in /content:", os.listdir('.') if os.path.exists('.') else "None")
    if os.path.exists('data'):
        print("Available directories in /content/data:", os.listdir('data') if os.path.exists('data') else "None")
else:
    print(f"Training {len(styles)} styles: {list(styles.keys())}")

results = {}

existing_loras = {
    'ghibli': 'ghibli_lora_proplus.safetensors',
    'lego': 'lego_lora_proplus.safetensors',
    '2d_animation': '2d_animation_lora_proplus.safetensors',
    '3d_animation': '3d_animation_lora_proplus.safetensors'
}

for style_name, paths in styles.items():
    print(f"\nTraining {style_name}...")

    existing_lora_path = existing_loras.get(style_name)

    if os.path.exists(paths["input_dir"]) and os.path.exists(paths["output_dir"]):
        save_dir, losses = lightweight_finetune_style(
            style_name,
            paths["input_dir"],
            paths["output_dir"],
            device,
            existing_lora_path=existing_lora_path
        )
        if save_dir:
             results[style_name] = {"save_dir": save_dir, "losses": losses}
    else:
        print(f"Skipping {style_name} - directories not found")



print("\nFine-tuning Summary:")
if results:
    for style, result in results.items():
        print(f"{style}: Final loss = {result['losses'][-1]:.4f}")
else:
    print("No models were fine-tuned.")


In [None]:
#download models
from google.colab import files
import zipfile
import os

zip_filename = 'proper_unet_lora_models.zip'

with zipfile.ZipFile(zip_filename, 'w') as zipf:
    for root, dirs, files_in_dir in os.walk('models'):
        for file in files_in_dir:
            file_path = os.path.join(root, file)
            arcname = os.path.relpath(file_path, 'models')
            zipf.write(file_path, os.path.join('models', arcname))

print(f"Zipped model directory as: {zip_filename}")
files.download('proper_unet_lora_models.zip')
