* Author: Mohamed-Amine Baazizi 
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr

# Interrogation de données semi-structurées en Spark

Ce TME illustre un exemple de bout en bout qui traite de la préparation de données de la formulation de requêtes d'analyse. 
Les données sont au format JSON qui permet de représenter les données de manière flexible avec des variations structurelles.

Les traitements s'expriement dans l'API Dataset dont la documentation est accessible depuis ces deux liens :
* https://spark.apache.org/docs/latest/sql-programming-guide.html
* https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset



Les données utilisées correspondent à des évènements décrivant des posts échangés, partagés ou commentés sur un réseau social VK pendant l'élection présidentielle Russe. L'API officielle de ces données est décrite sur le lien https://vk.com/dev/streaming_api_docs_2?f=7.%2BЧтение%2Bпотока   

Nous utiliser ces données pour analyser les posts par types, les tags utilisés dans ces post et la relation implicite entre les auteurs des posts.

Télécharger l'archive  VKRU18s.tgz depuis le répertoire VKRU18 de PUBLIC_DATASET accessile depuis https://nuage.lip6.fr/s/PQM3RgR4FRnMPQ9  

Désarchiver en tapant
`tar xvf VKRU18s.tgz`

Puis copier les données  `cp -r VKRU18s /tmp/BDLE/dataset`

## Questions

### Chargement des données 

In [1]:
val path ="C:/Users/yousef/Desktop/Courses/VKRU18s/"

Intitializing Scala interpreter ...

Spark Web UI available at http://10.30.84.62:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1574384157044)
SparkSession available as 'spark'


path: String = C:/Users/yousef/Desktop/Courses/VKRU18s/


In [2]:
val vk = path + "vk_001.json"
val data = spark.read.format("json").load(vk).dropDuplicates
val total = data.count()

vk: String = C:/Users/yousef/Desktop/Courses/VKRU18s/vk_001.json
data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_id: struct<$oid: string>, code: bigint ... 1 more field]
total: Long = 30683


#### Affichage du schema 

In [None]:
data.printSchema

#### Fréquence d'apparition des attributs optionnels

In [6]:
val attrs = List("event", 
                 "event.event_id", 
                 "event.event_id.post_id", 
                 "event.event_id.post_owner_id", 
                 "event.event_id.comment_id", 
                 "event.event_id.shared_post_id", 
                 "event.author", 
                 "event.attachments", 
                 "event.geo", 
                 "event.tags",
                 "event.creation_time")

attrs.foreach(x=>println(x + " "+data.where(x+" is not null").count))

event 30683
event.event_id 30683
event.event_id.post_id 30683
event.event_id.post_owner_id 30683
event.event_id.comment_id 16518
event.event_id.shared_post_id 638
event.author 30683
event.attachments 15944
event.geo 22
event.tags 30683
event.creation_time 30683


attrs: List[String] = List(event, event.event_id, event.event_id.post_id, event.event_id.post_owner_id, event.event_id.comment_id, event.event_id.shared_post_id, event.author, event.attachments, event.geo, event.tags, event.creation_time)


#### Quels attributs sont optionnels lesquels sont obligatoires?  Extraire les  valeurs distinctes pour se terminant par _id

D'apres la documentation officielle https://vk.com/dev/streaming_api_docs_2?f=7.%2BЧтение%2Bпотока il est indiqué que la présence de certains attributs en conditionnée par la valeur que prend un autre attribut.
Par exemple, `event.event_id.comment_id` n'est présent que si `event.event_type='comment'`.
Idem pour `event.event_id.shared_post_id` qui n'est présent que si `event.event_type='share'`.

#### Vérifier ces hypothèses à l'aide de requêtes

In [3]:
/*vérifier  que `event.event_type='comment'` implique
`event.event_id.comment_id` */
data.where("event.event_type='comment'").where("event.event_id.comment_id is null").count()

res0: Long = 0


