<a href="https://colab.research.google.com/github/veroorli/ProjetProg/blob/master/TME1_Dataframes_collab_2022.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BDLE : Graphes en DataFrame

## Données synthétiques sur la musique

##### Description des données: 

- Fichier **data.csv** : contient des informations sur des chansons (trackId) interprétées par des artistes (artistId) et   écoutées par des utilisateurs (userId) à une date donnée par une estampille temporelle timestamp. Contient 260664 lignes.
- Fichier **meta.csv**: contient les noms (champ 'Name') des chansons si type==track ou des artistes si type==artist.
  'Artist' est le nom de l'artiste qui interprète la chanson si type==track ou le nom de l'artiste (même valeur que 'Name') si track==artist. 'Id' est l'identifiant d'une chanson ou d'un artiste. Contient 44319 lignes.

## Lecture des données 




## Construction du graphe


In [None]:
# fonction utilisée par la suite pour calculer les poids des arcs
# df: dataframe, source: nom de la colonne contenant les noeuds source des arcs
# poids: poids initial avant normalisation, n: nombre maximum d'arcs à garder 
#pour chaque source
from pyspark.sql.functions import row_number, sum
from pyspark.sql import Window

def calcul_poids(df, source, poids, n):
    
    window = Window.partitionBy(source).orderBy(col(poids).desc())
    
    # Sert juste à filtrer le nombre d'arc maximum pour chaque source
    filterDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number")) 
        
    # Somme des poids pour chaque source
    tmpDF = filterDF.groupBy(col(source)).agg(sum(col(poids))
    .alias("sum_" + poids))
   
   # Normalisation des poids
    finalDF = filterDF.join(tmpDF, source, "inner") \
        .withColumn("norm_" + poids, col(poids) / col("sum_" + poids)) \
        .cache()
        
    return finalDF

 ### Construction des liens pondérés entre utilisateurs et chansons 
* Construire un DataFrame userTrack à partir de data pour stocker les arcs entre utilisateurs et chansons. Pour chaque utilisateur (userId) on ajoute un arc vers une chanson (trackId) avec un poids égal au nombre total de fois que l'utilisateur à écouté la chanson. Utiliser la fonction calcul_poids pour garder pour chaque utilisateur les 100 chansons avec le poids le plus élevé et normaliser les poids des arcs gardés.   
    
* Affichage du résultat: garder uniquement les arcs qui ont les 5 plus grandes valeurs possibles des poids (utilisez la fonction rank() et la fenêtre over(window)). Afficher 20 lignes du résultat, en triant le résultat par ordre décroissant des poids, ensuite par ordre croissant des userId et des artistId.

In [None]:
from pyspark.sql.functions import col, rank

#nombre de fois que chaque utilisateurs écoute un track

userTrack = data.groupby('userId', 'trackId').agg(first('artistId')
.alias('artistId'), count('userId').alias('count'))

#calculer le poids final en utilisant calcul_poids
userTrack = calcul_poids(userTrack, 'userId', 'count', 100).persist() 

window = Window.partitionBy('userId').orderBy(col("norm_count").desc())
    
userTrackList = userTrack.withColumn("position", rank().over(window))\
  .where(col("position")<6)\
  .orderBy('userId', 'artistId')
  .select('userId', 'trackId', 'norm_count').take(20)

for val in userTrackList:
   print("%s %s %s" % val)
    
userTrack.count() # résultat: 210675   

### Construction des liens pondérés entre utilisateurs et artistes
* Construire un DataFrame userArtist à partir de data pour stocker les arcs entre utilisateurs et artistes. Pour chaque utilisateur (userId) on ajoute un arc vers un artiste (artistId) avec un poids égal au nombre total de fois que l'utilisateur à écouté des chansons de cet artiste. Utiliser la fonction calcul_poids pour garder pour chaque utilisateur au plus 100 artistes avec le poids le plus élevé et normaliser les poids des arcs gardés.   
    
* Affichage du résultat: garder uniquement les arcs qui ont les 5 plus grandes valeurs possibles des poids (utilisez la fonction rank() et la fenêtre over(window)). 
   Afficher 20 lignes du résultat, en triant le résultat par ordre décroissant des poids, ensuite par ordre croissant des userId et des artistId.

In [None]:
#poids=nombre de fois qu'un utilisateurs a écouté des morceaux de cet artiste
#regrouper les données par userId et artistId
userArtist = data.groupby('userId', 'artistId').count()

#calculer le poids final en utilisant calcul_poids
userArtist = calcul_poids(userArtist, 'userId', 'count', 100).persist()  

