In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()\
    .setMaster('yarn-client')\
    .set('spark.yarn.queue', 'profiler')\
    .set('spark.local.dir', '/app/PROFILER/tmp')\
    .setAppName('pl_small_notebook')\
    .set('spark.yarn.executor.memoryOverhead', '1000')\
    .set('spark.executor.cores', 2)\
    .set('spark.executor.memory', '4g')\
    .set('spark.executor.instances', 1) 

sc = SparkContext(conf=conf)

# Récupération du HiveContext & ajustement de la configuration
from pyspark.sql import HiveContext

hc = HiveContext(sc)

hc.setConf('hive.exec.dynamic.partition', 'true')
hc.setConf('hive.exec.dynamic.partition.mode', 'nonstrict')
hc.setConf('hive.exec.max.dynamic.partitions', '15000')
hc.setConf('hive.exec.max.dynamic.partitions.pernode', '15000')
hc.setConf('parquet.compression', 'gzip')
hc.setConf("spark.sql.hive.convertMetastoreParquet", "True") # keep False ?
hc.setConf('spark.hadoop.parquet.enable.summary-metadata', 'False')
hc.setConf('spark.sql.parquet.mergeSchema', 'False')
hc.setConf('spark.sql.hive.metastorePartitionPruning', 'True')
hc.setConf('spark.sql.parquet.filterPushdown', 'True')

# Récupération du SQLContext & ajustement de la configuration
from pyspark.sql import SQLContext

sqlc = SQLContext(sc)

sqlc.setConf("spark.sql.shuffle.partitions", '20')

# spark.hadoop.parquet.enable.summary-metadata false
# spark.sql.parquet.mergeSchema false
# spark.sql.parquet.filterPushdown true
# spark.sql.hive.metastorePartitionPruning true

In [3]:
hc.getConf('spark.sql.parquet.filterPushdown', 'False')

u'False'

In [2]:
from pyspark.sql.functions import *

In [9]:
hc.sql("""
create external table test_pl (
    insertion_date timestamp,
    last_updated_date timestamp,
    source string,
    sar int,
    phase int,
    sensor string,
    value float,
    time timestamp
)
PARTITIONED BY (departure_month int,departure_day int, flight_leg_count int)
STORED AS PARQUET
location '/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018'
""")

# Remarque : cette table "n'apparaitra pas" dans le browser HDFS
# https://www.phdata.io/hands-on-example-with-hive-partitioning/

DataFrame[result: string]

In [10]:
hc.sql("MSCK REPAIR TABLE test_pl")

DataFrame[result: string]

In [4]:
df = hc.read.table('test_pl')\
    .filter(col('flight_leg_count')==725)\
    .filter(col('sensor')=='ZZNSS00305:1')\
    .select(['time', 'sensor', 'value'])\
    .cache()

df.count()

14766

In [6]:
hc.read.table('test_pl')\
    .filter(col('flight_leg_count')==726)\
    .filter(col('sensor')=='WLSPD.6:2')\
    .select(['time', 'sensor', 'value'])\
    .explain()

