In [None]:
import kagglehub
import numpy as np
import cv2
import dlib
from pathlib import Path
import urllib.request
import bz2
from collections import defaultdict
import matplotlib.pyplot as plt
from itertools import combinations
from tqdm import tqdm
import json
import time

# Telecharger dataset
path = kagglehub.dataset_download("sidharthangn/celebrity-face-dataset-augmented")
print("Dataset:", path)

# Config
SIZE = 128
ALPHA = 0.5
NUM_VARIATIONS = 30  # 30 images par paire
MIN_IMAGES_PER_PERSON = 6  # Minimum 6 images par personne

# Dossiers
OUTPUT_DIR = Path("./morphed_database")
OUTPUT_DIR.mkdir(exist_ok=True)
LOCAL_DATA_DIR = Path("./dlib_models")
LOCAL_DATA_DIR.mkdir(exist_ok=True)
PREDICTOR_PATH = LOCAL_DATA_DIR / "shape_predictor_68_face_landmarks.dat"

# Dlib
if not PREDICTOR_PATH.exists():
    print("Telechargement Dlib...")
    url = "http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2"
    compressed = LOCAL_DATA_DIR / "temp.bz2"
    urllib.request.urlretrieve(url, compressed)
    with bz2.BZ2File(compressed, 'rb') as f_in:
        with open(PREDICTOR_PATH, 'wb') as f_out:
            f_out.write(f_in.read())
    compressed.unlink()

detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(str(PREDICTOR_PATH))
print("[OK] Dlib charge")

In [None]:
persons = defaultdict(list)
for person_dir in Path(path).rglob("*"):
    if person_dir.is_dir():
        images = list(person_dir.glob("*.jpg")) + list(person_dir.glob("*.png"))
        if len(images) >= MIN_IMAGES_PER_PERSON:
            persons[person_dir.name] = [str(img) for img in images]

print(f"\n[OK] {len(persons)} personnes avec {MIN_IMAGES_PER_PERSON}+ images:")
for i, (name, imgs) in enumerate(persons.items()):
    print(f"   {i+1}. {name}: {len(imgs)} images")

# Calculer les stats
n_persons = len(persons)
n_pairs = n_persons * (n_persons - 1) // 2
n_total_images = n_pairs * NUM_VARIATIONS

print(f"\n{'='*60}")
print(f"CONFIGURATION:")
print(f"   - Personnes: {n_persons}")
print(f"   - Paires possibles: {n_pairs}")
print(f"   - Images par paire: {NUM_VARIATIONS}")
print(f"   - TOTAL IMAGES A GENERER: {n_total_images}")
print(f"   - Alpha: {ALPHA} (50%)")
print(f"   - Dossier sortie: {OUTPUT_DIR}")
print(f"{'='*60}")

In [None]:
def get_landmarks(img_gray, detector, predictor, upsample_times=0):
    dets = detector(img_gray, upsample_times)
    if len(dets) == 0:
        return None
    shape = predictor(img_gray, dets[0])
    pts = np.zeros((68, 2), dtype=np.int32)
    for i in range(68):
        pts[i] = (shape.part(i).x, shape.part(i).y)
    return pts

