# Jointure

D'une manière générale, le sujet des jointures en big data est un des sujets de ce domaine les plus complexes, mais aussi sur lequel il y a le plus d'enjeu, tant nous avons une vision relationnelle de la donnée.

Le principe d'une jointure consiste, en effet, à fusionner deux datasets en recherchant dans les deux datasets les lignes qui partagent la même clé. Un algorithme naïf va résoudre une jointure en $O(n^2)$ :

> pour chaque élément du premier dataset, je parcours le second dataset pour retrouver le ou les éléments paratageant la même clé.

Dans un contexte big data, une telle complexité n'est pas envisageable, du fait de la taille des données à traiter, des communications réseau que cela peut engendrer et de la capacité de stockage en mémoire vive qui reste relativement petite.

Il existe heureusement des algorithmes de jointure, proposés par Spark, bien plus efficaces, qui vont consister à utiliser une indexation sur la clé et/ou un tri sur la clé et/ou la diffusion d'un dataset entier sur les différents executors pour réaliser les opérations de recherche en local.

Spark, par défaut, va rechercher l'algorithme le plus adapté et le plus performant, en fonction du type de jointure et de statistiques obtenues sur les données. Mais, vous avez la possibilité de forcer Spark à adopter un algorithme.

**Dans ce notebook**, vous allez voir les différents algorithmes proposés par Spark et leurs conséquences, ainsi que les différents types de jointures possibles et ce qu'elles impliquent en termes de performance.

## Prélude

In [None]:
import $ivy.`org.slf4j:slf4j-reload4j:2.0.6`
import $ivy.`org.apache.logging.log4j:log4j-api:2.8.2`
import $ivy.`org.apache.logging.log4j:log4j-slf4j-impl:2.8.2`

// Avoid disturbing logs
import org.apache.log4j._
import org.apache.log4j.varia._
BasicConfigurator.configure(NullAppender.getNullAppender())

import $ivy.`org.apache.spark::spark-core:3.4.1`
import $ivy.`org.apache.spark::spark-sql:3.4.1`

In [None]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.rdd._

val spark = {
  NotebookSparkSession.builder()
    .progress(enable = true, keep = true, useBars = false)
    .master("local[*]")
    .appName("Spark tuning - Jointure")
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .getOrCreate()
}

import spark.implicits._
import $file.^.internal.spark_helper
import spark_helper.implicits._

## Chargement des datasets

Nous allons utiliser dans ce notebook des datasets différents de ce qui a été vu jusque-là. Nous nous mettons dans le cadre d'une application sur téléphone, qui permet à ses utilisateurs de partager sa position avec leurs amis. Pour cela, l'application détecte par géolocalisation le lieu et propose à l'utilisateur de partager le lieu (_checkin_). Une gamification est ajoutée, qui permet à l'utilisateur de gagner des badges selon la fréquence de ses checkins, la distance entre deux chekins, l'utilisateur qui effectue régulièrement les premiers checkins de la journée d'un même lieu devient "Maire" de ce lieu...

Nous avons deux datasets :

* Venues : représente un ensemble de lieux enregistrés qu'il est possible de visiter. Il contient notamment l'identifiant du lieu et ses coordonnées.
* Checkins : représente l'ensemble des checkins réalisés par les utilisateurs de l'application. Il contient l'identifiant de l'utilisateur, l'identifiant du lieu et le timestamp du checkin.

In [None]:
import java.time.Instant

case class Venue(id: String, latitude: Double, longitude: Double, locationType: String, country: String)
case class Checkin(userId: String, venueId: String, timestamp: Instant)

spark_helper.sparkExport(this)

val venuesFilename   = "data/threetriangle/venues.txt.gz"
val checkinsFilename = "data/threetriangle/checkins.txt.gz"

val venues =
  spark.read
    .option("sep", "\t")
    .schema("id STRING, latitude DOUBLE, longitude DOUBLE, locationType STRING, country STRING")
    .csv(venuesFilename)
    .as[Venue]

val checkins =
  spark.read
    .option("sep", "\t")
    // Option to correctly interpret timestamp in checkin data
    .option("timestampFormat", "EEE MMM d HH:mm:ss Z yyyy")
    .schema("userId STRING, venueId STRING, timestamp TIMESTAMP, tzOffset INT")
    .csv(checkinsFilename)
    .as[Checkin]

venues.createOrReplaceTempView("venues")
checkins.createOrReplaceTempView("checkins")

## Jointure

