<a href="https://colab.research.google.com/github/satishsampath/photo-genie/blob/main/photo_genie.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This demo takes a 
reuses a lot from ShriramShrirao's [excellent Dreambooth tutorial](https://colab.research.google.com/github/ShivamShrirao/diffusers/blob/main/examples/dreambooth/DreamBooth_Stable_Diffusion.ipynb). Thank you!

# Before you begin - get your photos ready in Google Drive

This demo trains a DreamBooth model using your own photos. Since Google Colab can't directly access your Google Photos, you have to upload a .zip file with your photos to Google Drive and connect that to this Colab. Here are the steps.

#### Export a set of photos from your Google Photos album. 
For example, I used just the photos I took in 2022 & 2023.
1. Open https://takeout.google.com/. 
2. Click “Deselect All”
3. Scroll down to “Google Photos” and select that row. 
4. Click the “All photo albums included” button. If it’s not there, wait for a few seconds as it’ll be gathering album info before showing up.
5. Clicking that button opens up a modal dialog titled “Google Photos content options”. Click “Deselect All” first, then scroll to “Photos from 2023” (or whichever year(s) you want to use) and select them. Then click “OK” to dismiss this modal.
6. Scroll to the very end of the Takeout page and click “Next Step”.
7. Select “Send download link via email”, “Export Once”, “.zip” and “2 GB” as the options. Then click “Create Export”.
8. Wait for a few minutes until you receive a download link in your google mail. Click on the link in the email to download the zip file containing all your Google Photos data for the selected year(s).

The .zip files contain videos & photos together. Uploading videos takes a very long time and they are not used, so remove them before uploading.
1. Extract the .zip file to a local folder
2. Delete all the .mp4 files
3. Create a new .zip file with the remaining downloaded files.

Open your Google Drive folder in the browser and upload the zip file. The Colab notebook will access it from there.
1. Open https://drive.google.com  and open the “Colab Notebooks” folder in there.
2. Create a new `photo_genie` folder
3. Upload your photos .zip files to this folder


# Initial Setup

In [None]:
#@title Install the required libs
!wget -q https://github.com/ShivamShrirao/diffusers/raw/main/examples/dreambooth/train_dreambooth.py
!wget -q https://github.com/ShivamShrirao/diffusers/raw/main/scripts/convert_diffusers_to_original_stable_diffusion.py
%pip install -qq git+https://github.com/ShivamShrirao/diffusers
%pip install -q -U --pre triton
%pip install -q accelerate transformers ftfy bitsandbytes==0.35.0 gradio natsort safetensors xformers
%pip install -qq "ipywidgets>=7,<8"
%pip install -qq deepface
%pip install -qq opencv-python
%pip install -qq matplotlib
%pip install -qq pynvml

In [None]:
#@title Initialize imports & global functions
import argparse
import itertools
import math
from contextlib import nullcontext
import random

import torch
import torch.nn.functional as F
import torch.utils.checkpoint
from torch.utils.data import Dataset

from accelerate import Accelerator
from accelerate.logging import get_logger
from accelerate.utils import set_seed
from diffusers import AutoencoderKL, DDPMScheduler, PNDMScheduler, StableDiffusionPipeline, UNet2DConditionModel
from diffusers.optimization import get_scheduler
from diffusers.pipelines.stable_diffusion import StableDiffusionSafetyChecker
from torchvision import transforms
from tqdm.auto import tqdm
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizer
import bitsandbytes as bnb

from deepface import DeepFace
import cv2
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
import re
import shutil
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import zipfile

from deepface import DeepFace
import cv2
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
import re
import shutil
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import zipfile