def add_corner_points(points, w, h):
    corners = np.array([
        [0, 0], [w - 1, 0], [w - 1, h - 1], [0, h - 1],
        [w // 2, 0], [w - 1, h // 2], [w // 2, h - 1], [0, h // 2]
    ], dtype=np.int32)
    return np.concatenate([points, corners], axis=0)

def clamp_points(points, w, h):
    pts = np.array(points, dtype=np.float32)
    pts[:, 0] = np.clip(pts[:, 0], 0, w - 1)
    pts[:, 1] = np.clip(pts[:, 1], 0, h - 1)
    return pts

def find_point_index(points, pt, tol=3.0):
    pts = np.asarray(points, dtype=np.float32)
    dists = np.linalg.norm(pts - np.asarray(pt, dtype=np.float32), axis=1)
    idx = int(np.argmin(dists))
    if dists[idx] <= tol:
        return idx
    return None

def triangle_completely_inside(t, w, h):
    for (x, y) in t:
        if x < 0 or x >= w or y < 0 or y >= h:
            return False
    return True

def apply_affine_transform(src, src_tri, dst_tri, size):
    warp_mat = cv2.getAffineTransform(np.float32(src_tri), np.float32(dst_tri))
    dst = cv2.warpAffine(src, warp_mat, (int(size[0]), int(size[1])),
                         None, flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
    return dst

def morph_triangle(img1, img2, img_morphed, t1, t2, t_morphed, alpha):
    r1 = cv2.boundingRect(np.float32([t1]))
    r2 = cv2.boundingRect(np.float32([t2]))
    r = cv2.boundingRect(np.float32([t_morphed]))

    if r1[2] <= 0 or r1[3] <= 0 or r2[2] <= 0 or r2[3] <= 0 or r[2] <= 0 or r[3] <= 0:
        return

    t1_rect = [(t1[i][0] - r1[0], t1[i][1] - r1[1]) for i in range(3)]
    t2_rect = [(t2[i][0] - r2[0], t2[i][1] - r2[1]) for i in range(3)]
    t_rect = [(t_morphed[i][0] - r[0], t_morphed[i][1] - r[1]) for i in range(3)]

    img1_rect = img1[r1[1]:r1[1]+r1[3], r1[0]:r1[0]+r1[2]]
    img2_rect = img2[r2[1]:r2[1]+r2[3], r2[0]:r2[0]+r2[2]]

    if img1_rect.size == 0 or img2_rect.size == 0:
        return

    size_rect = (r[2], r[3])

    warp_img1 = apply_affine_transform(img1_rect, t1_rect, t_rect, size_rect)
    warp_img2 = apply_affine_transform(img2_rect, t2_rect, t_rect, size_rect)

    img_rect = (1.0 - alpha) * warp_img1 + alpha * warp_img2

    mask = np.zeros((r[3], r[2]), dtype=np.float32)
    cv2.fillConvexPoly(mask, np.int32(t_rect), 1.0, 16, 0)

    y, x, w_rect, h_rect = r[1], r[0], r[2], r[3]
    img_morphed[y:y+h_rect, x:x+w_rect] = img_morphed[y:y+h_rect, x:x+w_rect] * (1 - mask[:, :, None]) + img_rect * mask[:, :, None]

def prepare_points_for_image(img_gray, w, h):
    pts = get_landmarks(img_gray, detector, predictor, upsample_times=0)
    if pts is None:
        grid_x = np.tile(np.linspace(w*0.25, w*0.75, 17), (4,))
        grid_y = np.repeat(np.linspace(h*0.25, h*0.75, 4), 17)
        grid = np.vstack([grid_x[:68], grid_y[:68]]).T.astype(np.int32)
        pts = grid
    pts = clamp_points(pts, w, h)
    pts = add_corner_points(pts.astype(np.int32), w, h)
    return pts.astype(np.float32)

In [None]:
def morph_faces(img_path_a, img_path_b, alpha=0.5):
    imgA = cv2.imread(str(img_path_a))
    imgB = cv2.imread(str(img_path_b))
    
    if imgA is None or imgB is None:
        return None
    
    imgA_resized = cv2.resize(imgA, (SIZE, SIZE), interpolation=cv2.INTER_CUBIC)
    imgB_resized = cv2.resize(imgB, (SIZE, SIZE), interpolation=cv2.INTER_CUBIC)
    
    imgA_gray = cv2.cvtColor(imgA_resized, cv2.COLOR_BGR2GRAY)
    imgB_gray = cv2.cvtColor(imgB_resized, cv2.COLOR_BGR2GRAY)
    
    imgA_color = imgA_resized.astype(np.float32)
    imgB_color = imgB_resized.astype(np.float32)

    ptsA = prepare_points_for_image(imgA_gray, SIZE, SIZE)
    ptsB = prepare_points_for_image(imgB_gray, SIZE, SIZE)

    points_morphed = (1.0 - alpha) * ptsA + alpha * ptsB
    points_morphed = clamp_points(points_morphed, SIZE, SIZE)

    rect = (0, 0, SIZE, SIZE)
    subdiv = cv2.Subdiv2D(rect)

    for p in points_morphed:
        x, y = float(p[0]), float(p[1])
        if 0 <= x < SIZE and 0 <= y < SIZE:
            try:
                subdiv.insert((x, y))
            except:
                pass

    triangle_list = subdiv.getTriangleList()

    tri_indices = []
    for t in triangle_list:
        tri_pts = [(t[0], t[1]), (t[2], t[3]), (t[4], t[5])]
        inds = []
        valid = True
        for p in tri_pts:
            idx = find_point_index(points_morphed, p, tol=5.0)
            if idx is None:
                valid = False
                break
            inds.append(idx)
        if valid and len(set(inds)) == 3:
            tri_indices.append(tuple(inds))

    tri_indices = list(set(tri_indices))

    img_morphed = np.zeros_like(imgA_color, dtype=np.float32)

    for tri in tri_indices:
        i1, i2, i3 = tri
        tA = [ptsA[i1], ptsA[i2], ptsA[i3]]
        tB = [ptsB[i1], ptsB[i2], ptsB[i3]]
        tM = [points_morphed[i1], points_morphed[i2], points_morphed[i3]]

        if not (triangle_completely_inside(tA, SIZE, SIZE) and 
                triangle_completely_inside(tB, SIZE, SIZE) and 
                triangle_completely_inside(tM, SIZE, SIZE)):
            continue

        morph_triangle(imgA_color, imgB_color, img_morphed, tA, tB, tM, alpha)

    morph_result = np.clip(img_morphed, 0, 255).astype(np.uint8)
    
    # Verifier que l'image n'est pas noire
    if np.mean(morph_result) < 10:
        return None
    
    return morph_result

In [None]:
def sanitize_name(name):
    """Nettoie le nom pour un nom de fichier valide"""
    # Remplacer les espaces et caracteres speciaux
    clean = "".join(c if c.isalnum() else '_' for c in str(name))
    return clean[:20]  # Limiter a 20 caracteres

In [None]:
def generate_full_database():
    """
    Genere la base de donnees complete:
    - 30 images morphees par paire d'identites
    - Format: A_B_N.png (A et B = identites, N = 1 a 30)
    """
    
    person_list = list(persons.keys())
    all_pairs = list(combinations(person_list, 2))
    
    print(f"\n{'='*60}")
    print("GENERATION DE LA BASE DE DONNEES MORPHEE")
    print(f"{'='*60}")
    print(f"   - Paires a traiter: {len(all_pairs)}")
    print(f"   - Images par paire: {NUM_VARIATIONS}")
    print(f"   - Total: {len(all_pairs) * NUM_VARIATIONS} images")
    print(f"{'='*60}\n")
    
    # Stats
    stats = {
        "total_pairs": len(all_pairs),
        "images_per_pair": NUM_VARIATIONS,
        "alpha": ALPHA,
        "size": SIZE,
        "successful": 0,
        "failed": 0,
        "start_time": time.strftime("%Y-%m-%d %H:%M:%S")
    }
    
    start_time = time.time()
    
    # Pour afficher des exemples
    display_samples = []
    
    # Generer les morphings
    for pair_idx, (nameA, nameB) in enumerate(tqdm(all_pairs, desc="Paires")):
        
        clean_nameA = sanitize_name(nameA)
        clean_nameB = sanitize_name(nameB)
        
        imgs_a = persons[nameA]
        imgs_b = persons[nameB]
        
        # Generer 30 variations
        for n in range(1, NUM_VARIATIONS + 1):
            try:
                # Selectionner aleatoirement une image de chaque personne
                img_a = np.random.choice(imgs_a)
                img_b = np.random.choice(imgs_b)
                
                # Generer le morphing
                morph = morph_faces(img_a, img_b, alpha=ALPHA)
                
                if morph is not None:
                    # Nom du fichier: A_B_N.png
                    filename = f"{clean_nameA}_{clean_nameB}_{n}.png"
                    filepath = OUTPUT_DIR / filename
                    
                    cv2.imwrite(str(filepath), morph)
                    stats["successful"] += 1
                    
                    # Garder quelques exemples pour affichage
                    if len(display_samples) < 10 and n == 1:
                        imgA = cv2.imread(str(img_a))
                        imgB = cv2.imread(str(img_b))
                        imgA = cv2.resize(imgA, (SIZE, SIZE))
                        imgB = cv2.resize(imgB, (SIZE, SIZE))
                        display_samples.append((imgA, morph, imgB, nameA, nameB))
                else:
                    stats["failed"] += 1
                    
            except Exception as e:
                stats["failed"] += 1
    
    # Finaliser les stats
    elapsed_time = time.time() - start_time
    stats["end_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
    stats["elapsed_seconds"] = elapsed_time
    stats["elapsed_minutes"] = elapsed_time / 60
    
    # Sauvegarder les stats
    stats_file = OUTPUT_DIR / "dataset_stats.json"
    with open(stats_file, 'w') as f:
        json.dump(stats, f, indent=2)
    
    # Afficher le resume
    success_rate = stats["successful"] / max(1, stats["successful"] + stats["failed"]) * 100
    
    print(f"\n{'='*60}")
    print("GENERATION TERMINEE")
    print(f"{'='*60}")
    print(f"   - Paires traitees: {len(all_pairs)}")
    print(f"   - Images generees: {stats['successful']}")
    print(f"   - Echecs: {stats['failed']}")
    print(f"   - Taux de reussite: {success_rate:.1f}%")
    print(f"   - Temps total: {elapsed_time/60:.1f} minutes")
    print(f"   - Vitesse: {stats['successful']/max(1,elapsed_time):.1f} images/seconde")
    print(f"   - Dossier: {OUTPUT_DIR}")
    print(f"   - Stats: {stats_file}")
    print(f"\nFORMAT: A_B_N.png")
    print(f"   A = Identite 1, B = Identite 2, N = 1 a 30")
    print(f"{'='*60}")
    
    return display_samples, stats

In [None]:
# Lancer la generation
samples, stats = generate_full_database()

print(f"\n[OK] Base de donnees generee!")
print(f"[OK] {stats['successful']} images dans {OUTPUT_DIR}")

In [None]:
# Afficher les exemples
if samples:
    n = len(samples)
    fig, axes = plt.subplots(n, 3, figsize=(12, 4*n))
    
    if n == 1:
        axes = [axes]
    
    for i, (imgA, morph, imgB, nameA, nameB) in enumerate(samples):
        axes[i][0].imshow(cv2.cvtColor(imgA, cv2.COLOR_BGR2RGB))
        axes[i][0].set_title(f"A: {nameA[:15]}")
        axes[i][0].axis('off')
        
        axes[i][1].imshow(cv2.cvtColor(morph, cv2.COLOR_BGR2RGB))
        axes[i][1].set_title("MORPH (50%)")
        axes[i][1].axis('off')
        
        axes[i][2].imshow(cv2.cvtColor(imgB, cv2.COLOR_BGR2RGB))
        axes[i][2].set_title(f"B: {nameB[:15]}")
        axes[i][2].axis('off')
    
    plt.tight_layout()
    plt.savefig(OUTPUT_DIR / "samples_preview.png", dpi=150)
    plt.show()
    print(f"[OK] Apercu sauvegarde: {OUTPUT_DIR}/samples_preview.png")

In [None]:
import numpy as np
import cv2
import os
from pathlib import Path
from tqdm import tqdm
import matplotlib.pyplot as plt
from collections import defaultdict
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import json

# TensorFlow/Keras
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU disponible: {tf.config.list_physical_devices('GPU')}")

In [None]:
# Dossiers
MORPHED_DIR = Path("./morphed_database")  # Dossier des images morphees
MODEL_DIR = Path("./models")
MODEL_DIR.mkdir(exist_ok=True)

# Parametres du modele
IMG_SIZE = 128
BATCH_SIZE = 32
EPOCHS = 50
LEARNING_RATE = 0.001

print(f"{'='*60}")
print("CONFIGURATION")
print(f"{'='*60}")
print(f"   - Dossier images: {MORPHED_DIR}")
print(f"   - Taille images: {IMG_SIZE}x{IMG_SIZE}")
print(f"   - Batch size: {BATCH_SIZE}")
print(f"   - Epochs: {EPOCHS}")
print(f"   - Learning rate: {LEARNING_RATE}")
print(f"{'='*60}")

In [None]:
def load_morphed_dataset():
    """
    Charge les images morphees
    Format fichier: A_B_N.png -> classe = A_B (identite fictive)
    """
    print("\n[INFO] Chargement des images morphees...")
    
    images = []
    labels = []
    label_to_idx = {}
    idx_to_label = {}
    
    # Lister tous les fichiers PNG
    all_files = list(MORPHED_DIR.glob("*.png"))
    print(f"   {len(all_files)} fichiers trouves")
    
    # Grouper par identite fictive (A_B)
    identities = defaultdict(list)
    
    for filepath in all_files:
        # Format: A_B_N.png
        parts = filepath.stem.rsplit("_", 1)  # Separer le dernier underscore
        if len(parts) == 2:
            identity = parts[0]  # A_B
            identities[identity].append(str(filepath))
    
    print(f"   {len(identities)} identites fictives")
    
    # Filtrer les identites avec assez d'images
    min_images = 10
    valid_identities = {k: v for k, v in identities.items() if len(v) >= min_images}
    print(f"   {len(valid_identities)} identites avec {min_images}+ images")
    
    # Creer le mapping label -> index
    for idx, identity in enumerate(sorted(valid_identities.keys())):
        label_to_idx[identity] = idx
        idx_to_label[idx] = identity
    
    # Charger les images
    for identity, filepaths in tqdm(valid_identities.items(), desc="Chargement"):
        for filepath in filepaths:
            img = cv2.imread(filepath)
            if img is not None:
                img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                images.append(img)
                labels.append(label_to_idx[identity])
    
    images = np.array(images, dtype=np.float32) / 255.0  # Normaliser [0, 1]
    labels = np.array(labels)
    
    print(f"\n[OK] Dataset charge:")
    print(f"   - Images: {images.shape}")
    print(f"   - Classes: {len(label_to_idx)}")
    
    return images, labels, label_to_idx, idx_to_label

# Charger les donnees
X, y, label_to_idx, idx_to_label = load_morphed_dataset()
NUM_CLASSES = len(label_to_idx)
print(f"   - Nombre de classes: {NUM_CLASSES}")

In [None]:
# Split: 70% train, 15% val, 15% test
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

# Convertir labels en one-hot
y_train_cat = to_categorical(y_train, NUM_CLASSES)
y_val_cat = to_categorical(y_val, NUM_CLASSES)
y_test_cat = to_categorical(y_test, NUM_CLASSES)

print(f"\n[OK] Split des donnees:")
print(f"   - Train: {X_train.shape[0]} images")
print(f"   - Validation: {X_val.shape[0]} images")
print(f"   - Test: {X_test.shape[0]} images")

In [None]:
def create_mobilenet_model(num_classes, img_size=128):
    """
    Cree un modele MobileNetV2 pour la classification
    """
    # Charger MobileNetV2 pre-entraine (sans la tete)
    base_model = MobileNetV2(
        weights='imagenet',
        include_top=False,
        input_shape=(img_size, img_size, 3)
    )
    
    # Geler les couches de base (transfer learning)
    for layer in base_model.layers:
        layer.trainable = False
    
    # Ajouter notre tete de classification
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(512, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.3)(x)
    predictions = Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs=base_model.input, outputs=predictions)
    
    return model, base_model

# Creer le modele
model, base_model = create_mobilenet_model(NUM_CLASSES, IMG_SIZE)

# Compiler
model.compile(
    optimizer=Adam(learning_rate=LEARNING_RATE),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()
print(f"\n[OK] Modele MobileNetV2 cree avec {NUM_CLASSES} classes")

In [None]:
# Data augmentation pour l'entrainement
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True,
    zoom_range=0.2,
    brightness_range=[0.8, 1.2]
)

val_datagen = ImageDataGenerator()  # Pas d'augmentation pour validation

# Creer les generateurs
train_generator = train_datagen.flow(X_train, y_train_cat, batch_size=BATCH_SIZE)
val_generator = val_datagen.flow(X_val, y_val_cat, batch_size=BATCH_SIZE)

print("[OK] Data augmentation configuree")

In [None]:
# Callbacks
callbacks = [
    EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True,
        verbose=1
    ),
    ModelCheckpoint(
        filepath=str(MODEL_DIR / 'mobilenet_morphed_best.keras'),
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    ),
    ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1
    )
]

