* Master DAC, BDLE, 2022 
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr

# Querying tabular  data

The goal of the lab session is to formulate SQL queries in Spark.

Documentation about expressing SQL using Dataframe
* https://spark.apache.org/docs/latest/sql-programming-guide.html

Documentation about the Dataframe python API:
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html



## Préparation

Vérifier que des ressources de calcul sont allouées à votre notebook est connecté (cf RAM  de disque indiqués en haut à droite) . Sinon cliquer sur le bouton connecter pour obtenir des ressources.




Pour accéder directement aux fichiers stockées sur votre google drive. Renseigner le code d'authentification lorsqu'il est demandé

Ajuster le nom de votre dossier : MyDrive/ens/bdle/SparkDF. 

In [1]:
import os
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

drive_dir = "/content/drive/MyDrive/ens/bdle/SparkDF/"
os.makedirs(drive_dir, exist_ok=True)
os.listdir(drive_dir)

Mounted at /content/drive


['books.csv', 'users.csv', 'ratings.csv', 'vk_001.json']

Installer pyspark et findspark :


In [2]:
!pip install -q pyspark
!pip install -q findspark

[K     |████████████████████████████████| 281.4 MB 55 kB/s 
[K     |████████████████████████████████| 199 kB 55.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Démarrer la session spark

In [3]:
import os
# !find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.7/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

In [4]:
# Principaux import
import findspark
from pyspark.sql import SparkSession 
from pyspark import SparkConf  

# pour les dataframe et udf
from pyspark.sql import *  
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# pour le chronomètre
import time

# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark 
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory")
  
  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  
  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")    
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()

session démarrée, son id est  local-1667207432051


In [5]:
# on utilise 8 partitions au lieu de 200 par défaut
spark.conf.set("spark.sql.shuffle.partitions", "8")
print("Nombre de partitions utilisées : ", spark.conf.get("spark.sql.shuffle.partitions"))

Nombre de partitions utilisées :  8


## Data loading

In [6]:
# URL du dossier PUBLIC_DATASET contenant des fichiers de données pour les TP
# ---------------------------------------------------------------------------
# en cas de problème avec le téléchargement des datasets, aller directement sur l'URL ci-dessous
PUBLIC_DATASET_URL = "https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4" 
PUBLIC_DATASET=PUBLIC_DATASET_URL + "/download?path="

print("URL pour les datasets ", PUBLIC_DATASET_URL)

URL pour les datasets  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4


In [7]:
import os
from urllib import request

def load_file(file,dir):
  if(os.path.isfile(file)):
    print(file, "is already stored")
  else:
    url = PUBLIC_DATASET + "/"+ dir + "/" + file
    print("downloading from URL: ", url, "save in : " + drive_dir   + file)
    request.urlretrieve(url , drive_dir + file)

load_file("books.csv", "Books")
load_file("ratings.csv", "Books")
load_file("users.csv", "Books")
# load_file("vk_001.json", "VKRU18")

# Liste des fichiers de IMDB
os.listdir(drive_dir)

downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/Books/books.csv save in : /content/drive/MyDrive/ens/bdle/SparkDF/books.csv
downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/Books/ratings.csv save in : /content/drive/MyDrive/ens/bdle/SparkDF/ratings.csv
downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/Books/users.csv save in : /content/drive/MyDrive/ens/bdle/SparkDF/users.csv


['books.csv', 'users.csv', 'ratings.csv', 'vk_001.json']

## Data description

We consider the Books dataset which describes books and users rating these books. The schema of this dataset is given as follows:

* `Users (userid: Number, country: Text, age: Number)` 
* `Books (bookid: Number, titlewords: Number, authorwords: Number, year: Number, publisher: Number)`
* `Ratings (userid: Number, bookid: Number, rating: Number)`

In the Ratings table, userid and bookid refer to Users and Books, respectively.

In [8]:
#load 
users =  spark.read\
            .format("csv").option("header", "true")\
            .option("inferSchema", "true")\
            .load(drive_dir +"users.csv")

books =  spark.read\
            .format("csv").option("header", "true")\
            .option("inferSchema", "true")\
            .load(drive_dir +"books.csv")

ratings =  spark.read\
            .format("csv").option("header", "true")\
            .option("inferSchema", "true")\
            .load(drive_dir +"ratings.csv")


Upon loading the three datasets, examine their respective schemas using these instructions:

In [None]:
users.printSchema()
books.printSchema()
ratings.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- age: integer (nullable = true)

root
 |-- bookid: integer (nullable = true)
 |-- titlewords: integer (nullable = true)
 |-- authorwords: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- publisher: integer (nullable = true)

root
 |-- userid: integer (nullable = true)
 |-- bookid: integer (nullable = true)
 |-- rating: integer (nullable = true)



## Questions

### Simple queries

#### s0) Ids of users (column userid) from France. Note that country names are in lower case

In [None]:
s0 = users.filter(col('country') == "france").select('userid')
s0.count()

309

#### s1) Ids of books (column bookid) whose publication year is 2000

In [None]:
s1 = books.where(col('year') == 2000 ).select('bookid')
s1.count()

3692

#### s2) Ids of books rated above 3 (>3)

In [None]:
s2 = ratings.where(col('rating') > 3).select('bookid').distinct()
s2.count()

32623

### Collecting basic statistics

#### Total number of distinct users

In [None]:
users.distinct().count()

27876

#### Total number of distinct  books

In [None]:
books.distinct().count()

49972

### Aggregation queries

#### q1) Number of users per country, sorted in descending order of this number

In [None]:
q1 = users.groupby(col('country')).count().alias('count').orderBy(col('count').desc())
q1.show()

+--------------+-----+
|       country|count|
+--------------+-----+
|           usa|18935|
|        canada| 2505|
|       germany| 1254|
|       unknown| 1069|
|united kingdom| 1019|
|     australia|  581|
|         spain|  518|
|        france|  309|
|         italy|  211|
|      portugal|  184|
|   switzerland|  176|
|   netherlands|  147|
|   new zealand|  113|
|      malaysia|   99|
|       austria|   97|
|     singapore|   52|
|        brazil|   39|
|       finland|   38|
|       ireland|   36|
|   philippines|   32|
+--------------+-----+
only showing top 20 rows



##### Country who has the highest number of users, together with this number. Assume that only one country has this number.

In [None]:
q11 = q1.limit(1).select("country")
q11.show()

+-------+
|country|
+-------+
|    usa|
+-------+



##### Year with the highest number of edited books, together with this number. Assume that only one year has this number.

In [None]:
q12 = books.groupby(col('year')).count().alias('count').orderBy(col('count').desc()).limit(1).select('year')
q12.show()


+----+
|year|
+----+
|2002|
+----+



#### q2) Publishers with more than ten (10) edited books, in total

In [None]:
q2 = books.groupby('publisher').count().alias('count').filter(col('count') > 10).select('publisher')
q2.count()

501

#### q3) Publishers with more than five (5) edited books for each year in which they have published a book

In [None]:
q3 = books.select('publisher').subtract(books.groupby(col('publisher'), col('year')).count().alias('count').where(col('count') < 5).select('publisher'))
q3.count()

14

#### q4) The average rating per book

In [None]:
q4 = ratings.groupby('bookid').avg('rating')
q4.show()

+------+------------------+
|bookid|       avg(rating)|
+------+------------------+
|  1837|               2.5|
|  4885|               3.0|
| 27222|               2.0|
| 28598|               3.0|
| 27517|               4.0|
|  5042|               3.0|
|114615|              4.25|
| 32211|2.8333333333333335|
| 55585|               2.0|
| 25083|3.3333333333333335|
|225454|               5.0|
| 66472|              2.25|
|  3740|3.0273972602739727|
| 66563|               3.0|
|  7295|               5.0|
| 39829|2.8333333333333335|
| 21943|2.3333333333333335|
|   820| 3.235294117647059|
|    38| 4.043478260869565|
|  7084|2.6206896551724137|
+------+------------------+
only showing top 20 rows



### Join queries

#### q5) The publishers of books rated by users living in France

In [None]:
q5 = users.where(col('country') == 'france').select('userid').join(ratings, 'userid').select('bookid').join(books, 'bookid').select('publisher').distinct()
q5.count()

339

#### q6) The publishers of books which were never rated by users living in France

In [None]:
q6 = books.select('publisher').subtract(q5)
q6.count()

3236

### Queries using built-in functions

The Spark API contains many useful built-in functions that can be directly invoked on a dataframe. These are documented:
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions

The goal of this section is two use some of these functions to compute the Jaccard Similarity between users based on the books they rated.
To do so, we need to collect, for each pair of users (u1,u2), the sets of books they have rated, eg. [b1, ..., bn] for u1 and [b'1, ...,b'm] for u2, than apply the similarity formulae explained in https://en.wikipedia.org/wiki/Jaccard_index, that is, dividing the intersection of the sets of books by the union of these sets.

$$sim(u1,u2) = |([b_1, ..., b_n] \cap [b'_1, ...,b'_m]) /  ([b_1, ..., b_n] \cup [b'_1, ...,b'_m])|$$

**Note. Due to the potential high cost for computing the cross-product between all users, we restrict to users of France.**

#### create a df with ratings restricted to users of 'france' .

In [None]:
users_fr = users.where("country = 'france'")
users_fr.count()

309

In [None]:
ratings_fr = ratings.join(users_fr,"userid")
ratings_fr.count()

2423

**In this part, users always refer to users of France **

#### create a dataframe obtained by collecting, for each user, the set of rated books. 
Hint. group bookids per user than use a built-in function that creates an array from the grouped bookids (examine the schema)

In [None]:
from pyspark.sql.functions import collect_list 

In [None]:
users_books = ratings_fr.groupby(col('userid')).agg(collect_list('bookid')).alias('bookList')

#### create a dataframe containing pairs of distinct users with their rated books.
Hint. You need to rename the dataframe columns.

In [None]:
pair_users_books = users_books.select('userid', 'bookList').alias('userid1', 'bookList1').crossJoin(users_books)

AnalysisException: ignored

#### compute the Jaccard similarity and leave only pairs of books with a non-zero similarity

In [None]:
from pyspark.sql.functions import array_intersect, array_union, size

In [None]:
jaccard_sim = pair_users_books.#...


In [None]:
jaccard_sim.show()

+------+-------+------------------+
|userid|userid1|               sim|
+------+-------+------------------+
| 72211| 268945|               1.0|
|223612| 277660|               0.5|
|155585| 187587|               0.5|
|  2549|  95895|               0.5|
|  2549| 115259|               0.5|
|  7720| 223612|               0.5|
|115259| 225340|               0.5|
|  5644| 187754|               0.5|
|252405| 274882|               0.5|
| 93263| 133057|               0.5|
| 48698| 159394|               0.5|
|  5208|  27762|               0.5|
| 48698| 252405|               0.5|
| 13867| 263926|               0.5|
|  6445| 188133|               0.5|
|170158| 255405|               0.4|
| 34908| 271681|             0.375|
|  4260| 197685|0.3333333333333333|
| 24717|  93263|0.3333333333333333|
|159394| 252405|0.3333333333333333|
+------+-------+------------------+
only showing top 20 rows



In [None]:
jaccard_sim.count()

2023

### Queries with User-defined functions

Spark allows users to define specific functions called `User-Defined Functions`. 
We illustrate this concept with the following example: 

consider that we need to return the number of characters of the `country` column. To do so, we define a function called `slen` which, given a string `s` as  input returns its length computed by the string function `len(s)`.
The `udf` will be invoked on a dataframe by specifying the column(s) on which it is applied.

There are different ways to define a `udf`:
* using the `udf` class and registering it using the `register` method of the `udf` class, or
* by preceding the function siganture with `@udf('type')` where `type` is the return type of the function

We will use the second option which is syntactically simpler.

In [None]:
from pyspark.sql.functions import udf

In [None]:
@udf('integer')
def slen(s):
  return len(s)

In [None]:
len_country = users.withColumn("length",slen("country"))
len_country.show()

+------+--------------+---+------+
|userid|       country|age|length|
+------+--------------+---+------+
|100004|           usa|  0|     3|
|100009|        canada| 49|     6|
| 10001|           usa| 47|     3|
|100029|       germany|  0|     7|
| 10003|           usa| 20|     3|
|100035|        canada|  0|     6|
|100043|           usa| 36|     3|
|100046|           usa| 14|     3|
|100053|           usa| 31|     3|
|100066|           usa| 26|     3|
|100088|united kingdom| 26|    14|
|100094|        canada|  0|     6|
|100098|           usa|  0|     3|
|100115|           usa| 42|     3|
|100119|           usa| 27|     3|
|100121|           usa| 47|     3|
|100131|           usa| 16|     3|
|100164|       germany|  0|     7|
|100167|           usa| 28|     3|
|100176|     australia|  0|     9|
+------+--------------+---+------+
only showing top 20 rows



#### Mapping ratings to categories

We would like to add a textual representation of ratings such that:
* rating <1 is converted to 'bad'
* 1 <= rating <2 is converted to 'average'
* 2 <= rating <3 is converted to 'good'
* 3 <= rating is converted to 'excellent'

#### Complete the stub of `convert_rating(note)` which maps an integer to a string based on the previous rules.

In [None]:
@udf('string')
def convert_rating(note):
   if note<=1:
    return "bad"
   elif note <=2:
        return "average"
   elif note <=3:
         return "good"
   else:
         return "excellent"

#### Using  `convert_rating` map each `rating` to its associated category

In [None]:
text_ratings = ratings.
text_ratings.show()

+------+------+------+-----------+
|userid|bookid|rating|text_rating|
+------+------+------+-----------+
|276747|  4780|     4|  excellent|
|276747|  1837|     4|  excellent|
|276747|  6277|     3|       good|
|276762|  7819|     1|        bad|
|276762|  4885|     3|       good|
|276772| 27222|     2|    average|
|276772| 33829|     5|  excellent|
|276772| 83629|     5|  excellent|
|276786|246867|     3|       good|
|276786|117697|     2|    average|
|276788|  2239|     3|       good|
|276788|  5507|     2|    average|
|276788| 19993|     5|  excellent|
|276798|118271|     1|        bad|
|276798| 82229|     2|    average|
|276798|  3218|     2|    average|
|276813| 28598|     3|       good|
|276813| 76828|     3|       good|
|276813|164425|     2|    average|
|276813| 95317|     2|    average|
+------+------+------+-----------+
only showing top 20 rows



### Queries with vectorized User-defined functions

Adopt another strategy by defining a Panda UDF for mapping rating to categories

## END