""" Given a directory with .zip files, extracts .jpg or .jpeg files from them all to a specified directory. """
def extract_photos_from_zips(zips_dir, dest_dir):
  os.makedirs(dest_dir, exist_ok=True)
  files = [file for file in os.listdir(zips_dir) if file.endswith('.zip')]
  for file in files:
    zip_path = os.path.join(zips_dir, file)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
      for name in zip_ref.namelist():
        if name.lower().endswith(('.jpg', '.jpeg')):
          with zip_ref.open(name) as src_file:
            dst_file_path = os.path.join(dest_dir, os.path.basename(name))
            print(os.path.basename(name), end=" ")
            with open(dst_file_path, 'wb') as dst_file:
              dst_file.write(src_file.read())
  print("\n")

"""Find unique faces based on the given embedding vectors and threshold."""
def find_unique_faces_dbscan(embeddings, threshold=0.6):
  embeddings_array = np.array([embedding for _, embedding in embeddings])

  # Normalize the embedding vectors
  scaler = StandardScaler()
  embeddings_array = scaler.fit_transform(embeddings_array)

  # Perform DBSCAN clustering on the embedding vectors
  dbscan = DBSCAN(metric='cosine', eps=threshold, min_samples=5)
  labels = dbscan.fit_predict(embeddings_array)

  unique_labels = list(set(labels))
  unique_faces = []
  for label in unique_labels:
    # Find the indices of embeddings belonging to the current label
    indices = np.where(labels == label)[0]
    item = {'count': len(indices), 'photos':[], 'embeddings':[]}
    for i in indices:
      item['photos'].append(embeddings[i][0])
    unique_faces.append(item)

  return unique_faces

"""Generate embedding vectors for face images in a folder matching a given pattern."""
def generate_embedding_vectors(folder_name, pattern):
  embedding_vectors = []
  file_names = [file for file in os.listdir(folder_name) if re.match(pattern, file)]

  for file_name in file_names:
    file_path = os.path.join(folder_name, file_name)
    img = cv2.imread(file_path)
    img_resized = cv2.resize(img, (160, 160))  # Resize image to (160, 160)        
    embedding = DeepFace.represent(img_resized, model_name='Facenet512', enforce_detection=False)
    embedding_vectors.append((file_name, embedding[0]['embedding']))
    
  return embedding_vectors

"""Extract faces from the photo and save them as separate images."""
def extract_faces(photos_path, orig_faces_path, closeup_faces_path):
  if not os.path.exists(closeup_faces_path):
    os.makedirs(closeup_faces_path)
  if not os.path.exists(orig_faces_path):
    os.makedirs(orig_faces_path)

  file_names = [file for file in os.listdir(photos_path) if os.path.isfile(os.path.join(photos_path, file))]
  for fi in range(len(file_names)):
    file_path = os.path.join(photos_path, file_names[fi])
    img = cv2.imread(file_path)
    img_height, img_width, _ = img.shape
    print("Extracting Faces : %d%%\r" % (int((fi * 100) / len(file_names))))
    faces = DeepFace.extract_faces(img, enforce_detection=False, align=True, detector_backend='retinaface')

    for i, face in enumerate(faces):
      # Ignore items that aren't confidently a face
      if face['confidence'] < 0.98:
        continue
      # Full face images are square-ish. So if it's longer than .5 on either sides, ignore it.
      facial_area = face['facial_area']
      wh_ratio = facial_area['w'] / facial_area['h']
      if wh_ratio > 1.5 or wh_ratio < 0.66:  
        continue

      # Save the close up face image
      destpath = os.path.join(closeup_faces_path, f"{os.path.splitext(os.path.basename(file_path))[0]}_face_{i}.jpg")
      cv2.imwrite(destpath, cv2.cvtColor(cv2.convertScaleAbs(face['face'], alpha=(255.0)), cv2.COLOR_BGR2RGB))

      # Expand the face rectangle by 50% on all sides and save that as the 'original' image
      expand = 0.5
      dim = max(facial_area['w'], facial_area['h'])
      x = max(0, facial_area['x'] - int((dim-facial_area['w'])/2) - int(dim * expand))
      y = max(0, facial_area['y'] - int((dim-facial_area['h'])/2) - int(dim * expand))
      w = min(img_width, int(dim * (1+expand*2)))
      h = min(img_height, int(dim * (1+expand*2)))
      face_img = img[y:y+h, x:x+w]
      destpath = os.path.join(orig_faces_path, f"{os.path.splitext(os.path.basename(file_path))[0]}_face_{i}.jpg")
      cv2.imwrite(destpath, face_img)
  print("\n")