print("[OK] Callbacks configures")

In [None]:
print(f"\n{'='*60}")
print("PHASE 1: TRANSFER LEARNING (couches gelees)")
print(f"{'='*60}\n")

history1 = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // BATCH_SIZE,
    epochs=20,
    validation_data=val_generator,
    validation_steps=len(X_val) // BATCH_SIZE,
    callbacks=callbacks,
    verbose=1
)

print("\n[OK] Phase 1 terminee")

In [None]:
print(f"\n{'='*60}")
print("PHASE 2: FINE-TUNING (couches degelees)")
print(f"{'='*60}\n")

# Degeler les dernieres couches du modele de base
for layer in base_model.layers[-50:]:
    layer.trainable = True

# Recompiler avec un learning rate plus faible
model.compile(
    optimizer=Adam(learning_rate=LEARNING_RATE / 10),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

history2 = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // BATCH_SIZE,
    epochs=30,
    validation_data=val_generator,
    validation_steps=len(X_val) // BATCH_SIZE,
    callbacks=callbacks,
    verbose=1
)

print("\n[OK] Phase 2 terminee")

In [None]:
# Sauvegarder le modele final
model.save(MODEL_DIR / 'mobilenet_morphed_final.keras')

# Sauvegarder les mappings
mappings = {
    'label_to_idx': label_to_idx,
    'idx_to_label': idx_to_label
}
with open(MODEL_DIR / 'label_mappings.json', 'w') as f:
    json.dump(mappings, f, indent=2)

