### 4.10.1 Démarrage de la session Spark

In [1]:
import os
import subprocess

# ========== 1. CONFIGURATION JAVA ==========
JAVA_PATH = '/tmp/java17/jdk-17.0.17+10'

if os.path.exists(f"{JAVA_PATH}/bin/java"):
    print("✓ Java 17 déjà installé")
else:
    print("Installation de Java 17...")
    subprocess.run(['rm', '-rf', '/tmp/java17'], capture_output=True)
    import jdk
    JAVA_PATH = jdk.install('17', path='/tmp/java17')

os.environ['JAVA_HOME'] = JAVA_PATH
os.environ['PATH'] = f"{JAVA_PATH}/bin:{os.environ['PATH']}"
os.environ['SPARK_HOME'] = '/opt/conda/lib/python3.9/site-packages/pyspark'

# ========== 2. CRÉATION SPARKSESSION AVEC S3 ==========
from pyspark.sql import SparkSession

# Version Hadoop 3.4.0 compatible avec Spark 4.0.1
spark = SparkSession.builder \
    .appName('P8') \
    .master('local[*]') \
    .config("spark.driver.memory", "2g") \
    .config("spark.jars.packages", 
            "org.apache.hadoop:hadoop-aws:3.4.0,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.367") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
            "com.amazonaws.auth.InstanceProfileCredentialsProvider") \
    .config("spark.sql.parquet.writeLegacyFormat", "true") \
    .getOrCreate()

print(f"✓ Spark version: {spark.version}")

# ========== 3. DÉFINIR LES CHEMINS S3 ==========
PATH = 's3a://p11-nicop-data'
PATH_Data = PATH + '/Test'
PATH_Result = PATH + '/Results'
PATH_Result_PCA = PATH + '/Results_PCA'

print(f"✓ Chemins configurés")


✓ Java 17 déjà installé
✓ Spark version: 4.0.1
✓ Chemins configurés


In [5]:
spark

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

### 4.10.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 4.10.3 Import des librairies

In [8]:
%pip install pandas pillow tensorflow pyarrow numpy

Note: you may need to restart the kernel to use updated packages.


In [6]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

In [7]:
# Vérifier que spark est disponible
print(spark)
print(spark.version)

<pyspark.sql.session.SparkSession object at 0x7f66dafc0ac0>
4.0.1


In [9]:
print(pd.__version__)

2.3.3


### 4.10.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [10]:
PATH = 's3a://p11-nicop-data'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'

#NP : Ajout pour PCA 
PATH_Result_PCA = PATH+'/Results_PCA'

print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result+'\nPATH_Result_PCA: '+PATH_Result_PCA)

PATH:        s3a://p11-nicop-data
PATH_Data:   s3a://p11-nicop-data/Test
PATH_Result: s3a://p11-nicop-data/Results
PATH_Result_PCA: s3a://p11-nicop-data/Results_PCA


### 4.10.5 Traitement des données

#### 4.10.5.1 Chargement des données

In [12]:
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(PATH_Data)

images.show(5)

Py4JJavaError: An error occurred while calling o41.load.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3581)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3612)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:777)
	at scala.collection.immutable.List.map(List.scala:247)
	at scala.collection.immutable.List.map(List.scala:79)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:775)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:575)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80)
	at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$1(Dataset.scala:115)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:113)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:109)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:100)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:58)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3581)
		at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3612)
		at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
		at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
		at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
		at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
		at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
		at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:777)
		at scala.collection.immutable.List.map(List.scala:247)
		at scala.collection.immutable.List.map(List.scala:79)
		at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:775)
		at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:575)
		at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
		at scala.Option.getOrElse(Option.scala:201)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
		at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
		at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
		at scala.collection.immutable.List.foldLeft(List.scala:79)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
		at scala.collection.immutable.List.foreach(List.scala:334)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
		at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 21 more


In [12]:
images.show(5)

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3a://p11-nicop-d...|2026-01-21 09:57:47| 10412|[FF D8 FF E0 00 1...|
|s3a://p11-nicop-d...|2026-01-21 09:57:47| 10224|[FF D8 FF E0 00 1...|
|s3a://p11-nicop-d...|2026-01-21 09:57:47|  9931|[FF D8 FF E0 00 1...|
|s3a://p11-nicop-d...|2026-01-21 09:57:47|  9873|[FF D8 FF E0 00 1...|
|s3a://p11-nicop-d...|2026-01-21 09:57:47|  9813|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows


<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [13]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------------+-------------+
|path                                                  |label        |
+------------------------------------------------------+-------------+
|s3a://p11-nicop-data/Test/Blackberrie 2/r0_203_100.jpg|Blackberrie 2|
|s3a://p11-nicop-data/Test/Blackberrie 2/r0_207_100.jpg|Blackberrie 2|
|s3a://p11-nicop-data/Test/Blackberrie 2/r0_199_100.jpg|Blackberrie 2|
|s3a://p11-nicop-data/Test/Blackberrie 2/r0_195_100.jpg|Blackberrie 2|
|s3a://p11-nicop-data/Test/Blackberrie 2/r0_191_100.jpg|Blackberrie 2|
+------------------------------------------------------+-------------+
only showing top 5 rows
None


#### 4.10.5.2 Préparation du modèle

In [14]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [15]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [16]:
sc = spark.sparkContext

In [17]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [18]:
new_model.summary()

In [19]:
def model_fn():
    """
    Reconstruit l'architecture MobileNetV2 SANS télécharger les poids,
    puis applique les poids broadcastés (bonne pratique sur cluster).
    """
    # IMPORTANT: weights=None pour ne pas retélécharger "imagenet" sur chaque worker
    base = MobileNetV2(weights=None,
                       include_top=True,
                       input_shape=(224, 224, 3))

    for layer in base.layers:
        layer.trainable = False

    feature_extractor = Model(inputs=base.input,
                              outputs=base.layers[-2].output)

    # On injecte les poids pré-entraînés qui ont été broadcastés par le driver
    feature_extractor.set_weights(brodcast_weights.value)

    return feature_extractor

#### 4.10.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [20]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



#### 4.10.5.4 Exécutions des actions d'extractions de features

In [24]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [25]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [28]:
# Forcer l'évaluation avec un count() ou collect() sur un petit échantillon
print("Nombre de lignes:", features_df.count())

# Ou tester avec un petit échantillon
sample_df = features_df.limit(10)
sample_df.show()

# Si ça fonctionne, essayez l'écriture avec un petit échantillon d'abord
features_df.limit(100).write.mode("overwrite").parquet(PATH_Result + "_test")


Nombre de lignes: 42749
+--------------------+------------------+--------------------+
|                path|             label|            features|
+--------------------+------------------+--------------------+
|s3a://p11-nicop-d...|    Apple Rotten 1|[0.0, 0.06954818,...|
|s3a://p11-nicop-d...|    Apple Rotten 1|[0.0, 0.008441594...|
|s3a://p11-nicop-d...|Cactus fruit red 1|[0.06812268, 0.04...|
|s3a://p11-nicop-d...|    Apple Rotten 1|[0.0, 0.0, 0.0, 0...|
|s3a://p11-nicop-d...|          Quince 2|[0.23798592, 0.01...|
|s3a://p11-nicop-d...|Cactus fruit red 1|[0.0, 0.0, 0.0, 0...|
|s3a://p11-nicop-d...|           Apple 9|[0.0, 0.030782398...|
|s3a://p11-nicop-d...|           Apple 9|[0.0, 0.029734066...|
|s3a://p11-nicop-d...|          Quince 3|[0.010510766, 0.0...|
|s3a://p11-nicop-d...|           Apple 8|[0.11645726, 0.0,...|
+--------------------+------------------+--------------------+



In [29]:
print(PATH_Result)

s3a://p11-nicop-data/Results


In [30]:
features_df.limit(300).write.mode("overwrite").parquet(PATH_Result)

#### Ajout de la PCA

In [31]:
from pyspark.ml.feature import PCA as PCAml
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# 1) Relire les features depuis Parquet
features_saved = spark.read.parquet(PATH_Result)

# 2) Convertir array<float> -> Vector (format attendu par Spark ML)
to_vec = udf(lambda xs: Vectors.dense(xs), VectorUDT())
df_vec = features_saved.withColumn("features_vec", to_vec(col("features")))

# 3) PCA
k = 50  
pca = PCAml(k=k, inputCol="features_vec", outputCol="pca_features")
pca_model = pca.fit(df_vec)