window = Window.partitionBy('userId').orderBy(col("norm_count").desc())
    
userArtistList = userArtist.withColumn("position", rank().over(window))\
  .where(col("position")<6)\
  .orderBy('userId', 'artistId')
  .select('userId', 'artistId', 'norm_count').take(20)

for val in userArtistList:
   print("%s %s %s" % val)
    
userArtist.count() #résultat: 178419

### Construction des liens pondérés entre artistes et chansons
* Construire un DataFrame artistTrack à partir de data pour stocker les arcs entre artistes et chansons. Pour chaque artiste (artistId) on ajoute un arc vers une chanson (trackId) avec un poids égal au nombre total de fois que la chanson de cet artiste a été écoutée par tous les utilisateurs. Utiliser la fonction calcul_poids pour garder pour chaque artiste au plus 100 chansons avec le poids le plus élevé et normaliser les poids des arcs gardés.   
    
* Affichage du résultat: garder uniquement les arcs qui ont les 5 plus grandes valeurs possibles des poids (utilisez la fonction rank() et la fenêtre over(window)). 
   Afficher 20 lignes du résultat, en triant le résultat par ordre décroissant des poids, ensuite par ordre croissant des artistId et des trackId.* 

In [None]:
# poids de l'arc: nombre de fois qu'un track de l'artiste a été écouté 
#par tous les utilisateurs
artistTrack = data.groupby('artistId', 'trackId').count()

#calculer le poids final en utilisant calcul_poids
artistTrack = calcul_poids(artistTrack, 'artistId', 'count', 100).persist() 

window = Window.partitionBy("artistId").orderBy(col("norm_count").desc())
    
artistTrackList = artistTrack.withColumn("position", rank().over(window))\
  .where(col("position")<6)\
  .orderBy('artistId', 'trackId')
  .select('artistId', 'trackId', 'norm_count').take(20)    

for val in artistTrackList:
   print("%s %s %s" % val)
    
artistTrack.count() #résultat: 35408

### Construction des liens pondérés entre chansons
* Construire un DataFrame trackTrack à partir de data pour stocker les arcs entre les chansons. Le poids total d'un arc entre trackId1 et trackId2 est le nombre total d'utilisateurs qui ont écouté à la fois trackId1 et trackId2 dans un laps de temps de 10 minutes (à noter que le graphe est non orienté, trackTrack contient à la fois une entrée pour (trackId1, trackId2) et une entrée pour (trackId2, trackId1)).   
Utiliser la fonction calcul_poids pour garder pour chaque chanson au plus 100 chansons avec le poids le plus élevé et normaliser les poids gardés.   
    
* Affichage du résultat: garder uniquement les arcs qui ont les 5 plus grandes valeurs possibles des poids (utilisez la fonction rank() et la fenêtre over(window)). 
   Afficher 20 lignes du résultat, en triant le résultat par ordre décroissant des poids, ensuite par ordre croissant des artistId et des trackId.

In [None]:
# Construire les couples de trackId écoutés par le même utilisateur
data2 = data.selectExpr("userId as userId2",
"trackId as trackId2", "artistId as artistId2", "timestamp as timestamp2")
trackTrack = data.join(data2, ((col("userId")==col("userId2"))
 & (col('trackId') != col("trackId2"))))
.where(abs(col('timestamp')-col('timestamp2')) < 600)\
.select('userId', "trackId", "trackId2")

#pour chaque couple de trackId le nombre d'utilisateurs qui les ont écouté ensemble
trackTrack = trackTrack.groupby("trackId", "trackId2").count()

#calculer le poids final en utilisant calcul_poids
trackTrack =  calcul_poids(trackTrack, 'trackId', 'count', 100).persist()

window = Window.partitionBy('trackId').orderBy(col("norm_count").desc())

trackTrackList = trackTrack.withColumn("position", rank().over(window))\
    .where(col("position")<6)\
    .orderBy('trackId').select('trackId', "trackId2", 'norm_count').take(20)   

for val in trackTrackList:
   print("%s %s %s" % val)
    
trackTrack.count() #résultat: 136257  

## Construction du graphe final

Construire un DataFrame graphe pour stocker tous les noeuds et tous les liens calculés précédemment. Le dataframe contiendra une colonne 'source' (identifiant du noeud source), une colonne 'destination' et une colonne 'poids'. 
Les colonnes 'source' et 'dest' contiennent à la fois des identifiants utilisateur, chanson et artiste. La colonne 'poids' contient les poids des arcs calculés à partir des poids précédents qui sont multipliés par les coefficients suivants:
* Liens user->artist : 0.5
* Liens user->track: 0.5
* Liens track->track: 1
* Liens artist->track: 1