print(f"[OK] Modele sauvegarde: {MODEL_DIR / 'mobilenet_morphed_final.keras'}")
print(f"[OK] Mappings sauvegardes: {MODEL_DIR / 'label_mappings.json'}")

In [None]:
# Config
SIZE = 128
ALPHA = 0.5
NUM_VARIATIONS = 100  # Augmenté de 30 à 100
MIN_IMAGES_PER_PERSON = 6

In [None]:
def load_morphed_dataset():
    print("\n[INFO] Chargement des images morphees...")
    
    images = []
    labels = []
    label_to_idx = {}
    idx_to_label = {}
    
    all_files = list(MORPHED_DIR.glob("*.png"))
    print(f"   {len(all_files)} fichiers trouves")
    
    identities = defaultdict(list)
    
    for filepath in all_files:
        parts = filepath.stem.rsplit("_", 1)
        if len(parts) == 2:
            identity = parts[0]
            identities[identity].append(str(filepath))
    
    print(f"   {len(identities)} identites fictives")
    
    # AUGMENTER le minimum d'images par classe
    min_images = 50  # Augmenté de 10 à 50
    valid_identities = {k: v for k, v in identities.items() if len(v) >= min_images}
    
    # LIMITER le nombre de classes
    MAX_CLASSES = 20  # Limiter à 20 classes
    valid_identities = dict(list(sorted(valid_identities.items(), 
                                         key=lambda x: len(x[1]), 
                                         reverse=True))[:MAX_CLASSES])
    
    print(f"   {len(valid_identities)} identites selectionnees (top {MAX_CLASSES})")
    
    for idx, identity in enumerate(sorted(valid_identities.keys())):
        label_to_idx[identity] = idx
        idx_to_label[idx] = identity
    
    for identity, filepaths in tqdm(valid_identities.items(), desc="Chargement"):
        for filepath in filepaths:
            img = cv2.imread(filepath)
            if img is not None:
                img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                images.append(img)
                labels.append(label_to_idx[identity])
    
    images = np.array(images, dtype=np.float32) / 255.0
    labels = np.array(labels)
    
    print(f"\n[OK] Dataset charge:")
    print(f"   - Images: {images.shape}")
    print(f"   - Classes: {len(label_to_idx)}")
    print(f"   - Images/classe: ~{len(images)//len(label_to_idx)}")
    
    return images, labels, label_to_idx, idx_to_label