# voir la variance expliquée
# explainedVariance est un vecteur de taille k
ev = list(pca_model.explainedVariance)
print("Explained variance (first 10):", ev[:10])
print("Cumulative explained variance:", float(sum(ev)))

# 4) Appliquer la PCA
df_pca = pca_model.transform(df_vec)

# 5) Sortie
df_pca_out = df_pca.select("path", "label", "pca_features")

# 6) Ecriture du résultat PCA
df_pca_out.write.mode("overwrite").parquet(PATH_Result_PCA)

print("PCA written to:", PATH_Result_PCA)


Explained variance (first 10): [0.14906987145161313, 0.09098528831394485, 0.056672128754518374, 0.054762197612571006, 0.04255106778275444, 0.035728407159139754, 0.031680558146240996, 0.02667718917588276, 0.025507184844251, 0.021473488136460377]
Cumulative explained variance: 0.8301709235346295
PCA written to: s3a://p11-nicop-data/Results_PCA


### 4.10.6 Chargement des données enregistrées et validation du résultat

In [32]:
df = pd.read_parquet(PATH_Result_PCA, engine='pyarrow')

severe performance issues, see also https://github.com/dask/dask/issues/10276

To fix, you should specify a lower version bound on s3fs, or
update the current installation.



In [33]:
df.head()

Unnamed: 0,path,label,pca_features
0,s3a://p11-nicop-data/Test/Apple Rotten 1/r1_13...,Apple Rotten 1,"{'type': 1, 'size': None, 'indices': None, 'va..."
1,s3a://p11-nicop-data/Test/Apple Rotten 1/r1_13...,Apple Rotten 1,"{'type': 1, 'size': None, 'indices': None, 'va..."
2,s3a://p11-nicop-data/Test/Cactus fruit red 1/r...,Cactus fruit red 1,"{'type': 1, 'size': None, 'indices': None, 'va..."
3,s3a://p11-nicop-data/Test/Apple Rotten 1/r1_10...,Apple Rotten 1,"{'type': 1, 'size': None, 'indices': None, 'va..."
4,s3a://p11-nicop-data/Test/Quince 2/r2_155_100.jpg,Quince 2,"{'type': 1, 'size': None, 'indices': None, 'va..."


In [None]:
#df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [34]:
import numpy as np

# Les pca_features sont des dictionnaires Spark ML Vector
# Il faut extraire le champ 'values' pour obtenir l'array numpy
pca_feature_dict = df.loc[0,'pca_features']

# Vérifier le type et extraire les valeurs
if isinstance(pca_feature_dict, dict):
    pca_array = np.array(pca_feature_dict['values'])
    print(f"Shape: {pca_array.shape}")
    print(f"Dimension: {pca_array.shape[0]}")
else:
    # Si c'est déjà un array (cas normal)
    print(f"Shape: {pca_feature_dict.shape}")
    # Voir la structure du premier élément
print(df.loc[0,'pca_features'])
print(type(df.loc[0,'pca_features']))

# Si c'est un dict, voir les clés
if isinstance(df.loc[0,'pca_features'], dict):
    print("Clés du dictionnaire:", df.loc[0,'pca_features'].keys())
    print("Valeurs:", df.loc[0,'pca_features']['values'])
    print("Shape des valeurs:", np.array(df.loc[0,'pca_features']['values']).shape)

Shape: (50,)
Dimension: 50
{'type': 1, 'size': None, 'indices': None, 'values': array([ 1.46687415, -3.06909955,  9.84996953,  5.10762901,  4.4712448 ,
        3.03942696, -1.56680524,  2.43527858,  2.80454338,  6.61404252,
       -4.24430132, -3.90641859, -1.13092713, -2.36325644,  6.75642989,
        1.20004964,  2.15831561,  4.3796629 ,  2.03076089, -0.64057792,
       -3.94313467, -1.31432799, -0.37545969, -0.58146649, -3.16755208,
       -0.41718472,  1.8622552 ,  0.1132985 ,  2.63843922, -0.01391684,
        3.07472606,  0.18394617,  2.66440911, -2.33571535, -1.47700721,
        0.71154961,  0.39747872,  1.58213155, -1.57926817,  0.53213297,
        1.66945992, -0.2849328 , -0.94444458, -2.61590553,  1.45044449,
       -2.3074089 , -0.56912288,  2.57386782,  1.10271692,  0.88427369])}