In [None]:
userTrack_g = userTrack.selectExpr
("userId as source", "trackId as destination", "norm_count * 0.5 as poids")
userArtist_g = userArtist.selectExpr("userId as source", "artistId as destination", "norm_count * 0.5 as poids")
artistTrack_g = artistTrack.selectExpr("artistId as source", "trackId as destination", "norm_count as poids")
trackTrack_g = trackTrack.selectExpr("trackId as source", "trackId2 as destination", "norm_count as poids")

In [None]:
graphe = userTrack_g.union(userArtist_g).union(artistTrack_g).union(trackTrack_g).persist()

graphe.printSchema()

graphe.count() #résultat: #560759

### Calcul de recommendation de chansons avec PPR

En utilisant le calul de PageRank Personalisé, recommender à l'utilisateur avec l'identifiant 10 des chansons qu'il n'a pas écoutées.
Rappel de la formule de mise à jour du score de recommendation à chaque itération du calcul:

x[i] = alpha * u[i] + (1-alpha)* sum(xant[j]*poids[j][i])

    - poids[j][i] : poids de l'arc entre j à i
    - u[i]: valeur de personalisation, u[10]=1 et u[i]=0 si i !=10
    - xant[j] : valeur du score du noeud j à l'itération précédente (x0[10]=1 et x0[i]=0 si i !=10)

On considère alpha=0.15 et on effectue le calcul pour 5 itérations (maxiter=5)

### Calcul du vecteur de recommendation x

In [None]:
import pandas as pd
from pyspark.sql.functions import when

user = 10
d=0.85
alpha=(1-d)
maxiter = 5
# Construire le vecteur d'importance initial
x0  = spark.createDataFrame(pd.DataFrame([(user,1)], columns=["id","rank"]))

print("Importance initiale")
x0.show()

x = x0
for iter in range(maxiter) :

    # nextx = (1-alpha)* sum(xant[j]*poids[j][i])
    nextx = x.join(graphe, x.id == graphe.source)
    .selectExpr('destination', "rank * poids as prod")
    .groupBy('destination').agg(sum('prod').alias('sum_prod'))\
    .select("destination", (col('sum_prod') * d).alias("alpha_sum_prod"))
    
    # x = alpha * u[i] + nextx
    x = nextx.select(col('destination')
    .alias("id"), when(nextx.destination != user, nextx.alpha_sum_prod)
    .otherwise(alpha + nextx.alpha_sum_prod).alias("rank"))
    
    if x.filter(col('id')==user).count() == 0:
      x = x.union(spark.createDataFrame
                  (pd.DataFrame([(user, alpha)], columns=["id","rank"])))

x.persist()
print("Importance finale")
x.show()

x.count() #résultat: 16168

In [None]:
x.where(col('id').isin([841504, 844513, 847474, 853761, 854531, 856152, 856636]))
.show()

+------+--------------------+
|    id|                rank|
+------+--------------------+
|854531|0.001554540512839496|
|856636|9.373849924168476E-6|
|841504|1.696264140221232...|
|847474|3.237165813907451E-5|
|853761|8.599485698358212E-4|
|844513|5.834083375811894E-8|
|856152|6.704348500324492E-8|
+------+--------------------+



### Construction de la liste de chansons non écoutées recommendées
Afficher les 10 chansons les plus recommendées que l'utilisateur n'a pas écoutées, avec leur score
calculé précédemment

In [None]:
x2 = meta.selectExpr('Name', 'Artist', 'Id as id2')
tracksrec = x.join(x2, x.id == x2.id2)
.selectExpr('id', 'rank', 'Name', 'Artist')

window = Window.orderBy(col("rank").desc())

tracksreckList = tracksrec.withColumn("position", rank().over(window))\
    .where(col("position")<=10)\
    .orderBy('position').select('id', "rank", 'Name', 'Artist').take(20)   
       
for val in tracksreckList:
   print("%s %s %s %s" % val)

839649 0.04804175618196066 Bazam Bekhand Artist: Mohsen Yeganeh
828318 0.041461989256473215 Unapologetic Bitch Artist: Madonna
960353 0.038525490989983785 Magneto And Titanium Man Artist: Wings
960214 0.03797141587570513 Procession Artist: Queen
955486 0.034484389884170796 Seni Bilen Biliyor Artist: Arif Susam
855194 0.02795106930767553 Porke Artist: Pochill
958924 0.02584262856304979 Pictures Artist: Gemafrei
801772 0.02535247383791928 Celebration Day Artist: Led Zeppelin
823737 0.02194303602829867 Dj Petros Artist: Bang La Decks
807650 0.02194303602829867 Take care Artist: Imany