== Physical Plan ==
Filter (sensor#134 = WLSPD.6:2)
+- Scan ParquetRelation: default.test_pl[time#136,sensor#134,value#135] InputPaths: hdfs://tlsdbpfitn01.france.airfrance.fr:8020/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018/departure_month=1/departure_day=1/flight_leg_count=713, hdfs://tlsdbpfitn01.france.airfrance.fr:8020/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018/departure_month=1/departure_day=10/flight_leg_count=725, hdfs://tlsdbpfitn01.france.airfrance.fr:8020/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018/departure_month=1/departure_day=10/flight_leg_count=726, hdfs://tlsdbpfitn01.france.airfrance.fr:8020/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018/departure_month=1/departure_day=10/flight_leg_count=727, hdfs://tlsdbpfitn01.france.airfrance.fr:8020/apps/hive/warehouse/historic_mea

In [54]:
# %%timeit -n 5 -r 1

df = hc.read.table('test_pl')\
    .filter(col('flight_leg_count')==713)\
    .select(['time', 'sensor', 'value'])\
    .cache()

df.count()

88680065

In [53]:
path = "/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018/departure_month=1/departure_day=1/flight_leg_count=713"

df2 = hc.read\
    .parquet(path)\
    .select(['time', 'sensor', 'value'])\
    .cache()

df2.count()

88680065

In [55]:
path = "/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018/departure_month=1/departure_day=1/flight_leg_count=713"

df2 = sqlc.read\
    .parquet(path)\
    .select(['time', 'sensor', 'value'])\
    .cache()

df2.count()

88680065

Mettent toutes le même temps (23-25s) prennent le même volume d'input ("read from Hadoop storage") (~69MB) pour mettre la même quantité de données en mémoire (~525MB) => Même comportement

In [6]:
path = "/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018/departure_month=1/departure_day=1/flight_leg_count=713"

df2 = sqlc.read\
    .option('basePath', '/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018')\
    .parquet('/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018')\
    .filter(col('flight_leg_count')==713)\
    .filter(col('sensor')=='HYDPRY:1')\
    .select(['time', 'sensor', 'value'])\
    .cache()

df2.count()

29817

La cellule du dessus a mis 23s => partition pruning ou données mises en cache ? Tester en tant que premier run du sparkContext

In [32]:
df2.unpersist()

DataFrame[time: timestamp, sensor: string, value: float]

In [16]:
df2.show()

+--------------------+------------+---------+--------------------+
|                time|      sensor|    value|      insertion_date|
+--------------------+------------+---------+--------------------+
|2018-01-01 18:00:...|    HYDPRY:2|245.46886|2018-01-02 06:18:...|
|2018-01-01 18:00:...|    HYDPRY:1|246.52275|2018-01-02 06:18:...|
|2018-01-01 18:00:...|    HYDPRG:2|128.43512|2018-01-02 06:18:...|
|2018-01-01 18:00:...|    HYDPRG:1|128.88994|2018-01-02 06:18:...|
|2018-01-01 18:00:...|ZZNSS00305:1|      0.0|2018-01-02 06:18:...|
|2018-01-01 18:00:...|  ZZNSS00305|      0.0|2018-01-02 06:18:...|
|2018-01-01 18:00:...|   WLSPD.9:2|      0.0|2018-01-02 06:18:...|
|2018-01-01 18:00:...|     WLSPD.7|      0.0|2018-01-02 06:18:...|
|2018-01-01 18:00:...|   WLSPD.6:2|      0.0|2018-01-02 06:18:...|
|2018-01-01 18:00:...|   WLSPD.6:1|      0.0|2018-01-02 06:18:...|
|2018-01-01 18:00:...|   WLSPD.5:2|      0.0|2018-01-02 06:18:...|
|2018-01-01 18:00:...|   WLSPD.4:2|      0.0|2018-01-02 06:18:

In [5]:
sqlc.read\
    .option('basePath', '/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018')\
    .parquet('/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018')\
    .filter(col('flight_leg_count')==713)\
    .filter(col('sensor')=='HYDPRY:1')\
    .select(['time', 'sensor', 'value'])\
    .explain(True)

== Parsed Logical Plan ==
'Project [unresolvedalias('time),unresolvedalias('sensor),unresolvedalias('value)]
+- Filter (sensor#5 = HYDPRY:1)
   +- Filter (flight_leg_count#10 = 713)
      +- Relation[insertion_date#0,last_updated_date#1,source#2,sar#3,phase#4,sensor#5,value#6,time#7,departure_month#8,departure_day#9,flight_leg_count#10] ParquetRelation

== Analyzed Logical Plan ==
time: timestamp, sensor: string, value: float
Project [time#7,sensor#5,value#6]
+- Filter (sensor#5 = HYDPRY:1)
   +- Filter (flight_leg_count#10 = 713)
      +- Relation[insertion_date#0,last_updated_date#1,source#2,sar#3,phase#4,sensor#5,value#6,time#7,departure_month#8,departure_day#9,flight_leg_count#10] ParquetRelation

== Optimized Logical Plan ==
Project [time#7,sensor#5,value#6]
+- Filter ((flight_leg_count#10 = 713) && (sensor#5 = HYDPRY:1))
   +- Relation[insertion_date#0,last_updated_date#1,source#2,sar#3,phase#4,sensor#5,value#6,time#7,departure_month#8,departure_day#9,flight_leg_count#10] Parquet

In [None]:
hc.setConf("spark.sql.hive.convertMetastoreParquet", "True") # keep False ?
hc.setConf('spark.hadoop.parquet.enable.summary-metadata', 'False')
hc.setConf('spark.sql.parquet.mergeSchema', 'False')
hc.setConf('spark.sql.hive.metastorePartitionPruning', 'True')
hc.setConf('spark.sql.parquet.filterPushdown', 'True')

Paramètres: 
* ```spark.sql.hive.convertMetastoreParquet``` : Défini sur ```True``` par défaut. Quand on écrit/lit sur/depuis des tables Hive, Spark va utiliser sa propre SerDe parquet au lieu de la Hive SerDe qui est semble-t-il moins rapide. Si on souhaite utiliser la SerDe Hive, définir l'option sur ```False```. Dit autrement : ce n'est pas parce qu'on passe par Hive qu'on et obligé d'utiliser sa SerDe qui est moins rapide. Ce n'est en fait pas le cas, on utilise la Spark SerDe du fait que cette option est sur ```True``` par défaut.
* ```spark.hadoop.parquet.enable.summary-metadata``` : Semble défini sur ```False``` par défaut en 2.x. Cette option permet de limiter les opérations de lecture/écriture quand on travaille avec des .parquet en n'écrivant/ne lisant pas un fichier de métadonnées associé à chaque .parquet (*summary file*).
* ```spark.sql.parquet.mergeSchema``` : Défini sur ```False``` par défaut depuis la 1.5 car la fusion de schémas (en fait de métadonnées) de différents fichiers est une opération coûteuse et peu fréquemment nécessaire. Quand on charge plusieurs fichiers .parquet à l'aide de la DataSource correspondante, celle-ci va par défaut prendre le schéma d'un fichier pris au hasard comme schéma de l'ensemble. L'option ```spark.sql.parquet.mergeSchema``` n'est utile que si tous les .parquet n'ont pas le même schéma (ex: pas le même nombre de colonnes). Le schéma pris pour l'ensemble est alors la réunion des schémas de l'ensemble des fichiers.
* ```spark.sql.hive.metastorePartitionPruning``` : Semble défini sur ```False``` par défaut notamment car la feature semblait buggée au moins en 1.5. Quand activée, cette option permet de ne lire que les fichiers des partitions finalement utiles. Utile uniquement si on passe par le Hive Metastore ? Cette option couvre-t-elle aussi le chargement direct de .parquet partionnés ?
* ```spark.sql.parquet.filterPushdown``` : Par défaut ```False``` en 1.6 mais ```True``` en 2.x, quand activée cette option permet au *filter pushdown* de descendre jusque dans les fichiers .parquet. La notion de filtre englobe selection de colonnes (projections) et de lignes (filtres). 

Is a codec splittable ? Il faut préciser qu'on a à l'idée qu'un fichier est stocké en plusieurs blocs. Splittable veut dire : est-ce qu'on peut décompresser les blocs individuellement, on pourra alors d'autant paralléliser la lecture du fichier. Pour un codec non splittable, il faut décompresser l'ensemble du fichier pour pouvoir le lire : un seul coeur va donc devoir décompresser tous les blocs. A contratrio, si le codec est splittable, chaque bloc peut être décompresser individuellement par un coeur ou thread, parallélisation qui accélère d'autant la lecture du fichier. Du fait de sa structure, un fichier .parquet est splittable pour les codec gzip et snappy (au moins). Les différences sont ensuite propres aux codec qui se distinguent notamment par leur ratio de compression, et vitesse d'encodage et de décodage. Suivant l'usage qu'on veut faire des données et nos contraintes on se penchera sur l'un ou l'autre : est-on contraint en stockage ? Requête-t-on souvent les données et voulons nous une lecture rapide ?<br>
https://stackoverflow.com/questions/35789412/spark-sql-difference-between-gzip-vs-snappy-vs-lzo-compression-formats
https://stackoverflow.com/questions/43323882/is-gzipped-parquet-file-splittable-in-hdfs-for-spark

sur spark.yarn.executor.memoryOverhead => https://blog.csdn.net/oufuji/article/details/50387104
RAPPEL SPARK SOUS YARN : EN CLIENT MODE (comme ici) IL Y A QUAND MEME UN APPLICATION MASTER QUI PREND UN CONTENEUR YARN<br>

There are two deploy modes that can be used to launch Spark applications on YARN. In cluster mode, the Spark driver runs inside an application master process (truc général à YARN) which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN. spark-submit starts a YARN client program which starts the default Application Master. Then ```<sparkAppName>``` will be run as a child thread of Application Master.

Shuffle read / write : https://stackoverflow.com/questions/27276884/what-is-shuffle-read-shuffle-write-in-apache-spark
Les données sont écrites pour être échangées et sont donc sérialisées. 
Le total du shuffle write peut-il être plus gros que les données ? Ex : j'ai 15Gb de données persistées (désérialisées) en mémoire mais j'ai un shuffle write de 37Gb. Comment c'est possible.

Concernant le timestamp avec une précision à la milliseconde, on le convertit en nombre décimal (norme POSIX aussi utilisée par Spark : nombre de secondes depuis de 01/01/1970). La seule (?) façon d'obtenir dans Spark un TimestampType() à la milliseconde est d'importer cette colonne sous forme de String et de la caster ensuite en DoubleType() (FloatType() ne marche pas) puis en TimestampType().

Remarque : Spark semble capable de faire du filter pushdown sur les csv, au moins en ce qui concerne les projections

### Spark Tables

Glossaire
* Internal table : Quand on drop une internal table, données et métadonnées sont supprimées.
* External table : Quand on drop une external table, seules les métadonnées sont supprimées. On a juste supprimé le pointeur vers les données (la table et les métadonnées associées) mais pas les données elles-même. 
* Managed table : Dans le contexte de Spark, semble signifier que la localisation des données est gérée par le Metastore. Quand on drop (sous-entendu depuis Hive) une managed table, les données associées sont aussi supprimées (et pas juste le pointeur et les métadonnées), conséquence de la première phrase?. Managed = Internal ?
* Unmanaged table
* Persistent table

Avec Spark 1.6, on peut sauvegarder le contenu d'un DataFrame à l'aide de saveAsTable (disponible uniquement avec le HiveContext). La commande va matérialiser le contenu du DataFrame sur le disque et créer un pointeur vers ces données dans le HiveMetastore. La commande crée une managed table persistente. Elle n'est donc perdue à la fermeture de la session Spark et peut être chargée via ```read.table()```.

Spark propose aussi ```registerTempTable``` (déprécié en Spark 2.x et remplcé par ```createOrReplaceTempView```). A quoi ça sert : sans doute si on tient absolument à travailler sous forme de requêtes SQL. La durée de vie de cette table est liée à celle du SQLContext qui l'a créée. Ex : 

```python
sqlContext.read.parquet("people.parquet").registerTempTable("parquetFile")
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
```

On peut voir une TempTable comme une variable pour requête SQL. On ne peut d'ailleur pas y accéder comme une variable normale (dans notre exemple, on ne peut pas écrire:  ```parquetFile.select('name')```) mais seulement à travers ```sqlContext.sql(sqlQuery)```. Le DataFrame est en fait enregistré dans le SQLContext comme temporary table.

Remarque : ```sqlContext.sql('SHOW tables')``` va retourner une liste des tables (de quelle db ? on y voit par exemple toutes les tables Hive ?) avec entre autres si elles sont temporaires ou nom. Spark 2.x fournit un ensemble de fonctions utilitaires pour travailler avec les tables afin de pouvoir faire ce genre d'opération dans un style programmatique (et donc de ne pas avoir à écrire une requête SQL) via sa nouvelle API Catalog. Dans Spark 2.x l'équivalent d'une TempTable serait enregistrée dans le Catalog.

Spark permet d'écrire et de lire des données stockées dans Hive. Pour pouvoir travailler avec Hive en Spark 1.6, il faut construire un objet HiveContext (qui hérite de SQLContext) et qui ajoute la possibilité d'accéder au Metastore Hive ainsi que de pouvoir passer des requêtes HQL à ```sql``` (Spark support le SQL standard comme le HQL). On peut voir le HiveContext comme un SQLContext aux fonctionnalités étendues : il permet en plus du second d'accéder aux tables du Metastore Hive, aux UDF Hive ainsi que d'écrire ses commandes en HQL (contre du SQL standard). Le SQLContext ne propose en effet qu'un parser de SQL standard alors que le HiveContext permet de parser SQL et HQL. Le HiveContext héritant du SQLContext et n'implémentant que des fonctionnalités supplémentaires, on peut utiliser le premier en remplacement du second pour les fonctionnalités non liées à Hive. Pour ces dernières, l'utilisation du HiveContext ou du SQLContext ne fait aucune différence. Les deux ont été créés à l'origine pour éviter d'avoir à joindre au build Spark (le HiveContext est packagé séparémment) les dépendances nécéssaires au support de Hive.

Remarque : dans le cas de fichiers partitionnés comme parquet, Spark met en cache les métadonnées des fichiers pour une meilleure performance lors d'éventuels imports ultérieurs. Si ```spark.sql.hive.convertMetastoreParquet``` est sur ```True```, Spark va également mettre en cache les métadonnées de tables Hive de fichiers parquet. Cependant si Hive ou un autre programme externe viennent modifier la table, il est est nécessaire de rafraichir le cache Spark des métadonnées à l'aide de  ```hc.refreshTable("my_table")```.

Remarque : Le Metastore sert à stocker les métadonnées des tables Hive. Ex : le schéma, les partitions si la table est partitionnée, etc. Cela évite notamment à Hive d'aller interroger le File System pour ces mêmes métadonnées.

En Spark 2.x (cf lien), la différence entre création d'une table dans Hive ou simplement dans Spark (valable si enableHiveSupport activé) semble se faire sur le mot-clé USING (table Spark) ou STORED AS (table Hive). Une requête destinée à créer une table Hive peut commencer par CREATE TABLE ou CREATE EXTERNAL TABLE suivant qu'on souhaite créer une table interne ou externe respectivement. Créer une table interne quand on souhaite que Hive ait le controle complet du cycle de vie de la table et des données (disent pour des données temporaires mais pourquoi ?). Utiliser une table externe quand on souhaite y accéder depuis l'extérieur de Hive via d'autres application comme Spark. Les tables externes sont indiquées quand on souhaite créer autant de vues des mêmes données sous-jacentes qu'on le souhaite. Aussi quand on ne souhaite pas que Hive ait la main (own) sur les données et qu'on souhaite que cela soit réservé à une autre application.
https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html#create-table-with-hive-format
https://stackoverflow.com/questions/17038414/difference-between-hive-internal-tables-and-external-tables

Remarque : append sur une table partitionné :
```python
df.write.partitionBy('aircraft', 'registration', 'departure_year', 
                                  'departure_month', 'departure_day', 'flight_leg_count') \
                .insertInto(ata28_pivot_table_name, overwrite=False)
```

Fonctions liées à la création de table en Spark 2.x :
* ```df.createTempView('table_name')``` (ou ```df.createOrReplaceTempView('table_name')```) : crée une *temporary view* **locale** dont la durée de vie est liée à celle de la SparkSession qui l'a créé. Il s'agit d'une tranformation et non d'une action. Les données ne sont pas matérialisées sauf si le DataFrame à partir duquel a été créé la *view* a été persisté en mémoire. La *view* peut ensuite être utilisée comme une table Hive dans SparkSQL. N'a d'intérêt que si on préfère écrire ses traitements sous forme de requêtes SQL/HQL. A remplacé ```registerTempTable('table_name')``` de Spark 1.x qui produisait une *temporary view* (locale) dont la durée de vie était liée à celle du SQLContext qui l'avait créé.
* ```df.createGlobalTempView('table_name')``` : crée une *temporary view* **globale** dont la durée de vie est liée à celle de l'application Spark qui l'a créé. La différence avec la *temporary view* locale provient du fait que la durée de vie de la table est liée à celle de l'application et non de celle de la SparkSession qui l'a créé. Une application Spark peut en effet comporter plusieurs sessions. La table sera alors accessible à chacune d'entre elles.
* ```df.write.saveAsTable('table_name')``` : Les *temporary views* qu'elles soient locales ou globales sont parfois appelées tables locales (*local tables*) par opposition aux tables globales (*global tables*) créées par ```df.write.saveAsTable('table_name')```. Cette dernière méthode correspond à une action qui va matérialiser les données (fichiers .snappy.parquet) sur le disque et créer un pointeur vers ces données dans le Hive Metastore. Les données étant persistée sur le disque on parle aussi de *persistent table*. Spark distingue deux types de *global tables* : 
        * Si Spark gère uniquement les métadonnées on parle de *unmanaged table* qu'on appelle aussi table interne (*internal table*) du point de vue de Hive car Hive conserve la gestion des données.
        * Si Spark gère à la fois données et métadonnées on parle de *managed tables* qu'on appelle aussi table externe (*external table*) du point de vue de Hive car la gestion des données est à la charge d'une application extérieure.
     La différence apparaît seulement lorsqu'on supprime la table depuis Spark : si la table est *managed* les métadonnées sont supprimées du Metastore et les données effacées du disques, si la table est *unmanaged* seules les métadonnées sont supprimées. On peut choisir entre *managed* et *unmanaged table* à la création de la table. Attention la différence est subtile : 
     * Création d'une *unmanaged table* : ```df.write.option('path', '/apps/hive/warehouse/table_name').saveAsTable('table_name')``` : Les données sont alors écrites dans le *warehouse* Hive dont Spark trouve le chemin dans le fichier de configuration hive-site.xml (/apps/hive/warehouse dans notre cas).
     * Création d'une *managed table* : ```df.write.saveAsTable('table_name')``` : on ne donne alors aucun chemin particulier, Spark va écrire les données dans le *warehouse directory* (par défaut un répertoire spark-warehouse dans le *current directory*, élément contrôlé par le paramètre ```spark.sql.warehouse.dir```).
* ```df.write.insertInto('table_name')``` : Possède également un argument ```overwrite``` fixé à ```False``` par défaut. Contrairement cette fonction ne crée pas de nouvelle table mais requiert une table déjà existante et que les schémas du DataFrame à écrire et de la table soient identiques (comme les fonctions précédentes en ```SaveMode.Append```). 

Remarque : la principale différence entre une table et un DataFrame est que la première est définie dans le contexte (*scope*) d'une base de données tandis que le second l'est dans celui d'un language de programmation. Quand on crée une table, elle est définie dans la base de données courante (```default``` par défaut).

Table interne vs table externe
Il semble indiqué de créer des tables externes quand les données sont destinées à être utilisées à l'extérieur de Hive, par d'autres programmes que Hive et notamment si ce programme est chargé d'ajouter, de supprimer, etc. des données. On ne veut alors pas que Hive soit l'unique détenteur des droits sur ces données (dans le cas de tables internes, il semble que Hive a aussi la charge de la sécurité des données) bien qu'on puisse souhaiter ponctuellement l'utiliser pour du requêtage. La table externe est également indiquée si on ne veut pas que les données soient supprimées en même temps que la table. C'est notamment le cas quand on utilise plusieurs tables/vues construites sur des mêmes données.

La table interne est indiquée lorsque qu'on souhaite gérer l'ensemble du cycle de vie de la table et des données avec Hive. C'est le cas si on est quelqu'un qui réalise son travail d'analyse uniquement avec des requêtes SQL. On travaille par exemple sur des données brutes stockées dans une table qui on besoin d'être prétraitées avant de faire notre analyse. Ce prétraitement est couteux, on va donc en sauver le produit dans une table interne Hive sur laquelle on fera notre analyse avec Hive et qu'on supprimera une fois notre analyse terminée. Les tables internes sont donc indiquée quand on effectue des travaux d'analyse avec Hive et qu'on a besoin de stocker des résultats intermédiaires dans une table finalement temporaire.

Table partitionnée 
Pour créer une table partitionnée il suffit d'écrire ```df.write.partitionBy(*partition_cols).saveAsTable('table_name')```. Attention : si on souhaite créer une table partitionnée à partir de données (partitionnée) préexistentes, la création de la table ne déclenche pas de *partition discovery*, partitions ensuites ajoutées au Hive Metastore. Découvrir et enregistrer les partitions dans le Metastore impose l'utilisation d'une commande supplémentaire : 
```python
# On crée des données partitionnées
df.write\
    .mode('overwrite')\
    .partitionBy(*partition_cols)\
    .parquet('/apps/hive/warehouse/table_name')
    
# On crée une table interne ou externe (ici interne)
# Il semble qu'on soit obligé d'en passer par SQL et l'écriture du schéma à la main
spark.sql("CREATE TABLE <example-table>(id STRING, value STRING) USING parquet PARTITIONED BY(id) LOCATION "<file-path>"")

# On découvre les partitions et update les métadonnées
# Existe-t-il une interface programmatique ?
spark.sql("MSCK REPAIR TABLE table_name)
```

Remarque : le schéma d'une table est immutable, on peut en revanche modifier les données sous-jascentes et ensuite rafraîchir les métadonnées de la table (marche que la table soit interne ou externe ?) à l'aide de ```spark.catalog.refreshTable('table_name')``` ou ```spark.sql('REFRESH TABLE table_name')```. Plus généralement, l'API Catalog de Spark 2.x fournit une interface programmatique pour toutes les opérations liées au cycle de vie d'une table. Ces fonctions étaient dans l'ensemble accessible via le SQLContext dans Spark 1.x : 
* ```spark.catalog.refreshTable('table_name')``` là où on peut utiliser ```spark.sql('REFRESH TABLE table_name')```
* ```spark.catalog.dropTable('table_name')``` n'existe pas, on doit utiliser ```spark.sql('DROP TABLE table_name')```
* ```spark.catalog.cacheTable('table_name')``` (et ```spark.catalog.uncacheTable('table_name')```) : a un équivalent SQL ? Déclenche une action ?


Temporary views : Spark permet de créer des view temporaires. En Spark 1.6 cela passait par ```sqlc.registerDataFrameAsTable(df, 'tmp_table_name')``` ou ```df.registerTempTable('tmp_table_name')``` pour la création et utilisait ```sqlc.dropTempTable('tmp_table_name')``` pour la suppression. Spark 2.x a déprécié ```registerTempTable``` qui est remplacé par ```df.createTempView('tmp_table_name')``` ou ```df.createOrReplaceTempView('tmp_table_name')``` ou ```df.createGlobalTempView('tmp_table_name')```. La suppression passe déormais par le Catalog : ```spark.catalog.dropTempView('tmp_table_name')```. Comme n'importe quelle table, on peut accéder au DataFrame associé via ```spark.table('tmp_table_name')```. Une application intéressante peut être de lier une Temporary View avec des donnée persistées en mémoire. On peut alors s'éviter d'écrire du code comme : 

```python
df_persisted = df.persist(StorageLevel.MEMORY_AND_DISK)

df_persisted.count()

df_persisted.filter()\
    .groupBy()\
    #etc.
    
df_persisted.unpersist()
```

Une vue temporaire permet un code (un peu plus élégant?) qui nous permet de ne plus nous soucier de préserver la variable ```df_persisted``` (notamment si le ```df_persisted = df.cache()``` est écrit à l'intérieur d'une fonction). On a en fait créé une liaison entre un nom (celui de la Temporary View) et nos données à l'intérieur de l'environnement de l'application Spark et indépendante de l'environnement d'exécution de notre Notebook / script Python, une sorte de "variable Spark". 

```python
df.createOrReplaceTempView('temp_view')
        
spark.table('temp_view')\
    .persist(StorageLevel.MEMORY_AND_DISK)

spark.table('temp_view')\
    .count()
    
spark.table('temp_view')\
    .filter()\
    .groupBy()\
    #etc.
    
spark.catalog.dropTempView('tmp_table_name')
```

**Important** : si notre temp view est liée à des données persistées, dropTempView va dropper aussi les données persistées en mémoire (Spark 2.1 au moins). Pas besoin d'appeler unpersist sur la table avant de la dropper ou clearCache après l'avoir droppée.

Sur le warehouse : 
spark 1.6 and spark 2 have a different hive warehouse.
For spark 1.6 , warehouse location is identified by hive.metastore.warehouse.dir (default = /apps/hive/warehouse)
For spark 2, warehouse location is identified by spark.sql.warehouse.dir (default = <user.dir>/spark-warehouse)
Attention aux permissions. Quand on passe un répertoire à spark.sql.warehouse.dir il doit déjà être créé.

    // When working with Hive, one must instantiate `SparkSession` with Hive support, including
    // connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined
    // functions. Users who do not have an existing Hive deployment can still enable Hive support.
    // When not configured by the hive-site.xml, the context automatically creates `metastore_db`
    // in the current directory and creates a directory configured by `spark.sql.warehouse.dir`,
    // which defaults to the directory `spark-warehouse` in the current directory that the spark
    // application is started.

### Avoir le nombre de lignes par partitions 
df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show

Vérifier si historic_measurement est interne ou externe / managed ou unmanaged. Si interne on peut quand même rafraichir les métadonnées ? Au pire on s'en fout, si on passe une requête via .sql() c'est Hive qui se charge de l'exécuter in fine. Si interne on peut quand même aller modifier les données ? ie : aller écrire sous des chemins où on peut ne pas avoir les droits (on = soit le user, soit l'application : Hive interdit à une appli externe d'y aller). A priori le user 'profiler' est owner de /apps/hive/warehouse/historic_measurement mais les permissions sont 'drwxrwxrwx'.

## Memory management

La ```spark.executor.memory``` définie dans l'objet SparkConf se décompose en trois parties : 
* La *Spark memory* qui se décompose elle même en deux partie : 
    * La mémoire dédiée au stockage de données (storage memory). Elle est dédiée à la mise en cache de données (via ```cache``` ou ```persist```) et au stockage de broadcast variables. Les données stockées dans cet espaces peuvent être sujettes à éviction quand l'*execution memory* a besoin de s'étendre sauf pour une part ```spark.memory.storageFraction``` de la *Spark memory*.
    * La mémoire dédiée au traitements (execution memory). C'est ici que sont stockés les résultats intermédiaire induits par des opérations comme les jointures, agrégations, tris, shuffles, etc. Elle peut aller empiéter sur la *storage memory* (avec possible éviction des données stockées vers le disque) jusqu'à une certaine limite (contrôlée par ```spark.memory.storageFraction```, égal à 0.5 par défaut) au delà de laquelle les résultats intermédiaires sont stockés sur le disque (*spill on disk*) OU AU CONTRAIRE DECLENCHE UN OOM ?. L'inverse n'est pas vrai : la *storage memory* ne peut pas venir empiéter sur l'*execution memory*.
* La *user memory*
* La *reserved memory* d'un montant fixe de 300Mb.

Le partage de la mémoire entre *Spark memory* et *user memory* est contrôlée par le paramètre ```spark.memory.fraction``` égal ) 0.75 par défaut.

Points incertains (changements depuis 1.6+) : 
* pas sûr que la storage memory n'ait pas le droit de faire de l'évition de données d'exécution vers le disque.
* on parle parfois aussi de shuffle memory. Qu'est-ce ? Idem quand elle tape sur sa limite (controlée par ?), OOM ou spill to disk avec impact catastrophique sur la perf si autorisé à spill (controlé par ?) ? Bizarre : shuffle on écrit forcément sur le disque ??. ```spark.shuffle.memoryFraction``` https://stackoverflow.com/questions/43342423/what-happens-on-heap-space-during-spark-shuffle-stage
* Globalement il semblerait qu'il y ait eut pas mal de changements entre 1.6 et 2.x. Pas les mêmes noms d'options voire pas les mêmes comportements autorisés.
* Tungsten travaille off-heap mais ```spark.executor.memory``` semble correspondre à la heap de la JVM de l'exécuteur => ???

Exemple : 
Pour un paramètre ```spark.executor.memory``` fixé à ```16g``` tous les autres étant laissés à leur valeur par défaut, le mémoire va se décomposer en : 
* 300 Mb de *reserved memory*
* (16 - 0.3) * (1 - 0.75) = 3.925 Gb de *user memory*
* (16 - 0.3) * 0.75 = 11.775 Gb de *Spark memory* dont 11.775 * 0.5 = 5.887 Gb au maximum peut être utilisée comme storage memory.

Remarque : Il semblerait que ```spark.executor.memory``` fixe la taille de la *heap* de la JVM de l'executor et qu'il existe un paramètre ```spark.memory.offheap.size``` pour contrôler la taille de la mémoire *off-heap*. Il semblerait que cette fraction soit aussi contrôlée par ```spark.yarn.executor.memoryOverhead``` ?. Les ressource demandées au cluster correspondent au moins à la somme de la *heap* et de la mémoire *off-heap*. => ```'spark.memory.offHeap.enabled', True``` et ```'spark.memory.offHeap.size', '16g'``` en Spark 2.x. 

Remarque : Sur YARN, chaque exécuteur va prendre un peu plus que ```spark.executor.memory```, une partie de la mémoire étant allouée au fonctionnement du conteneur YARN (?).

Docs : 
https://spark.apache.org/docs/1.6.3/tuning.html#memory-management-overview
cf slide 8 : https://www.slideshare.net/gmovnlab/spark-tuningpptx
https://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space
https://databricks.com/session/deep-dive-apache-spark-memory-management

spark.yarn.executor.memoryOverhead

Remarque : ne pas oublier qu'en mode client on a un Application Master qui prend un conteneur et qui a ses propres settings. Ex : on a spark.yarn.am.memory qui est le pendant de spark.driver.memory. Le premier settings n'est évidemment pertinent qu'en client mode YARN. Nécessite de comprendre ce que fait l'Application Master.

https://databricks.com/session/understanding-memory-management-in-spark-for-fun-and-profit
https://blog.csdn.net/oufuji/article/details/50387104

## Trop de partitions : 
* Overwhelming scheduler overhead
* More fetches => more disk seeks (si shuffle car les données sont supposées sur le disque)
* Driver needs to track state per task : on risque d'en demander beaucoup au driver


OOM : GC overhead limit exceeded : >98% du temps passé à GC + <2% de la head recovered. Solutions : 1) Augmenter la taille de la heap de l'exécuteur 2) Changer le mode de GC

## Unités de mesure
Formellement, un byte se définit (indépendamment de l'adressage physique de la mémoire) comme la plus petite unité logiquement adressable par un programme sur un ordinateur. On trouvait autrefois des processeurs utilisant des tailles de byte très variable mais l'industrie s'est aujourd'hui rassemblée autour d'une valeur de 8 bits par byte (eight-bit-byte). Un byte est dès lors devenu synonyme de 8 bits (d'où également la confusion entre byte et octet) mais il subsiste dans des applications spécifiques des processeurs utilisant des mémoires adressables par quantité de 4 bits ou autre. Les microprocesseurs modernes adressent physiquement la mémoire de l'ordinateur par mots de 64 bits voire plus.

Remarque : l'égalité 1 byte = 8 bits est même entérinée par certains organismes de normalisation.

Dans la plupart des usages et comme particulièrement dans l'expression de la taille des disques durs, les préfixes kilo, mega, giga, tera, etc. sont utilisés dans leur sens habituel. Ainsi 1GB=1000MB=1000000kB avec 1B=8bits. Cependant, sur Windows comme dans la quantification des capacités de mémoire RAM, cache, etc. (les adresses mémoires étant des adresses binaires), les termes kilobyte, megabyte, gigabyte, etc. s'entendent différemment. Dans cette dernière situation, le terme kilobyte s'etend comme 1024  bytes (2^10 bytes), le préfixe kilo ayant été gardé car 1024 reste proche de 1000. Cette définition est étendue aux autres préfixes, ainsi 1MB = 1024kB = (2^10)^2 bytes = 2^20 bytes, 1GB = 1024MB = (2^10)^3 bytes = 2^30 bytes, etc. Les deux définitions du kilobyte et de ses dérivés sont difficiles à distinguer. On voit parfois l'usage de KB (au lieu de kB) pour désigner le kilobyte à 1024 byte. Une nouvelle norme à émergé et on parle de kibibytes (KiB), mebibytes (MiB), gibibytes (GiB), tebibytes (TiB), etc. pour désigner et distinguer ces unités fondées sur une définition du kilo à 1024 bytes. Toutefois cette notation ne semble pas être utilisée partout où elle le devrait laissant persister la confusion avec les unités définies en base 10.

L'expression des capacités mémoire pour Spark se fait avec un kilobyte à 1024 byte. Ainsi un paramètre ```'spark.driver.memory'``` définit à ```'1g'``` correspond à 1*(1024)^3 = 1 073 741 824 bytes = 1.07 GB.

Conversions utiles: 
* 1KiB = 1 024 bytes
* 1MiB = 1 024KiB = 1 048 576 bytes = 1 048KB = 1.05MB
* 1GiB = 1 024MiB = 1 048 576KiB = 1 073 741 824 bytes = 1 074MB = 1.07GB

### Remarques sur la config : 
Quand on souhaite modifier des paramètres de configuration d'outils différents de Spark, il faut préfixer le nom du paramètre de façon à ce que Spark sache qu'il ne faut pas qu'il ne passe à son objet SparkConf mais à un autre outil de configuration. Par exemple pour fixer des paramètres Hadoop comme 'fs.s3a.access.key', 'fs.s3a.secret.key', 'parquet.block.size' on les passe à l'objet de config avec le préfixe 'spark.hadoop'. Exemple : config('spark.hadoop.parquet.block.size', 128*1024*1024). De tels paramètres Hadoop sont souvent modifiés via l'objet hadoopConfiguration du SparkContext (sc.hadoopConfiguration.setInt( "parquet.block.size", 128*1024*1024)) mais c'est lourd, peu pratique en Spark 2.x où ça va à l'encontre de la SparkSession comme unique point d'entrée et l'objet ne semble en plus pas disponible hadoopConfiguration avec l'API Python. C'est la même chose avec d'autres outils, comme YARN (préfixe 'spark.yarn').

**Histoires que le driver a déjà démarrer et qu'il faut passer les paramètres au spark-submit c'est vrai ??**

https://stackoverflow.com/questions/43330902/spark-off-heap-memory-config-and-tungsten
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

## Chargement de l'ensemble de l'historique pour la flotte entière
On passe à ```parquet()``` le *base path*, Spark va alors automatiquement trouver tous les fichiers .parquet situés dans les partitions en aval (feature appelée *partition discovery*) et ajouter au DataFrame les *partitioning columns* correspondante. Passer le *base path* revient à charger l'ensemble des données mais on aurait pu passer n'importe quel autre chemin pour ne charger que certaines partitions. 

Remarque : Le partitionnement se fait dans cet ordre : aircraft > registration > departure_year > departure_month > departure_day > flight_leg_count

Noter la différence : 

```python
path = '/apps/hive/warehouse/at968490.db/ata28_pivot2/aircraft=a380/registration=F-HPJJ/departure_year=2018/departure_month=1'

hc.read\
    .option("basePath", '/apps/hive/warehouse/at968490.db/ata28_pivot2/')\
    .parquet(path)\
    .show()
    
# On obtient une table contenant les données de tous les vols de janvier 2018 et comportant l'ensemble des partitioning columns
```
vs
```python
path = '/apps/hive/warehouse/at968490.db/ata28_pivot2/aircraft=a380/registration=F-HPJJ/departure_year=2018/departure_month=1'

hc.read\
    .option("basePath", '/apps/hive/warehouse/at968490.db/ata28_pivot2/aircraft=a380/registration=F-HPJJ/')\
    .parquet(path)\
    .show()
    
# On obtient une table contenant les données de tous les vols de janvier 2018 mais avec toutes les partitioning columns sauf aircraft et registration car le base path passé en option fait que spark ne voit pas ces répertoires comme des partitions de la table.
```

Attention : ce n'est pas parce que les excécuteurs se sont vue attribuer le même nombre de partitions qu'il n'y a pas de skew. Toutes les partitions ne sont pas forcément de même taille, les exécuteurs peuvent donc avoir un nombre différent d'enregistrements à traiter malgré un nombre de partitions égal.

Spark + Hive : https://andr83.io/1123/

Remarque : on est obligé de passer par un groupage quand on veut utiliser .pivot (contrairement à gather qui peut être effectué partition par partition) : il faut rassembler toutes les lignes d'une même clé de groupage pour pouvoir pivoter, or ces lignes peuvent être dispersées sur plusieurs partitions.

De plus, quand on repartitionne à la main, celà peut donner lieu à des partitions vides. Si on force la repartition suivant les valeurs prises par une colonne mais que celle-ci ne possède que deux valeurs uniques, on va avoir 2 grosses partitions et beaucoup (198?) de partitions vides. De plus, en cas de très grosse partition (qui sera donc localisée sur une seule machine), il est possible que la taille de la partition excède la mémoire allouée à l'exécuteur => OOM error. https://blog.scottlogic.com/2018/03/22/apache-spark-performance.html

Rappel : pour Spark, un shuffle peut impliquer plusieurs des coûteuses opérations suivantes: 
* Un partionnemnet des données qui suppose un coûteux travail de tri
* Des opérations de SerDe suivant comment elles sont sotckées en mémoire afin de pouvoir échanger des données à travers le réseau
* De la compression des données afin de réduire la consommation de bande passante IO dans l'utilisation du réseau
* Des opérations d'écriture sur disque (probablement plusieurs fois pour un même bloc)
Un shuffle est couteux, il implique la sérialisation et l'écriture sur le disque des données et leur transmission sur le réseau, chacune de ces étapes étant réputée lente. De plus, chaque exécuteur dépend alors des autres : s'il y en a un qui n'a pas pas fini de calculer/sérialiser/écrire ses données, les autres attendent.

Pour une agregation, le Physical Plan la fait apparaître deux étapes d'aggrégation de part et d'autre du shuffle. La première étape est une agrégation partielle. On agrège au sein de chaque bloc. Les résultats partiels sont ensuite échangés et réagrégés pour obtenir le résultat final. Cela permet de minimiser la masse d'information échanger sur le réseau.

"Shuffling means the reallocation of data between multiple Spark stages. "Shuffle Write" is the sum of all written serialized data on all executors before transmitting (normally at the end of a stage) and "Shuffle Read" means the sum of read serialized data on all executors at the beginning of a stage."

Mappers	(sources) serializes RDD records and write them	to local disk (shuffle write)	
Reducers (destinations) reads from remote disk over network	for their partition of records (shuffle read)	

On minimise les shuffles => pas de orderBy juste pour faire joli

Remarque :
* https://stackoverflow.com/questions/38981772/spark-shuffle-operation-leading-to-long-gc-pause/39111205
* https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
* https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/tuning-java-garbage-collection.html
* https://stackoverflow.com/questions/45553845/gc-overhead-limit-exceeded-while-reading-data-from-mysql-on-spark
* https://spark.apache.org/docs/latest/configuration.html#memory-management
* http://spark.apache.org/docs/latest/running-on-yarn.html
* https://issues.apache.org/jira/browse/SPARK-15796
* https://forums.databricks.com/questions/2202/javalangoutofmemoryerror-gc-overhead-limit-exceede.html
* https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
* https://dlab.epfl.ch/2017-09-30-what-i-learned-from-processing-big-data-with-spark/
* https://stackoverflow.com/questions/41766150/change-default-stack-size-for-spark-driver-running-from-jupyter
* https://becominghuman.ai/setting-up-a-scalable-data-exploration-environment-with-spark-and-jupyter-lab-22dbe7046269
* https://becominghuman.ai/cheat-sheets-for-ai-neural-networks-machine-learning-deep-learning-big-data-678c51b4b463
* https://medium.com/teads-engineering/spark-performance-tuning-from-the-trenches-7cbde521cf60

Remarque : il existe deux types de shuffle en Spark : https://0x0fff.com/spark-architecture-shuffle/

Remarque : http://liuchengxu.org/books/src/Spark/Apache-Spark-2.x-Cookbook.pdf

Remarque : https://spoddutur.github.io/spark-notes/second_generation_tungsten_engine.html
Une stage peut aussi être vue comme une succession d'operateurs optimisables en même temps (WholeStageCodeGen), une chaîne d'opérateurs optimisables en même temps. Il existe des règles sur comment regrouper les opérateurs d'un pipe complexe (un shuffle est par exemple un délimiteur ?), le cas favorable où pourrait être regroupé et optimisé au sein d'un seul appel de fonction est plus l'exception que la règle. La code generation consiste à générer (métaprogrammation) un code optimisé pour l'exécution du groupe d'opérateur. Le but est que le code obtenu doit plus rapide que pour chaque opérateur de la chaîne, le chargement de l'output de l'opérateur précédent, l'appel et l'exécution de l'opérateur courant et l'écriture de ses résultats en mémoire dans un format générique pouvant être passé à n'importe quel opérateur. Ces techniques sont variées : réordonner les opérations (Ex :filter ou project/select pushdown, apparemment c'est cette étape qui génère un optimised query plan ? Ou alors on a une optimisation globale du query plan, un découpage en stage et une code gen par stage ?), loop unfolding etc.
Article de même structure et particulièrement clair : https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html

Remarque : On peut sans doute encore gagner en vitesse en changeant le codec de compression de parquet.
Remarque : Gagner en vitesse => Passer à Spark 2.x => Tungsten 2nd Gen. : vectorized scan. (vectorization : SPARK-12992) parquet scans 10x plus rapide (car parquer columnar format, surement pas valable pour tous les formats notamment csv qui est un row format). WholeStageCodeGen (SPARK-12795), etc. Catalyst improvements. Tech talk : apports de Spark 2 : https://pages.databricks.com/Apache-Spark-2.0.html
Remarque : Lacher Hive : on en a pas besoin. La Spark data source parquet comprend (et est capable d'écrire) des tables partionnées.

Lecture de DAG : jargon des RDBMS : project pour select, etc => https://stackoverflow.com/questions/37505638/understanding-spark-physical-plan (il y a des termes spécifiques à Tungsten => ConvertToSafe (memory management), WholeStageCodeGen (query execution). Pourquoi les Agregate-Exchange-Aggregate : se souvenir du framework Mapper-Combiner-Shuffle-Reducer. PartialAgregate ~ combiner. MapPartitionRDD ~ Mapper : on applique une fonction au RDD :https://community.hortonworks.com/questions/36266/spark-physical-plan-doubts-tungstenaggregate-tungs.html + https://stackoverflow.com/questions/37505638/understanding-spark-physical-plan HashAgregate + autres formes ? Cf. https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html partie Execution DAG 

Remarque : bucketBy ne semble supporté par PySpark qu'à partir de la 2.3 : https://stackoverflow.com/questions/49398687/pyspark-does-not-allow-me-to-create-bucket
Comment on repartitionne intelligement ? Manuellement ? Comment on s'assure de la distribution des blocs entre les executeurs? Attention et rappel tous les blocs ne sont pas de même taille. Est-ce que Spark équilibre le nombre d'enregistrements par exécuteur mais comme toutes les partitions de ne sont pas (car n'ont pas à être) de la même taille, tous les exécuteurs n'ont pas le même nombre de partitions.

Remarque : parquet filter pushdown 
* Concerne aussi bien les lignes que les colonnes
* Attention à la fragmentation du pipe : 
```python
df = hc.read.parquet(...)\
    .withCol()\
    .withCol()\
    .withCol()\
    .cache()

df.count()

df = df.select()\
    .filter()\
    .toPandas()

```
On ne va pas profiter du pushdown => sauf à tout évaluer dans la même cellule (Spark devrait alors construire un unique plan),
mettre le plus possible les filter et select avant le persist()

Second generation Tungsten (Spark 2.x - First gen. 1.4) : https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

"
When programming against Spark SQL we have two entry points depending on whether we need Hive support. The recommended entry point is the HiveContext to provide access to HiveQL and other Hive-dependent functionality. The more basic SQLContext provides a subset of the Spark SQL support that does not depend on Hive.

-The separation exists for users who might have conflicts with including all of the Hive dependencies.

-Additional features of HiveContext which are not found in in SQLContext include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables.

-Using a HiveContext does not require an existing Hive setup."
https://stackoverflow.com/questions/33666545/what-is-the-difference-between-apache-spark-sqlcontext-vs-hivecontext
https://stackoverflow.com/questions/39898067/how-do-i-enable-partition-pruning-in-spark
https://stackoverflow.com/questions/36171349/using-windowing-functions-in-spark

Partition pruning
Overpartitioning can actually reduce performance:
If a column has only a few rows matching each value, the number of directories to process can become a limiting factor, and the data file in each directory could be too small to take advantage of the Hadoop mechanism for transmitting data in multi-megabyte blocks.

On doit le voir dans le physical plan : on doit avoir une rubrique (comme pour les filters push down) Partition Filters

Spark FileStatusCache (250 Mb par défaut) 
https://fr.slideshare.net/databricks/lessons-from-the-field-applying-best-practices-to-your-apache-spark-applications-with-silvio-fiorito

Le FileScan parquet du physical plan doit faire apparaître un PartitionCount et un PartititionFilter s'il y a pruning. Le premier donne le nombre de partitions qui seront finalement chargées, le second les filtres sur les colonnes de partitionnement qui ont servi au pruning.
Remarque : changer le basepath peut faire gagner du temps quand on a beaucoup de partition : il réduit le nombre de fichiers à lister (par contre on peut perdre des colonnes de partionnement).
Apparemment dans Spark2, si on utilise une table le listing se fait avec le Hive Metastore, listing et pruning peuvent aller très vite (dans 1.x aussi ?).
Cf. Spark Managed et unmanaged tables. Spark 2.x ?
https://databricks.com/session/why-you-should-care-about-data-layout-in-the-filesystem

## Sur les pivots 

Attention ne semble plus d'actualité en 2.x 

Attention : l'étape de pivot semble avoir besoin pour s'optimiser de connaitre l'ensemble des valeurs prisent par les variables de groupage (on peut d'ailleurs les passer à Spark via l'argument ```values``` de la fonction ```pivot``` mais elles sont trop nombreuses ici). ```pivot``` reste une transformation mais la collecte par Spark des valeurs des variables de groupage au moment du pivot peut entrainer de très lourds calculs (presque autant que si on avait lancé une action) et notamment le chargement de l'ensemble des données. Pour éviter de charger les données 2 fois : une fois pour que Spark comprenne comment il va pivoter et une seconde fois pour finalement opérer dessus, ne pivoter qu'une fois les données persistées en mémoire. Si on a pris ses précautions, le pivot peut ne pas être si coûteux que ça d'autant qu'il ne demande pas de *shuffle*.

Dans notre cas, le chargement et la mise en cache des données non pivotées a pris ~15min alors que le pivot n'a mis que ~20s à s'exécuter. Pour connaître les valeurs des variables de groupage Spark n'a qu'à parcourir les données déjà prétraitées et disponibles en mémoire : on a minimisé la durée de cette étape pouvant être très couteuse.

Sans la séparation opérée ci-dessus, le chargement, pivotage et mise en cache des données aurait pris de l'ordre du double du temps soit environ ~30min.

```python
base_path = '/apps/hive/warehouse/historic_measurement'
path = '/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJB/departure_year=2018'

# partition_filters = ['aircraft = "a380"', 'registration = "F-HPJB"', '(departure_year = 2018 or departure_year = 2017)']
partition_filters = ['aircraft = "a380"', 'registration = "F-HPJB"', 'departure_year = 2018',
                    '(departure_month = 8 or departure_month = 9 or departure_month = 10)']
partition_conditions = ' and '.join(partition_filters)

sensors = ['_CURM_5QA1_1.2', '_CURM_5QA1_2.2', '_CURM_5QA1_3.2', '_CURM_804QA_1.2', '_CURM_804QA_2.2', '_CURM_804QA_3.2',
           '_CURM_5QA4_1.1', '_CURM_5QA4_2.1', '_CURM_5QA4_3.1', 'FQCC.1:1', 'FQCC.4:1', 'FQFT.1:1', 'FQFT.4:1']

id_cols = ['aircraft', 'registration','departure_year','departure_month','departure_day','flight_leg_count', 'time']

df_unpivoted = hc.read\
    .option("basePath", base_path)\
    .parquet(path)\
    .coalesce(nb_partitions)\
    .filter(partition_conditions)\
    .filter(col('sensor').isin(sensors))\
    .select(id_cols + ['sensor', 'value'])\
    .cache()
    
df_unpivoted.count()

df_pivoted = df_unpivoted.groupBy(id_cols)\
    .pivot('sensor')\
    .agg(max('value'))\
    .cache()

df_pivoted.count()
df_unpivoted.unpersist()
```

## Parquet
Un fichier parquet se décompose en une hiérarchie de trois types d'éléments : 
* Row group : il s'agit d'une partition logique horizontale des données. Un Row group se compose d'un Column chunk par colonne du dataset.
* Column chunk : un Column chunk correspond à une partie des données pour une des colonnes du dataset. Ces données sont écrites de façon contigüe. Un Column chunk consiste en une ou plusieurs Pages. 
* Page : unité élémentaire consituant un Column chunk, une page constitue une unité indivisible en termes de compression et d'encodage : les données d'une même page sont encodées et/ou compressées ensemble. Il faut comprendre ce terme d'unité élémentaire comme la plus petite unité qui doit obligatoirement être scannée pour y trouver ne serait-ce qu'une valeur.
Un fichier .parquet est ainsi constitué d'un ou de plusieurs Row groups chacun comportant un Column chunk par colonne du dataset, un Column chunk contenant à son tour une ou plusieurs Pages.

Un .parquet vient avec ses métadonnées qui permettent sa lecture et l'optimisation de celle-ci. On distingue là encore trois types de métadonnées réparties à l'intérieur du fichier :
* Les métadonnées du fichier écrites en un unique endroit, dans le footer du fichier. 
* Les métadonnées de chaque Column chunk qui sont comprises dans les métadonnées du fichier dans le footer. Elle contiennent en particulier l'adresse du départ de chaque Column chunk.
* Les métadonnées de chaque Page qui sont écrites en tête de chaque page dans un Page header.

Examinons le contenu de chacune : 
* Les métadonnées du fichier contiennent principalement la version de Parquet avec laquelle il a été écrit, le nombre total de ligne et le schéma. Les métadonnées du fichiers rassemblent également la liste de tous les Row groups constituant le fichier avec pour chacun le nombre d'enregistrement, leur taille en bytes et la liste de leurs Column chunks avec leurs métadonnées.
* Les métadonnées d'un Column chunk donnent l'adresse du chunk, son nombre d'enregistrement, sa taille compressé et non compressé mais surtout le type des données, le codec de compression utilisé et le type d'encodage utilisé pour les valeurs de ce Column chunk le cas échéant.
* Les métadonnées d'une Page sont similaires à celles d'un Column chunk. On retrouve notamment le type des données de la Page, sa taille compressée et non compressée, le codec de compression utilisé et l'encodage utilisé pour ce type de données. Suivant le type de Page (DataPage, IndexPage ou DictionaryPage (?) qui dépend du type des données ?) peuvent s'ajouter d'autres métadonnées. Dans le cas d'une DataPage on trouve dans les métadonnées le type d'encodage utilisé ainsi que des valeurs utiles au décodage (?) écritent immédiatemment après le header et juste avant les valeurs encodées (definition levels et/ou (?) les repetition levels (?)).

On remarque que Parquet permet une gestion de l'encodage et potentiellement de la compression au niveau de chaque colonne et non au niveau du fichier : pour chaque (type) de colonne on choisit l'encodage et la compression la plus efficace.

Un lecteur de fichiers Parquet est supposé commencé par en lire les métadonnées afin de n'aller lire que les colonnes qui l'intéresse. Un second filtrage peut ensuite se faire grace aux métadonnées des Pages que le programme peut choisir de sauter en fonction de la description des données que la Page contient (à moins qu'il puisse déjà le faire à l'aide de métadonnées stockées dans le extraKeyValuePairs des Column chunks).

Parquet précise que l'unité de parallélisation pour un job MapReduce est le Row group, la taille duquel on essaye d'ailleurs de faire coïncider avec le celle du bloc HDFS. Ainsi un fichier Parquet est en fait (à confirmer) attaquable par autant de cores que de de Row groups et non par autant de cores que de blocs HDFS (mais leur nombre coïncide le plus souvent). "Row group = smallest task of input split size)

Faire un rappel sur le caractère splittable des codecs de compression.
Rappel : un des avantages de Parquet : à la compression s'ajoute l'encoding pour améliorer la performance de stockage. Autre avantage : columnar : on ne lit que les colonnes qui nous intéressent et la lecture de celle-ci est rapide et efficace.

Intéressant pour la représentation sur disque : https://fr.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide

Il faut également comprendre un Parquet compressé non comme un gros fichier compressé mais comme la concaténation de multiples entités élémentaires compressées (les Pages). L'avantage de la présence de métadonnées au niveau de chaque page permet de sauter une Page si elle ne contient pas les valeurs recherchées et donc d'en économiser la décompression et le décodage.


Paramètres afférant : 
Quand on parle de taille c'est avant ou après compression ?
spark.sql.parquet.enableVectorizedReader (true par défaut)
spark.hadoop.parquet.block.size (default 128 MiB -> 128*1024*1024) : Remarque : "The block size is the size of a row group being buffered in memory" : des Row groups de taille importante améliorent les performance en lecture mais impliquent une consommation de mémoire plus élevée à l'écriture.
parquet.page.size (default 1 MiB -> 1*1024*1024)
parquet.enable.dictionary (default true) : autorise l'encodage à l'aide de dictionnaires
parquet.dictionary.page.size : "There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary"
parquet.enable.summary-metadata : "To enable/disable summary metadata aggregation at the end of a MR job" default True (False depuis Spark 2.2). Se tradauit par l'écriture d'un summary file.
parquet.compression
parquet.filter.statistics.enabled => pas de préfixe ? spark.hadoop semble-t-il
parquet.filter.dictionary.enabled
Pour ces deux là, ils doivent sûrement être true par défault. Sont sûrement les paramètres qui permettent de filter sur les métadonnées.

Sur ces filtres : https://fr.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide
Stat filters : l'exemple donné dans la présentation est intéressant : le filtrage montré semble ce faire au niveau du Row group mais dans l'exemple (champ "name" = some string) le filtre n'apporte rien car tous les row groups matchent le prédicat : chaque row group comporte des strings commençant par toutes les lettres de A à Z et chacun d'entre eux va alors être exploré. Le fichier est réécrit mais trié par "name" avant écriture : conséquence, les Row Groups semblant être écrits séquentiellement à l'aide des données triées, chaque Row group contient une partie de l'alphabet, le filtre sur "name" va alors pouvoir jouer à plein. (On parle de stats pour une string car ici on a des min max, ici A et Z dans le premier cas).
Dict filter : un dictionnary est une liste de toutes les valeurs uniques du Column chunk du Row group, cette liste de valeur est écrite en tête de Chunk dans des DictionaryPages. Si on cherche une valeur particulière et que celle-ci est absente du dictionnaire, on saute ce Row group. Le stat filter semble venir avant et peut permettre de s'économiser la lecture inutile de dictionnaires. Ce filtre ne fonctionne que si la colonne a été dictionary-encoded. Il semble qu'à la création du fichier, Parquet peut renoncer à encoder une colonne avec un dictionnaire si la DictionaryPage resultant est trop large (contrôlé par parquet.dictionary.page.size), un a alors un "encoding fallback" vers un autre encoding et la colonne ne sera pas encodée avec un dictionnaire et ne pourra bénéficier du dictionary encoding. On peut palier à ça soit en augmentant un peu la parquet.dictionary.page.size (à 2 ou 3 MiB par exemple) et/ou en diminuant la parquet.block.size (à 64 ou 32 Mib par exemple) car on aura alors moins de valeurs par Row group et donc potentiellement moins de valeurs uniques qui viendraient grossir le dictionnaire.
http://blog.cloudera.com/blog/2017/12/faster-performance-for-selective-queries/

Remarque : Comme toujours pour Spark : le support du predicate push down dépend de la version de Spark.

Paramètres Spark https://fr.slideshare.net/HadoopSummit/producing-spark-on-yarn-for-etl slide 27

Apparemment le page level filtering n'est pas encore implémenté et semble être un projet à plus long terme. Parquet s'enrichit aussi de nouvelles techniques d'encodage et par le support d'autres codecs.


For single-row lookups, it is more efficient to have smaller pages, so there are fewer values to read through before reaching the target value. However, smaller pages incur a higher storage and processing overhead, due to the extra
metadata (offsets, dictionaries) resulting from more pages.

Larger blocks are more efficient to scan through since they contain more rows which improves sequential I/O (as there’s less overhead in setting up each column chunk). However, each block is buffered in memory for both reading and writing, which limits
how large blocks can be. The default block size is 128 MB. The Parquet file block size should be no larger than the HDFS block size for the file so that each Parquet block can be read from a single HDFS block (and therefore from a
single datanode).

Row groups trop petits : "reduces the efficacy of Parquet’s columnar storage format"

Sur l'abondance de WARN à la lecture d'anciens .parquet
https://support.datastax.com/hc/en-us/articles/360001187783-Spark-Executor-reports-CorruptStatistics-warnings-when-reading-older-Parquet-files 

When an executor reads the files written using Spark from an earlier version, the newer version of Parquet fails to parse the file's metadata, in particular the file statistics (PARQUET-251), and generates warning messages. Under normal circumstances, failure to parse the metadata does not affect the executor's ability to read the underlying Parquet file but an update to the way Parquet metadata is handled in Apache Spark 2.1.0 (SPARK-16980) has inadvertently changed the way Parquet logging is redirected and the warnings make their way to the Spark executor's stderr.

The warning messages generate unnecessary noise and the solution was to override the logging level for Parquet to ERROR (SPARK-17993).

Remarque : https://issues.apache.org/jira/browse/SPARK-23852

Retenir aussi que le filter pushdown marche globalement pour les filtres simples et le type de filtre que Spark est capable de pushdown dépend aussi de sa version.

## Why avoiding too many partitions ?
In addition, the number of partitions is also critical for your applications. As mentioned before, the more the partitions, the less data each partition will have. But what’s the trade-off here? Except from the fact your partitions might become too tiny (if they are too many for your current dataset), a large number of partitions means a large number of output files (yes, the number of partitions is equal to the number of part-xxxxx files you will get in the output directory), and usually if the the partitions are too many, the output files are small, which is OK, but the problem appears with the metadata HDFS has to housekeep, which puts pressure in HDFS and decreases its performance. So, finding a sweet spot for the number of partitions is important, usually something relevant with the number of executors and number of cores, like their product*3 

Balancing the data across partitions, is always a good thing to do, for performance issues, and for avoiding spikes in the memory trace, which once it overpasses the memoryOverhead, it will result in your container be killed by YARN. => Au delà de la sous utilisation des CPUs en cas de skew, on évite aussi les pics de consommation de mémoire. Il suffit d'une grosse partition qui nous envoie au dessus de la limite pour que le conteneur se fasse killer par YARN.

increasing the number of dataframe partitions (in this case from 1024 to 2048), reduces the needed memory per partition. => c'est là que toucher aux defaults de spark.sql.shuffle.partitions et spark.default.parallelism peut être intéressant sur de gros volumes

```python
Ntot = spark.read\
    .option('basePath', base_path)\
    .parquet(path)\
    .count()

n=10
wdw = Window.partitionBy('registration').orderBy('departure_year')

df = spark.read\
    .option('basePath', base_path)\
    .parquet(path)\
    .withColumn('repartCol', sqlf.ceil(sqlf.row_number().over(wdw)*sqlf.lit(n)/sqlf.lit(Ntot)))\
    .repartition('repartCol')\
    .drop('repartCol')\
    .cache()

df.count()
```

## Pyspark specifics 
Decreasing the value of ‘spark.executor.memory’ will help, if you are using Python, since Python will be all off-heap memory and would not use the ram we reserved for heap. So, by decreasing this value, you reserve less space for the heap, thus you get more space for the off-heap operations (we want that, since Python will operate there). ‘spark.executor.memory’ is for JVM heap only. You may not need that much, but you may need more off-heap, since there is the Python piece running. => Augmenter spark.yarn.memoryOverhead ? La PVM va quand même se démarrer dans l'exécuteur ?

The reason adjusting the heap helped is because you are running pyspark. That starts both a python process and a java process. The java process is what uses heap memory, the python process uses off heap. 

https://stackoverflow.com/questions/39878846/how-does-spark-running-on-yarn-account-for-python-memory-usage
Paramètre spark.python.worker.memory ?? A l'air spécifique aux taches d'aggrégation 

Voir ça, le process python se nomme le pyspark worker.
https://fr.slideshare.net/SparkSummit/debugging-pyspark-spark-summit-east-talk-by-holden-karau

### Cache vs checkpointing
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://0x0fff.com/spark-memory-management/
https://blog.box.com/apache-spark-caching-and-checkpointing-under-hood
https://github.com/wctopluto/spark-learning/blob/master/src/main/scala/com/spark/two/updates/CatalogExample.scala#L38

### Partitionnement après lecture : spark.sql.files.maxPartitionBytes

Utiliser spark.sql.files.maxPartitionBytes (et non spark.files.maxPartitionBytes). Il regroupe les fichiers si leur taille est inférieure à cette valeur. Que fait-il si un fichier est plus gros que spark.files.maxPartitionBytes ? Attention, il utilise la taille du fichier dans le file system => s'il est compressé, les partitions obtenues sont 1/compr_ratio plus grosses! Ajuster sa valeur en fonction des fichiers avec lesquels on travaille ? Ca permet d'éviter des repartitionnements idiots. ATTENTION : sur les tables partitionnées donnent l'apparence d'un ratio de compression d'autant plus élevé que s'ajoute une fois la table montée en mémoire les colonnes correspondant aux variables de partitionnement.

De manière plus générale : comment anticiper la taille de ses partitions à partir du nombre de fichiers, de leur taille, de leur nombre de blocs, de row groups, de leur partitionnement (une fois en mémoire les colonnes de partitionnement non incluses dans la taille du fichier se matérialisent) et de leur ratio de compression.

In [4]:
spark.stop()

In [5]:
from pyspark.sql import SparkSession

nb_executors = 1
nb_cores_per_exec = 2
heap_size = 10
off_heap_size = 1
spark_mem_fraction = 0.7
spark_storage_fraction = 0.9
parquet_heap_ratio = 0.3

spark = SparkSession.builder\
    .master('yarn-client')\
    .appName('small_pl_notebook')\
    .config('spark.yarn.queue', 'profiler')\
    .config('spark.local.dir', '/app/PROFILER/tmp')\
    .config('spark.yarn.executor.memoryOverhead', str(off_heap_size)+'g')\
    .config('spark.executor.cores', nb_cores_per_exec)\
    .config('spark.executor.memory', str(heap_size)+'g')\
    .config('spark.memory.fraction', spark_mem_fraction)\
    .config('spark.memory.storageFraction', spark_storage_fraction)\
    .config('spark.executor.instances', nb_executors)\
    .config('spark.driver.extraJavaOptions', '-Dlog4j.configuration=/app/PROFILER/travail/pilienhart/notes_perso/log4j.properties')\
    .config('spark.executor.extraJavaOptions', '-Dlog4j.configuration=/app/PROFILER/travail/pilienhart/notes_perso/log4j.properties')\
    .config('spark.speculation', 'false')\
    .config('spark.sql.warehouse.dir', 'hdfs:///user/at053351/spark_warehouse_pl')\
    .config('spark.hadoop.parquet.memory.pool.ratio', parquet_heap_ratio)\
    .config('spark.hadoop.parquet.enable.summary-metadata', 'False')\
    .config('spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs', 'False')\
    .config('spark.hadoop.parquet.block.size', 32*1024*1024)\
    .config('spark.sql.files.maxPartitionBytes', 12*1024*1024)\
    .config('spark.sql.files.openCostInBytes', 16*1024*1024)\
    .config('spark.task.maxFailures', 1)\
    .enableHiveSupport()\
    .getOrCreate()
    
#     .enableHiveSupport()\
# spark.sql.warehouse.dir
#     .config('hive.metastore.uris', 'thrift://tlsdbpfitn01.france.airfrance.fr:9083')\
#     .config('spark.sql.sources.maxConcurrentWrites', '1')\
#     .config('spark.hadoop.parquet.memory.pool.ratio', '0.9')\
#     .config('spark.hadoop.parquet.enable.summary-metadata', 'False')\ => supprime le summary file _METADATA
# cf https://stackoverflow.com/a/44237411/9216761 et le https://issues.apache.org/jira/browse/SPARK-15719 associé.
# de toute façon c'est False par défaut en 2.x donc on a pas à se sentir coupable.
#     .config('spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs', 'False')\ => supprime le _SUCCESS file

# Input 11.5MB

## 4Gb de heap | 2 cores
## 'spark.memory.fraction' || 'spark.hadoop.parquet.memory.pool.ratio'
## 0.5 || 0.3 => OK => 0.925 GiB Spark memory/core
## 0.6 || 0.3 => OK => 1.11 GiB Spark memory/core
## 0.7 || 0.3 => OK => 1.29 GiB Spark memory/core
## 0.8 || 0.3 => OK => 1.48 GiB Spark memory/core
## 0.9 || 0.3 => OK => 1.66 GiB Spark memory/core
## 0.9 || 0.4 => OK => 1.66 GiB Spark memory/core
## 0.9 || 0.5 => OK => 1.66 GiB Spark memory/core
## 0.9 || 0.6 => OK => 1.66 GiB Spark memory/core
## 0.9 || 0.9 => OK => 1.66 GiB Spark memory/core

In [26]:
# 15 fichiers 2 cores => (totsize+15*cost)/2 = 214
# => split = maxPartByte = 12
# (368+15*cost)/12 = 35.6 => 36 partitions 
# CSV splittable : devrait être assez even : 30 partitions pleines de 12MB, une partielle à 8 MB et une vide ?

df = spark.read\
    .option('header', 'True')\
    .csv('/user/at053351/csv_test')\
    .cache()

df.count()

# cost 4 > 16
# 15 fichiers 2 cores => (totsize+15*cost)/2 = 3064
# => split = maxPartByte = 12
# (368+15*cost)/12 = 255.3 => 255 partitions 

15000000

In [44]:
import pyspark.sql.functions as sqlf
partition_cols = ['aircraft', 'registration', 'departure_year', 'departure_month', 'departure_day']

spark.read\
    .option('basePath', '/apps/hive/warehouse/historic_measurement')\
    .parquet('/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJI/departure_year=2016/departure_month=12/departure_day=12')\
    .withColumn('registration', sqlf.lower(sqlf.col('registration')))\
    .repartition(*partition_cols)\
    .write\
    .partitionBy(*partition_cols)\
    .option('compression', 'snappy')\
    .mode('overwrite')\
    .parquet('/user/at053351/test_sc_dir')

In [34]:
spark.sql('MSCK REPAIR TABLE test_internal_table_pl')

DataFrame[]

In [39]:
spark.sql('SHOW PARTITIONS test_internal_table_pl').show(20, False)

+-----------------------------------------------------------------------------------------+
|partition                                                                                |
+-----------------------------------------------------------------------------------------+
|aircraft=a380/registration=f-hpji/departure_year=2016/departure_month=12/departure_day=10|
|aircraft=a380/registration=f-hpji/departure_year=2016/departure_month=12/departure_day=11|
|aircraft=a380/registration=f-hpji/departure_year=2016/departure_month=12/departure_day=14|
+-----------------------------------------------------------------------------------------+



In [40]:
spark.catalog.recoverPartitions('test_internal_table_pl')

In [41]:
spark.sql('SHOW PARTITIONS test_internal_table_pl').show(20, False)

+-----------------------------------------------------------------------------------------+
|partition                                                                                |
+-----------------------------------------------------------------------------------------+
|aircraft=a380/registration=f-hpji/departure_year=2016/departure_month=12/departure_day=10|
|aircraft=a380/registration=f-hpji/departure_year=2016/departure_month=12/departure_day=11|
|aircraft=a380/registration=f-hpji/departure_year=2016/departure_month=12/departure_day=14|
|aircraft=a380/registration=f-hpji/departure_year=2016/departure_month=12/departure_day=9 |
+-----------------------------------------------------------------------------------------+



In [35]:
# parquet.block.size : Pour l'écriture. Taille maximale d'un row group sur le disque après compression / encodage.
# Trouver pour NOS DONNEES et le codec choisi quelle taille de row group entre 128, 64, 32, 16, 8, etc. donne des partitions
# de la bonne taille.

dfp = spark.read\
    .parquet('/user/at053351/parquet_test')\
    .cache()

dfp.count()

15000000

In [6]:
import pyspark.sql.functions as sqlf

spark.read\
    .option('basePath', '/user/at053351/spark_warehouse_pl/test_ext_table')\
    .parquet('/user/at053351/spark_warehouse_pl/test_ext_table')\
    .filter(sqlf.col('departure_day')==10)\
    .write\
    .partitionBy('aircraft', 'registration', 'departure_year', 'departure_month', 'departure_day')\
    .mode('overwrite')\
    .parquet('/user/at053351/spark_warehouse_pl/test_ext_table')

AnalysisException: u'Path does not exist: hdfs://tlsdbpfitn01.france.airfrance.fr:8020/user/at053351/spark_warehouse_pl/test_ext_table;'

Problème similaire : SPARK-12546
11.5MB parquet gzippé en input
314MB persistés en mémoire, colonnes de partitionnement comprises
310MB sans les colonnes de partitionnement (??)

In [3]:
import itertools
import pandas as pd
import numpy as np
from py4j.protocol import Py4JError
from pyspark.sql import SparkSession
import time

# heap_range = range(1,9)
# spark_mem_fraction_range = np.arange(0.3, 0.95, 0.05)
# parquet_heap_ratio_range = np.arange(0.3, 1.0, 0.05)
# conc_write_range = range(1,6)

heap_range = [1]
spark_mem_fraction_range = np.arange(0.2, 0.95, 0.1)
parquet_heap_ratio_range = np.arange(0.1, 0.95, 0.1)
conc_write_range = [5]

nb_executors = 1
nb_cores_per_exec = 3
off_heap_size = 1
spark_storage_fraction = 0.25

base_path = '/apps/hive/warehouse/test_insert_pl'
path_pattern = '/apps/hive/warehouse/test_insert_pl/aircraft=a380/registration=F-HPJI/departure_year=16/departure_month=12/departure_day='
part_list = [path_pattern + str(x) for x in [10,11]]
partition_cols = ['aircraft', 'registration', 'departure_year', 'departure_month', 'departure_day']

res = []

for i, x in enumerate(list(itertools.product(heap_range, spark_mem_fraction_range, parquet_heap_ratio_range, conc_write_range))[45:]):
    
    print str(i) + ' Starting session with total heap=' + str(x[0])+'g + mem.fraction=' + str(round(x[1],2)) + ' + parquet.ratio='+\
    str(round(x[2],2)) + ' + maxConcWrites=' + str(x[3])
    
    spark = SparkSession.builder\
            .master('yarn-client')\
            .appName('small_pl_notebook')\
            .config('spark.yarn.queue', 'profiler')\
            .config('spark.local.dir', '/app/PROFILER/tmp')\
            .config('spark.yarn.executor.memoryOverhead', str(off_heap_size)+'g')\
            .config('spark.executor.cores', nb_cores_per_exec)\
            .config('spark.executor.memory', str(x[0])+'g')\
            .config('spark.memory.fraction', round(x[1],2))\
            .config('spark.memory.storageFraction', spark_storage_fraction)\
            .config('spark.executor.instances', nb_executors)\
            .config('spark.driver.extraJavaOptions', '-Dlog4j.configuration=/app/PROFILER/travail/pilienhart/notes_perso/log4j.properties')\
            .config('spark.executor.extraJavaOptions', '-Dlog4j.configuration=/app/PROFILER/travail/pilienhart/notes_perso/log4j.properties')\
            .config('spark.speculation', 'false')\
            .config('spark.sql.sources.maxConcurrentWrites', x[3])\
            .config('spark.hadoop.parquet.memory.pool.ratio', round(x[2],2))\
            .config('spark.hadoop.parquet.enable.summary-metadata', 'False')\
            .config('spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs', 'False')\
            .config('spark.hadoop.parquet.block.size', 128*1024*1024)\
            .config('spark.sql.files.maxPartitionBytes', 12*1024*1024)\
            .config('spark.task.maxFailures', 1)\
            .getOrCreate()
            
    t1 = time.time()
    
    try:
        spark.read\
            .option('basePath', base_path)\
            .option('compression', 'gzip')\
            .parquet(*part_list)\
            .write\
            .partitionBy(*partition_cols)\
            .mode('overwrite')\
            .parquet('/apps/hive/warehouse/test_ext_table')
        
        duration = round(time.time()-t1, 2)   
        res.append(('SUCCESS',) + x + ((x[0]-0.3)*x[1]/nb_cores_per_exec, duration))
        print 'Test successful'
    
    except Py4JError as e:
        
        duration = round(time.time()-t1, 2)
        res.append(('FAIL',) + x + ((x[0]-0.3)*x[1]/nb_cores_per_exec, 0))
        print 'Test failed'
        print repr(e)
    
    finally:
        spark.stop()
        time.sleep(5)
    
results = pd.DataFrame.from_records(res, columns=['write_result', 'spark_executor_memory', 'spark_memory_fraction',
                                        'parquet_memory_pool_ratio', 'maxConcurrentWrites', 'mem_per_core', 'job_duration'])

results.to_pickle('grid_search_results_parquet.pkl')
pd.read_pickle('grid_search_results_parquet.pkl')

0 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.1 + maxConcWrites=5
Test successful
1 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.2 + maxConcWrites=5
Test successful
2 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.3 + maxConcWrites=5
Test successful
3 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.4 + maxConcWrites=5
Test successful
4 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.5 + maxConcWrites=5
Test successful
5 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.6 + maxConcWrites=5
Test successful
6 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.7 + maxConcWrites=5
Test successful
7 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.8 + maxConcWrites=5
Test successful
8 Starting session with total heap=1g + mem.fraction=0.7 + parquet.ratio=0.9 + maxConcWrites=5
Test successful
9

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)
	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.GeneratedConstructorAccessor44.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:236)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [8]:
list(itertools.product(heap_range, spark_mem_fraction_range, parquet_heap_ratio_range, conc_write_range))[68]

(1, 0.9000000000000001, 0.6, 1)

In [9]:
# %%timeit -r 2 -n 3

spark.read\
    .option('basePath', base_path)\
    .option('compression', 'gzip')\
    .parquet(*part_list)\
    .write\
    .partitionBy(*partition_cols)\
    .mode('overwrite')\
    .parquet('/apps/hive/warehouse/test_ext_table')

In [20]:
spark.stop()

In [1]:
%%time

import itertools
import pandas as pd
import numpy as np
from py4j.protocol import Py4JError
from pyspark.sql import SparkSession
import time

# heap_range = range(1,9)
# spark_mem_fraction_range = np.arange(0.3, 0.95, 0.05)
# parquet_heap_ratio_range = np.arange(0.3, 1.0, 0.05)
# conc_write_range = range(1,6)

heap_range = [2,3]
spark_mem_fraction_range = np.arange(0.85, 0.95, 0.05)
parquet_heap_ratio_range = np.arange(0.90, 0.95, 0.05)
conc_write_range = [1,5]

nb_executors = 2
nb_cores_per_exec = 2
off_heap_size = 2
spark_storage_fraction = 0.25

base_path = '/apps/hive/warehouse/test_insert_pl'
path_pattern = '/apps/hive/warehouse/test_insert_pl/aircraft=a380/registration=F-HPJI/departure_year=16/departure_month=12/departure_day='
part_list = [path_pattern + str(x) for x in [10,11]]
partition_cols = ['aircraft', 'registration', 'departure_year', 'departure_month', 'departure_day']

spark = SparkSession.builder\
            .master('yarn-client')\
            .appName('small_pl_notebook')\
            .config('spark.yarn.queue', 'profiler')\
            .config('spark.local.dir', '/app/PROFILER/tmp')\
            .config('spark.yarn.executor.memoryOverhead', str(off_heap_size)+'g')\
            .config('spark.executor.cores', nb_cores_per_exec)\
            .config('spark.executor.memory', str(10)+'g')\
            .config('spark.memory.fraction', 0.4)\
            .config('spark.hadoop.parquet.memory.pool.ratio', 0.3)\
            .config('spark.sql.shuffle.partitions', 5)\
            .config('spark.memory.storageFraction', spark_storage_fraction)\
            .config('spark.executor.instances', nb_executors)\
            .config('spark.driver.extraJavaOptions', '-Dlog4j.configuration=/app/PROFILER/travail/pilienhart/notes_perso/log4j.properties')\
            .config('spark.executor.extraJavaOptions', '-Dlog4j.configuration=/app/PROFILER/travail/pilienhart/notes_perso/log4j.properties')\
            .config('spark.speculation', 'false')\
            .config('spark.sql.sources.maxConcurrentWrites', 1)\
            .config('spark.hadoop.parquet.enable.summary-metadata', 'False')\
            .config('spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs', 'False')\
            .config('spark.hadoop.parquet.block.size', 16*1024*1024)\
            .config('spark.sql.files.maxPartitionBytes', 128*1024*1024)\
            .config('spark.task.maxFailures', 1)\
            .enableHiveSupport()\
            .getOrCreate()

#             .config('spark.yarn.executor.memoryOverhead', str(off_heap_size)+'g')\
# spark.read\
#             .option('basePath', base_path)\
#             .option('compression', 'gzip')\
#             .parquet(*part_list)\
#             .write\
#             .partitionBy(*partition_cols)\
#             .mode('overwrite')\
#             .parquet('/apps/hive/warehouse/test_ext_table')

CPU times: user 473 ms, sys: 643 ms, total: 1.12 s
Wall time: 15.3 s


In [2]:
spark.stop()

In [4]:
part_list

['/apps/hive/warehouse/test_insert_pl/aircraft=a380/registration=F-HPJI/departure_year=16/departure_month=12/departure_day=10',
 '/apps/hive/warehouse/test_insert_pl/aircraft=a380/registration=F-HPJI/departure_year=16/departure_month=12/departure_day=11']

In [31]:
import pyspark.sql.functions as sqlf

base_path = '/apps/hive/warehouse/test_insert_pl/aircraft=a380/registration=F-HPJI/departure_year=16'
#     .option('basePath', base_path)\ utiliser ça signale qu'on lit du parquet partitionné (si on ne recours pas à une table)


spark.read\
    .option('basePath', base_path)\
    .parquet(base_path)\
    .select('time', 'sensor', 'value')\
    .filter(sqlf.length(sqlf.col('sensor'))==8)\
    .explain()

== Physical Plan ==
*Project [time#646, sensor#650, value#651]
+- *Filter (length(sensor#650) = 8)
   +- *FileScan parquet [time#646,sensor#650,value#651,departure_month#652,departure_day#653] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://tlsdbpfitn01.france.airfrance.fr:8020/apps/hive/warehouse/test_insert_pl..., PartitionCount: 25, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<time:timestamp,sensor:string,value:float>


In [25]:
spark.catalog.listTables()

[]

In [4]:
base_path = '/apps/hive/warehouse/test_insert_pl'
path_pattern = '/apps/hive/warehouse/test_insert_pl/aircraft=a380/registration=F-HPJI/departure_year=16/departure_month=12/departure_day='
part_list = [path_pattern + str(x) for x in [10,11]]
partition_cols = ['aircraft', 'registration', 'departure_year', 'departure_month', 'departure_day']

# 

spark.read\
        .option('basePath', base_path)\
        .option('compression', 'gzip')\
        .parquet(*part_list)\
        .write\
        .partitionBy(*partition_cols)\
        .mode('overwrite')\
        .format('parquet')\
        .saveAsTable('test_ext_table')
        
#         .parquet('/apps/hive/warehouse/test_ext_table')

        
#         .format('parquet')\
#         .saveAsTable('test_ext_table') #on peut lui passer un format mais aussi un mode, etc. On peut aussi passer via mode() format() ?
# Demande de mettre le warehouse sur hdfs ? on le crée avec chmod 777 et on le passe à spark.sql.warehouse.dir
# https://stackoverflow.com/a/46821539/9216761 Table externe/interne sans HiveSupport ? Comment ça se passe ? Si externe on doit alors
# aller effacer les fichiers nous même
# https://www.appsflyer.com/blog/the-bleeding-edge-spark-parquet-and-s3/

In [7]:
# 405Mb sur 4 blocs
# 34 tasks pour la lecture, seules 4 avec des inputs non-nuls => 34 partitions dont 4 non-vides 
# Correspond bien au nombre de row groups

df128 = spark.read\
        .parquet('/apps/hive/warehouse/historic_measurement_pl_test_rg128')\
        .cache()
        
df128.count()

50000000

In [24]:
# 3 partitions 98, 76 et 125 MiB
import pyspark.sql.functions as sqlf
partition_cols = ['aircraft', 'registration', 'departure_year', 'departure_month', 'departure_day', 'flight_leg_count']
partition = '/apps/hive/warehouse/historic_measurement/aircraft=a380/registration=F-HPJJ/departure_year=2018/departure_month=8/departure_day=15/flight_leg_count=112'

spark.read\
    .option('basePath', '/apps/hive/warehouse/historic_measurement')\
    .parquet(partition)\
    .filter(sqlf.col('flight_leg_count')==112)\
    .repartition(*partition_cols)\
    .write\
    .partitionBy(*partition_cols)\
    .option('compression', 'gzip')\
    .mode('overwrite')\
    .saveAsTable('test_hist_meas')

In [23]:
df128.unpersist()

DataFrame[insertion_date: timestamp, last_updated_date: timestamp, source: string, sar: int, phase: int, sensor: string, value: float, time: timestamp, departure_day: int, flight_leg_count: int]

In [24]:
df16 = spark.read\
        .parquet('/user/at053351/parquet_test')\
        .cache()
        
df16.count()

50000000

In [27]:
df16.coalesce(1)\
    .write\
    .option('compression', 'gzip')\
    .mode('overwrite')\
    .parquet('/user/at053351/parquet_test2')

In [28]:
df16.unpersist()

DataFrame[insertion_date: timestamp, last_updated_date: timestamp, source: string, sar: int, phase: int, sensor: string, value: float, time: timestamp, flight_leg_count: int, departure_day: int]

In [32]:
df16 = spark.read\
        .parquet('/user/at053351/parquet_test2')\
        .cache()
        
df16.count()

50000000

In [5]:
# 292 Mb sur 3 blocs 
# 25 tasks pour la lecture, seules 5 avec des inputs non-nuls => 25 partitions dont 5 non-vides
# Correspond bien au nombre de row groups

df64 =spark.read\
        .parquet('/apps/hive/warehouse/historic_measurement_pl_test_rg')\
        .cache()
        
df64.count()

50000000

In [6]:
spark.stop()

In [19]:
import pandas as pd

for i in range(0, 15):
    pd.DataFrame({'a' : range(0,1000000), 'b' : range(1000000,2000000), 'c' : 'ddd'}, columns=['a', 'b', 'c'])\
    .to_csv('./Test_csv/file' + str(i) + '.csv')

In [22]:
import subprocess
subprocess.call(['hdfs', 'dfs', '-put'] + ['./Test_csv/file' + str(i) + '.csv' for i in range(0,15)] + ['hdfs:///user/at053351/csv_test'])

0