def find_max_pairwise_diff_in_list(nums):
  if len(nums) < 2:
    return 0
  nums.sort()  # Sort the list in ascending order
  max_diff = float('-inf')
  for i in range(len(nums) - 1):
    diff = nums[i+1] - nums[i]
    max_diff = max(max_diff, diff)
  return max_diff

def find_best_set_of_unique_faces(embeddings):
  threshold = 0.01
  min_diff = 1000000
  min_diff_threshold = threshold
  min_diff_counts = []
  min_diff_unique_faces = []
  while threshold < 10:
    threshold += 0.01
    unique_faces = find_unique_faces_dbscan(embeddings, threshold)
    if len(unique_faces) > 1:
      print(threshold)
      counts=[]
      for face in unique_faces:
        counts.append(face['count'])
      print(counts)
      selected = False
      if len(unique_faces) > len(min_diff_unique_faces):
        selected = True
      elif len(unique_faces) == len(min_diff_unique_faces):
        diff = find_max_pairwise_diff_in_list(counts)
        if diff < min_diff:
          selected = True
      if selected:
        min_diff_threshold = threshold
        min_diff = find_max_pairwise_diff_in_list(counts)
        min_diff_counts = counts
        min_diff_unique_faces = unique_faces

  print(f"Final threshold & face cluster counts : {min_diff_threshold}, {min_diff_counts}")
  return min_diff_unique_faces

""" Copy the unique faces to separate directories. """
def organize_unique_face_photos_in_directories(unique_faces, orig_faces_dir, closeup_faces_dir):
  for i in range(len(unique_faces)):
    origdir = f"{orig_faces_dir}/face-{i}"
    dir = f"{closeup_faces_dir}/face-{i}"
    shutil.rmtree(origdir, True)
    shutil.rmtree(dir, True)
    os.mkdir(origdir)
    os.mkdir(dir)
    for name in unique_faces[i]['photos']:
      shutil.copyfile(f"{closeup_faces_dir}/{name}", f"{dir}/{name}")
      shutil.copyfile(f"{orig_faces_dir}/{name}", f"{origdir}/{name}")

""" Show a grid of images from the given directory into `output` """
def show_grid_of_images(image_dir, output, num_images=8):
  # Clear the previous images
  output.clear_output()

  # Get the list of image files
  image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir)]
  image_files = image_files[:num_images]  # take only the first n_images

  with output:
    # Set up the grid
    _, axs = plt.subplots(2, 4)
    axs = axs.ravel()

    for i, image_file in enumerate(image_files):
      img = PIL.Image.open(image_file)
      axs[i].imshow(img)
      axs[i].axis('off')  # hide the axes

    # If less than n_images, hide the rest of the axes
    for j in range(i+1, num_images):
      axs[j].axis('off')

    plt.tight_layout()
    plt.show()

""" Copy the unique faces to separate directories. """
def copy_images_to_instance_images_dir(src_dir, instance_images_dir):
  shutil.rmtree(instance_images_dir, True)
  os.mkdir(instance_images_dir)
  srcFiles = os.listdir(src_dir)
  for filename in srcFiles:
    filePath = os.path.join(src_dir, filename)
    if os.path.isfile(filePath):
      shutil.copy(filePath, instance_images_dir)


# Get your photos .zip downloaded from your Google Drive
Do you have a .zip file of your photos already saved in your Google Drive? **If not**, see the instructions at the very top of this page before continuing further.

In [None]:
#@title Connect & mount Google Drive
#@markdown Run this cell to mount your Google Drive.
gdrive_mount_path = "/content/gdrive" #@param {type: "string"}
from google.colab import drive
drive.mount(gdrive_mount_path, force_remount=True)

