# Pyspark Fonctions Spécifiques

### 1 La fonction collect()

La fonction `collect()` est une fonction d'action qui permet de récupérer toutes les données préalablement dispatchées dans les noeuds de travail afin de les récupérer et de pouvoir les manipuler.

L'opération `collect()` nous renvoie un tableau de "rows" (de type row)

L'avantage est qu'une fois mise à disposition nous pouvons grâce à des "**loop**" itérer de manière à appliquer un traitement.

In [None]:
# Exemple de collect() simple
mon_dataframe = spark.table("formation.arrivees").limit(1).collect()

# Affichage de mon dataframe
print(mon_dataframe)

# Affichage de son type
print(type(mon_dataframe))

# Affichage d'un row
# Dans mon print nous savons que nous avons qu'1 seul row
# nous utiliserons donc le premier [0]
# puis nous lui indiquons que nous voulons la valeur 0 (numero_caf) puis les suivantes avec [0:]
# cette formulation nous sera utile afin de selectionner les rows et les valeurs contenues dans ceux-ci
print(mon_dataframe[0][0:])

#### 1.1 Iteration sur notre liste

Nous allons voir ci-dessous plusieurs exemples d'iterations sur notre liste (une fois le **collect()** réalisé)

In [None]:
from pyspark.sql.functions import col

# Exemple de collect() simple
mon_dataframe = spark.table("formation.arrivees").select("numero_caf","date_jour","service").limit(5).collect()

# Boucle simple
for row in mon_dataframe:
    print(str(row["numero_caf"]),",",str(row["date_jour"]),",",str(row["service"]))
    
print("")   
print("-----------------------------------------------------------------------")
print("")


# Boucle avec limite
for row in mon_dataframe[0:2]:
    print(str(row["numero_caf"]),",",str(row["date_jour"]),",",str(row["service"]))
    
print("")   
print("-----------------------------------------------------------------------")
print("")


# Boucle pour ne récuperer qu'une colonne
for col in mon_dataframe:
    ma = col["service"]
        

## 2 La fonction window()

Le fonction `window()` va nous permettre de manière simple de réaliser des opérations "statistiques" à partir d'un groupe, d'une collection, d'un dataframe.

On va retrouver de manière générale trois grande fonctions lié à `window()` :

- Les  fonctions Analytiques
- Les fonctions permettant d'établir un classement
- Les fonctions d'aggrégations