## Calcul des triangles

Implémenter les différentes étapes de l'algorithme amélioré de calcul des triangles 
présenté en cours sur le graphe trackTrack construit précédemment.

In [None]:
trackTrack.count()

136257

In [None]:
# Définir une fonction qui prend comme argument une liste d'utilisateurs triés 
# par ordre croissant (users) et retourne une liste de couples ordonnés d'utilisateurs

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def parse_string(users):
    
    results = [[users[i], users[j]] for i in range(len(users)-1) 
                                    for j in range(i+1, len(users))]
    
    return results

parse_string_udf = udf(parse_string, ArrayType(StringType()))

In [None]:
# Implémentation de l'algorithme de calcul de triangles

from pyspark.sql.functions import collect_list, sort_array

#Map1 - cours
#Prendre en considération uniquement 
#les couples ordonnés (trackId, track1) (trackId < track1)  
trackOrd = trackTrack.where(col('trackId') < col('trackId2'))
.select('trackId', 'trackId2')
.orderBy('trackId', 'trackId2')

#Reduce 1 - cours: Construire pour chaque trackId la liste de couples 
#ordonnés de ses voisins
# a) regrouper les lignes par trackId en construisant 
#la liste de voisins triés par ordre
#    croissant (utiliser sort_array)
neighbors = trackOrd.groupby('trackId')
.agg(sort_array(collect_list('trackId2'))
.alias('liste'))

# b) utiliser la fonction définie précédemment pour retourner
# la liste de couples de voisins
couples= neighbors.withColumn('liste_couple',
        explode(parse_string_udf(col('liste'))))
.select('trackId', 'liste_couple')

# Map2 + Reduce 2 - cours
# prendre en considération uniquement les lignes telles 
#que les couples de voisins 
# construits précédemment existent également dans le graphe 
from pyspark.sql.functions import concat, lit, count, desc
trackOrd_couple = trackOrd.withColumn('liste_couple',
                                      concat(lit('['), col('trackId'), lit(', ')
                                      , col('trackId2'), lit(']')))
.select('liste_couple')

liste =  couples.join(trackOrd_couple, on='liste_couple')

# Calculer le nombre de triangles pour chaque utilisateur et 
# trier le résultat par le nombre de triangles décroissant
triangles = liste.groupBy('trackId').count().orderBy(col('count').desc())

print(triangles.count()) #résultat: 7156
triangles.agg(sum(col("count"))).show() #Résultat: 93400
triangles.orderBy(col("count").desc()).show()

+-------+--------------------+
|trackId|               liste|
+-------+--------------------+
| 798256|    [923706, 954826]|
| 798258|    [808254, 810685]|
| 798261|[895481, 905671, ...|
| 798290|[853797, 880442, ...|
+-------+--------------------+
only showing top 4 rows



## Calcul du plus court chemin

Implémenter l'algorithme parallèle de calcul de plus court chemin d'origine fixée vers n'importe quelle destination

In [None]:
from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F

origin=10
#Rassembler tous les noeuds  dans un seul DF vertices
vertices = graphe.select("source").distinct().withColumnRenamed("source", "id")
.union(graphe.select("dest").distinct().withColumnRenamed("dest", "id"))
.distinct() 
# Vecteur des distances de tous les neouds par rapport à origin
distances = vertices.withColumn("distance", F.when(vertices["id"] == origin, 0)
.otherwise(float("inf")))
active  = spark.createDataFrame(pd.DataFrame([(origin,0)],
                                             columns=["idA","distanceA"]))

distances= AM.getCachedDataFrame(distances)

while active.first():

   ....
   active=AM.getCachedDataFrame(...)
   distances=AM.getCachedDataFrame((...)

print("Distances finales:")  
distances.filter(col("distance")!=float('inf')).orderBy(col("distance")).show()

```
# Résultat attendu:

Distances finales:
+------+--------+
|    id|distance|
+------+--------+
|    10|     0.0|
|828318| 0.03125|
|807650| 0.03125|
|801772| 0.03125|
|941064| 0.03125|
|823737| 0.03125|
|824440| 0.03125|
|839649| 0.03125|
|960214| 0.03125|
|855194| 0.03125|
|901153| 0.03125|
|934050| 0.03125|
|943645| 0.03125|
|955486| 0.03125|
|956604| 0.03125|
|958924| 0.03125|
|960353| 0.03125|
|970381| 0.03125|
|976111| 0.03125|
|983989| 0.03125|
+------+--------+
only showing top 20 rows
```