# TP Plans et partitions BDLE  2021

décembre 2021

date de révision du document : 

# Préparation

installation spark 

In [None]:
!pip install -q pyspark
!pip install -q findspark

import os
# !find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.7/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

# Principaux import
import findspark
from pyspark.sql import SparkSession 
from pyspark import SparkConf  
 
# pour les dataframe et udf
from pyspark.sql import *  
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *
 
# pour le chronomètre
import time
 
# initialise les variables d'environnement pour spark
findspark.init()
 
# Démarrage session spark 
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory")
  
  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  
  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")
 
  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")    
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()


[K     |████████████████████████████████| 281.4 MB 29 kB/s 
[K     |████████████████████████████████| 198 kB 43.1 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
session démarrée, son id est  local-1644517281732


In [None]:
import pandas as pd
from google.colab import data_table
 
# alternatives to Databricks display function.
 
def display(df, n=100):
  return data_table.DataTable(df.limit(n).toPandas(), include_index=False, num_rows_per_page=10)
 
def display2(df, n=20):
  pd.set_option('max_columns', None)
  pd.set_option('max_colwidth', None)
  return df.limit(n).toPandas().head(n)

print("display redéfini")

display redéfini


Définir le tag %%sql pour pouvoir écrire plus simplement des requêtes en SQL dans une cellule

In [None]:
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

@register_line_cell_magic
def sql(line, cell=None):
    "To run a sql query. Use:  %%sql"
    val = cell if cell is not None else line
    tabRequetes = val.split(";")
    for r in tabRequetes:
        if len(r) > 2:
          derniere = spark.sql(r)
    return display(derniere)
print("tag %%SQL défini")

tag %%SQL défini


In [None]:
# on désactive la modification adaptative du plan
spark.conf.set("spark.sql.adaptive.enabled", False)

## Classe utilitaire : Mesure
Sert pour mesurer la quantité de shuffle lors de l'exécution d'un requête

In [None]:
#---------------------
# rapport d'execution (seulement pour les TP en local ou avec colab)
#---------------------

# Consulter le rapport d'exécution pour connaitre la quantité de données transférée lors de l'opération Exchange: shuffle read. L'accès au rapport d'exécution peut se faire soit avec un navigateur, soit directement en python en indiquant une URL qui se termine par exemple par: /api/v1/applications/A/jobs/B/ avec A étant le numéro de l'application et B étant le numéro de job. S'inspirer de l'exemple ci dessous.

# Indications: consulter la liste des jobs, obtenir le numéro du dernier job traité, i.e, celui ayant le plus grand numéro. Parcourir les stages du job souhaité.

# Pour chaque stage, obtenir les infos de shuffle : shuffle read et shuffle write. Caculer la quantité totale de shuffle read, idem pour les shuffle write.

# Rmq, cette classe est également définie https://nuage.lip6.fr/s/FpqGorZiBbF3Sb2/download?path=/python/mesure.py

import urllib
import json
import dateutil.parser

# -----------
# classe Mesure
# -----------
class Mesure(object):
  def __init__(self, spark):
    self.spark = spark

  def appURL(self):
    appId = self.spark.sparkContext.applicationId
#     contextId = self.spark.conf.get("spark.databricks.sparkContextId")
#     clusterId = self.spark.conf.get('spark.databricks.clusterUsageTags.clusterId')
#     url = "https://community.cloud.databricks.com/sparkui/" + clusterId + "/driver-" + contextId + f"/api/v1/applications/{appId}"
#     return url
    return f"http://localhost:4040/api/v1/applications/{appId}"


  def getJson(self, request):
    webURL = urllib.request.urlopen(request)
    encoding = webURL.info().get_content_charset('utf-8')
    obj = json.loads(webURL.read().decode(encoding))
    return obj


  def recentJobs(self):
    prefix = self.appURL()
    jobsRequest = f"{prefix}/jobs"
    jobs = self.getJson(jobsRequest)
    return [x['jobId'] for x in jobs][:10]


  def showJobs(self):
    prefix = self.appURL()
    jobsRequest = f"{prefix}/jobs"
    jobs = self.getJson(jobsRequest)
    return jobs


  def jobShuffle(self, jobId):
    prefix = self.appURL()
    jobsRequest = f"{prefix}/jobs"
    jobs = self.getJson(jobsRequest)
    jobStageIds = {x['jobId']:x['stageIds'] for x in jobs}

    if(jobStageIds.get(jobId) != None):
      shuffleRead =0
      for stageId in jobStageIds[jobId]:
        stageRequest = f"{prefix}/stages/{stageId}"
        stage = self.getJson(stageRequest)[0]
        # print(stage)
        if(stage['status'] == 'COMPLETE'):
          # voir les attributs décrivant les shuffle: localBytesRead, remoteBytesRead, ...
          # for executor in stage['executorSummary'].values() :
          #   shuffleRead += executor['shuffleRead']
          for task in stage['tasks'].values():
            read = task['taskMetrics']['shuffleReadMetrics']
            shuffleRead += (read['localBytesRead'] + read['remoteBytesRead'])
      return shuffleRead
    else:
      print("JobId not found:", jobId)
      return 0


  def jobTime(self, jobId):
    prefix = self.appURL()
    jobsRequest = f"{prefix}/jobs"
    jobs = self.getJson(jobsRequest)
    startTimes = {x['jobId']:x['submissionTime'] for x in jobs}
    endTimes = {x['jobId']:x['completionTime'] for x in jobs}

    if(startTimes.get(jobId) != None):
      start = dateutil.parser.parse(startTimes[jobId]) 
      end = dateutil.parser.parse(endTimes[jobId]) 
      # time in milliseconds
      return (end - start).microseconds /1000
    else:
      print("JobId not found:", jobId)
      return 0


  def mesure(self, df):
    df.count()
    j = self.recentJobs()[0]
    print("job", j, ": shuffle =", self.jobShuffle(j), "octets, time =", self.jobTime(j), "ms")

# fin de la classe Mesure

print("Mesure definie")


Mesure definie


L'objet mesure  _m_ servira à mesurer les requêtes

In [None]:
m = Mesure(spark)
print("m est défini")

m est défini


## Chronomètres

Fonction auxilliaire pour chronométrer l'exécution d'une requête

In [None]:
#------------------------------
# Chronometre : chronoPersist2
#------------------------------
import time

# Ce chronometre garantit que chaque tuple du dataframe est lu entièrement.
# En effet il est nécessaire de lire le détail de chaque tuple avant de les 'copier' en mémoire.
def chronoPersist(df):
    df.unpersist()
    t1 = time.perf_counter()
    count = df.persist().count()
    t2 = time.perf_counter()
    df.unpersist()
    print('durée: {:.1f} s'.format(t2 - t1), 'pour lire', count , 'elements')

def chronoPersist2(df):
  dest = df.selectExpr("*", "1")
  t1 = time.perf_counter()
  count = dest.persist().count()
  t2 = time.perf_counter()
  dest.unpersist()
  print('durée: {:.1f} s'.format(t2 - t1), 'pour lire', count , 'elements')
        
def chronoCount(df):
  t1 = time.perf_counter()
  count = df.count()
  t2 = time.perf_counter()
  print('durée: {:.1f} s'.format(t2 - t1), 'pour dénombrer', count , 'elements')

    
print("fonctions définies")

fonctions définies


## Fonctions pour afficher le contenu des partitions d'un dataframe
On définit 
* showPartitions : affiche les _n_ premiers éléments de chaque partition
* showPartitionSize : affiche le nombre d'éléments dans chaque partition

In [None]:
# fonction auxilliaire
def partSize(partID, iterateur):
  c=0
  suivant = next(iterateur, None)
  while suivant is not None :
    c+=1
    suivant = next(iterateur, None)
  return [(partID, c)]


def showPartitionSize(df):  
  t = df.selectExpr("1").rdd.mapPartitionsWithIndex(partSize)
  for (partID, nbElt) in t.collect():
    print("partition", partID, ":", nbElt, "éléments")
  print()


def showPartitions(df, N=5):
  size = df.selectExpr("1").rdd.mapPartitionsWithIndex(partSize).collectAsMap()
  
  def topN(partID, iterateur):
    c=0
    head=[]
    suivant = next(iterateur, None)
    while suivant is not None and c < N :
      c+=1
      head.append(suivant)
      suivant = next(iterateur, None)
    return [(partID, head)]  
  t = df.rdd.mapPartitionsWithIndex(topN)
  for (partID, head) in t.collect():
    print("Partition", partID, ",", size[partID], "éléments")
    for row in head:
        print(row)
    print()
    
print('showPartitions et showPartitionSize définies')

showPartitions et showPartitionSize définies


## Données

Les fichiers de Movielens contenant des notes et des films aux formats JSON ou CSV
* https://nuage.lip6.fr/s/FpqGorZiBbF3Sb2?path=movielens


Les commandes ci-dessous permettent d'importer _directement_ ce fichier dans l'espace de stockage, sans passer par une copie intermédiaire sur votre ordinateur perso. 
Avantages : cette méthode est rapide quelle que soit la connectivité de votre accès à internet et permet d'importer de "larges" fichiers.

Créer un dossier temporaire vide,  localement sur le driver

In [None]:
import os
temp = "/temp/"
os.makedirs(temp, exist_ok=True)
os.listdir(temp)

[]

In [None]:
from urllib import request
import zipfile

PUBLIC_DATASET_URL = "https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4" 
PUBLIC_DATASET=PUBLIC_DATASET_URL + "/download?path="

def loadMovieLensFile(file):
  url = PUBLIC_DATASET + "/movielens/" + file
  local_file = temp + file

  if(os.path.isfile(local_file)):
    print(local_file, "is already downloaded")
  else:
    print("downloading from URL: ", url, "save in : ", local_file)
    request.urlretrieve(url, local_file)
    print("download done")

loadMovieLensFile("notes1M.zip")
loadMovieLensFile("ratings3M.zip")
loadMovieLensFile("films.json")
request.urlretrieve(PUBLIC_DATASET + "/movielens/ml-latest/movies.csv", temp + "movies.csv")

with zipfile.ZipFile("/temp/notes1M.zip", 'r') as zip_ref:
    zip_ref.extractall("/temp")
with zipfile.ZipFile("/temp/ratings3M.zip", 'r') as zip_ref:
    zip_ref.extractall("/temp")
os.remove("/temp/notes1M.zip")
os.remove("/temp/ratings3M.zip")
os.listdir(temp)

downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/movielens/notes1M.zip save in :  /temp/notes1M.zip
download done
downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/movielens/ratings3M.zip save in :  /temp/ratings3M.zip
download done
downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/movielens/films.json save in :  /temp/films.json
download done


['movies.csv', 'ratings3M.csv', 'films.json', 'notes1M.json']

Afficher les premières lignes des films

In [None]:
%%sh
head -n 5 /temp/films.json

{"nF":8754,"titre":"Prime of Miss Jean Brodie, The (1969)","g":["Drama"]}
{"nF":111486,"titre":"Lesson of the Evil (Aku no kyôten) (2012)","g":["Thriller"]}
{"nF":1033,"titre":"Fox and the Hound, The (1981)","g":["Animation","Children","Drama"]}
{"nF":6536,"titre":"Sinbad: Legend of the Seven Seas (2003)","g":["Adventure","Animation","Children","Fantasy"]}
{"nF":5179,"titre":"Gloria (1980)","g":["Drama","Thriller"]}


Afficher les premières lignes des notes

In [None]:
%%sh
head -n 5 /temp/notes1M.json

{"nU":1,"nF":2959,"note":4.0,"date":1425941601,"annee":2015}
{"nU":1,"nF":69844,"note":5.0,"date":1425942139,"annee":2015}
{"nU":1,"nF":73017,"note":5.0,"date":1425942699,"annee":2015}
{"nU":2,"nF":788,"note":1.0,"date":867039288,"annee":1997}
{"nU":4,"nF":1422,"note":4.0,"date":1042674861,"annee":2003}


Définir le **dossier** utilisé pour ce TP

In [None]:
dir = "/temp/"
dbfsDir = dir

### Définir les dataframes associés aux données

Les **films**

In [None]:
films = spark.read.json(dir + "films.json").selectExpr("nF", "titre", "g as genres")
display(films)

Unnamed: 0,nF,titre,genres
0,8754,"Prime of Miss Jean Brodie, The (1969)",[Drama]
1,111486,Lesson of the Evil (Aku no kyôten) (2012),[Thriller]
2,1033,"Fox and the Hound, The (1981)","[Animation, Children, Drama]"
3,6536,Sinbad: Legend of the Seven Seas (2003),"[Adventure, Animation, Children, Fantasy]"
4,5179,Gloria (1980),"[Drama, Thriller]"
...,...,...,...
95,4690,"Cotton Club, The (1984)","[Crime, Musical]"
96,6598,Step Into Liquid (2002),[Documentary]
97,2562,Bandits (1997),[Drama]
98,6287,Anger Management (2003),[Comedy]


Les **notes**

In [None]:
notes = spark.read.json(dir + "notes1M.json").selectExpr("nF", "nU", "note", "annee")
display(notes)

Unnamed: 0,nF,nU,note,annee
0,2959,1,4.0,2015
1,69844,1,5.0,2015
2,73017,1,5.0,2015
3,788,2,1.0,1997
4,1422,4,4.0,2003
...,...,...,...,...
95,1974,24,3.0,2001
96,2145,24,4.0,2001
97,2381,24,3.0,2001
98,2395,24,5.0,2001


# Exercice 1 : partitionnement des données en MEMOIRE
Une des particularités de Spark est la possibilité de charger et de “maintenir” (persister) les données en **mémoire** vive. Cette fonctionnalité permet d'accélérer les accès fréquents aux données en évitant de relire plusieurs fois des données stockées sur disque.

Nettoyer l'espace mémoire (vider le cache) en début d'exercice

In [None]:
spark.catalog.clearCache()

### Question 1 : Notes par hachage de l'attribut année : REPARTITION
Créer un nouveau dataframe contenant les notes fragmentées sur l’attribut année en _N_ partitions. Le numéro de partition d'une note est obtenu par hachage de l'année.

Comprendre le plan:
* FileScan 
* Exchange hashpartitioning(annee, _N_)

Rmq: **repartition** est une transformation et non une action. Le (re)partitionnement est défini mais n'est pas encore exécuté

In [None]:
notes.rdd.getNumPartitions()

2

In [None]:
notes.explain()

== Physical Plan ==
*(1) Project [nF#29L, nU#30L, note#31, annee#27L]
+- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




In [None]:
N=4
notesMemoire_par_annne = notes.repartition(N, "annee")
notesMemoire_par_annne.explain()

== Physical Plan ==
Exchange hashpartitioning(annee#27L, 4), REPARTITION_BY_NUM, [id=#50]
+- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
   +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




Rendre ce dataframe persistant en mémoire. Rmq: ici l'action **count** exécute le repartionnement défini ci-dessus.

In [None]:
notesMemoire_par_annne.persist()
notesMemoire_par_annne.count()

1301573

In [None]:
# notesMemoire_par_annne.unpersist()
m.mesure(notesMemoire_par_annne)

job 5 : shuffle = 236 octets, time = 142.0 ms


Afficher un extrait du contenu de chacune des partitions.
Les partitions de notesMemoire_par_annne sont :

In [None]:
notesMemoire_par_annne.explain()

== Physical Plan ==
Exchange hashpartitioning(annee#27L, 4), REPARTITION_BY_NUM, [id=#50]
+- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
   +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




Contenu initial des partitions du dataframe Notes

In [None]:
showPartitions(notes, 10)

Partition 0 , 686782 éléments
Row(nF=2959, nU=1, note=4.0, annee=2015)
Row(nF=69844, nU=1, note=5.0, annee=2015)
Row(nF=73017, nU=1, note=5.0, annee=2015)
Row(nF=788, nU=2, note=1.0, annee=1997)
Row(nF=1422, nU=4, note=4.0, annee=2003)
Row(nF=2355, nU=4, note=4.0, annee=2003)
Row(nF=2541, nU=4, note=3.0, annee=2003)
Row(nF=3476, nU=4, note=4.0, annee=2003)
Row(nF=541, nU=5, note=5.0, annee=2000)
Row(nF=3948, nU=7, note=1.5, annee=2017)

Partition 1 , 614791 éléments
Row(nF=2995, nU=142907, note=4.0, annee=2001)
Row(nF=3178, nU=142907, note=5.0, annee=2001)
Row(nF=3578, nU=142907, note=4.0, annee=2001)
Row(nF=3693, nU=142907, note=4.0, annee=2001)
Row(nF=3699, nU=142907, note=2.0, annee=2001)
Row(nF=4366, nU=142907, note=4.0, annee=2001)
Row(nF=4618, nU=142907, note=2.0, annee=2001)
Row(nF=1, nU=142908, note=0.5, annee=2016)
Row(nF=410, nU=142908, note=0.5, annee=2016)
Row(nF=1079, nU=142908, note=0.5, annee=2016)



In [None]:
showPartitions(notesMemoire_par_annne)

Partition 0 , 104893 éléments
Row(nF=4881, nU=20, note=3.5, annee=2016)
Row(nF=260, nU=32, note=5.0, annee=2016)
Row(nF=1196, nU=32, note=5.0, annee=2016)
Row(nF=2571, nU=32, note=5.0, annee=2016)
Row(nF=2700, nU=32, note=2.5, annee=2016)

Partition 1 , 237179 éléments
Row(nF=2054, nU=11, note=2.5, annee=2009)
Row(nF=3977, nU=11, note=3.0, annee=2009)
Row(nF=6870, nU=11, note=3.5, annee=2009)
Row(nF=48774, nU=11, note=3.5, annee=2009)
Row(nF=52245, nU=11, note=3.5, annee=2009)

Partition 2 , 394749 éléments
Row(nF=788, nU=2, note=1.0, annee=1997)
Row(nF=541, nU=5, note=5.0, annee=2000)
Row(nF=1240, nU=15, note=4.0, annee=2012)
Row(nF=1265, nU=15, note=5.0, annee=2012)
Row(nF=1278, nU=15, note=5.0, annee=2012)

Partition 3 , 564752 éléments
Row(nF=2959, nU=1, note=4.0, annee=2015)
Row(nF=69844, nU=1, note=5.0, annee=2015)
Row(nF=73017, nU=1, note=5.0, annee=2015)
Row(nF=1422, nU=4, note=4.0, annee=2003)
Row(nF=2355, nU=4, note=4.0, annee=2003)



##### Accès aux données cachées
Comprendre le plan d'accès :
* InMemoryRelation : cache mémoire
* InMemoryTableScan : accès au cache

Rmq: les autres opérateur en dessous de InMemoryRelation ne sont précisés qu'à titre d'info rappelant l'historique de création du cache, ils ne sont pas exécutés

In [None]:
notesMemoire_par_annne.explain()

== Physical Plan ==
Exchange hashpartitioning(annee#27L, 4), REPARTITION_BY_NUM, [id=#50]
+- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
   +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




Durée de **lecture** des données cachées

In [None]:
chronoPersist2(notesMemoire_par_annne)

durée: 2.9 s pour lire 1301573 elements


In [None]:
chronoPersist2(notes)

durée: 4.1 s pour lire 1301573 elements


### Question 2 : Notes par hachage du n° de film nF
Créer un nouveau dataframe contenant les notes fragmentées sur l’attribut nF. Le numéro de partition d'une note est obtenu par hachage de l'attribut.

Afficher le plan

In [None]:
notesMemoire_par_nF = notes.repartition(4, "nF")
notesMemoire_par_nF.explain()

== Physical Plan ==
Exchange hashpartitioning(nF#29L, 4), REPARTITION_BY_NUM, [id=#252]
+- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
   +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




Que font ces opérations ?
* FileScan 
* Exchange hashpartitioning(nF, 4)

In [None]:
notesMemoire_par_nF.persist()
notesMemoire_par_nF.count()

1301573

In [None]:
#showPartitions(notesMemoire_par_nF, 10)

### Question 3 : Cacher les Films en les repartitionnant par nF

In [None]:
# A compléter
films_par_nF = films.repartition(6, "nF").persist()
films_par_nF.count()


9125

### Question 4 : PARTITIONNEMENT déclenché par une requête GROUP BY

Requete group by année ==> repartitionnement par année

In [None]:
#notes.unpersist()
showPartitions(notes, 20)

Partition 0 , 686782 éléments
Row(nF=2959, nU=1, note=4.0, annee=2015)
Row(nF=69844, nU=1, note=5.0, annee=2015)
Row(nF=73017, nU=1, note=5.0, annee=2015)
Row(nF=788, nU=2, note=1.0, annee=1997)
Row(nF=1422, nU=4, note=4.0, annee=2003)
Row(nF=2355, nU=4, note=4.0, annee=2003)
Row(nF=2541, nU=4, note=3.0, annee=2003)
Row(nF=3476, nU=4, note=4.0, annee=2003)
Row(nF=541, nU=5, note=5.0, annee=2000)
Row(nF=3948, nU=7, note=1.5, annee=2017)
Row(nF=6934, nU=7, note=2.0, annee=2017)
Row(nF=54503, nU=7, note=3.5, annee=2017)
Row(nF=1210, nU=8, note=4.0, annee=2002)
Row(nF=1792, nU=8, note=1.0, annee=2002)
Row(nF=4361, nU=8, note=2.0, annee=2002)
Row(nF=4367, nU=8, note=3.0, annee=2002)
Row(nF=4571, nU=8, note=3.0, annee=2002)
Row(nF=4643, nU=8, note=2.0, annee=2002)
Row(nF=342, nU=9, note=5.0, annee=2003)
Row(nF=1059, nU=9, note=5.0, annee=2003)

Partition 1 , 614791 éléments
Row(nF=2995, nU=142907, note=4.0, annee=2001)
Row(nF=3178, nU=142907, note=5.0, annee=2001)
Row(nF=3578, nU=142907, not

Calculer pour chaque année, le nombre de notes et leur moyenne.

In [None]:
from pyspark.sql.functions import count
from pyspark.sql.functions import avg

notes.persist()

r1 = notes.groupBy("annee").agg(count("note").alias("nb"), avg("note").alias("moyenne"))
r1.explain()

# notes.createOrReplaceTempView("NOTES")
# r1 = spark.sql("""
# select annee, count(*) as nb, avg(note) as moyenne
# from NOTES
# group by annee
# """)

== Physical Plan ==
*(2) HashAggregate(keys=[annee#27L], functions=[count(note#31), avg(note#31)])
+- Exchange hashpartitioning(annee#27L, 4), ENSURE_REQUIREMENTS, [id=#392]
   +- *(1) HashAggregate(keys=[annee#27L], functions=[partial_count(note#31), partial_avg(note#31)])
      +- *(1) ColumnarToRow
         +- InMemoryTableScan [note#31, annee#27L]
               +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
                        +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




In [None]:
display(r1)

Unnamed: 0,annee,nb,moyenne
0,2016,104893,3.541232
1,2009,50128,3.479034
2,1999,61508,3.624732
3,2010,48991,3.515105
4,2004,60046,3.426257
5,1998,16506,3.534593
6,1997,38310,3.602088
7,2000,101580,3.579986
8,2012,39625,3.581767
9,2001,62315,3.529198


Que font ces opérations ?
* **HashAggregate**  keys = _année_, functions = _partial_count_(note)  
* **Exchange** hashpartitioning  _année_
* **HashAggregate** keys = _année_, functions = _finalmerge_count_(merge count)

In [None]:
r1.rdd.getNumPartitions()

4

In [None]:
showPartitions(r1)

Partition 0 , 1 éléments
Row(annee=2016, nb=104893, moyenne=3.541232494065381)

Partition 1 , 5 éléments
Row(annee=2009, nb=50128, moyenne=3.4790336737950844)
Row(annee=1999, nb=61508, moyenne=3.6247317422123952)
Row(annee=2010, nb=48991, moyenne=3.515104815170133)
Row(annee=2004, nb=60046, moyenne=3.426256536655231)
Row(annee=1998, nb=16506, moyenne=3.5345934811583666)

Partition 2 , 7 éléments
Row(annee=1997, nb=38310, moyenne=3.60208822761681)
Row(annee=2000, nb=101580, moyenne=3.5799862177594015)
Row(annee=2012, nb=39625, moyenne=3.5817665615141956)
Row(annee=2001, nb=62315, moyenne=3.529198427344941)
Row(annee=2013, nb=31631, moyenne=3.6245297334892985)

Partition 3 , 9 éléments
Row(annee=2015, nb=95630, moyenne=3.5788873784377286)
Row(annee=2003, nb=54165, moyenne=3.4724729991692054)
Row(annee=2017, nb=64654, moyenne=3.5603984285581713)
Row(annee=2002, nb=45606, moyenne=3.4821514712976365)
Row(annee=1996, nb=86449, moyenne=3.5503129012481347)



afficher la liste des notes de chaque utilisateur. Expliquer le plan d'exécution.

In [None]:
from pyspark.sql.functions import collect_list

r1Liste = notes.groupBy("nU").agg(collect_list("note"))
r1Liste.rdd.getNumPartitions()
r1Liste.explain()

== Physical Plan ==
ObjectHashAggregate(keys=[nU#30L], functions=[collect_list(note#31, 0, 0)])
+- Exchange hashpartitioning(nU#30L, 4), ENSURE_REQUIREMENTS, [id=#494]
   +- ObjectHashAggregate(keys=[nU#30L], functions=[partial_collect_list(note#31, 0, 0)])
      +- *(1) ColumnarToRow
         +- InMemoryTableScan [nU#30L, note#31]
               +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
                        +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




In [None]:
r1Liste.persist()
r1Liste.count()

In [None]:
display(r1Liste)

In [None]:
showPartitions(r1Liste)

### Question 5 : Requête GROUP BY sur des données déjà partitionnées

In [None]:
r2 = notesMemoire_par_annne.groupBy("annee").agg(count("note"))
r2.explain()

== Physical Plan ==
*(1) HashAggregate(keys=[annee#27L], functions=[count(note#31)])
+- *(1) HashAggregate(keys=[annee#27L], functions=[partial_count(note#31)])
   +- *(1) ColumnarToRow
      +- InMemoryTableScan [note#31, annee#27L]
            +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- Exchange hashpartitioning(annee#27L, 4), REPARTITION_BY_NUM, [id=#70]
                     +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
                        +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




Que font ces opérations ?
* **InMemoryTableScan**  note, année
* **HashAggregate**  keys = _année_, functions = _count_(note)  

Pourquoi il n'y a pas d'exchange ?

Rappel: les opéations qui précèdent l'opération InMemoryRelation ne sont pas traitées. Elles ne sont indiquées que pour rappeler l'historique de création de la relation InMemory.

In [None]:
display(r2)

Unnamed: 0,annee,count(note)
0,2016,104893
1,2009,50128
2,1999,61508
3,2010,48991
4,2004,60046
5,1998,16506
6,1997,38310
7,2000,101580
8,2012,39625
9,2001,62315


### Question 6: PARTITIONNEMENT déclenché par une requête  ORDER BY

In [None]:
tri = notes.orderBy("annee")
tri.explain()

== Physical Plan ==
*(2) Sort [annee#27L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(annee#27L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [id=#629]
   +- *(1) ColumnarToRow
      +- InMemoryTableScan [nF#29L, nU#30L, note#31, annee#27L]
            +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
                     +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




Que font les opérations ?
* Exchange rangepartitioning année
* Sort année

In [None]:
showPartitions(tri)

In [None]:
chronoPersist2(tri)
# tri.count()

durée: 3.8 s pour lire 1301573 elements


### Question 7: ORDER BY sur des données déjà partitionnées
Pourquoi est-ce que cela déclenche un partitionnement ?

In [None]:
# COMPLETER 
notesMemoire_par_annne.orderBy("annee").explain()

== Physical Plan ==
*(2) Sort [annee#27L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(annee#27L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [id=#749]
   +- *(1) ColumnarToRow
      +- InMemoryTableScan [nF#29L, nU#30L, note#31, annee#27L]
            +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- Exchange hashpartitioning(annee#27L, 4), REPARTITION_BY_NUM, [id=#70]
                     +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
                        +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




### Question 8: PARTITIONNEMENT déclenché par une requête DISTINCT

In [None]:
d = notes.select("nU").distinct()
d.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[nU#30L], functions=[])
+- Exchange hashpartitioning(nU#30L, 4), ENSURE_REQUIREMENTS, [id=#779]
   +- *(1) HashAggregate(keys=[nU#30L], functions=[])
      +- *(1) ColumnarToRow
         +- InMemoryTableScan [nU#30L]
               +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
                        +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




Pourquoi n'y a-t-il pas de fonction ( on constate que _functions=[]_ ) dans les opérations HashAggregate ?

# Exercice 2  : Jointures

### Question 1 : algorithme de jointure par défaut

parametre par défaut

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1000 * 1000)

Vérifier si une table est cachée en mémoire

In [None]:
notes.is_cached

True

Jointure et notes et films

In [None]:
j1 = notes.join(films, "nF")
j1.explain()

# select *
# from NOTES JOIN FILMS on (NOTES.nF = Films.nF)

# select *
# from NOTES, FILMS
# where NOTES.nF = Films.nF

# select *
# from NOTES natural join FILMS

== Physical Plan ==
*(2) Project [nF#29L, nU#30L, note#31, annee#27L, titre#9, genres#13]
+- *(2) BroadcastHashJoin [nF#29L], [nF#8L], Inner, BuildRight, false
   :- *(2) Filter isnotnull(nF#29L)
   :  +- *(2) ColumnarToRow
   :     +- InMemoryTableScan [nF#29L, nU#30L, note#31, annee#27L], [isnotnull(nF#29L)]
   :           +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
   :                    +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#826]
      +- *(1) Project [nF#8L, titre#9, g#7 AS genres#13]
         +- *(1) Filter isnotnull(nF#8L)


Que font les opérations ?
* BroadcastExchange 
* BroadcastHashJoin

In [None]:
display(j1)

In [None]:
chronoPersist2(j1)
m.mesure(j1)

In [None]:
showPartitions(notes, 10)

In [None]:
print('Partition de Notes :')
showPartitionSize(notes)
print('Partition de J1 :')
showPartitionSize(j1)

Contenu des partitions de J1

In [None]:
showPartitions(j1,20)
# identique à celui de notes

### Question 2
Etudier la jointure Film,Notes selons plusieurs algorithmes de jointure
- J1: parallel hash sortmerge (HSM): N.join(F, "movieId")
- J2: broadcast des films: N.join(broadcast(F), "movieId"))
- J3: broadcast des notes: F.join(broadcast(N), "movieId"))

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")
j2 = notes.join(films, "nF")
j2.explain()

== Physical Plan ==
*(5) Project [nF#29L, nU#30L, note#31, annee#27L, titre#9, genres#13]
+- *(5) SortMergeJoin [nF#29L], [nF#8L], Inner
   :- *(2) Sort [nF#29L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(nF#29L, 4), ENSURE_REQUIREMENTS, [id=#877]
   :     +- *(1) Filter isnotnull(nF#29L)
   :        +- *(1) ColumnarToRow
   :           +- InMemoryTableScan [nF#29L, nU#30L, note#31, annee#27L], [isnotnull(nF#29L)]
   :                 +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                       +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
   :                          +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>
   +- *(4) Sort [nF#8L ASC NULLS FIRST], false, 0
     

In [None]:
j2.count()

In [None]:
showPartitions(j2, 20)

Que font ?
* Exchange hashpartitioning
* SortMergeJoin

In [None]:
chronoPersist2(j2)

In [None]:
from pyspark.sql.functions import broadcast

J3 = notes.join(broadcast(films), "nF")
J3.explain()

== Physical Plan ==
*(2) Project [nF#29L, nU#30L, note#31, annee#27L, titre#9, genres#13]
+- *(2) BroadcastHashJoin [nF#29L], [nF#8L], Inner, BuildRight, false
   :- *(2) Filter isnotnull(nF#29L)
   :  +- *(2) ColumnarToRow
   :     +- InMemoryTableScan [nF#29L, nU#30L, note#31, annee#27L], [isnotnull(nF#29L)]
   :           +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
   :                    +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#946]
      +- *(1) Project [nF#8L, titre#9, g#7 AS genres#13]
         +- *(1) Filter isnotnull(nF#8L)


In [None]:
chronoPersist2(J3)

In [None]:
J4 = films.join(broadcast(notes), "nF")
J4.explain()

In [None]:
chronoPersist2(J4)

# Exercice 3. Numérotation avec des entiers consécutifs

* Etudier le plan d'une opération qui numérote les éléments d'un dataset avec des nombres *consécutifs* (1, 2, ..., n), sans tri spécifique du dataset.
* Etudier la solution écrite en SQL avec la fonction de fenêtrage row_number(). Comprendre que la numérotation n'est pas traitée en parallèle.
* Comparaison avec le plan de la solution qui effectue une numérotation croissante mais avec des entiers **non** consécutifs. Indication: utiliser la fonction **monotonically_increasing_id()**.
* Proposer une solution qui s'applique à un Dataset et qui soit traitée en **parallèle**. Indication : s'inspirer de la méthode zipWithIndex() pour un RDD.

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

notesNumerotees1= notes.withColumn("numeroNote",  monotonically_increasing_id())

display(notesNumerotees1)

Unnamed: 0,nF,nU,note,annee,numeroNote
0,2959,1,4.0,2015,0
1,69844,1,5.0,2015,1
2,73017,1,5.0,2015,2
3,788,2,1.0,1997,3
4,1422,4,4.0,2003,4
...,...,...,...,...,...
95,1974,24,3.0,2001,95
96,2145,24,4.0,2001,96
97,2381,24,3.0,2001,97
98,2395,24,5.0,2001,98


In [None]:
showPartitions(notesNumerotees1, 3)

Partition 0 , 686782 éléments
Row(nF=2959, nU=1, note=4.0, annee=2015, numeroNote=0)
Row(nF=69844, nU=1, note=5.0, annee=2015, numeroNote=1)
Row(nF=73017, nU=1, note=5.0, annee=2015, numeroNote=2)

Partition 1 , 614791 éléments
Row(nF=2995, nU=142907, note=4.0, annee=2001, numeroNote=8589934592)
Row(nF=3178, nU=142907, note=5.0, annee=2001, numeroNote=8589934593)
Row(nF=3578, nU=142907, note=4.0, annee=2001, numeroNote=8589934594)



Afficher les 10 plus grandes valeurs de numeroNote

In [None]:
from pyspark.sql.functions import desc

display(notesNumerotees1.orderBy(desc("numeroNote")))

Unnamed: 0,nF,nU,note,annee,numeroNote
0,63082,270896,4.5,2009,8590549382
1,60069,270896,5.0,2009,8590549381
2,54001,270896,4.0,2009,8590549380
3,51662,270896,4.5,2009,8590549379
4,48516,270896,4.5,2009,8590549378
...,...,...,...,...,...
95,76293,270887,5.0,2016,8590549287
96,74510,270887,3.0,2016,8590549286
97,73266,270887,0.5,2017,8590549285
98,72722,270887,4.0,2016,8590549284


In [None]:
notesNumerotees1.explain()

== Physical Plan ==
*(1) Project [nF#29L, nU#30L, note#31, annee#27L, monotonically_increasing_id() AS numeroNote#2786L]
+- *(1) ColumnarToRow
   +- InMemoryTableScan [nF#29L, nU#30L, note#31, annee#27L]
         +- InMemoryRelation [nF#29L, nU#30L, note#31, annee#27L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [nF#29L, nU#30L, note#31, annee#27L]
                  +- FileScan json [annee#27L,nF#29L,nU#30L,note#31] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/temp/notes1M.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<annee:bigint,nF:bigint,nU:bigint,note:double>




Adapter le showPartitionsSize en getPartitionsSize pour recupérer dans le notebook le nbre d'éléments des partitions

Définir une fonction *numeroter(partId, iterateur)* pour numéroter chaque partition à partir de la "bonne" valeur de départ.

Invoquer cette fonction *numeroter* dans mapPartitionsWithIndex

# (facultatif) Exercice 4 : Similarité cosinus

## Question 1
Proposer une solution parallèle et distribuée pour calculer le cosinus entre toute les paires d'utilisateurs. Pour chaque utilisateur on connait l'ensemble des couples (nF,note) pour les films qu'il a notés.

## Question 2
Proposer une implémentation avec les méthodes mapPartitionWithIndex et repartition.

# (facultatif) Exercice 5  Similarité Jaccard
Définir un dataframe Similarité(u1, u2, sim) contenant la similarité entre chaque paire d'utilisateur.

similarité dans ]0, 1] en fonction (des notes et) des films...
jaccard entre les listes de films de 2 utilisateurs
https://fr.wikipedia.org/wiki/Indice_et_distance_de_Jaccard

jaccard(liste1, liste2) = nombre de films en communs dans liste1 et liste2 / nombre de films distincts dans l'union des 2 listes

$$jaccard(liste1, liste2) = \frac{|liste1 \cap liste2| } {|liste1 \cup lite2| }$$

jaccard([1,3,4], [3,4,5,8]) = 2 / 5 


FilmsUtil(nU, listeFilms)   avec listeFilms etant un attribut list   (collect_list)

FilmsUtil2(nu2, listeFilms2) : idem mais en renommant les attributs

PairesUtilisateurs(nU, listeFilms, nu2, listeFilms2) : avec un produit cartesien  (crossJoin)