In [None]:
# =============================================================================
# 0. RECONFIGURATION SPARK POUR LA PCA
# =============================================================================

from pyspark.sql import SparkSession

# Stopper la session actuelle si elle existe
try:
    spark.stop()
    print("✓ Session Spark existante arrêtée.")
except:
    print("ℹ️ Aucune session Spark à arrêter.")

spark = SparkSession.builder \
    .appName("P9-ImageProcessing-PCA") \
    .config("spark.driver.memory", "30g") \
    .config("spark.driver.maxResultSize", "20g") \
    .config("spark.executor.memory", "20g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print("\n✓ Configuration Spark optimisée pour la PCA :")
print(f"  - Driver Memory      : {spark.conf.get('spark.driver.memory')}")
print(f"  - Max Result Size    : {spark.conf.get('spark.driver.maxResultSize')}")
print(f"  - Executor Memory    : {spark.conf.get('spark.executor.memory')}")
print(f"  - Shuffle Partitions : {spark.conf.get('spark.sql.shuffle.partitions')}")

In [None]:
# =============================================================================
# 0.5 ALERTE COÛTS ET INITIALISATION
# =============================================================================

# ALERTE COÛTS : Instance EMR active
# Coût estimé : ~2.70€/heure pour ce cluster (3x i3.2xlarge).
import datetime
print(f"Session démarrée à : {datetime.datetime.now()}")
print("N'oubliez pas d'arrêter le cluster!")

In [None]:
# =============================================================================
# 1. SETUP ET CONFIGURATION DE L'ENVIRONNEMENT
# =============================================================================

# Affichage des informations sur la session Spark
print("\nInformations sur la session Spark :")
spark

try:
    region = spark.conf.get('spark.hadoop.fs.s3a.endpoint.region')
    if region not in ['eu-west-1', 'eu-west-2', 'eu-west-3', 'eu-central-1', 'eu-north-1']:
        raise ValueError(f"❌ ERREUR RGPD : Région {region} hors UE !")
    print(f"\n✓ Région S3 conforme RGPD : {region}")
except Exception as e:
    print(f"\n⚠️ Vérification RGPD échouée : {e}")

# Calcul du nombre de partitions idéal
num_partitions = spark.sparkContext.defaultParallelism * 2
print(f"✓ Nombre de coeurs disponibles : {spark.sparkContext.defaultParallelism}")
print(f"✓ Nombre de partitions utilisé : {num_partitions} (justifié par le nombre de coeurs)")

In [None]:
# =============================================================================
# 2. IMPORT DES LIBRAIRIES
# =============================================================================

import pandas as pd
import numpy as np
import io
import tensorflow as tf
from PIL import Image

from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

from pyspark.sql.functions import col, pandas_udf, element_at, split
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT

print("\n✓ Toutes les librairies ont été importées.")

In [None]:
# =============================================================================
# 3. CHARGEMENT ET PRÉPARATION DES DONNÉES
# =============================================================================

PATH_Data = "s3a://projet9-aws/Test_Sample"
#PATH_Data = "s3a://projet9-tmoahs/Training"
PATH_Result = "s3a://projet9-aws/Test_Results"

print(f"\nChargement des images depuis : {PATH_Data}")
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

images = images.repartition(num_partitions)

print(f"✓ {images.count()} images chargées et réparties.")

In [None]:
# =============================================================================
# 4. EXTRACTION DE FEATURES AVEC MOBILENETV2 (VERSION CORRIGÉE)
# =============================================================================

from pyspark.sql.types import ArrayType, FloatType

model = MobileNetV2(weights="imagenet", include_top=False, input_shape=(224, 224, 3))
model_intermediaire = Model(inputs=model.input, outputs=model.layers[-2].output)
broadcast_weights = spark.sparkContext.broadcast(model_intermediaire.get_weights())
print("\n✓ Poids du modèle broadcastés.")

# Retourner un ArrayType au lieu de VectorUDT
def featurize_series(model, content_series: pd.Series) -> pd.Series:
    def featurize_udf(content):
        local_model = MobileNetV2(weights="imagenet", include_top=False, input_shape=(224, 224, 3))
        intermediate_model = Model(inputs=local_model.input, outputs=local_model.layers[-2].output)
        intermediate_model.set_weights(broadcast_weights.value)

        img = Image.open(io.BytesIO(content)).resize((224, 224))
        img_array = img_to_array(img)
        expanded_img = np.expand_dims(img_array, axis=0)
        preprocessed_img = preprocess_input(expanded_img)

        features = intermediate_model.predict(preprocessed_img)
        flattened_features = features.flatten()

        # Retourner une liste Python, pas un Vector
        return flattened_features.tolist()

    return content_series.apply(featurize_udf)

# Déclarer le returnType comme ArrayType(FloatType())
featurize_udf = pandas_udf(
    lambda content_series: featurize_series(model, content_series),
    returnType=ArrayType(FloatType())
)

print("Début de l'extraction des features...")
features_df = images.select(
    col("path"),
    element_at(split(col("path"), "/"), -2).alias("label"),
    featurize_udf(col("content")).alias("features")
)
print("✓ Extraction terminée.")

In [None]:
# =============================================================================
# 5. RÉDUCTION DE DIMENSION (PCA) - VERSION CORRIGÉE
# =============================================================================

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# ÉTAPE 1 : Convertir les listes en Vectors
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df_vectors = features_df.withColumn("featuresVector", list_to_vector_udf(col("features")))

# ÉTAPE 2 : Persister pour optimiser
features_df_vectors = features_df_vectors.persist()
print("\n✓ DataFrame de features mis en cache.")

# Vérification
print("✓ Vérification du schéma :")
features_df_vectors.select("featuresVector").printSchema()

# ÉTAPE 3 : Appliquer la PCA sur la colonne Vector
print("\nDébut de l'entraînement de la PCA (k=100)...")
pca = PCA(k=100, inputCol="featuresVector", outputCol="pcaFeatures")
model_pca = pca.fit(features_df_vectors)
df_pca = model_pca.transform(features_df_vectors)
print("✓ PCA terminée.")

# Aperçu du résultat
print(f"\n✓ Nombre d'images traitées : {df_pca.count()}")
df_pca.select("path", "label", "pcaFeatures").show(3, truncate=True)

In [None]:
# =============================================================================
# 6. SAUVEGARDE DES RÉSULTATS
# =============================================================================

print(f"\nSauvegarde des résultats dans : {PATH_Result}")
df_pca.write.mode("overwrite").parquet(PATH_Result)
print("✓ Sauvegarde terminée.")

In [None]:
# =============================================================================
# 7. VALIDATION DES RÉSULTATS
# =============================================================================

print("\n" + "=" * 80)
print("VALIDATION DES DONNÉES TRAITÉES")
print("=" * 80)

# 7.1 Vérification du nombre total d'images
print(f"\n1. Nombre d'images:")
initial_count = images.count()
final_count = df_pca.count()
print(f"   - Images chargées initialement : {initial_count}")
print(f"   - Images après traitement PCA : {final_count}")
if initial_count == final_count:
    print("   ✓ Aucune perte de données")
else:
    print(f"   ✗ ATTENTION : {initial_count - final_count} images perdues !")

# 7.2 Vérification des dimensions
print(f"\n2. Dimensions des features:")
sample_features = df_pca.select("featuresVector", "pcaFeatures").first()
original_dim = len(sample_features['featuresVector'])
pca_dim = len(sample_features['pcaFeatures'])
print(f"   - Dimension originale (MobileNetV2) : {original_dim}")
print(f"   - Dimension après PCA : {pca_dim}")
print(f"   - Réduction : {((1 - pca_dim/original_dim) * 100):.1f}%")

# 7.3 Vérification de l'intégrité (pas de NaN/Null)
print(f"\n3. Intégrité des données:")
null_features = df_pca.filter(col("featuresVector").isNull()).count()
null_pca = df_pca.filter(col("pcaFeatures").isNull()).count()
print(f"   - Features NULL : {null_features}")
print(f"   - PCA Features NULL : {null_pca}")
if null_features == 0 and null_pca == 0:
    print("   ✓ Aucune valeur manquante")
else:
    print("   ✗ ATTENTION : Valeurs manquantes détectées !")

# 7.4 Variance expliquée par la PCA
print(f"\n4. Variance expliquée par la PCA:")
explained_variance = model_pca.explainedVariance
total_variance = sum(explained_variance)
print(f"   ✓ Variance totale expliquée par {pca_dim} composantes : {total_variance*100:.2f}%")

print("\n" + "=" * 80)
print("VALIDATION TERMINÉE")
print("=" * 80 + "\n")

In [None]:
# =============================================================================
# 8. RETOUR CRITIQUE
# =============================================================================
"""
## RETOUR CRITIQUE SUR L'ARCHITECTURE BIG DATA

### Avantages
- **Scalabilité Horizontale** : L'architecture EMR a prouvé sa capacité à traiter un volume de données ingérable sur une seule machine, simplement en ajoutant des nœuds.
- **Performance** : En utilisant des instances optimisées pour le stockage (`i3`) et en parallélisant les calculs avec Spark, nous avons atteint des temps de traitement performants pour un coût maîtrisé.
- **Écosystème Intégré** : L'intégration native d'EMR avec S3 simplifie grandement le pipeline de données, du stockage brut au stockage des résultats.

### Inconvénients
- **Coût de Démarrage ("Cold Start")** : Le temps de provisionnement du cluster (~10-15 min) représente un coût fixe et un délai incompressibles, rendant la solution peu rentable pour des traitements très courts ou des volumes de données faibles (< 50 Go).
- **Complexité Opérationnelle** : La configuration, l'optimisation (gestion de la mémoire, des partitions) et le débogage d'un cluster EMR demandent des compétences spécifiques, comme l'a démontré ce projet.

### Recommandations
- **Utilisation Ciblée** : Réserver l'usage d'EMR pour les traitements dépassant un seuil de rentabilité (estimé à > 50-100 Go de données).
- **Automatisation** : Pour une utilisation en production, automatiser le cycle de vie du cluster (création, exécution, arrêt) via des services comme AWS Step Functions ou Lambda pour fiabiliser le processus et maîtriser les coûts.
- **Spot Instances** : Utiliser systématiquement des instances Spot pour les nœuds de travail afin de réduire les coûts d'EC2 jusqu'à 70%.
"""

In [None]:
# =============================================================================
# 9. RECOMMANDATIONS POUR LA PRODUCTION
# =============================================================================
print("\n" + "="*80)
print("RECOMMANDATIONS POUR LE PASSAGE EN PRODUCTION")
print("="*80)
print("""
1. INFRASTRUCTURE
   - Utiliser EMR avec Auto Scaling (min 2, max 10 nodes) pour s'adapter à la charge.
   - Privilégier les Spot instances pour réduire les coûts de 70%.
   - Configurer un timeout automatique si inactivité > 1h.

2. MONITORING
   - Activer CloudWatch pour suivre les métriques Spark et l'utilisation des ressources.
   - Mettre en place des alertes si la durée du traitement dépasse un seuil.

3. DONNÉES
   - Partitionner les données sur S3 par date (ex: `year=2025/month=10/`) pour éviter de scanner tout le dataset.
   - Utiliser une politique de cycle de vie S3 pour archiver les vieilles données.

4. SÉCURITÉ & RGPD
   - Activer le chiffrement S3 au repos (SSE-S3).
   - Lancer le cluster dans un VPC privé sans accès public direct.
   - Activer les logs d'audit avec AWS CloudTrail.
""")

In [None]:
# =============================================================================
# 10. NETTOYAGE FINAL
# =============================================================================

# Libération des ressources mises en cache
features_df_vectors.unpersist()
broadcast_weights.unpersist()
print("\n✓ Cache mémoire libéré.")

# Arrêt de la session Spark
spark.stop()
print("✓ Session Spark terminée. Vous pouvez maintenant arrêter le cluster EMR.")