Nous voulons maintenant retrouver les informations des différents lieux où ont été effectués les checkins. Nous avons donc besoin de réaliser une jointure entre les datasets Checkins et Venues.

La méthode `.join()` est assez simple à utiliser. Elle s'applique sur un premier dataframe, puis vous spécifiez en paramètre le dataframe avec lequel vous créez une jointure, en indiquant éventuellement la relation de jointure et le type de jointure (inner, outer, left outer, right outer..., par défaut : inner).

In [None]:
%%data limit=10

val data = checkins.join(venues, checkins("venueId") === venues("id"))

data

Affichons le plan d'exécution de cette requête. Essayer de percevoir les différentes, dont celles qui concernent la jointure (Quelles optimisations sont apportées ? Y a-t-il des échanges de données ?)

Note : un plan d'exécution se lit du bas vers le haut pour suivre les différentes étapes de traitement dans l'ordre.

In [None]:
data.explain()

Allez dans Spark UI et explorez l'onglet "SQL / DataFrame". Spark UI montre un DAG assez complexe pour la requête.

**Approche alternative**

Voici l'équivalent de notre requête, mais cette fois exprimée en SQL.

In [None]:
%%sql limit=10

SELECT *
FROM
  checkins c INNER JOIN venues v ON (v.id = c.venueId)

🤔 **Question** 🤔

Le plan d'exécution de cette requête est-il similaire au plan précédent ?

In [None]:
data.explain()

👀 **Ce qu'il faut voir** 👀

Dans le processus de génération du plan d'exécution physique, nous pouvons voir que Spark fini par sélectionner une stratégie de jointure. Cette stratégie se traduit par une phase d'échange (ou plus exactement, de diffusion) de données entre exécuteurs (`BroadcastExchange`). Puis, interviens la phase de jointure basée sur le _hash_ de la clé de jointure (`BroadcastHashJoin`).

Nous pouvons voir aussi que Spark se laisse la possibilité au dernier moment de changer de stratégie de jointure (`AdaptiveSparkPlan isFinalPlan=false`), sur la base de données statistiques récupérées pendant l'exécution.

Ce type de jointure est similaire à ce que nous avons vu avec les variables broadcast dans le cas des RDD.

## Stratégie de jointure

Spark SQL dispose de différentes stratégies de jointure. Nous venons d'en voir une dans la section précédente.

Sans précision dans le code, cette stratégie est choisie selon une heuristique paramétrable liée, notamment, au type de la jointure, la condition sur les clés (équivalence ou non-équivalence), à taille des données ou à d'autres données statistiques.

Si vous voulez forcer la stratégie de jointure, vous pouvez le préciser dans le code à travers des _hints_ :

```scala
  df1.join(df2.hint("<Stratégie>"), ...)
```

En SQL :

```scala
  spark.sql("""SELECT /*+ <Stratégie> */ ...""")
```

Voici les différentes stratégies (et les valeurs à utiliser dans les hints) :

 * Broadcast Hash Join (BROADCAST / BROADCASTJOIN / MAPJOIN)
 * Shuffle Sort-Merge Join (MERGE / SHUFFLE_MERGE / MERGEJOIN)
 * Shuffle Hash Join (SHUFFLE_HASH)
 * Shuffle-and-Replicate Nested Loop Join (SHUFFLE_REPLICATE_NL)

### Broadcast Hash Join

Cette stratégie est utilisée dans ces conditions :
 * La relation de jointure se base sur l'égalité entre les clés (ie. `dataset1("key") === dataset2("key")`).
 * L'un des datasets est considéré comme étant suffisamment petit.
 * Tous les types de jointure sont supportés, sauf FULL OUTER JOIN.