![pyspark window](<https://miro.medium.com/max/1200/1*lUxrBqSZ7lci8UGvOlyRUg.png>)


La fonction `window()` est également utilisé afin d'opérer un transformation des données.

La syntaxe concernant `window()` est la suivante :

- `Window.partitionBY("nom_de_ma_colonne").orderBy("nom_de_ma_colonne")`

In [None]:
# importing pyspark
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

# Nous allons créer un dataframe propre pour la compréhension 
spark = SparkSession.builder.getOrCreate()

# Creation de nos données
donnee = (("Killian", "Dev", 3000),("Benoit", "Dev", 4600),("Romain", "Dev", 4100),("Lindsay", "RH", 3000),("Simon", "RH", 3000),("Émilie", "Direction", 3300),("Adrien", "Direction",3900))

# Creation de nos colonnes
colonnes = ["Nom", "Service", "Salaire"]

# Récupération de notre dataframe
mon_dataframe = spark.createDataFrame(data = donnee, schema = colonnes)

# Création de notre window
ma_window = Window.partitionBy("Service").orderBy("Salaire")


### 2.1 Les fonctions de classements ou "ranking fonctions"

La fonction `row_number()`  va numéroter les rows, nous affichant une séquence en fonction de la window que l'on aura créée. 

Dans l'exemple ci-dessous :

- nous ajoutons à la suite de notre dataframe une colonne appelée "numero_row" qui permettra l'affichage de notre séquence
- nous appliquons `row_number()` qui prendra notre window pour faire la séquence

Constater l'affichage de notre dataframe.

In [None]:
# importing pyspark
from pyspark.sql.functions import row_number

# Application de notre row_nuber()
mon_dataframe = mon_dataframe.withColumn("numero_row",row_number().over(ma_window))

# Lorsque nous affichons notre dataframe, l'on se rend bien compte qu'il à numeroter les rows en créant une sequence
display(mon_dataframe)

La fonction `rank()` nous permet d'établir un classement basé sur notre window

Ce classement se traduira sous forme de chiffre (allant de 1 à ...).

Dans notre exemple ci-dessous, le salaire le plus élevé à le rank le plus haut.


> la fonction `rank()` est similaire à la fonction `RANK` en SQL  
> Par défaut les rows présentant une égalité ne seront pas affichés  
> Pour palier à cela nous utiliserons la fonction `dense_rank()`


On trouve également la fonction `percent_rank()` similaire à `rank()` mais exprimé sous forme de pourcentage.   
Appliqué à notre exemple le salaire le plus elevé aura la valeur **1.0** et le salaire le moins élevé aura la valeur **0.0**

La fonction `ntile()` qui prend une valeur en paramètre peut nous permettre d'intéragir directement avec les limites du classement
 - si nous lui donnons "2" en paramètre le ranking exprimé ne dépassera jamais "2" (les rows auront la valeur soit "1" soit "2")

In [None]:
# importing pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Appliquons à notre dataframe la fonction rank
mon_dataframe = mon_dataframe.withColumn("rank",rank().over(ma_window))

# On peut constater qu'il à établit un classement en fonction de notre window()
# Autrement dit, il a établit un classement en fonction du salaire 
display(mon_dataframe)

### 2.2 Les fonctions analytiques

Les fonctions analytiques permettent d'effectuer des opérations sur un nombre définis de rows.  
Le résultat retourné comprant autant de rows , que le nombre de rows en "input".

La fonction `cume_dist()` nous permet de manière simple de déterminer l'emplacement relatif d'une valeur dans un ensemble de valeurs.

In [None]:
# importing pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import cume_dist

# Appliquons à notre dataframe la fonction rank
mon_dataframe = mon_dataframe.withColumn("cumulative_valeur",cume_dist().over(ma_window))


display(mon_dataframe)

La fonction `lag()` nous permet de manière simple de garder la valeur du précédent row.

La fonction `lag()` prend deux paramètre :

  - la colonne à regarder 
  - le "look-Back" c'est à dire le nombre d'itération précédente sur laquelle se baser.

> Pour utiliser `lag()` il faut que la colonne à laquelle est appliqué la fonction soit cohérente.  
> Dans notre exemple ci-dessous, la fonction `lag()` va nous renvoyer des `null` car les valeures contenues dans ma colonnes salaire ne sont pas cohérente avec son utilisation

In [None]:
# importing pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Appliquons à notre dataframe la fonction rank
mon_dataframe = mon_dataframe.withColumn("valeur_précédente",lag("Salaire",1).over(ma_window))


display(mon_dataframe)

La fonction `lead()` nous permet d'itérer sur la colonne spécifié afin de retrouver le row suivant notre row actuel

> Attention cela dépend de la colonne spécifié et de l'application que l'on veut en faire 

**EXEMPLE** :

Imaginons que l'on est une table composé de 2 colonnes comme suit :

|FILM|DUREE|
|:--:|:--:|
|film1|duree1|
|film2|duree2|

`lead()` avec comme paramètre la colonne "duree" va en premier parcourir la colonne "duree" de manière ascendante , et retourner la valeur supérieur en durée au row actuel

|FILM|DUREE|LEAD|
|:--:|:--:|:--:|
|film1|duree1|duree2|
|film2|duree2|null|

> Tout comme `lag()` il faut que la colonne à laquelle est appliqué la fonction soit cohérente.

In [None]:
# importing pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import lead

# Appliquons à notre dataframe la fonction rank
mon_dataframe = mon_dataframe.withColumn("lead",lead("Salaire",1).over(ma_window))

 
display(mon_dataframe)

### 2.3 Les fonctions d'aggrégations

Les fonctions d'aggrégations sont similaires à celles déja vu en Pyspark.
Avec la fonction `window()` 

> Les fonctions d'aggrégations reprennent la `window()` que l'on à établit.  
> Par conséquent, les résultat ci-dessous sont valable pour le partitionnement par "Service" que l'on a fait.  
>
> Exemple :  
> Dans la colonne "sum" vous constaterez qu'il additionne uniquement au fur et a mesure les **salaires** concernant **uniquement** le **service**

In [None]:
# importing pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import col,avg,sum,min

# Appliquons à notre dataframe la fonction rank
#mon_dataframe = mon_dataframe.withColumn("lead",lead("Salaire",1).over(ma_window))

mon_dataframe = mon_dataframe.withColumn("avg",avg(col("Salaire")).over(ma_window)) \
                .withColumn("sum",sum(col("Salaire")).over(ma_window)) \
                .withColumn("min",min(col("Salaire")).over(ma_window)) 

display(mon_dataframe)