In [None]:
# If running in Colab, uncomment installs:
# !pip -q install diffusers==0.30.0 transformers==4.43.3 accelerate==0.33.0 \
#           sentencepiece==0.2.0 safetensors==0.4.3 \
#           facenet-pytorch==2.6.0 opencv-python==4.10.0.84 \
#           imagehash==4.3.1 ftfy==6.2 \
#           git+https://github.com/openai/CLIP.git

import os, json, random, time, uuid, math, hashlib, itertools
from pathlib import Path
from collections import defaultdict, Counter

import numpy as np
import pandas as pd
from PIL import Image, ImageOps

import torch
from tqdm.auto import tqdm

# Face detection
from facenet_pytorch import MTCNN

# CLIP for text-image similarity
import clip
import torchvision.transforms as T

# Perceptual hamming hash for near-duplicate removal
import imagehash

# Diffusers (choose SDXL or SD 1.5 below)
from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline
from diffusers import DPMSolverMultistepScheduler

In [None]:
# ===== YOUR PATHS (exactly as you asked) =====
TRAIN_CSV      = "/content/ann-visual-emotion/data/processed/EmoSet_splits/train.csv"
IMAGES_ROOT    = "/content/ann-visual-emotion/data/raw/EmoSet"
SYNTH_ROOT     = "/content/ann-visual-emotion/data/raw/EmoSet"  # generate into the same root (separate subfolders/files)
LABEL_MAP_JSON = "/content/ann-visual-emotion/data/processed/EmoSet_splits/label_map.json"

Path(SYNTH_ROOT).mkdir(parents=True, exist_ok=True)

# Fallback label map (will be overwritten if file exists)
LABEL_MAP = {
    "amusement": 0,
    "anger": 1,
    "awe": 2,
    "contentment": 3,
    "disgust": 4,
    "excitement": 5,
    "fear": 6,
    "sadness": 7,
}

# Overwrite with file contents if present
if Path(LABEL_MAP_JSON).exists():
    with open(LABEL_MAP_JSON, "r") as f:
        mp = json.load(f)
    # normalize to str->int
    LABEL_MAP = {str(k): int(v) for k, v in mp.items()}

IDX2LABEL = {v:k for k,v in LABEL_MAP.items()}
CLASSES = list(LABEL_MAP.keys())

# Target count per class
TARGET_PER_CLASS = 2000

# Read train.csv and count existing images per class
df_tr = pd.read_csv(TRAIN_CSV)
# Try to find reasonable column names
img_col_candidates = ["image_path", "img_path", "path", "filepath", "image", "img"]
lab_col_candidates = ["label", "class", "target", "emotion"]

IMG_COL = next((c for c in img_col_candidates if c in df_tr.columns), None)
LAB_COL = next((c for c in lab_col_candidates if c in df_tr.columns), None)
if IMG_COL is None or LAB_COL is None:
    raise ValueError(f"Could not infer image/label columns. Found columns: {list(df_tr.columns)}")

existing_counts = Counter(df_tr[LAB_COL].astype(str).tolist())
existing_counts = {k: int(existing_counts.get(k, 0)) for k in CLASSES}

need_counts = {lab: max(TARGET_PER_CLASS - existing_counts.get(lab, 0), 0) for lab in CLASSES}
need_counts

In [None]:
# Core style for photoreal portraits
CORE_STYLE = (
    "ultra-detailed RAW color photo, 85mm lens, shallow depth of field, "
    "studio portrait, highly detailed skin, pores visible, sharp eyes, "
    "soft key light, rim light, natural color grading"
)

# Negative prompt to avoid artifacts
NEGATIVE = (
    "low-res, blurry, deformed face, extra fingers, extra limbs, cropped head, "
    "over-saturated, watermark, text, logo, jpeg artifacts, disfigured, doll-like, "
    "unrealistic skin, severe shadow banding"
)

# Diverse attributes to randomize per sample
AGES = ["teenager", "young adult", "adult", "middle-aged", "senior"]
GENDERS = ["woman", "man", "non-binary person"]
SKIN_TONES = [
    "very fair skin", "fair skin", "medium skin", "olive skin",
    "brown skin", "dark brown skin", "very dark skin"
]
HAIR = ["short hair", "long hair", "curly hair", "straight hair", "tied hair", "shaved head"]
LIGHTS = [
    "softbox lighting", "natural window light", "butterfly lighting",
    "Rembrandt lighting", "cinematic lighting", "overcast daylight"
]