In [None]:
#@title Settings
#@markdown Setup variables for extracting photos from your Google Drive .zip files to local VM directory.
import os
import shutil

#@markdown `zipsDir` The directory in Google Drive where you have the photos .zip files
zips_dir = "photo_genie" #@param {type: "string"}

#@markdown `photosDir` The directory in VM to store the extracted photos
photos_dir = "/content/photos" #@param {type: "string"}

gdrive_photo_genie_path = f'{gdrive_mount_path}/MyDrive/{zips_dir}'
orig_faces_dir = f"{gdrive_photo_genie_path}/orig_faces"
closeup_faces_dir = f"{gdrive_photo_genie_path}/faces"
instance_faces_dir = f"{gdrive_photo_genie_path}/instance_images"

In [None]:
#@title Extract photos from your Google Drive .zip files to local VM directory.
shutil.rmtree(photos_dir, ignore_errors=True)
os.makedirs(photos_dir, exist_ok=True)

# Extract all photos from the .zips to local directory
extract_photos_from_zips(gdrive_photo_genie_path, photos_dir)

# Extract unique faces, group them and select one group

In [None]:
#@title Extract faces & group them by similarity.
extract_faces(photos_dir, orig_faces_dir, closeup_faces_dir)
embeddings = generate_embedding_vectors(closeup_faces_dir, r'.*_face_\d\.jpg')
unique_faces = find_best_set_of_unique_faces(embeddings)
organize_unique_face_photos_in_directories(unique_faces, orig_faces_dir, closeup_faces_dir)

In [None]:
#@title Select the set of faces to fine-tune the model
from ipywidgets import Button, HBox, Output, VBox, Label

# Create an Output widget for the images
output = Output()

def handle_directory_click(directory, output):
  show_grid_of_images(f"{closeup_faces_dir}/{directory}", output)
  copy_images_to_instance_images_dir(f"{orig_faces_dir}/{directory}", instance_faces_dir)

# Create a Button widget for each directory
buttons = []
for i in range(len(unique_faces)):
    directory = f"face-{i}"
    button = Button(description=directory)
    button.on_click(lambda x, directory=directory: handle_directory_click(directory, output))
    buttons.append(button)

# Create a VBox: one row for the buttons, one row for the output
VBox([Label(value="Select a group below by clicking on the button"), HBox(buttons), output])

# Train & fine-tune the model

In [None]:
#@title Model & Weights settings
#@markdown If model weights should be saved directly in google drive (takes around 4-5 GB).
save_to_gdrive = False #@param {type:"boolean"}

#@markdown Name/Path of the initial model.
MODEL_NAME = "stabilityai/stable-diffusion-2-1" #@param {type:"string"}

#@markdown Enter the directory name to save model at.

OUTPUT_DIR = "stable_diffusion_weights" #@param {type:"string"}
if save_to_gdrive:
    OUTPUT_DIR = f"{gdrive_photo_genie_path}/{OUTPUT_DIR}"
else:
    OUTPUT_DIR = "/content/" + OUTPUT_DIR

#@markdown Maximum training runs.
MAX_TRAIN_RUNS = 800 #@param {type:"integer"}

print(f"[*] Weights will be saved at {OUTPUT_DIR}")

!mkdir -p $OUTPUT_DIR

In [None]:
#@title Concepts - instance & class images
# You can also add multiple concepts here.

#@markdown Prompt describing your photos. `zwx` is just a unique name identifying you.
instance_prompt = "photo of zwx's face" #@param {type:"string"}

#@markdown Prompt describing the general class of your photos. Edit this to best describe how you & your photos identify.
class_prompt = "photo of a man's face" #@param {type:"string"}

concepts_list = [
    {
        "instance_prompt":      instance_prompt,
        "class_prompt":         class_prompt,
        "instance_data_dir":    instance_faces_dir,
        "class_data_dir":       "/content/class_data"
    },
]

