# <span style='color:SteelBlue'>P9 - Réalisez un traitement dans un environnement Big Data sur le Cloud</span>

![Logo Fruits]()

# <span style='background:red; color:black'>MAJ à faire</span>

# <span style='background:white; color:black'>Sommaire</span>

Importations des librairies<br>

Paramètres d'affichage<br>

Fonctions <br>

**Etape 1 : Importation des données**

Conclusion

# <span style='background:blue'>Introduction</span>

L'entreprise "Fruits!" est une jeune entreprise qui a la volonté de préserver la biodiversité des fruits en permettant des traitements spécifiques pour chaque espèce de fruit en développant des robots cueilleurs intelligents.

Pour cela, l'entreprise souhaite se faire connaitre en mettant à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit. Le developpement de cette application nécessite un traitement Big Data pour la reconnaissance d'image.
___
**Missions**

1/ Expliquer pas-à-pas le script PySpark implémenté avec :
- Un traitement de diffusion des poids du modèle Tensorflow sur les clusters (broadcast des « weights » du modèle)

- Une étape d’une réduction de dimension de type ACP en PySpark

2/ Faire une démonstration de la mise en place d’une instance EMR opérationnelle.

3/ Démontrer le respect des contraintes RGPD (serveurs situés sur le territoire européen).

4/ Donner mon retour critique sur cette solution (utile avant de la généraliser).
___

**Source des datasets**<br>
Ce jeux de données comporte des images de fruits pour train et test un modèle de reconnaissance d'image.

Nombre d'images : 90_483<br>
Nombre de classes : 131<br>
Taille des images : 100x100 pixels<br>

3 dossiers :
- Training (67_692 images)
- Test (22_688 images)
- test-multiple_fruits


