# Initialisation de Spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Exemples avec les formats de stockage") \
    .getOrCreate()
sc = spark.sparkContext

23/10/29 15:02:48 WARN Utils: Your hostname, sal9000 resolves to a loopback address: 127.0.1.1; using 192.168.1.201 instead (on interface eno2)
23/10/29 15:02:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/29 15:02:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/29 15:02:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Chargement des données

In [32]:
from pyspark.sql import Row

# Charge un fichier texte et convertit les lignes en "Row".
lines = sc.textFile("prenoms.txt")
prenoms_as_rdd = lines.map(lambda l: l.split(";"))\
    .filter(lambda l: "sexe" not in l and "XX" not in l)\
    .map(lambda p: Row(\
        sexe=int(p[0]),\
        prenom=p[1],\
        annee=int(p[2]),\
        dep=p[3],\
        effectif=int(p[4])))

# Infère le schéma et enregistre le DataFrame comme une table.
prenoms = spark.createDataFrame(prenoms_as_rdd)
prenoms.cache()

DataFrame[sexe: bigint, prenom: string, annee: bigint, dep: string, effectif: bigint]

# Sauvegarder dans les différents formats

In [8]:
import os

def get_size(start_path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            if not os.path.islink(fp): # skip if it is symbolic link
                total_size += os.path.getsize(fp)
    return total_size

In [10]:
formats = {\
    "csv": ["uncompressed", "bzip2", "deflate", "gzip"],\
    "json": ["uncompressed", "bzip2", "deflate", "gzip"],\
    "parquet": ["uncompressed", "gzip", "snappy"],\
    "orc": ["uncompressed", "snappy", "zlib"]\
}

In [33]:
size_by_formats_and_codecs = dict()
for format in iter(formats):
    size_by_formats_and_codecs[format] = dict()
    for codec in formats[format]:
        prenoms.write\
            .mode("overwrite")\
                .option("compression", codec)\
                    .format(format)\
                        .save("prenoms")
        size = get_size("prenoms")
        print(f'{format}, {codec}, {size}')
        size_by_formats_and_codecs[format][codec] = size
print(size_by_formats_and_codecs)

                                                                                

csv, uncompressed, 73932529


                                                                                

csv, bzip2, 11459537


                                                                                

csv, deflate, 12860959


                                                                                

csv, gzip, 12860995


                                                                                

json, uncompressed, 246360316


                                                                                

json, bzip2, 10214914


                                                                                

json, deflate, 15795079


                                                                                

json, gzip, 15795115


                                                                                

parquet, uncompressed, 8947986


                                                                                

parquet, gzip, 5611550


                                                                                

parquet, snappy, 7468201


                                                                                

orc, uncompressed, 8386496


                                                                                

orc, snappy, 5953622




orc, zlib, 4671873
{'csv': {'uncompressed': 73932529, 'bzip2': 11459537, 'deflate': 12860959, 'gzip': 12860995}, 'json': {'uncompressed': 246360316, 'bzip2': 10214914, 'deflate': 15795079, 'gzip': 15795115}, 'parquet': {'uncompressed': 8947986, 'gzip': 5611550, 'snappy': 7468201}, 'orc': {'uncompressed': 8386496, 'snappy': 5953622, 'zlib': 4671873}}


                                                                                

# Affichage des résultats

## Taille (en Mo)

In [34]:
all_codecs = ["uncompressed", "bzip2", "deflate", "gzip", "snappy", "zlib"]
print("Format | uncomp.  | bzip2    | def.     | gzip     | snappy   | zlib     |")
for format in iter(formats):
    print(f'{format:7}|', end='')
    for codec in all_codecs:
        size = size_by_formats_and_codecs[format].get(codec, 0) / (1024 * 1024)
        print(f' {size:8.2f} |', end='')
    print()


Format | uncomp.  | bzip2    | def.     | gzip     | snappy   | zlib     |
csv    |    70.51 |    10.93 |    12.27 |    12.27 |     0.00 |     0.00 |
json   |   234.95 |     9.74 |    15.06 |    15.06 |     0.00 |     0.00 |
parquet|     8.53 |     0.00 |     0.00 |     5.35 |     7.12 |     0.00 |
orc    |     8.00 |     0.00 |     0.00 |     0.00 |     5.68 |     4.46 |
