In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.functions import vector_to_array
from pyspark.ml import Pipeline, Transformer
from pyspark.sql.functions import col, udf
from pyspark import keyword_only

from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

import urllib.parse
import os

out_dir = "PCA_transformed"

Création de la session SPARK

In [2]:
spark = (SparkSession
         .builder
         .appName('P8_parquet_to_csv')
         .config("spark.sql.parquet.writeLegacyFormat", 'true')
         .getOrCreate()
)

23/08/29 14:00:48 WARN Utils: Your hostname, nordine-optiplex7040 resolves to a loopback address: 127.0.1.1; using 192.168.0.201 instead (on interface enp0s31f6)
23/08/29 14:00:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/29 14:00:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Lecture des fichiers Parquet

In [3]:
df = spark.read.parquet("Results_PCA")

                                                                                

In [4]:
df.show(5, 60)

+--------------------------------------------------------+------------------+------------------------------------------------------------+-----------+------------------------------------------------------------+
|                                                    path|             label|                                                   features2|label_index|                                                         pca|
+--------------------------------------------------------+------------------+------------------------------------------------------------+-----------+------------------------------------------------------------+
|            s3://kockot-bucket/Test/Banana/r_104_100.jpg|            Banana|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...|       57.0|[0.17817222594192905,1.331924851271245,-1.807314859435009...|
|         s3://kockot-bucket/Test/Banana Red/r_17_100.jpg|        Banana Red|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...|       58.0|[0

Ici on définit une fonction qui transforme une URI S3 en URL

In [5]:
def s3_2_http_path(s3_filepath):
    bucket, key = s3_filepath.replace("s3://", "").split("/", 1)
    return f"https://kockot-bucket.s3.eu-west-3.amazonaws.com/{urllib.parse.quote(key)}"


Testons

In [6]:
s3_2_http_path("s3://kockot-bucket/Test/Banana Lady Finger/r_80_100.jpg")

'https://kockot-bucket.s3.eu-west-3.amazonaws.com/Test/Banana%20Lady%20Finger/r_80_100.jpg'

On peut maintenant créer un pipeline qui transforme la colonne PCA qui est un tableau en 1024 colonnes différentes.

On va aussi en profiter pour transformer le path S3 en path HTTP

In [7]:
s3_2_http_path_udf = udf(s3_2_http_path)

class S3toHTTPTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self):
        super(S3toHTTPTransformer, self).__init__()

    def _transform(self, dataset):
        r= dataset.select(
            s3_2_http_path_udf(col("path")).alias("path"),
            col("label"), 
            col("label_index"),
            col("pca")
        )
        return r

class V2ATranformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self):
        super(V2ATranformer, self).__init__()

    def _transform(self, dataset):
        r= dataset.select(
            col("path"), 
            col("label"), 
            col("label_index"),
            vector_to_array("pca").alias("pca")
        )
        return r

class PCAV2PCACTranformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self):
        super(PCAV2PCACTranformer, self).__init__()

    def _transform(self, dataset):
        r= dataset.select(
            col("path"), 
            col("label"), 
            col("label_index"),
            *[col('pca').getItem(i).alias(f'pca_{i+1}') for i in range(0, 1024)]
        )
        return r

On crée un pipeline avec les transformations mais on aurait aussi pu ajouter les transformations dans le fichier spark_pca.py

In [8]:
s3_2_http = S3toHTTPTransformer()
v2a = V2ATranformer()
pcav2pcac = PCAV2PCACTranformer()

pipe = Pipeline(stages=[s3_2_http, v2a, pcav2pcac])


In [9]:
pipe_model = pipe.fit(df)


In [10]:
df2 = pipe_model.transform(df)


In [11]:
df2.printSchema()

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- label_index: double (nullable = true)
 |-- pca_1: double (nullable = true)
 |-- pca_2: double (nullable = true)
 |-- pca_3: double (nullable = true)
 |-- pca_4: double (nullable = true)
 |-- pca_5: double (nullable = true)
 |-- pca_6: double (nullable = true)
 |-- pca_7: double (nullable = true)
 |-- pca_8: double (nullable = true)
 |-- pca_9: double (nullable = true)
 |-- pca_10: double (nullable = true)
 |-- pca_11: double (nullable = true)
 |-- pca_12: double (nullable = true)
 |-- pca_13: double (nullable = true)
 |-- pca_14: double (nullable = true)
 |-- pca_15: double (nullable = true)
 |-- pca_16: double (nullable = true)
 |-- pca_17: double (nullable = true)
 |-- pca_18: double (nullable = true)
 |-- pca_19: double (nullable = true)
 |-- pca_20: double (nullable = true)
 |-- pca_21: double (nullable = true)
 |-- pca_22: double (nullable = true)
 |-- pca_23: double (nullable = true)
 |-- pca_24: d

Afin d'éviter d'avoir l'export CSV répartis sur plusieurs fichiers, nous utilisons la fonction coalesce

In [None]:
df2.coalesce(1).write.mode("overwrite").option("header",True).csv(out_dir)

23/08/29 14:00:57 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 2:>                                                          (0 + 1) / 1]

In [None]:
arr_f = os.listdir(out_dir)
for f in arr_f:
    if f.endswith(".csv"):
        os.rename(f"{out_dir}/{f}", f"{out_dir}/PCA_1024columns.csv")