In [13]:
/*vérifier que `event.event_type='share'` implique 
`event.event_id.shared_post_id` */
data.where("event.event_type='share'").where("event.event_id.shared_post_id is null").count()

res9: Long = 0


#### Combien y a t il  de post id différents ?

In [None]:
data.where("event.event_id.shared_post_id").count

In [21]:
data.select("event.event_id.post_id").distinct().count()//.agg(countDistinct("shared_post_id")).show()

res16: Long = 21683


#### Retourner le nombre de posts par type d'évenement 

In [46]:
val postPerType = data.
postPerType.show()


+----------+----------+
|event_type|count_post|
+----------+----------+
|   comment|     14202|
|      post|      8137|
|     share|       544|
+----------+----------+



postPerType: org.apache.spark.sql.DataFrame = [event_type: string, count_post: bigint]


In [17]:
data.select("event.event_type","event.event_id.post_id").distinct().groupBy("event_type").count().show()

+----------+-----+
|event_type|count|
+----------+-----+
|   comment|14202|
|      post| 8137|
|     share|  544|
+----------+-----+



#### Applatissement des listes de tags

Dans la valeur `data` chaque objet est associté à un tableau de tags accessible depuis `event.tags`.
L'instruction suivante applatie ce tableau en créant une ligne pour chaque chaine contenue dans tags.

In [5]:
val dataWithTags = data.withColumn("tag", explode($"event.tags"))
dataWithTags.show

