# Initialisation de Spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Exemples avec les formats de stockage") \
    .getOrCreate()
sc = spark.sparkContext

25/11/04 11:06:17 WARN Utils: Your hostname, MacBook-Air-de-Theo.local resolves to a loopback address: 127.0.0.1; using 192.0.0.2 instead (on interface en0)
25/11/04 11:06:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/04 11:06:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/04 11:06:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/04 11:06:20 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


# Chargement des données

In [2]:
from pyspark.sql import Row

# Charge un fichier texte et convertit les lignes en "Row".
lines = sc.textFile("prenoms.txt")
prenoms_as_rdd = lines.map(lambda l: l.split(";"))\
    .filter(lambda l: "sexe" not in l and "XX" not in l)\
    .map(lambda p: Row(\
        sexe=int(p[0]),\
        prenom=p[1],\
        annee=int(p[2]),\
        dep=p[3],\
        effectif=int(p[4])))

# Infère le schéma et enregistre le DataFrame comme une table.
prenoms = spark.createDataFrame(prenoms_as_rdd)
prenoms.cache()

                                                                                

DataFrame[sexe: bigint, prenom: string, annee: bigint, dep: string, effectif: bigint]

# Sauvegarder dans les différents formats

In [3]:
import os

def get_size(start_path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            if not os.path.islink(fp): # skip if it is symbolic link
                total_size += os.path.getsize(fp)
    return total_size

In [4]:
formats = {\
    "csv": ["uncompressed", "bzip2", "deflate", "gzip"],\
    "json": ["uncompressed", "bzip2", "deflate", "gzip"],\
    "parquet": ["uncompressed", "gzip", "snappy"],\
    "orc": ["uncompressed", "snappy", "zlib"]\
}

In [5]:
size_by_formats_and_codecs = dict()
for format in iter(formats):
    size_by_formats_and_codecs[format] = dict()
    for codec in formats[format]:
        prenoms.write\
            .mode("overwrite")\
                .option("compression", codec)\
                    .format(format)\
                        .save("prenoms")
        size = get_size("prenoms")
        print(f'{format}, {codec}, {size}')
        size_by_formats_and_codecs[format][codec] = size
print(size_by_formats_and_codecs)

                                                                                

csv, uncompressed, 76930655


                                                                                

csv, bzip2, 11935058


                                                                                

csv, deflate, 13405032


                                                                                

csv, gzip, 13405068


                                                                                

json, uncompressed, 256757057


                                                                                

json, bzip2, 10650558


                                                                                

json, deflate, 16475815


                                                                                

json, gzip, 16475851


                                                                                

parquet, uncompressed, 9325773


                                                                                

parquet, gzip, 5868526


                                                                                

parquet, snappy, 7816852


                                                                                

orc, uncompressed, 8764031


                                                                                

orc, snappy, 6230244




orc, zlib, 4882720
{'csv': {'uncompressed': 76930655, 'bzip2': 11935058, 'deflate': 13405032, 'gzip': 13405068}, 'json': {'uncompressed': 256757057, 'bzip2': 10650558, 'deflate': 16475815, 'gzip': 16475851}, 'parquet': {'uncompressed': 9325773, 'gzip': 5868526, 'snappy': 7816852}, 'orc': {'uncompressed': 8764031, 'snappy': 6230244, 'zlib': 4882720}}


                                                                                

# Affichage des résultats

## Taille (en Mo)

In [6]:
all_codecs = ["uncompressed", "bzip2", "deflate", "gzip", "snappy", "zlib"]
print("Format | uncomp.  | bzip2    | def.     | gzip     | snappy   | zlib     |")
for format in iter(formats):
    print(f'{format:7}|', end='')
    for codec in all_codecs:
        size = size_by_formats_and_codecs[format].get(codec, 0) / (1024 * 1024)
        print(f' {size:8.2f} |', end='')
    print()


Format | uncomp.  | bzip2    | def.     | gzip     | snappy   | zlib     |
csv    |    73.37 |    11.38 |    12.78 |    12.78 |     0.00 |     0.00 |
json   |   244.86 |    10.16 |    15.71 |    15.71 |     0.00 |     0.00 |
parquet|     8.89 |     0.00 |     0.00 |     5.60 |     7.45 |     0.00 |
orc    |     8.36 |     0.00 |     0.00 |     0.00 |     5.94 |     4.66 |
