In [None]:
# InsightFace (not preinstalled)
!pip install insightface==0.7.3

# FAISS (not preinstalled)
!pip install faiss-cpu

# Gradio (Colab may have old version)
!pip install -U gradio

# ONNX runtime (required by InsightFace)
!pip install onnxruntime


Collecting insightface==0.7.3
  Downloading insightface-0.7.3.tar.gz (439 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.5/439.5 kB[0m [31m14.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting onnx (from insightface==0.7.3)
  Downloading onnx-1.20.0-cp312-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (8.4 kB)
Downloading onnx-1.20.0-cp312-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (18.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.1/18.1 MB[0m [31m28.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: insightface
  Building wheel for insightface (pyproject.toml) ... [?25l[?25hdone
  Created wheel for ins

In [None]:
import os, cv2, zipfile, uuid, shutil, tempfile
import numpy as np
import faiss
import gradio as gr
from collections import defaultdict
from insightface.app import FaceAnalysis

SIM_THRESHOLD = 0.6
TOP_K = 1000

def face_sorter(image_files, progress=gr.Progress(track_tqdm=True)):
    if not image_files:
        return None, "❌ No images provided"
    if isinstance(image_files, str):
        image_files = [image_files]
    elif isinstance(image_files, bool):
        return None, "❌ Invalid input"

    base_dir = tempfile.mkdtemp()
    INPUT_DIR = os.path.join(base_dir, "input_images")
    OUTPUT_DIR = os.path.join(base_dir, "face_albums")
    ZIP_PATH = os.path.join(base_dir, "face_identity_albums.zip")

    os.makedirs(INPUT_DIR, exist_ok=True)
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    total_uploads = len(image_files)
    progress(0.0, desc="Uploading images...")

    for i, f in enumerate(image_files):
        shutil.copy(f, os.path.join(INPUT_DIR, os.path.basename(f)))
        progress((i + 1) / total_uploads, desc=f"Uploading images: {i + 1} / {total_uploads}")

    progress(0.0, desc="Loading AI Model...")
    app = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"])
    app.prepare(ctx_id=0, det_size=(640, 640))

    faces = []
    files_list = os.listdir(INPUT_DIR)
    total_files = len(files_list)

    for i, fname in enumerate(files_list):
        path = os.path.join(INPUT_DIR, fname)
        img = cv2.imread(path)
        if img is None:
            continue

        detections = app.get(img)
        img_id = os.path.splitext(fname)[0]

        for face in detections:
            emb = face.embedding / np.linalg.norm(face.embedding)
            faces.append({
                "id": uuid.uuid4().hex,
                "img_id": img_id,
                "path": path,
                "bbox": face.bbox.astype(int),
                "emb": emb.astype("float32")
            })

        progress((i + 1) / total_files, desc=f"Detecting Faces: {i + 1} / {total_files}")

    if not faces:
        return None, "❌ No faces detected"

    progress(0.0, desc="Clustering Faces...")
    embeddings = np.stack([f["emb"] for f in faces])
    index = faiss.IndexFlatIP(embeddings.shape[1])
    index.add(embeddings)

    parent = list(range(len(faces)))

    def find(x):
        while parent[x] != x:
            parent[x] = parent[parent[x]]
            x = parent[x]
        return x

    def union(a, b):
        ra, rb = find(a), find(b)
        if ra != rb:
            parent[rb] = ra

    D, I = index.search(embeddings, TOP_K)
    for i in range(len(faces)):
        for j, sim in zip(I[i], D[i]):
            if sim >= SIM_THRESHOLD:
                union(i, j)

    clusters = defaultdict(list)
    for i in range(len(faces)):
        clusters[find(i)].append(faces[i])

    cluster_items = list(clusters.items())
    total_clusters = len(cluster_items)

    for idx, (pid, group) in enumerate(cluster_items):
        person_dir = os.path.join(OUTPUT_DIR, f"person_{pid:03d}")
        os.makedirs(person_dir, exist_ok=True)

        seen_images = set()
        crop_saved = False

        for f in group:
            img_name = os.path.basename(f["path"])
            if img_name not in seen_images:
                shutil.copy(f["path"], os.path.join(person_dir, img_name))
                seen_images.add(img_name)

            if not crop_saved:
                img = cv2.imread(f["path"])
                x1, y1, x2, y2 = f["bbox"]
                h, w = img.shape[:2]
                x1, y1 = max(0, x1 - 20), max(0, y1 - 20)
                x2, y2 = min(w, x2 + 20), min(h, y2 + 20)
                crop = img[y1:y2, x1:x2]
                cv2.imwrite(os.path.join(person_dir, f"{f['id']}_crop.jpg"), crop)
                crop_saved = True

        progress((idx + 1) / total_clusters, desc=f"Saving Albums: {idx + 1} / {total_clusters}")

    progress(0.0, desc="Creating ZIP...")
    with zipfile.ZipFile(ZIP_PATH, "w", zipfile.ZIP_DEFLATED) as z:
        for root, _, files in os.walk(OUTPUT_DIR):
            for f in files:
                p = os.path.join(root, f)
                z.write(p, arcname=os.path.relpath(p, OUTPUT_DIR))

    progress(1.0, desc="Done!")
    return ZIP_PATH, f"✅ Done! Found {len(clusters)} identities"

ui = gr.Interface(
    fn=face_sorter,
    inputs=gr.File(label="Upload Image Folder", file_count="directory", type="filepath"),
    outputs=[gr.File(label="Download Face Albums"), gr.Textbox(label="Status")],
    title="AI Face Album Maker",
    description="Upload a folder of images. Each person's folder contains all original images where they appear + one cropped face.",
    flagging_mode="never"
)

ui.launch()


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://615d14c321933add1d.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