# Emotion-specific wording that tends to produce crisp results
EMOTION_VERBS = {
    "amusement":   ["laughing", "smiling with joy", "amused grin", "eyes crinkled in laughter"],
    "anger":       ["angry expression", "furious glare", "tense jaw", "brows furrowed in anger"],
    "awe":         ["awe-struck expression", "eyes wide in wonder", "astonished look", "breath taken in awe"],
    "contentment": ["content smile", "relaxed and satisfied", "peaceful expression", "calm gentle smile"],
    "disgust":     ["disgusted expression", "nose wrinkled", "upper lip raised", "eyes squinting in disgust"],
    "excitement":  ["excited expression", "sparkling eyes", "cheerful wide smile", "thrilled look"],
    "fear":        ["fearful expression", "eyes widened in fear", "tense mouth", "startled face"],
    "sadness":     ["sad expression", "teary eyes", "downturned mouth", "gloomy look"],
}

def make_prompt(emotion: str) -> str:
    a = random.choice(AGES)
    g = random.choice(GENDERS)
    s = random.choice(SKIN_TONES)
    h = random.choice(HAIR)
    l = random.choice(LIGHTS)
    e = random.choice(EMOTION_VERBS[emotion])

    return (
        f"A photorealistic portrait of a {a} {g} with {s}, {h}, "
        f"showing a clear {emotion} facial expression ({e}), "
        f"{CORE_STYLE}, {l}, high dynamic range, 8k master"
    )

In [None]:
USE_SDXL = torch.cuda.is_available() and torch.cuda.get_device_properties(0).total_memory >= 14_000_000_000

if USE_SDXL:
    model_id = "stabilityai/stable-diffusion-xl-base-1.0"
    pipe = StableDiffusionXLPipeline.from_pretrained(
        model_id, torch_dtype=torch.float16, use_safetensors=True
    )
else:
    model_id = "runwayml/stable-diffusion-v1-5"
    pipe = StableDiffusionPipeline.from_pretrained(
        model_id, torch_dtype=torch.float16, use_safetensors=True
    )

pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
pipe = pipe.to("cuda" if torch.cuda.is_available() else "cpu")

# Slight guidance helps clarity; you can tune 6.5–8.0 (SD 1.5) or 5.0–7.0 (SDXL)
GUIDANCE_SCALE = 7.0 if not USE_SDXL else 6.0
IMG_SIZE = (768, 768) if USE_SDXL else (512, 512)
BATCH_SIZE = 4  # increase if your VRAM allows
SEED = 42
generator = torch.Generator(device=pipe.device).manual_seed(SEED)

In [None]:
# Face detector
mtcnn = MTCNN(keep_all=False, device="cuda" if torch.cuda.is_available() else "cpu")

# CLIP model
clip_model, clip_preprocess = clip.load("ViT-B/32", device="cuda" if torch.cuda.is_available() else "cpu")
clip_model.eval()

def clip_score(pil_img: Image.Image, text: str) -> float:
    with torch.no_grad():
        image_input = clip_preprocess(pil_img).unsqueeze(0).to(clip_model.visual.conv1.weight.device)
        text_tokens = clip.tokenize([text]).to(clip_model.visual.conv1.weight.device)
        image_features = clip_model.encode_image(image_input)
        text_features  = clip_model.encode_text(text_tokens)
        image_features /= image_features.norm(dim=-1, keepdim=True)
        text_features  /= text_features.norm(dim=-1, keepdim=True)
        sim = (image_features @ text_features.T).item()
    return float(sim)

# Simple near-duplicate check via perceptual hash
def is_near_duplicate(pil_img: Image.Image, seen_hashes: set, max_dist: int = 5) -> bool:
    h = imagehash.phash(pil_img)
    for sh in seen_hashes:
        if h - sh <= max_dist:
            return True
    return False

In [None]:
def save_with_meta(pil_img: Image.Image, out_dir: Path, meta: dict, fname_prefix: str = "synth"):
    out_dir.mkdir(parents=True, exist_ok=True)
    uid = str(uuid.uuid4())[:8]
    img_fp = out_dir / f"{fname_prefix}_{uid}.png"
    json_fp = out_dir / f"{fname_prefix}_{uid}.json"
    pil_img.save(img_fp, "PNG")
    with open(json_fp, "w") as f:
        json.dump(meta, f, indent=2)
    return img_fp, json_fp

def crop_face_if_present(pil_img: Image.Image, margin: float = 0.15) -> Image.Image:
    # returns cropped image if a single face is detected; else original
    img = pil_img.convert("RGB")
    boxes, _ = mtcnn.detect(img)
    if boxes is not None and len(boxes) > 0:
        x1, y1, x2, y2 = boxes[0]  # first face
        w, h = img.size
        dx, dy = (x2 - x1), (y2 - y1)
        x1 = max(0, int(x1 - margin * dx))
        x2 = min(w, int(x2 + margin * dx))
        y1 = max(0, int(y1 - margin * dy))
        y2 = min(h, int(y2 + margin * dy))
        return img.crop((x1, y1, x2, y2)).resize(img.size, Image.LANCZOS)
    return img