Dans ce cas, le "petit" dataset est transmis (ou diffusé, d'où le terme _broadcast_) sous la forme d'une table de hachage (_hash table_) à l'ensemble des exécuteurs participant à la jointure. La jointure est alors réalisée en local sur chaque nœud.

Le seuil indiquant si la stratégie sera utilisée est fixée par le paramètre `spark.sql.autoBroadcastJoinThreshold`. Il est exprimé en octets (valeur par défaut : 10485760 (= 10 MB)). Si l'un des datasets de la jointure à une taille inférieure à ce seuil, il sera diffusé. Si aucun 

Dans le cadre de la fonctionnalité AQE (_Adaptive Query Execution_), qui permet de pousser Spark à sélectionner une autre startégie sur la base de relevés statistiques, au lieu de la stratégie planifiée initialement, le seuil permettant de passer à la stratégie _Broadcast Hash Join_ est fixé par le paramètre `spark.sql.adaptive.autoBroadcastJoinThreshold` exprimé en octet. Si vous fixez sa valeur à -1, cette fonctionnalité est désactivée.

Un autre paramètre intervient : `spark.sql.broadcastTimeout`. Ce paramètre est exprimé en secondes (valeur par défaut : 300 (= 5mn)). Il est détecté des problèmes de communication durant la diffusion des données. Si la diffusion de la table ne peut pas être terminée dans le délai imparti, Spark interrompt l'exécution de la requête et génère une erreur.

### Shuffle Sort-Merge Join

Cette stratégie est utilisée dans ces conditions :
 * La relation de jointure se base sur l'égalité entre les clés (ie. `dataset1("key") === dataset2("key")`).
 * Il est possible de trier les clés.
 * Tous les types de jointure sont supportés.
 
C'est la stratégie utilisée par défaut (sauf dans le cas ou le paramètre `spark.sql.join.preferSortMergeJoin=false`, dans ce cas, Spark utilisera _Shuffle Hash Join_).

Les étapes de jointure sont :
 1. Échanges des données entre exécuteurs en fonction de la clé.
 2. Tri des données en local en fonction de la clé.
 3. Fusion des deux datasets.

En général, _Shuffle Sort-Merge Join_ a de meilleures performances sur des données volumineuses.

### Shuffle Hash Join

Cette stratégie est utilisée dans ces conditions :
 * La relation de jointure se base sur l'égalité entre les clés (ie. `dataset1("key") === dataset2("key")`).
 * Il est possible de trier les clés.
 * Tous les types de jointure sont supportés, sauf FULL OUTER JOIN.

Les étapes de jointure sont :
 1. Échanges des données entre exécuteurs en fonction de la clé.
 2. Dépôt des données transférées en local dans une table de hachage en fonction de la clé.
 3. Fusion des deux datasets.

_Shuffle Hash Join_ est efficace lorsque la clé utilisée pour la jointure permet un partitionnement équilibré de la donnée.

### Shuffle-and-Replicate Nested Loop Join

Cette stratégie applique l'algorithme naïf vu plus haut : "pour chaque élément du premier dataset, je parcours le second dataset pour retrouver le ou les éléments partageant la même clé".

Ce type de jointure convient lorsque :
 * Le premier dataset est diffusable sur une jointure de type RIGHT OUTER JOIN.
 * Ou le second dataset est diffusable sur des jointures de type LEFT OUTER JOIN, LEFT SEMI JOIN ou LEFT ANTI JOIN.
 * Ou Dans tous les autres cas sur des jointures de type INNER JOIN ou équivalent.

### Produit cartésien

Le produit cartésien (ou CROSS JOIN) consiste à retourner toutes les combinaisons possibles de lignes sur la fusion de deux dataset.

Ce type de jointure intervient lorsque :
 * Le type de jointure est INNER JOIN ou équivalent.
 * Ou `cross` est explicitement indiqué comme type de jointure
 
Dans les autres cas, l'opération de jointure renvoie une erreur de type AnalysisException.

### Exercices

👷 Pour chacun des exercices ci-dessous, vous devez :
 1. Exécuter la cellule contenant la requête.
 2. Observer le plan d'exécution
 3. Retrouver et observer dans Spark UI le DAG de la requête correspondante
 
Essayez de distinguer les différences qu'il peut y avoir entre les diverses stratégies.

### Broadcast Hash Join

In [None]:
val data = checkins.join(venues.hint("BROADCAST"), checkins("venueId") === venues("id"))
data.showHTML(limit = 10)
data.explain()

### Shuffle Sort-Merge Join

In [None]:
val data = checkins.join(venues.hint("MERGE"), checkins("venueId") === venues("id"))
data.showHTML(limit = 10)
data.explain()

### Shuffle Hash Join

In [None]:
val data = checkins.join(venues.hint("SHUFFLE_HASH"), checkins("venueId") === venues("id"))
data.showHTML(limit = 10)
data.explain()

👀 **Question** 👀

En comparant dans Spark UI avec la précédente stratégie, quel pourrait être la meilleure stratégie pour nos dataframe ?

### Shuffle-and-Replicate Nested Loop Join

Pour commencer, nous allons utiliser le hint `SHUFFLE_REPLICATE_NL` pour impliquer implicitement un produit cartésien.

In [None]:
val data = checkins.join(venues.hint("SHUFFLE_REPLICATE_NL"), checkins("venueId") === venues("id"))
data.showHTML(limit = 10)
data.explain()

Recherchons les lieux qui n'ont pas encore reçu de visite des utilisateurs. Pour cela, nous allons utiliser une jointure de type LEFT ANTI JOIN.

In [None]:
val data = venues.join(checkins.hint("SHUFFLE_REPLICATE_NL"), checkins("venueId") === venues("id"), "leftanti")
data.showHTML(limit = 10)
data.explain()

### Jointure sans clé

Cette fois, sans préciser la clé, nous allons générer toutes les combinaisons possibles d'utilisateurs et de lieux.

In [None]:
val data = checkins.select($"userId").join(venues)
data.showHTML(limit = 10)
data.explain()

## Influence du type de jointure sur la stratégie choisie

Nous allons voir, comment le type de jointure influe sur la stratégie de jointure choisie au sein de Spark. Pour rappel, les notions de dataframe gauche et de dataframe droite sont perçues par rapport à l'opération :

`<left_dataframe>.join(<right_dataframe>, ...)`

Voici les différents types de jointure :

* **inner** : par défaut, conserve les éléments ayant une correspondance de chaque côté de la jointure.
* **outer, full, fullouter, full_outer** : tente de trouver une correspondance à tous les éléments des deux dataframes, ou en associant des valeurs nulles par défaut.
* **leftouter, left, left_outer** : tente de trouver une correspondance pour tous les éléments de dataframe de gauche, des valeurs nulles sont utilisées par défaut, pour les éléments qui ne possèdent pas de correspondant à droite.
* **rightouter, right, right_outer** : tente de trouver une correspondance pour tous les éléments de dataframe de droite, des valeurs nulles sont utilisées par défaut, pour les éléments qui ne possèdent pas de correspondant à gauche.
* **leftsemi, left_semi, semi** : équivalent à un _inner join_, sauf que les colonnes du dataframe de droite sont retirées en sortie. Ce type de jointure permet de vérifier que des valeurs d'un dataframe apparaissent bien dans un autre dataframe.
* **leftanti, left_anti, anti** : recherche dans le dataframe de gauche les clés qui n'apparaissent pas dans le dataframe de droite. C'est l'opposé de _left semi join_. 
* **cross** : produit cartésien, essaye toutes les combinaisons de correspondances entre les éléments gauches et droites.

Ici, nous allons utiliser des données plus simples : un ensemble d'utilisateurs, que nous chargeons directement en mémoire.

In [None]:
val users =
  Seq(
    ("123", "1", 32),
    ("456", "2", 25),
    ("789", "3", 16),
    ("321", "8", 55)
  ).toDF("id", "name_id", "age")

val mapping =
  Seq(
    ("1", "Jon"),
    ("2", "Mary"),
    ("3", "Tom"),
    ("4", "Albert")
  ).toDF("id", "name")

val adults = users.where($"age" >= 18)

🤔 **Question** 🤔

* Pour chaque cellule ci-dessous, quelles principales différences relevez-vous concernant le plan d'exécution ?
* Regroupez les différents types de jointure selon l'équivalence des plans d'exécution. Expliquez pourquoi certains plans sont similaires pour des types de jointure différents ?

In [None]:
%%data

adults.join(mapping, adults("name_id") === mapping("id"), "inner")

In [None]:
%%data

adults.join(mapping, adults("name_id") === mapping("id"), "outer")

In [None]:
%%data limit=10

adults.join(mapping, adults("name_id") === mapping("id"), "left_outer")

In [None]:
%%data limit=10

adults.join(mapping, adults("name_id") === mapping("id"), "right_outer")

In [None]:
%%data limit=10

adults.join(mapping, adults("name_id") === mapping("id"), "left_semi")

In [None]:
%%data limit=10

adults.join(mapping, adults("name_id") === mapping("id"), "left_anti")

## Auto-jointure (_self join_)

L'auto-jointure est une jointure où le dataframe de gauche et de droite sont les mêmes.

Nous avons la table d'employés ci-dessous, qui indique dans la colonne `superior` l'employé responsable.

In [None]:
%%data

val employees =
  Seq(
    ("123", "Jon", null),
    ("456", "Mary", "123"),
    ("789", "Tom", "123"),
    ("321", "Frank", null)
  ).toDF("id", "name", "superior")

employees

Nous voulons afficher chaque employé son responsable, s'il en a un.

In [None]:
%%data

employees.as("l")
  .join(employees.as("r"), $"l.id" === $"r.superior")
  .select($"l.id", $"r.name", $"l.name" as "superior")

🤔 **Question** 🤔

* Quelle stratégie est appliquée ?