Source : [Fruits-360 dataset](https://www.kaggle.com/datasets/moltean/fruits) sur Kaggle.com<br>

# <span style='background:grey'>Importations des librairies</span>

Liste des imports généraux :

In [1]:
# !pip install Pandas pillow tensorflow pyspark pyarrow

In [2]:
# Librairies generales
import os
import os.path
import sys
import time
from datetime import datetime

# Librairies data science
import pandas as pd
from PIL import Image
import numpy as np
import io

# Librairies TensorFlow
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# Librairies Spark
from pyspark import SparkConf
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession


# Versions
print("Interpréteur python :")
print("Python        : " + sys.version + "\n")

print("Version des librairies utilisées :")
print("NumPy         : " + np.__version__)
print("Pandas        : " + pd.__version__)
print("Pillow (Image): " + Image.__version__)
print("Tensorflow    : " + tf.__version__)
print("PySpark       : " + SparkSession.builder.getOrCreate().version)

# Afficher heure lancement
maintenant = datetime.now().isoformat()
print("\nCode lance le : " + maintenant)

# Enregistrer l'heure de debut
start_time = time.time()

Interpréteur python :
Python        : 3.11.5 (tags/v3.11.5:cce6ba9, Aug 24 2023, 14:38:34) [MSC v.1936 64 bit (AMD64)]

Version des librairies utilisées :
NumPy         : 1.26.4
Pandas        : 2.2.2
Pillow (Image): 10.3.0
Tensorflow    : 2.16.1
PySpark       : 3.5.1

Code lance le : 2024-05-24T08:10:48.176317


J'ai également copié le fichier "hadoop.dll" dans le dossier "*C:\Windows\System32*" pour éviter une erreur lors de l'importation des données. Ce fichier est issue du git de winutils : [git winutils/hadoop-3.4.0-win10-x64/bin](https://github.com/kontext-tech/winutils/tree/master/hadoop-3.4.0-win10-x64/bin) 

Il faut ensuite définir la variable d'environnement "HADOOP_HOME" qui pointe vers le dossier contenant le fichier "hadoop.dll".

In [3]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['HADOOP_HOME'] = "C:\\Program Files\\Hadoop\\"

# <span style='background:grey'>Paramètres d'affichage</span>

# <span style='background:grey'>Fonctions</span>


In [4]:
def calculer_duree_notebook(start_time):
    """
    Cette procédure calcule et affiche la durée d'éxécution du notebook.

    Args:
        start_time (float): Le temps de début en secondes depuis l'époque.

    Returns:
        None
    """
    # Calculer la durée totale et convertir en minutes et secondes
    minutes, seconds = divmod(time.time() - start_time, 60)

    # Afficher la durée totale
    print(f"Durée execution notebook : {int(minutes)} min {int(seconds)} sec")

# <span style='background:blue'>Etape 1 : Déploiement de la solution en local</span>

## <span style='background:green'>1/Importation des données</span>

Echantillon de 3 type de pommes (1_416 images)

In [5]:
PATH = os.getcwd()

PATH_Data = PATH + '/data/source/dataset_sample'
PATH_Result = PATH + '/data/cleaned'

print(
    'PATH:        ' +\
    PATH + '\nPATH_Data:   ' +\
    PATH_Data + '\nPATH_Result: '+ PATH_Result
)

PATH:        c:\Users\pierr\VSC_Projects\Projet9_OCR_DataScientist
PATH_Data:   c:\Users\pierr\VSC_Projects\Projet9_OCR_DataScientist/data/source/dataset_sample
PATH_Result: c:\Users\pierr\VSC_Projects\Projet9_OCR_DataScientist/data/cleaned


## <span style='background:green'>2/Création de la SparkSession</span>


In [6]:
# Arrête la session Spark existante, si elle existe
SparkSession.builder.getOrCreate().stop()

# Créer une session Spark
spark = (SparkSession
                    .builder
                    .appName('P9')
                    .master('local')
                    .config("spark.sql.parquet.writeLegacyFormat", 'true')
                    .getOrCreate()
)

In [7]:
sc = spark.sparkContext

In [8]:
spark

## <span style='background:green'>3/Traitement des données</span>

### <span style='background:black'>a/Chargement des données</span>


In [9]:
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(PATH_Data)

In [10]:
# Ajout de la col label à partir du nom du dernier dossier contenant l'image
images = images.withColumn('label', element_at(split(images['path'], '/'), -2))

# Afficher le schema et les 5 premières lignes
print(images.printSchema())
# print(images.select('path', 'label').show(5, False))
print(images.show(5))

# Imprimer le nombre de lignes dans le DataFrame
print("Nombre de lignes dans le DataFrame : ", images.count())

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------+-------------------+------+--------------------+--------------+
|                path|   modificationTime|length|             content|         label|
+--------------------+-------------------+------+--------------------+--------------+
|file:/c:/Users/pi...|2021-09-12 19:25:44|  5879|[FF D8 FF E0 00 1...|Apple Golden 1|
|file:/c:/Users/pi...|2021-09-12 19:25:44|  5865|[FF D8 FF E0 00 1...|Apple Golden 1|
|file:/c:/Users/pi...|2021-09-12 19:25:44|  5861|[FF D8 FF E0 00 1...|Apple Golden 1|
|file:/c:/Users/pi...|2021-09-12 19:25:44|  5856|[FF D8 FF E0 00 1...|Apple Golden 1|
|file:/c:/Users/pi...|2021-09-12 19:25:44|  5856|[FF D8 FF E0 00 1...|Apple Golden 1|
+--------------------+-------------------+------+--------------------+--------------+
only showing top

### <span style='background:black'>b/Préparation du modèle</span>


In [11]:
model = MobileNetV2(
    weights='imagenet',
    include_top=True,
    input_shape=(224, 224, 3)
)

In [12]:
new_model = Model(
    inputs=model.input,
    outputs=model.layers[-2].output
)

A décommenter à la fin :

In [13]:
# new_model.summary()

In [14]:
broadcast_weights = sc.broadcast(new_model.get_weights())

In [15]:
def model_fn():
    """
    Cette fonction crée un modèle MobileNetV2 avec des poids pré-entraînés à partir d'ImageNet, retire la couche supérieure du modèle, rend toutes les couches non entraînables (c'est-à-dire, leurs poids ne seront pas mis à jour pendant l'entraînement), puis définit les poids du nouveau modèle à partir d'une valeur diffusée.

    :return: Le nouveau modèle avec les poids pré-entraînés diffusés.
    """
    # Créer modèle MobileNetV2 avec des poids pré-entraînés à partir d'ImageNet
    model = MobileNetV2(
        weights='imagenet',
        include_top=True,
        input_shape=(224, 224, 3)
    )

    # Rendre les couches non entraînables
    for layer in model.layers:
        layer.trainable = False

    # création nouveau modèle sans la dernière couche
    new_model = Model(
        inputs=model.input,
        outputs=model.layers[-2].output
    )

    # transmettre les poids du modèle
    new_model.set_weights(broadcast_weights.value)

    return new_model

### <span style='background:black'>c/Préparation du processus de chargement des images et application <br/>de leur featurisation à travers l'utilisation de pandas UDF</span>


In [16]:
def preprocess(content):
    """
    Cette fonction charge une image à partir des données brutes, redimensionne
    l'image à une taille de 224x224 pixels, la convertit en un tableau numpy,
    puis applique la fonction de prétraitement de MobileNetV2.

    :param content: Les données brutes de l'image à prétraiter.
    
    :return: Le tableau numpy de l'image prétraitée.
    """
    # Chargement de l'image et redimensionnement à 224x224
    img = Image.open(io.BytesIO(content)).resize([224, 224])

    # Conversion en tableau numpy
    arr = img_to_array(img)

    # Application de la fonction preprocess_input de MobileNetV2
    return preprocess_input(arr)

In [17]:
def featurize_series(model, content_series):
    """
    Effectue une caractérisation (featurization) d'une pd.Series d'images
    brutes en utilisant le modèle en entrée.
    
    :param model: Un modèle de deep learning pré-entraîné pour la
    caractérisation des images.
    :param content_series: Une pd.Series contenant des images brutes à
    caractériser.
    
    :return: Une pd.Series où chaque élément est un vecteur unidimensionnel
    de caractéristiques d'image.
    """
    # Application prétraitement à chaque image brute et empilement dans array
    input = np.stack(content_series.map(preprocess))

    # Utilisation du modèle pour prédire les caractéristiques de l'image
    preds = model.predict(input)

    # Applatissement des caractéristiques en 1 vecteur 1D (besoin pour Spark)
    output = [p.flatten() for p in preds]

    # Conversion en pd.Series
    return pd.Series(output)

In [18]:
# Decorateur pour définir une UDF pandas de type Scalar Iterator
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)

def featurize_udf(content_series_iter):
    '''
    Cette méthode est une UDF pandas de type Scalar Iterator qui enveloppe
    notre fonction de featurisation. Le décorateur spécifie qu'elle renvoie
    une colonne de DataFrame Spark de type ArrayType(FloatType).

    :param content_series_iter: Cet argument est un itérateur sur des lots de
    données, où chaque lot est une série pandas de données d'image.
    '''
    # Charger le modèle une fois  (réutiliser pour plusieurs lots de données)
    model = model_fn()

    # POUR chaque lot de données
    for content_series in content_series_iter:
        print("Lot de données de taille : ", content_series.shape)

        # Application de la fonction de featurisation (du modele sur le lot)
        yield featurize_series(model, content_series)



### <span style='background:black'>d/Exécution des actions d'extraction de features</span>

In [19]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [20]:
# 20 partitions + select col path & label + featurize & rename col content
features_df = images.repartition(20).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

In [21]:
features_df.columns

['path', 'label', 'features']

In [22]:
features_df.select('features').show(5, False)

Py4JJavaError: An error occurred while calling o98.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 90) (ASUS-Vivobook-Pierrick executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
	at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
	at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
	at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
	at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
	at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
	at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [None]:
print(PATH_Result)

c:\Users\pierr\VSC_Projects\Projet9_OCR_DataScientist/data/cleaned


In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

Py4JJavaError: An error occurred while calling o251.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 272) (ASUS-Vivobook-Pierrick executor driver): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/c:/Users/pierr/VSC_Projects/Projet9_OCR_DataScientist/data/cleaned.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:774)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
	at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
	at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
	at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/c:/Users/pierr/VSC_Projects/Projet9_OCR_DataScientist/data/cleaned.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:774)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
	at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
	at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
	at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [None]:
features_df.show()

<div class="alert alert-block alert-success"><b>Bilan: </b>

Le fichier....<br>

# <span style='background:blue'>Conclusion</span>

Ici

In [None]:
# Afficher temps d'exécution du notebook
calculer_duree_notebook(start_time)

Durée execution notebook : 0 min 1 sec