In [None]:
train_datagen = ImageDataGenerator(
    rotation_range=30,
    width_shift_range=0.3,
    height_shift_range=0.3,
    horizontal_flip=True,
    zoom_range=0.3,
    brightness_range=[0.6, 1.4],
    shear_range=0.2,
    fill_mode='nearest'
)

In [None]:
# Combiner les historiques
def combine_histories(h1, h2):
    combined = {}
    for key in h1.history.keys():
        combined[key] = h1.history[key] + h2.history[key]
    return combined

history = combine_histories(history1, history2)

# Plot
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Accuracy
axes[0].plot(history['accuracy'], label='Train')
axes[0].plot(history['val_accuracy'], label='Validation')
axes[0].set_title('Accuracy')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Accuracy')
axes[0].legend()
axes[0].grid(True)

# Loss
axes[1].plot(history['loss'], label='Train')
axes[1].plot(history['val_loss'], label='Validation')
axes[1].set_title('Loss')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Loss')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.savefig(MODEL_DIR / 'training_history.png', dpi=150)
plt.show()

print(f"[OK] Historique sauvegarde: {MODEL_DIR / 'training_history.png'}")

In [None]:
history1 = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // BATCH_SIZE,
    epochs=50,  # Augmenté de 20 à 50
    validation_data=val_generator,
    validation_steps=len(X_val) // BATCH_SIZE,
    callbacks=callbacks,
    verbose=1
)

