In [None]:
%pip install torch torchvision facenet-pytorch pillow scikit-learn tqdm


In [None]:
%pip install kaggle

In [None]:
import kagglehub

path = kagglehub.dataset_download("hearfool/vggface2")
print("Path to dataset files:", path)

In [None]:
print("Path to dataset files:", path)

In [None]:
import os
import zipfile
import hashlib
import shutil
from pathlib import Path
from PIL import Image
from tqdm import tqdm
import torch
from torchvision import transforms
from facenet_pytorch import MTCNN
from sklearn.model_selection import train_test_split

# KaggleHub download path
data_root = r'C:\Users\LENOVO\.cache\kagglehub\datasets\hearfool\vggface2\versions\1'

# Unzip all .zip files if not yet extracted
for file in os.listdir(data_root):
    if file.endswith('.zip'):
        zip_path = os.path.join(data_root, file)
        print(f"Extracting {zip_path} ...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(data_root)
        print(f"Extracted {file}")

# Set base directory for train images
base_dir = os.path.join(data_root, 'train')

# Output directories - change this to your project folder or desired output location
project_root = r'D:\depi\project'
processed_dir = os.path.join(project_root, 'processed_faces')
train_dir = os.path.join(project_root, 'data_split', 'train')
val_dir = os.path.join(project_root, 'data_split', 'val')
val_split = 0.2

# Check if base_dir is correct and list some files
print("base_dir exists?", os.path.exists(base_dir))
print("Sample of images in base_dir:")
for root, dirs, files in os.walk(base_dir):
    print("Root:", root)
    print("Files:", files[:5])
    break

# Set device and MTCNN face detector
device = 'cuda' if torch.cuda.is_available() else 'cpu'
mtcnn = MTCNN(image_size=160, margin=10, min_face_size=10, device=device)

# Define augmentation transforms
augment = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
])

# Prepare output folders
shutil.rmtree(processed_dir, ignore_errors=True)
os.makedirs(processed_dir, exist_ok=True)

# Step 1: Detect faces, crop and save
def save_faces(data_dir, save_dir):
    image_paths = list(Path(data_dir).rglob('*.jpg'))
    print("Number of input images found:", len(image_paths))
    for img_path in tqdm(image_paths, desc="Processing images"):
        person_name = img_path.parent.name
        save_person_dir = Path(save_dir) / person_name
        save_person_dir.mkdir(parents=True, exist_ok=True)
        try:
            img = Image.open(img_path).convert('RGB')
            face = mtcnn(img)
            if face is not None:
                face_img = transforms.ToPILImage()(face)
                dst_path = save_person_dir / img_path.name
                face_img.save(dst_path)
            else:
                print(f"No face detected in {img_path}")
        except Exception as e:
            print(f"Error processing {img_path}: {e}")
            continue

save_faces(base_dir, processed_dir)

num_processed = sum(1 for _ in Path(processed_dir).rglob('*.jpg'))
print(f"Number of processed face images: {num_processed}")

# Step 2: Augment faces
def augment_faces(src_dir, num_aug=2):
    for person_dir in Path(src_dir).iterdir():
        for img_path in person_dir.glob('*.jpg'):
            img = Image.open(img_path)
            for i in range(num_aug):
                img_aug = augment(img)
                save_aug_dir = Path(src_dir) / person_dir.name
                save_aug_dir.mkdir(parents=True, exist_ok=True)
                img_aug.save(save_aug_dir / f"{img_path.stem}_aug{i}{img_path.suffix}")

augment_faces(processed_dir)

# Step 3: Remove duplicate images
def remove_duplicates(folder):
    seen_hashes = set()
    num_removed = 0
    for person_dir in Path(folder).iterdir():
        if not person_dir.is_dir():
            continue
        for img_path in list(person_dir.glob('*.jpg')):
            with open(img_path, 'rb') as f:
                filehash = hashlib.md5(f.read()).hexdigest()
            if filehash in seen_hashes:
                img_path.unlink()
                num_removed += 1
            else:
                seen_hashes.add(filehash)
    print(f"Removed {num_removed} duplicate images.")

remove_duplicates(processed_dir)

# Step 4: Split into train/val
def split_train_val(src_dir, train_dir, val_dir, val_split=0.2):
    for person_dir in Path(src_dir).iterdir():
        img_paths = list(person_dir.glob('*.jpg'))
        if len(img_paths) < 2:
            continue
        train_imgs, val_imgs = train_test_split(img_paths, test_size=val_split, random_state=42)
        for t in train_imgs:
            out_dir = Path(train_dir) / person_dir.name
            out_dir.mkdir(parents=True, exist_ok=True)
            shutil.copy(t, out_dir / t.name)
        for v in val_imgs:
            out_dir = Path(val_dir) / person_dir.name
            out_dir.mkdir(parents=True, exist_ok=True)
            shutil.copy(v, out_dir / v.name)

split_train_val(processed_dir, train_dir, val_dir, val_split)

print("Clean and structured dataset ready for training.")
print(f"Train images in: {train_dir}")
print(f"Validation images in: {val_dir}")