<class 'dict'>
Clés du dictionnaire: dict_keys(['type', 'size', 'indices', 'values'])
Valeurs: [ 1.46687415 -3.06909955  9.84996953  5.10762901  4.4712448   3.03942696
 -1.56680524  2.43527858  2.80

In [35]:
df.shape

(300, 3)

<u>On peut également constater la présence des fichiers <br />
    au format "**parquet**" sur le **serveur S3**</u> :

![Affichage des résultats sur S3](img/S3_Results.png)

## 4.11 Suivi de l'avancement des tâches avec le Serveur d'Historique Spark

Il est possible de voir l'avancement des tâches en cours <br />
avec le **serveur d'historique Spark**.

![Accès au serveur d'historique spark](img/EMR_serveur_historique_spark_acces.png)

**Il est également possible de revenir et d'étudier les tâches <br />
qui ont été réalisé, afin de debugger, optimiser les futurs <br />
tâches à réaliser.**

<u>Lorsque la commande "**features_df.write.mode("overwrite").parquet(PATH_Result)**" <br />
était en cours, nous pouvions observer son état d'avancement</u> :

![Progression execution script](img/EMR_jupyterhub_avancement.png)

<u>Le **serveur d'historique Spark** nous permet une vision beaucoup plus précise <br />
de l'exécution des différentes tâche sur les différentes machines du cluster</u> :

![Suivi des tâches spark](img/EMR_SHSpark_01.png)

On peut également constater que notre cluster de calcul a mis <br />
un tout petit peu **moins de 8 minutes** pour traiter les **22 688 images**.

![Temps de traitement](img/EMR_SHSpark_02.png)


## 4.12 Résiliation de l'instance EMR

Notre travail est maintenant terminé. <br />
Le cluster de machines EMR est **facturé à la demande**, <br />
et nous continuons d'être facturé même lorsque <br />
les machines sont au repos.<br />
Pour **optimiser la facturation**, il nous faut <br />
maintenant **résilier le cluster**.

<u>Je réalise cette commande depuis l'interface AWS</u> :

1. Commencez par **désactiver le tunnel ssh dans FoxyProxy** pour éviter des problèmes de **timeout**.
![Désactivation de FoxyProxy](img/EMR_foxyproxy_desactivation.png)
2. Cliquez sur "**Résilier**"
![Cliquez sur Résilier](img/EMR_resiliation_01.png)
3. Confirmez la résiliation
![Confirmez la résiliation](img/EMR_resiliation_02.png)
4. La résiliation prend environ **1 minute**
![Résiliation en cours](img/EMR_resiliation_03.png)
5. La résiliation est effectuée
![Résiliation terminée](img/EMR_resiliation_04.png)

## 4.13 Cloner le serveur EMR (si besoin)

Si nous devons de nouveau exécuter notre notebook dans les mêmes conditions, <br />
il nous suffit de **cloner notre cluster** et ainsi en obtenir une copie fonctionnelle <br />
sous 15/20 minutes, le temps de son instanciation.

<u>Pour cela deux solutions</u> :
1. <u>Depuis l'interface AWS</u> :
 1. Cliquez sur "**Cloner**"
   ![Cloner un cluster](img/EMR_cloner_01.png)
 2. Dans notre cas nous ne souhaitons pas inclure d'étapes
   ![Ne pas inclure d'étapes](img/EMR_cloner_02.png)
 3. La configuration du cluster est recréée à l’identique. <br />
    On peut revenir sur les différentes étapes si on souhaite apporter des modifications<br />
    Quand tout est prêt, cliquez sur "**Créer un cluster**"
  ![Vérification/Modification/Créer un cluster](img/EMR_cloner_03.png)
2. <u>En ligne de commande</u> (avec AWS CLI d'installé et de configuré et en s'assurant <br />
   de s'attribuer les droits nécessaires sur le compte AMI utilisé)
 1. Cliquez sur "**Exporter AWS CLI**"
 ![Exporter AWS CLI](img/EMR_cloner_cli_01.png)
 2. Copier/Coller la commande **depuis un terminal**
 ![Copier Coller Commande](img/EMR_cloner_cli_02.png)

## 4.14 Arborescence du serveur S3 à la fin du projet

<u>Pour information, voici **l'arborescence complète de mon bucket S3 p8-data** à la fin du projet</u> : <br />
*Par soucis de lisibilité, je ne liste pas les 131 sous dossiers du répertoire "Test"*

1. Results/_SUCCESS
1. Results/part-00000-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00001-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00002-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00003-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00004-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00005-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00006-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00007-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00008-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00009-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00010-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00011-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00012-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00013-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00014-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00015-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00016-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00017-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00018-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00019-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00020-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00021-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00022-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00023-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Test/
1. bootstrap-emr.sh
1. jupyter-s3-conf.json
1. jupyter/jovyan/.s3keep
1. jupyter/jovyan/P8_01_Notebook.ipynb
1. jupyter/jovyan/_metadata
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/file-perm.sqlite
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/html/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/latex/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbsignatures.db
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/notebook_secret
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/Untitled-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/Untitled1-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/test3-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/Untitled.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/Untitled1.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/test3.ipynb

# 5. Conclusion

Nous avons réalisé ce projet **en deux temps** en tenant <br />
compte des contraintes qui nous ont été imposées.

Nous avons **dans un premier temps développé notre solution en local** <br />
sur une machine virtuelle dans un environnement Linux Ubuntu.

La <u>première phase</u> a consisté à **installer l'environnement de travail Spark**. <br />
**Spark** a un paramètre qui nous permet de travaillé en local et nous permet <br />
ainsi de **simuler du calcul partagé** en considérant <br />
**chaque cœur d'un processeur comme un worker indépendant**.<br />
Nous avons travaillé sur un plus **petit jeu de donnée**, l'idée était <br />
simplement de **valider le bon fonctionnement de la solution**.

Nous avons fait le choix de réaliser du **transfert learning** <br />
à partir du model **MobileNetV2**.<br />
Ce modèle a été retenu pour sa **légèreté** et sa **rapidité d'exécution** <br />
ainsi que pour la **faible dimension de son vecteur en sortie**.

Les résultats ont été enregistrés sur disque en plusieurs <br />
partitions au format "**parquet**".

<u>**La solution a parfaitement fonctionné en mode local**</u>.

La <u>deuxième phase</u> a consisté à créer un **réel cluster de calculs**. <br />
L'objectif était de pouvoir **anticiper une future augmentation de la charge de travail**.

Le meilleur choix retenu a été l'utilisation du prestataire de services **Amazon Web Services** <br />
qui nous permet de **louer à la demande de la puissance de calculs**, <br />
pour un **coût tout à fait acceptable**.<br />
Ce service se nomme **EC2** et se classe parmi les offres **Infrastructure As A Service** (IAAS).

Nous sommes allez plus loin en utilisant un service de plus <br />
haut niveau (**Plateforme As A Service** PAAS)<br />
en utilisant le service **EMR** qui nous permet d'un seul coup <br />
d'**instancier plusieurs serveur (un cluster)** sur lesquels <br />
nous avons pu demander l'installation et la configuration de plusieurs<br />
programmes et librairies nécessaires à notre projet comme **Spark**, <br />
**Hadoop**, **JupyterHub** ainsi que la librairie **TensorFlow**.

En plus d'être plus **rapide et efficace à mettre en place**, nous avons <br />
la **certitude du bon fonctionnement de la solution**, celle-ci ayant été <br />
préalablement validé par les ingénieurs d'Amazon.

Nous avons également pu installer, sans difficulté, **les packages <br />
nécessaires sur l'ensembles des machines du cluster**.

Enfin, avec très peu de modification, et plus simplement encore, <br />
nous avons pu **exécuter notre notebook comme nous l'avions fait localement**.<br />
Nous avons cette fois-ci exécuté le traitement sur **l'ensemble des images de notre dossier "Test"**.

Nous avons opté pour le service **Amazon S3** pour **stocker les données de notre projet**. <br />
S3 offre, pour un faible coût, toutes les conditions dont nous avons besoin pour stocker <br />
et exploiter de manière efficace nos données.<br />
L'espace alloué est potentiellement **illimité**, mais les coûts seront fonction de l'espace utilisé.

Il nous sera **facile de faire face à une monté de la charge de travail** en **redimensionnant** <br />
simplement notre cluster de machines (horizontalement et/ou verticalement au besoin), <br />
les coûts augmenteront en conséquence mais resteront nettement inférieurs aux coûts engendrés <br />
par l'achat de matériels ou par la location de serveurs dédiés.