In [None]:
print(f"\n{'='*60}")
print("PREPARATION POUR MEMBERSHIP INFERENCE ATTACK (MIA)")
print(f"{'='*60}\n")

# Pour MIA, on a besoin de:
# 1. Membres (train set) - le modele les a vus
# 2. Non-membres (test set) - le modele ne les a pas vus

# Obtenir les predictions de confiance
train_predictions = model.predict(X_train, verbose=0)
test_predictions = model.predict(X_test, verbose=0)

# Calculer les scores de confiance (max probability)
train_confidence = np.max(train_predictions, axis=1)
test_confidence = np.max(test_predictions, axis=1)

# Calculer si la prediction est correcte
train_correct = (np.argmax(train_predictions, axis=1) == y_train).astype(int)
test_correct = (np.argmax(test_predictions, axis=1) == y_test).astype(int)

print(f"[INFO] Statistiques de confiance:")
print(f"   Train - Confiance moyenne: {train_confidence.mean():.4f}")
print(f"   Test  - Confiance moyenne: {test_confidence.mean():.4f}")
print(f"   Train - Accuracy: {train_correct.mean()*100:.2f}%")
print(f"   Test  - Accuracy: {test_correct.mean()*100:.2f}%")

In [None]:
# Creer les features pour l'attaque MIA
def create_mia_features(predictions, labels, correct):
    """
    Cree les features pour l'attaque MIA:
    - Confiance max
    - Entropie
    - Confiance sur la vraie classe
    - Prediction correcte ou non
    """
    features = []
    
    for i in range(len(predictions)):
        pred = predictions[i]
        true_label = labels[i]
        
        # Confiance max
        max_conf = np.max(pred)
        
        # Entropie
        entropy = -np.sum(pred * np.log(pred + 1e-10))
        
        # Confiance sur la vraie classe
        true_conf = pred[true_label]
        
        # Difference entre top 1 et top 2
        sorted_pred = np.sort(pred)[::-1]
        margin = sorted_pred[0] - sorted_pred[1]
        
        features.append([max_conf, entropy, true_conf, margin, correct[i]])
    
    return np.array(features)

