<a href="https://colab.research.google.com/github/ocisse-jems/spark_bigdata/blob/main/projet_simul_velos_libre_service/1_PySpark_chargement_manipulation_dataframe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Chargement et Manipulation d'un dataframe avec PySpark**

In [1]:
# install
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet 
!apt install openjdk-8-jdk-headless &> /dev/null

[K     |████████████████████████████████| 281.4 MB 31 kB/s 
[K     |████████████████████████████████| 199 kB 50.2 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
# java env

# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
# SparkSession 

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
# access to drive for getting data

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## **Lecture d'un fichier .csv**

In [5]:
path_csv = "/content/drive/MyDrive/Spark/projet_simul_velos_libre_service/data/Villes/ville_1.csv"
ville = spark.read.format("csv").option("header", "true").load(path_csv)

### 1ères lignes

In [6]:
ville.show(3, vertical=True)

-RECORD 0----------------------------------
 id                 | 5251                 
 vitesse_a_pied     | 0.02                 
 vitesse_a_velo     | 0.05                 
 home               | (lon:26.60 lat:28... 
 travail            | (lon:21.08 lat:14... 
 sportif            | False                
 casseur            | False                
 statut             | reserviste           
 salaire            | 29800.610034665042   
 sexe               | F                    
 age                | 18                   
 sportivite         | 0.1                  
 velo_perf_minimale | 0.4                  
-RECORD 1----------------------------------
 id                 | 5252                 
 vitesse_a_pied     | 0.14974625830876215  
 vitesse_a_velo     | 0.37436564577190534  
 home               | (lon:0.26 lat:42.61) 
 travail            | (lon:36.35 lat:33... 
 sportif            | False                
 casseur            | False                
 statut             | professeur

## **Manipulations de base**

### types des colonnes : **dtypes**

- types pouvant être précisés lors de la lecture
- sinon, colonnes lues par défaut en type **string**

In [7]:
ville.dtypes

[('id', 'string'),
 ('vitesse_a_pied', 'string'),
 ('vitesse_a_velo', 'string'),
 ('home', 'string'),
 ('travail', 'string'),
 ('sportif', 'string'),
 ('casseur', 'string'),
 ('statut', 'string'),
 ('salaire', 'string'),
 ('sexe', 'string'),
 ('age', 'string'),
 ('sportivite', 'string'),
 ('velo_perf_minimale', 'string')]

### nb de lignes : fonction **count**

In [8]:
ville.count()

1083

In [14]:
# nb distinct de lignes 
ville.distinct().count()

1083

### 1ère ligne du dataframe : fonction **first**
- les DF st composées de RDD qui sont elles-memescomposees de rows
- la focntion **first** retourne un objet de la classe **row**

In [9]:
ville.first()

Row(id='5251', vitesse_a_pied='0.02', vitesse_a_velo='0.05', home='(lon:26.60 lat:28.13)', travail='(lon:21.08 lat:14.11)', sportif='False', casseur='False', statut='reserviste', salaire='29800.610034665042', sexe='F', age='18', sportivite='0.1', velo_perf_minimale='0.4')

### Filtrer sur une catégorie : **where**

- pour filtrer le DF sur une condition 

In [10]:
ville.where(ville.sexe == "H").count()

560

### compter par regroupement : **groupBy**

- l'objet DF permet de simuler lefonctionement du **SQL**
- on utilise la méthode **groupby** du DF
- suivie de lamethode **count** pour construire le DAG
- puis on applique l'action **collect** pour obtenir le resultat

In [13]:
ville.groupBy("sexe").count().collect()

[Row(sexe='F', count=523), Row(sexe='H', count=560)]

### table de donnees temporaire et **SQL**

- la fonction **createTempView** cree une table imaginaire temporaire
sur laquelle on peut faire des requetes SQL classiques via l'objet Spark

In [19]:
# on référence le DF comme table SQL
# ville.createTempView("ville")

ville.createOrReplaceTempView("ville")

# requete SQL
query_h = """
  select count(*)
  from ville
  where sexe = "H"
"""

# variable saprk pour appliquer la requete SQL
spark.sql(query_h).collect()

[Row(count(1)=560)]

### calculer une moyenne

- moyenne des saliares par sexe

- ville.groupBy(["sexe"]).mean("salaire").collect() : ne marche pas
car au préalable, il faut  transformer la ou les colonnes utilisees en numerique
si on passe par les methodes des DF,  
avec SQL cela marche sans transformation (**conversion implicite**)  
idem avec pyspark.sql.functions

In [25]:
# methode 1 (SQL avec conversion implicite)

query_avg = """
  select sexe, avg(salaire)
  from ville
  group by sexe
"""

spark.sql(query_avg).collect()

[Row(sexe='F', avg(salaire)=23539.718996012853),
 Row(sexe='H', avg(salaire)=28187.828451177982)]

In [27]:
# methode 2 (pyspark.sql avec conversion implicite)

#-- on groupe par ville puis aggregation dan slaquelle on appelle la fonction avg sur lacolonne salire
#-- puis activation de l'agregation avec l'action collect


from pyspark.sql.functions import avg
ville.groupBy(["sexe"]).agg(avg("salaire")).collect()

[Row(sexe='F', avg(salaire)=23539.718996012853),
 Row(sexe='H', avg(salaire)=28187.828451177982)]

In [28]:
# methode 3 (modification prealable du type de la colonne)

#-- withColumn pour rajouter une colonne à un DF
#-- cast qui permet de chager le type d'une colonne en un type specifique à Saprk

from pyspark.sql.types import DoubleType
ville = ville.withColumn("salaire_float", ville.salaire.cast(DoubleType()))

In [30]:
ville.show(2, vertical=True)

-RECORD 0----------------------------------
 id                 | 5251                 
 vitesse_a_pied     | 0.02                 
 vitesse_a_velo     | 0.05                 
 home               | (lon:26.60 lat:28... 
 travail            | (lon:21.08 lat:14... 
 sportif            | False                
 casseur            | False                
 statut             | reserviste           
 salaire            | 29800.610034665042   
 sexe               | F                    
 age                | 18                   
 sportivite         | 0.1                  
 velo_perf_minimale | 0.4                  
 salaire_float      | 29800.610034665042   
-RECORD 1----------------------------------
 id                 | 5252                 
 vitesse_a_pied     | 0.14974625830876215  
 vitesse_a_velo     | 0.37436564577190534  
 home               | (lon:0.26 lat:42.61) 
 travail            | (lon:36.35 lat:33... 
 sportif            | False                
 casseur            | False     

In [31]:
#-- on calcule la moyenne sur cette nouvelle colonne
ville.groupBy(["sexe"]).avg("salaire_float").collect()

[Row(sexe='F', avg(salaire_float)=23539.718996012853),
 Row(sexe='H', avg(salaire_float)=28187.828451177982)]

### regroupemenst categoriels de valeurs

- compter le nb de personnes qui ont un salaire entre 
    - 10 et 20k€
    - 20 et 30k€
    - 30 et 40k€
    - ...  

pour cela, il nous faut trouver pour chaque salaire dans quelle tranche il se trouve via par ex. une fonction Python


**UDF** (user defined functions)  
permet d'étendre lespossibilités de Spark.
On peut créer et référencer des fonctions que l'on ajoute à Spark.
Ces fonctions seront diffusées sur le cluster pour que tous les workers puissent les appeler

In [33]:
# fonction Python

def categorie(salaire):
  """
  calcule la dizaine de milliers dans lequel se trouve le salaire.
  Ex:
    15000 --> 10000
    34000 --> 30000
  """
  nb_de_dizaine_de_milliers = float(salaire)//10000
  categorie = 10000 * nb_de_dizaine_de_milliers
  return int(categorie)

In [35]:
# UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import * # pour pourvoir typer le resultat

my_function = udf(categorie, IntegerType()) # on enregistre la fonction comme fonction UDF

# application (lazy) de la fonction UDF sur la colonne salaire
# et enregistrement du resultat dans une nouvelle colonne "salaire_categorie"
ville = ville.withColumn("salaire_categorie", my_function("salaire"))

ville.show(1)

+----+--------------+--------------+--------------------+--------------------+-------+-------+----------+------------------+----+---+----------+------------------+------------------+-----------------+
|  id|vitesse_a_pied|vitesse_a_velo|                home|             travail|sportif|casseur|    statut|           salaire|sexe|age|sportivite|velo_perf_minimale|     salaire_float|salaire_categorie|
+----+--------------+--------------+--------------------+--------------------+-------+-------+----------+------------------+----+---+----------+------------------+------------------+-----------------+
|5251|          0.02|          0.05|(lon:26.60 lat:28...|(lon:21.08 lat:14...|  False|  False|reserviste|29800.610034665042|   F| 18|       0.1|               0.4|29800.610034665042|            20000|
+----+--------------+--------------+--------------------+--------------------+-------+-------+----------+------------------+----+---+----------+------------------+------------------+--------------

In [36]:
# pour finalement calculer le nb de perosnnes par tranche de salaires de 10k€

pop_par_tranche = ville.groupBy(["sexe", "salaire_categorie"]).count()

In [40]:
# tri par nb
pop_par_tranche.orderBy("count").show()

+----+-----------------+-----+
|sexe|salaire_categorie|count|
+----+-----------------+-----+
|   F|            60000|    1|
|   H|            80000|    1|
|   H|            70000|    1|
|   F|            50000|    2|
|   H|            60000|    2|
|   F|                0|    2|
|   H|                0|    4|
|   H|            50000|   12|
|   F|            40000|   13|
|   H|            40000|   52|
|   F|            30000|   83|
|   H|            10000|  115|
|   H|            30000|  139|
|   F|            10000|  180|
|   H|            20000|  234|
|   F|            20000|  242|
+----+-----------------+-----+



In [43]:
# tri par sexe et categorie via lamethode sorted de Python
# car la fonction collect renvoie une list Python

sorted(
    ville.groupBy(["sexe", "salaire_categorie"]).count().collect()
)

[Row(sexe='F', salaire_categorie=0, count=2),
 Row(sexe='F', salaire_categorie=10000, count=180),
 Row(sexe='F', salaire_categorie=20000, count=242),
 Row(sexe='F', salaire_categorie=30000, count=83),
 Row(sexe='F', salaire_categorie=40000, count=13),
 Row(sexe='F', salaire_categorie=50000, count=2),
 Row(sexe='F', salaire_categorie=60000, count=1),
 Row(sexe='H', salaire_categorie=0, count=4),
 Row(sexe='H', salaire_categorie=10000, count=115),
 Row(sexe='H', salaire_categorie=20000, count=234),
 Row(sexe='H', salaire_categorie=30000, count=139),
 Row(sexe='H', salaire_categorie=40000, count=52),
 Row(sexe='H', salaire_categorie=50000, count=12),
 Row(sexe='H', salaire_categorie=60000, count=2),
 Row(sexe='H', salaire_categorie=70000, count=1),
 Row(sexe='H', salaire_categorie=80000, count=1)]

### compter les elements dans un group by

- pour faire des calculs par groupe, la fonction **func** du module **functions** de **pyspark.sql** est utile via
  - **collect_list** : qui permet de recuperer la liste des elements de chaque sous-groupe
  - **collect_set** : qui renvoie une liste de valeurs uniques
  - **size** : qui renvoie le nombre d'elements dans une liste

Ex: calcul du nb de personnes par sexe et du nb de tranches de salaire par sexe

In [45]:
from pyspark.sql import functions as func

# nb d'elements par groupe
ville.groupBy("sexe").agg(func.size(func.collect_list('sexe'))).show()

+----+------------------------+
|sexe|size(collect_list(sexe))|
+----+------------------------+
|   F|                     523|
|   H|                     560|
+----+------------------------+



In [47]:
# nb d'elements distincts par groupe
ville.groupBy("sexe").agg(func.size(func.collect_set("salaire_categorie"))).show()

+----+------------------------------------+
|sexe|size(collect_set(salaire_categorie))|
+----+------------------------------------+
|   F|                                   7|
|   H|                                   9|
+----+------------------------------------+



## **Spark DataFrame: creation, transformation, execution**

### charger de la donnee : **spark.read**

In [48]:
path_csv = "/content/drive/MyDrive/Spark/projet_simul_velos_libre_service/data/Villes/ville_1.csv"
df = spark.read.load(path_csv, format="csv")

In [49]:
df.dtypes

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string'),
 ('_c5', 'string'),
 ('_c6', 'string'),
 ('_c7', 'string'),
 ('_c8', 'string'),
 ('_c9', 'string'),
 ('_c10', 'string'),
 ('_c11', 'string'),
 ('_c12', 'string')]

In [51]:
# l'option "header" permet de rajouter les noms des colonnes

df = spark.read.format("csv").options(header=True).load(path_csv)

In [52]:
df.dtypes

[('id', 'string'),
 ('vitesse_a_pied', 'string'),
 ('vitesse_a_velo', 'string'),
 ('home', 'string'),
 ('travail', 'string'),
 ('sportif', 'string'),
 ('casseur', 'string'),
 ('statut', 'string'),
 ('salaire', 'string'),
 ('sexe', 'string'),
 ('age', 'string'),
 ('sportivite', 'string'),
 ('velo_perf_minimale', 'string')]

In [53]:
# l'option 'inferSchema' permet de transformer les colonnes en types plus precis
# si le fichier d'origine le permet

df = spark.read.format("csv").options(header=True, inferSchema=True).load(path_csv)

In [54]:
df.dtypes

[('id', 'int'),
 ('vitesse_a_pied', 'double'),
 ('vitesse_a_velo', 'double'),
 ('home', 'string'),
 ('travail', 'string'),
 ('sportif', 'boolean'),
 ('casseur', 'boolean'),
 ('statut', 'string'),
 ('salaire', 'double'),
 ('sexe', 'string'),
 ('age', 'int'),
 ('sportivite', 'double'),
 ('velo_perf_minimale', 'double')]

Avec **Spark**
- les Df sont potentiellement **distribués** sur le cluster d'ordinateur de maniere transparente pour le programmeur
- on peut utiliser le meme code pour un calcul sur un seul ordinateur que pour 1000 ordinateurs
- toute la logique de egstion du reseau, de distribution de calculs, de recuperation sur panneest geree par Spark de maniere autonome
- l'integraion de Spark avec Python permet en plus de faire des tests et de transformer les resulatst Spark en objets Python de maniete tout à fait transparente

In [67]:
path_csv_cyclistes = "/content/drive/MyDrive/Spark/projet_simul_velos_libre_service/data/Cyclistes/"
df_cyclistes = spark.read.load(path_csv_cyclistes, format="csv", header=True, inferSchema="True")

In [68]:
df_cyclistes.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- sur_velo: boolean (nullable = true)
 |-- velo: string (nullable = true)
 |-- vitesse: double (nullable = true)
 |-- position: string (nullable = true)
 |-- destination_finale: string (nullable = true)



In [69]:
df_cyclistes.count()

4868396

### transformer la donnee: **les transformations**

In [70]:
df_cyclistes.agg({"vitesse":"mean"}).collect()

[Row(avg(vitesse)=0.5635749865179128)]

In [71]:
path_csv_villes = "/content/drive/MyDrive/Spark/projet_simul_velos_libre_service/data/Villes/"
df_villes = spark.read.load(path_csv_villes, format="csv", header=True, inferSchema="True")

In [72]:
df_villes.printSchema()

root
 |-- id: integer (nullable = true)
 |-- vitesse_a_pied: double (nullable = true)
 |-- vitesse_a_velo: double (nullable = true)
 |-- home: string (nullable = true)
 |-- travail: string (nullable = true)
 |-- sportif: boolean (nullable = true)
 |-- casseur: boolean (nullable = true)
 |-- statut: string (nullable = true)
 |-- salaire: double (nullable = true)
 |-- sexe: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sportivite: double (nullable = true)
 |-- velo_perf_minimale: double (nullable = true)



In [73]:
df_villes.count()

1083

In [76]:
df_villes.groupBy("sexe").agg({"sportivite":"mean"}).collect()

[Row(sexe='F', avg(sportivite)=2.201847229112897),
 Row(sexe='H', avg(sportivite)=2.161359149481709)]

In [77]:
df_villes.groupBy("sexe").agg({"sportivite":"mean"}).show()

+----+-----------------+
|sexe|  avg(sportivite)|
+----+-----------------+
|   F|2.201847229112897|
|   H|2.161359149481709|
+----+-----------------+



In [78]:
df_villes.groupBy("sexe").agg({"sportivite":"mean", "age":"max"}).collect()

[Row(sexe='F', avg(sportivite)=2.201847229112897, max(age)=84),
 Row(sexe='H', avg(sportivite)=2.161359149481709, max(age)=84)]

In [79]:
df_villes.groupBy("sexe").agg({"sportivite":"mean", "age":"max"}).show()

+----+-----------------+--------+
|sexe|  avg(sportivite)|max(age)|
+----+-----------------+--------+
|   F|2.201847229112897|      84|
|   H|2.161359149481709|      84|
+----+-----------------+--------+