+--------------------+----+--------------------+---------+
|                 _id|code|               event|      tag|
+--------------------+----+--------------------+---------+
|[5a66276e7f254c35...| 100|[new,, [https://v...| grudinin|
|[5a66296f7f254c35...| 100|[new,, [https://v...|    putin|
|[5a68d75a713e4d08...| 100|[new,, [https://v...|    putin|
|[5a68d75a713e4d08...| 100|[new,, [https://v...|yavlinsky|
|[5a68f01c713e4d08...| 100|[new,, [https://v...|  navalny|
|[5a68fe3c713e4d08...| 100|[new,, [https://v...|    putin|
|[5a690364713e4d08...| 100|[new,, [https://v...|    putin|
|[5a69a403713e4d08...| 100|[new, [[,,, [,,, ...|    putin|
|[5a69b263713e4d08...| 100|[new, [[,,,,,,,,,...|  navalny|
|[5a6a03f7713e4d08...| 100|[new, [[,,,,,,,,,...|    putin|
|[5a6af994713e4d08...| 100|[new, [[,,,,,, [c...|    putin|
|[5a6b13e4713e4d08...| 100|[new,, [https://v...|    putin|
|[5a6b6900713e4d08...| 100|[new, [[,,, [,,, ...|    putin|
|[5a6b7918713e4d08...| 100|[new, [[,,,,,,,,,...|  navaln

dataWithTags: org.apache.spark.sql.DataFrame = [_id: struct<$oid: string>, code: bigint ... 2 more fields]


#### Retourner le nombre de posts par tag

In [13]:
val postsPerTag = dataWithTags.
postsPerTag.orderBy($"count".desc).show()

<console>: 27: error: value postsPerTag is not a member of org.apache.spark.sql.DataFrame

In [22]:
val postsPerTag = dataWithTags.groupBy("tag").count().orderBy($"count".desc)
postsPerTag.show()

+-----------+-----+
|        tag|count|
+-----------+-----+
|      putin|18869|
|   grudinin| 8485|
|    navalny| 2768|
|    sobchak| 2407|
|zhirinovsky| 1290|
|      titov|  592|
|  yavlinsky|  365|
+-----------+-----+



postsPerTag: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tag: string, count: bigint]


In [27]:
import org.apache.spark.sql.expressions.UserDefinedFunction
def tuple: UserDefinedFunction =  udf((l: String, r: Integer) => (l,"vaj",r))

import org.apache.spark.sql.expressions.UserDefinedFunction
tuple: org.apache.spark.sql.expressions.UserDefinedFunction


In [29]:
postsPerTag.withColumn("dashagh",($"tag","hala",$"count")).show()

<console>: 30: error: type mismatch;

In [19]:
postsPerTag.crossJoin(postsPerTag).show()

<console>: 28: error: too many arguments for method crossJoin: (right: org.apache.spark.sql.Dataset[_])org.apache.spark.sql.DataFrame

####  Retourner le nombre de posts par auteur id

In [56]:
val authCountPerTag = dataWithTags.
authCountPerTag.orderBy($"nbAuths".desc).show()


+-----------+-------+
|        tag|nbAuths|
+-----------+-------+
|      putin|  15673|
|   grudinin|   6207|
|    navalny|   2580|
|    sobchak|   2288|
|zhirinovsky|   1214|
|      titov|    572|
|  yavlinsky|    347|
+-----------+-------+



authCountPerTag: org.apache.spark.sql.DataFrame = [tag: string, nbAuths: bigint]


In [77]:
val authCountPerTag =dataWithTags.groupBy("tag").agg(expr("count(_id)").as("nbAuths"))
authCountPerTag.orderBy($"nbAuths".desc).show()

+-----------+-------+
|        tag|nbAuths|
+-----------+-------+
|      putin|  18869|
|   grudinin|   8485|
|    navalny|   2768|
|    sobchak|   2407|
|zhirinovsky|   1290|
|      titov|    592|
|  yavlinsky|    365|
+-----------+-------+



authCountPerTag: org.apache.spark.sql.DataFrame = [tag: string, nbAuths: bigint]


#### Fact-checking en utilisant Wikipédia 

Chaque tag correspond à un candidat de l'élection Russe de 2018 (ex. Putin, Titov, etc).
Nous voulons savoir s'il existe une relation entre le nombre de tag par auteur et le nombre de votes de chaque candidat.
On récupere depuis Wikipedia le nombre de votes de chaque candidat et on le stocke dans la valeur `votes` décrite ci-dessous.

In [50]:
import spark.implicits._

case class Vote(name: String, party: String, votes: Long)
val votes = Seq(Vote("putin", "Independent", 56430712), 
                Vote("grudinin", "Communist",8659206), 
                Vote("zhirinovsky","Liberal Democratic Party",4154985),
                Vote("sobchak","Civic Initiative",1238031),
                Vote("yavlinsky","Yabloko",769644), 
                Vote("titov","Party of Growth",556801)).toDS()
votes.printSchema
votes.show()

root
 |-- name: string (nullable = true)
 |-- party: string (nullable = true)
 |-- votes: long (nullable = false)

+-----------+--------------------+--------+
|       name|               party|   votes|
+-----------+--------------------+--------+
|      putin|         Independent|56430712|
|   grudinin|           Communist| 8659206|
|zhirinovsky|Liberal Democrati...| 4154985|
|    sobchak|    Civic Initiative| 1238031|
|  yavlinsky|             Yabloko|  769644|
|      titov|     Party of Growth|  556801|
+-----------+--------------------+--------+



import spark.implicits._
defined class Vote
votes: org.apache.spark.sql.Dataset[Vote] = [name: string, party: string ... 1 more field]


#### Créeer une table contenant pour chaque candidate le nombre de votes et le nombre d'auteurs ayant utilisé son nom comme un tag

In [58]:
val votesCount = votes.
votesCount.show

+-----------+--------+-------+
|       name|   votes|nbAuths|
+-----------+--------+-------+
|      putin|56430712|  15673|
|   grudinin| 8659206|   6207|
|    sobchak| 1238031|   2288|
|zhirinovsky| 4154985|   1214|
|      titov|  556801|    572|
|  yavlinsky|  769644|    347|
+-----------+--------+-------+



votesCount: org.apache.spark.sql.DataFrame = [name: string, votes: bigint ... 1 more field]


In [78]:
val votesCount = votes.
    join(authCountPerTag,votes("name") === authCountPerTag("tag")).
    orderBy($"votes".desc).
    select("name","votes","nbAuths")
votesCount.show

+-----------+--------+-------+
|       name|   votes|nbAuths|
+-----------+--------+-------+
|      putin|56430712|  18869|
|   grudinin| 8659206|   8485|
|zhirinovsky| 4154985|   1290|
|    sobchak| 1238031|   2407|
|  yavlinsky|  769644|    365|
|      titov|  556801|    592|
+-----------+--------+-------+



votesCount: org.apache.spark.sql.DataFrame = [name: string, votes: bigint ... 1 more field]


#### Rajouter à la table de votes le rang obtenu à partir du nombre de votes et celui obtenu à partir du nombre d'auteurs. Que remarquez-vous ?


In [59]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

...

+-----------+--------+-------+----------+-------------+
|       name|   votes|nbAuths|votes_rank|nbAuths_ranks|
+-----------+--------+-------+----------+-------------+
|      putin|56430712|  15673|         1|            1|
|   grudinin| 8659206|   6207|         2|            2|
|    sobchak| 1238031|   2288|         4|            3|
|zhirinovsky| 4154985|   1214|         3|            4|
|      titov|  556801|    572|         6|            5|
|  yavlinsky|  769644|    347|         5|            6|
+-----------+--------+-------+----------+-------------+



import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
windowSpecVotes: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@16301589
windowSpecCount: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@36d3a591


In [54]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._


In [80]:
var ranked=votesCount.
    withColumn("votes_rank",rank().over(Window.orderBy($"votes".desc))).
    withColumn("nbAuths_rank",rank().over(Window.orderBy($"nbAuths".desc)))
ranked.show()

+-----------+--------+-------+----------+------------+
|       name|   votes|nbAuths|votes_rank|nbAuths_rank|
+-----------+--------+-------+----------+------------+
|      putin|56430712|  18869|         1|           1|
|   grudinin| 8659206|   8485|         2|           2|
|    sobchak| 1238031|   2407|         4|           3|
|zhirinovsky| 4154985|   1290|         3|           4|
|      titov|  556801|    592|         6|           5|
|  yavlinsky|  769644|    365|         5|           6|
+-----------+--------+-------+----------+------------+



ranked: org.apache.spark.sql.DataFrame = [name: string, votes: bigint ... 3 more fields]


### Création de cubes

Nous voulons créer une cube pour agéger le posts sur trois dimensions : tag, type d'évenement and mois de création.

#### Rajouter un attribut month contenant le mois extrait pour chaque objet 

Astuce. importer le package org.apache.spark.sql.types.DateType et utiliser une fonction Spark SQL prédéfinie.
Vous remarquerez qu'il n y a que trois mois : 1,2 et 3.

In [82]:
import org.apache.spark.sql.types.DateType

val dataTagMon = dataWithTags.withColumn("month",from_unixtime($"event.creation_time","M"))
dataTagMon.show()

+--------------------+----+--------------------+---------+-----+
|                 _id|code|               event|      tag|month|
+--------------------+----+--------------------+---------+-----+
|[5a66276e7f254c35...| 100|[new,, [https://v...| grudinin|    1|
|[5a66296f7f254c35...| 100|[new,, [https://v...|    putin|    1|
|[5a68d75a713e4d08...| 100|[new,, [https://v...|    putin|    1|
|[5a68d75a713e4d08...| 100|[new,, [https://v...|yavlinsky|    1|
|[5a68f01c713e4d08...| 100|[new,, [https://v...|  navalny|    1|
|[5a68fe3c713e4d08...| 100|[new,, [https://v...|    putin|    1|
|[5a690364713e4d08...| 100|[new,, [https://v...|    putin|    1|
|[5a69a403713e4d08...| 100|[new, [[,,, [,,, ...|    putin|    1|
|[5a69b263713e4d08...| 100|[new, [[,,,,,,,,,...|  navalny|    1|
|[5a6a03f7713e4d08...| 100|[new, [[,,,,,,,,,...|    putin|    1|
|[5a6af994713e4d08...| 100|[new, [[,,,,,, [c...|    putin|    1|
|[5a6b13e4713e4d08...| 100|[new,, [https://v...|    putin|    1|
|[5a6b6900713e4d08...| 10

import org.apache.spark.sql.types.DateType
dataTagMon: org.apache.spark.sql.DataFrame = [_id: struct<$oid: string>, code: bigint ... 3 more fields]


#### Que font les instructions suivantes ? 

In [83]:
val cub_ev_tag_mo =  dataTagMon.rollup("event.event_type", "tag", "month").count()
val cub_ev_tag_mo_notnull = cub_ev_tag_mo.where("event_type is not null and tag is not null and month is not null")
cub_ev_tag_mo_notnull.orderBy($"count".desc).show()
// it makes a calcutes number of --- in for each event_type,tag and month

+----------+--------+-----+-----+
|event_type|     tag|month|count|
+----------+--------+-----+-----+
|   comment|   putin|    2| 4601|
|   comment|   putin|    3| 3997|
|      post|   putin|    2| 3654|
|      post|   putin|    3| 3561|
|   comment|grudinin|    2| 2276|
|      post|grudinin|    2| 1836|
|   comment|grudinin|    3| 1705|
|      post|grudinin|    3| 1688|
|   comment|   putin|    1| 1498|
|      post|   putin|    1| 1186|
|   comment| navalny|    2|  810|
|   comment| sobchak|    3|  555|
|      post| sobchak|    2|  553|
|   comment| navalny|    1|  494|
|      post| navalny|    2|  478|
|   comment|grudinin|    1|  475|
|   comment| sobchak|    2|  445|
|      post| sobchak|    3|  438|
|   comment| navalny|    3|  423|
|      post|grudinin|    1|  388|
+----------+--------+-----+-----+
only showing top 20 rows



cub_ev_tag_mo: org.apache.spark.sql.DataFrame = [event_type: string, tag: string ... 2 more fields]
cub_ev_tag_mo_notnull: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [event_type: string, tag: string ... 2 more fields]


### Tableau croisé 

#### En utilisant le cube construit précédemment, construire un tableau croisé réduisant le cube aux dimensions mois et type d'évenement.

In [65]:
val monthEvent = cub_ev_tag_mo_notnull.groupBy("event_type","month")
monthEvent.printSchema
monthEvent.show

root
 |-- month: integer (nullable = true)
 |-- comment: long (nullable = true)
 |-- post: long (nullable = true)
 |-- share: long (nullable = true)

+-----+-------+----+-----+
|month|comment|post|share|
+-----+-------+----+-----+
|    1|   2729|2101|  100|
|    3|   7130|6442|  208|
|    2|   8594|7057|  415|
+-----+-------+----+-----+



monthEvent: org.apache.spark.sql.DataFrame = [month: int, comment: bigint ... 2 more fields]


In [86]:
val monthEvent = cub_ev_tag_mo_notnull.groupBy("month").pivot("event_type").sum("count")
monthEvent.show()
//groupBy("id").pivot("type")

+-----+-------+----+-----+
|month|comment|post|share|
+-----+-------+----+-----+
|    3|   7130|6442|  208|
|    1|   2729|2101|  100|
|    2|   8594|7057|  415|
+-----+-------+----+-----+



monthEvent: org.apache.spark.sql.DataFrame = [month: string, comment: bigint ... 2 more fields]


<console>: 26: error: not found: value monthEvent

###  Matrice de co-occurrence des tag

#### Créer une table indiquant pour chaque paire de tag le nombre d'auteur qui les utilisent en même temps.

In [67]:
val authTag = dataWithTags.
val tagCoOcc = authTag.
tagCoOcc.show()


+-----------+-----------+-----+
|        tag|   otherTag|count|
+-----------+-----------+-----+
|      putin|   grudinin| 7267|
|   grudinin|      putin| 7267|
|      putin|    navalny| 1806|
|    navalny|      putin| 1806|
|    sobchak|      putin| 1299|
|      putin|    sobchak| 1299|
|    navalny|   grudinin| 1113|
|   grudinin|    navalny| 1113|
|      putin|zhirinovsky|  901|
|zhirinovsky|      putin|  901|
|   grudinin|    sobchak|  722|
|    sobchak|   grudinin|  722|
|zhirinovsky|   grudinin|  714|
|   grudinin|zhirinovsky|  714|
|zhirinovsky|    sobchak|  439|
|    sobchak|zhirinovsky|  439|
|      putin|  yavlinsky|  355|
|  yavlinsky|      putin|  355|
|    sobchak|    navalny|  300|
|    navalny|    sobchak|  300|
+-----------+-----------+-----+
only showing top 20 rows



authTag: org.apache.spark.sql.DataFrame = [authorID: bigint, tag: string]
tagCoOcc: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [authorID: bigint, otherTag: string ... 1 more field]


In [30]:
val dataWithTags_a = dataWithTags.withColumnRenamed("event", "event1").withColumnRenamed("tag", "otherTag")
val TupletagsPerAuthor_a = dataWithTags.join(dataWithTags_a).where("(event.event_id = event1.event_id) and (event.author.id = event1.author.id) and (tag > otherTag)")
val tagCoOcc = TupletagsPerAuthor_a.groupBy("tag", "otherTag").agg(size(collect_set("event.author.id")) as "count")
tagCoOcc.show(30)

+-----------+---------+-----+
|        tag| otherTag|count|
+-----------+---------+-----+
|      putin| grudinin| 1479|
|  yavlinsky|  sobchak|  164|
|    sobchak| grudinin|  332|
|zhirinovsky|yavlinsky|  116|
|    sobchak|  navalny|  187|
|  yavlinsky| grudinin|  138|
|  yavlinsky|    putin|  142|
|zhirinovsky|    putin|  332|
|  yavlinsky|    titov|   78|
|      titov|  navalny|    8|
|    sobchak|    putin|  507|
|zhirinovsky|  navalny|   48|
|      putin|  navalny|  480|
|zhirinovsky|    titov|   79|
|      titov|  sobchak|   90|
|zhirinovsky| grudinin|  289|
|  yavlinsky|  navalny|   35|
|      titov|    putin|  101|
|    navalny| grudinin|  264|
|zhirinovsky|  sobchak|  346|
|      titov| grudinin|   86|
+-----------+---------+-----+



dataWithTags_a: org.apache.spark.sql.DataFrame = [_id: struct<$oid: string>, code: bigint ... 2 more fields]
TupletagsPerAuthor_a: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_id: struct<$oid: string>, code: bigint ... 6 more fields]
tagCoOcc: org.apache.spark.sql.DataFrame = [tag: string, otherTag: string ... 1 more field]


#### Construire une matrice de co-occurrence des tags. 

In [68]:
val tagMat = tagCoOcc.groupBy("tag").pivot("otherTag").agg(sum("count") as "count")
tagMat.show()

+-----------+--------+-------+-----+-------+-----+---------+-----------+
|        tag|grudinin|navalny|putin|sobchak|titov|yavlinsky|zhirinovsky|
+-----------+--------+-------+-----+-------+-----+---------+-----------+
|   grudinin|    null|   1113| 7267|    722|  140|      210|        714|
|    navalny|    1113|   null| 1806|    300|   21|       60|         90|
|      putin|    7267|   1806| null|   1299|  246|      355|        901|
|    sobchak|     722|    300| 1299|   null|  111|      199|        439|
|      titov|     140|     21|  246|    111| null|       94|         90|
|  yavlinsky|     210|     60|  355|    199|   94|     null|        145|
|zhirinovsky|     714|     90|  901|    439|   90|      145|       null|
+-----------+--------+-------+-----+-------+-----+---------+-----------+



tagMat: org.apache.spark.sql.DataFrame = [tag: string, grudinin: bigint ... 6 more fields]