# Creer les features
train_mia_features = create_mia_features(train_predictions, y_train, train_correct)
test_mia_features = create_mia_features(test_predictions, y_test, test_correct)

# Labels MIA: 1 = membre (train), 0 = non-membre (test)
train_mia_labels = np.ones(len(train_mia_features))
test_mia_labels = np.zeros(len(test_mia_features))

# Combiner
X_mia = np.vstack([train_mia_features, test_mia_features])
y_mia = np.concatenate([train_mia_labels, test_mia_labels])

print(f"[OK] Dataset MIA cree:")
print(f"   - Total samples: {len(X_mia)}")
print(f"   - Membres (train): {int(train_mia_labels.sum())}")
print(f"   - Non-membres (test): {int(len(test_mia_labels) - test_mia_labels.sum())}")
print(f"   - Features: {X_mia.shape[1]}")

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, roc_auc_score, roc_curve

# Split MIA dataset
X_mia_train, X_mia_test, y_mia_train, y_mia_test = train_test_split(
    X_mia, y_mia, test_size=0.3, random_state=42, stratify=y_mia
)

print(f"\n{'='*60}")
print("ENTRAINEMENT DU MODELE D'ATTAQUE MIA")
print(f"{'='*60}\n")

# Modele 1: Random Forest
rf_attack = RandomForestClassifier(n_estimators=100, random_state=42)
rf_attack.fit(X_mia_train, y_mia_train)
rf_pred = rf_attack.predict(X_mia_test)
rf_proba = rf_attack.predict_proba(X_mia_test)[:, 1]

rf_accuracy = accuracy_score(y_mia_test, rf_pred)
rf_auc = roc_auc_score(y_mia_test, rf_proba)

print(f"[Random Forest]")
print(f"   - Accuracy: {rf_accuracy*100:.2f}%")
print(f"   - AUC: {rf_auc:.4f}")

