# Initialisation de Spark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Exemples avec les formats de stockage") \
    .getOrCreate()
sc = spark.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/10 15:35:44 WARN Utils: Your hostname, Dell, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/10 15:35:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/10 15:35:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/10 15:35:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Chargement des données

## Chargement des prénoms

In [3]:
from pyspark.sql import Row

# Charge un fichier texte et convertit les lignes en "Row".
lines = sc.textFile("prenoms_sample.txt")
prenoms_as_rdd = lines.map(lambda l: l.split(";"))\
    .map(lambda p: Row(\
        sexe=int(p[0]),\
        prenom=p[1],\
        annee=int(p[2]),\
        dep=p[3],\
        effectif=int(p[4])))

# Infère le schéma et enregistre le DataFrame comme une table.
prenoms = spark.createDataFrame(prenoms_as_rdd)
prenoms.createOrReplaceTempView("prenoms")
prenoms.show()

                                                                                

+----+--------------+-----+---+--------+
|sexe|        prenom|annee|dep|effectif|
+----+--------------+-----+---+--------+
|   1|_PRENOMS_RARES| 1910| 06|      38|
|   1|_PRENOMS_RARES| 1919|974|     122|
|   1|_PRENOMS_RARES| 1921| 84|       3|
|   1|_PRENOMS_RARES| 1929| 43|       7|
|   1|_PRENOMS_RARES| 1934|971|     190|
|   1|_PRENOMS_RARES| 1942| 85|      11|
|   1|_PRENOMS_RARES| 1949| 34|      10|
|   1|_PRENOMS_RARES| 1967| 04|       3|
|   1|_PRENOMS_RARES| 1967| 08|      17|
|   1|_PRENOMS_RARES| 1992| 75|    1166|
|   1|_PRENOMS_RARES| 1995| 32|       4|
|   1|_PRENOMS_RARES| 1998| 82|      19|
|   1|_PRENOMS_RARES| 2011| 57|     210|
|   1|_PRENOMS_RARES| 2016| 29|     158|
|   1|      ABDALLAH| 1967| 78|       3|
|   1|    ABDELHAFID| 1968| 59|       3|
|   1|    ABDELKADER| 1965| 71|       3|
|   1|    ABDELKADER| 1975| 77|       8|
|   1|    ABDELKADER| 1982| 71|       5|
|   1|    ABDELKARIM| 1985| 75|       4|
+----+--------------+-----+---+--------+
only showing top

## Chargement des départements

In [4]:
lines = sc.textFile("dpts.txt")
depts_as_rdd = lines\
    .filter(lambda l: "dep" not in l and "2A" not in l and "2B" not in l)\
    .map(lambda l: l.split(","))\
    .map(lambda p: Row(\
        dep=int(p[0]),\
        reg=int(p[1]),\
        cheflieu=p[2],\
        tncc=p[3],\
        ncc=p[4],\
        nccenr=p[6],\
        libelle=p[6]))

depts = spark.createDataFrame(depts_as_rdd)
depts.createOrReplaceTempView("depts")
depts.show()

+---+---+--------+----+--------------------+--------------------+--------------------+
|dep|reg|cheflieu|tncc|                 ncc|              nccenr|             libelle|
+---+---+--------+----+--------------------+--------------------+--------------------+
|  1| 84|   01053|   5|                 AIN|                 Ain|                 Ain|
|  2| 32|   02408|   5|               AISNE|               Aisne|               Aisne|
|  3| 84|   03190|   5|              ALLIER|              Allier|              Allier|
|  4| 93|   04070|   4|ALPES DE HAUTE PR...|Alpes-de-Haute-Pr...|Alpes-de-Haute-Pr...|
|  5| 93|   05061|   4|        HAUTES ALPES|        Hautes-Alpes|        Hautes-Alpes|
|  6| 93|   06088|   4|     ALPES MARITIMES|     Alpes-Maritimes|     Alpes-Maritimes|
|  7| 84|   07186|   5|             ARDECHE|             Ardèche|             Ardèche|
|  8| 44|   08105|   4|            ARDENNES|            Ardennes|            Ardennes|
|  9| 76|   09122|   5|              ARIEGE

# Sauvegarder les données au format parquet

## Les prénoms partitionnés par départements et années et compressés (Snappy)

In [4]:
prenoms.write\
    .partitionBy('dep', 'annee')\
        .format('parquet')\
            .save('prenomsParDeptsEtAnnees.parquet')

                                                                                

## Les prénoms partitionnés par départements et années et compressés (gzip)

In [5]:
prenoms.write\
    .partitionBy('dep', 'annee')\
        .option("compression", "gzip")\
            .format('parquet')\
                .save("prenomsParDeptsEtAnnees.gzip.parquet")

                                                                                

## Les départements

In [None]:
depts.write\
    .format("parquet")\
        .save("depts.parquet")