In [None]:
# CLIP acceptance thresholds (tune per your taste)
CLIP_MIN = 0.26 if USE_SDXL else 0.28  # SDXL often gives slightly higher semantic fit; adjust as needed

# Keep rolling hash set for de-duplication
seen_hashes_per_class = {lab: set() for lab in CLASSES}

def generate_batch(prompts, negative_prompt):
    if USE_SDXL:
        images = pipe(
            prompt=prompts,
            negative_prompt=[negative_prompt]*len(prompts),
            num_inference_steps=28,
            guidance_scale=GUIDANCE_SCALE,
            width=IMG_SIZE[0], height=IMG_SIZE[1],
            generator=generator
        ).images
    else:
        images = pipe(
            prompt=prompts,
            negative_prompt=negative_prompt,
            num_inference_steps=30,
            guidance_scale=GUIDANCE_SCALE,
            width=IMG_SIZE[0], height=IMG_SIZE[1],
            generator=generator
        ).images
    return images

for emotion in CLASSES:
    need = int(need_counts.get(emotion, 0))
    if need <= 0:
        print(f"[{emotion}] Already >= {TARGET_PER_CLASS}, skipping.")
        continue

    print(f"\nGenerating for '{emotion}' -> need {need} images to reach {TARGET_PER_CLASS}")
    out_dir = Path(SYNTH_ROOT) / emotion
    out_dir.mkdir(parents=True, exist_ok=True)

    pbar = tqdm(total=need, desc=f"{emotion}")
    accepted = 0

    while accepted < need:
        # Build a small batch of diverse prompts
        batch_prompts = [make_prompt(emotion) for _ in range(BATCH_SIZE)]
        images = generate_batch(batch_prompts, NEGATIVE)

        for img, prompt in zip(images, batch_prompts):
            # Optional: crop to face framing to make expression clearer
            img_c = crop_face_if_present(img)

            # Filter 1: at least one face
            boxes, _ = mtcnn.detect(img_c)
            if boxes is None or len(boxes) == 0:
                continue

            # Filter 2: CLIP prompt-image similarity
            sim = clip_score(img_c, prompt)
            if sim < CLIP_MIN:
                continue

            # Filter 3: de-duplication
            ph = imagehash.phash(img_c)
            if is_near_duplicate(img_c, seen_hashes_per_class[emotion], max_dist=5):
                continue

            # Save + metadata
            meta = {
                "emotion": emotion,
                "prompt": prompt,
                "negative_prompt": NEGATIVE,
                "model_id": model_id,
                "guidance_scale": GUIDANCE_SCALE,
                "seed": SEED,
                "clip_similarity": sim,
                "use_sdxl": bool(USE_SDXL)
            }
            save_with_meta(img_c, out_dir, meta, fname_prefix="synth")
            seen_hashes_per_class[emotion].add(ph)

            accepted += 1
            pbar.update(1)
            if accepted >= need:
                break

    pbar.close()

print("\nDone! Synthetic balancing complete.")

In [None]:
def append_synth_to_csv(train_csv, images_root, classes):
    df = pd.read_csv(train_csv)
    img_col = IMG_COL
    lab_col = LAB_COL

    rows = []
    for c in classes:
        folder = Path(SYNTH_ROOT) / c
        if not folder.exists():
            continue
        for p in folder.glob("synth_*.png"):
            # store relative path (relative to IMAGES_ROOT) if your CSV uses relatives
            try:
                rel = str(Path(p).relative_to(IMAGES_ROOT))
            except ValueError:
                # if generation path is already inside IMAGES_ROOT, the above works; otherwise just store absolute
                rel = str(p)
            rows.append({img_col: rel, lab_col: c})

    df_new = pd.concat([df, pd.DataFrame(rows)], ignore_index=True)
    # Drop exact duplicates just in case
    df_new = df_new.drop_duplicates(subset=[img_col, lab_col])
    df_new.to_csv(train_csv, index=False)
    return len(rows)

added = append_synth_to_csv(TRAIN_CSV, IMAGES_ROOT, CLASSES)
print(f"Appended {added} synthetic rows to {TRAIN_CSV}")

In [None]:
import shutil

# Path to your folder
folder_path = "/content/ann-visual-emotion/data/processed/NewEmoSet"

# Output zip file path (without .zip extension)
output_path = "/content/NewEmoSet"

# Create the zip file
shutil.make_archive(output_path, 'zip', folder_path)

In [None]:
from google.colab import files
files.download("/content/NewEmoSet.zip")