# `class_data_dir` contains regularization images
import json
import os
for c in concepts_list:
    os.makedirs(c["instance_data_dir"], exist_ok=True)

with open("concepts_list.json", "w") as f:
    json.dump(concepts_list, f, indent=4)

In [None]:
#@title Clear up GPU memory before training
!pip install numba
from numba import cuda
device = cuda.get_current_device() 
device.reset()

In [None]:
#@title Run training & fine-tuning
!python3 train_dreambooth.py \
  --pretrained_model_name_or_path={MODEL_NAME} \
  --pretrained_vae_name_or_path="stabilityai/sd-vae-ft-mse" \
  --output_dir={OUTPUT_DIR} \
  --revision="fp16" \
  --with_prior_preservation --prior_loss_weight=1.0 \
  --seed=1337 \
  --resolution=512 \
  --train_batch_size=1 \
  --train_text_encoder \
  --mixed_precision="fp16" \
  --use_8bit_adam \
  --gradient_accumulation_steps=1 \
  --learning_rate=1e-6 \
  --lr_scheduler="constant" \
  --lr_warmup_steps=0 \
  --num_class_images=50 \
  --sample_batch_size=4 \
  --max_train_steps={MAX_TRAIN_RUNS} \
  --save_interval=10000 \
  --save_sample_prompt="oil painting on canvas of zwx's face" \
  --concepts_list="concepts_list.json"

# Reduce the `--save_interval` to lower than `--max_train_steps` to save weights from intermediate steps.
# `--save_sample_prompt` can be same as `--instance_prompt` to generate intermediate samples (saved along with weights in samples directory).

In [None]:
#@title (Optional) Convert weights to ckpt to use in web UIs like AUTOMATIC1111.
#@markdown Run conversion.
ckpt_path = f"{OUTPUT_DIR}/{MAX_TRAIN_RUNS}/model.ckpt"

half_arg = ""
#@markdown  Whether to convert to fp16, takes half the space (2GB).
fp16 = True #@param {type: "boolean"}
if fp16:
    half_arg = "--half"
!python convert_diffusers_to_original_stable_diffusion.py --model_path "{OUTPUT_DIR}/{MAX_TRAIN_RUNS}"  --checkpoint_path {ckpt_path} {half_arg}
print(f"[*] Converted ckpt saved at {ckpt_path}")

# Use the model to create your own images

In [None]:
#@title Setup inference
import torch
from torch import autocast
from diffusers import StableDiffusionPipeline, DDIMScheduler

# If you want to use previously trained model saved in gdrive, replace this with the full path of model in gdrive
model_path = f"{OUTPUT_DIR}/{MAX_TRAIN_RUNS}"

pipe = StableDiffusionPipeline.from_pretrained(model_path, safety_checker=None, torch_dtype=torch.float16).to("cuda")
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)
pipe.enable_xformers_memory_efficient_attention()

#@markdown Can set random seed here for reproducibility.
seed = 52362 #@param {type:"number"}
g_cuda = torch.Generator(device='cuda')
g_cuda.manual_seed(seed)

In [None]:
#@title Create your own images, using the fine-tuned model

prompt = "caricature pencil drawing of zwx chasing a dog" #@param {type:"string"}
negative_prompt = "" #@param {type:"string"}
num_samples = 10 #@param {type:"number"}
guidance_scale = 7.5 #@param {type:"number"}
num_inference_steps = 24 #@param {type:"number"}
height = 512 #@param {type:"number"}
width = 512 #@param {type:"number"}

with autocast("cuda"), torch.inference_mode():
    images = pipe(
        prompt,
        height=height,
        width=width,
        negative_prompt=negative_prompt,
        num_images_per_prompt=num_samples,
        num_inference_steps=num_inference_steps,
        guidance_scale=guidance_scale,
        generator=g_cuda
    ).images

for img in images:
    display(img)

In [None]:
#@title (Optional) Free runtime memory
exit()