# Modele 2: Logistic Regression
lr_attack = LogisticRegression(random_state=42)
lr_attack.fit(X_mia_train, y_mia_train)
lr_pred = lr_attack.predict(X_mia_test)
lr_proba = lr_attack.predict_proba(X_mia_test)[:, 1]

lr_accuracy = accuracy_score(y_mia_test, lr_pred)
lr_auc = roc_auc_score(y_mia_test, lr_proba)

print(f"\n[Logistic Regression]")
print(f"   - Accuracy: {lr_accuracy*100:.2f}%")
print(f"   - AUC: {lr_auc:.4f}")

# Baseline: attaque par seuil de confiance
threshold = 0.5
threshold_pred = (X_mia_test[:, 0] > threshold).astype(int)  # Confiance max
threshold_accuracy = accuracy_score(y_mia_test, threshold_pred)

print(f"\n[Baseline - Seuil de confiance]")
print(f"   - Accuracy: {threshold_accuracy*100:.2f}%")

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# 1. Distribution des confiances
axes[0].hist(train_confidence, bins=50, alpha=0.7, label='Membres (Train)', color='blue')
axes[0].hist(test_confidence, bins=50, alpha=0.7, label='Non-membres (Test)', color='red')
axes[0].set_xlabel('Confiance')
axes[0].set_ylabel('Frequence')
axes[0].set_title('Distribution des Confiances')
axes[0].legend()
axes[0].grid(True)

# 2. Courbe ROC
fpr_rf, tpr_rf, _ = roc_curve(y_mia_test, rf_proba)
fpr_lr, tpr_lr, _ = roc_curve(y_mia_test, lr_proba)

axes[1].plot(fpr_rf, tpr_rf, label=f'Random Forest (AUC={rf_auc:.3f})', color='blue')
axes[1].plot(fpr_lr, tpr_lr, label=f'Logistic Regression (AUC={lr_auc:.3f})', color='green')
axes[1].plot([0, 1], [0, 1], 'k--', label='Random (AUC=0.5)')
axes[1].set_xlabel('False Positive Rate')
axes[1].set_ylabel('True Positive Rate')
axes[1].set_title('Courbe ROC - MIA')
axes[1].legend()
axes[1].grid(True)

# 3. Importance des features (Random Forest)
feature_names = ['Max Conf', 'Entropy', 'True Conf', 'Margin', 'Correct']
importances = rf_attack.feature_importances_
axes[2].barh(feature_names, importances, color='steelblue')
axes[2].set_xlabel('Importance')
axes[2].set_title('Importance des Features (RF)')
axes[2].grid(True)

plt.tight_layout()
plt.savefig(MODEL_DIR / 'mia_results.png', dpi=150)
plt.show()

print(f"\n[OK] Resultats MIA sauvegardes: {MODEL_DIR / 'mia_results.png'}")

In [None]:
print(f"\n{'='*60}")
print("RESUME - MEMBERSHIP INFERENCE ATTACK")
print(f"{'='*60}")
print(f"""
OBJECTIF:
   Determiner si le modele a memorise les visages sources
   utilises pour creer les identites fictives (morphees).

RESULTATS:
   - Modele cible (MobileNetV2):
     * Train Accuracy: {train_correct.mean()*100:.2f}%
     * Test Accuracy: {test_correct.mean()*100:.2f}%
   
   - Attaque MIA (Random Forest):
     * Accuracy: {rf_accuracy*100:.2f}%
     * AUC: {rf_auc:.4f}
   
   - Attaque MIA (Logistic Regression):
     * Accuracy: {lr_accuracy*100:.2f}%
     * AUC: {lr_auc:.4f}

INTERPRETATION:
   - AUC = 0.5: Pas de fuite d'information (modele securise)
   - AUC > 0.5: Fuite d'information detectee
   - AUC > 0.7: Fuite significative (modele vulnerable)
   - AUC > 0.9: Fuite severe (modele tres vulnerable)

   Votre modele: AUC = {rf_auc:.4f}
""")

if rf_auc > 0.7:
    print("   [ALERTE] Le modele presente une vulnerabilite MIA significative!")
    print("   Les informations sur les visages sources peuvent etre inferees.")
elif rf_auc > 0.55:
    print("   [ATTENTION] Le modele presente une legere vulnerabilite MIA.")
else:
    print("   [OK] Le modele semble resistant a l'attaque MIA.")

print(f"{'='*60}")