### Notebook TP API DataFrame de Spark en Python - MLIA


Le but du TP est d'utiliser l'API Dataframe de Spark en Python.
Pour la documentation à consulter, suivre ce li:
* https://spark.apache.org/docs/latest/sql-programming-guide.html

* Consulter également les fichiers contenus dans le répertoire **Python Spark Cheat Sheets** sur Moodle

## Rappel de quelques fonctions

|Expression |Action|
|:-------------:|:-------------:|
|val ds = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/path/file.csv") |loads the content of file.csv into a dataset ds by indicting that it contains a header  and by requesting  Spark to infer the schema |
|ds.printSchema | show the schema of ds |
|ds.show(truncate=false)|shows the first 20 rows without truncating the values |
|ds.describe().show()|collects and shows descriptive statistics (mean, max, count, ..) of numeric values|
|ds.select("c1", "c2", ..., "cn")|projects ds on the columns c1, …, cn|
|ds.withColumnRenamed("c1","c2")|renames the column c1 with c2|
|ds.where(cond)|selects the rows respecting cond|
|ds.groupBy("c1").agg(collect_list($"c2") as "values")|groups the rows by column c1 and creates an new column of values associated to those of c1|
|ds.groupBy("c1").agg(avg("c2"))|computes the sum of c2 for each c1 |
|ds.withColumn("new", Exp)|creates a new column whose values are computed by Exp|
|ds1.crossJoin(ds2)|computes the cross product of ds1 and ds2|
|ds1.join(ds2, "c") |joins ds1 and ds2 on the column c|
|ds1.join(ds2, Seq("c1",...,"cn")) |generalizes the previous one to a sequence of columns c1,…, cn|

## Préparation

*   ***Vérifier que des ressources*** de calcul sont allouées à votre notebook est
connecté (cf RAM de disque indiqués en haut à droite) . Sinon cliquer sur le bouton connecter pour obtenir des ressources.

*   ***Créer le répertoire*** pour stocker les fichiers nécessaires sur votre google
drive (donnez l'autorisation au notebook d'accéder à votre drive lorsque c'est demandé). *Ajuster le nom de votre dossier* : **MyDrive/mlia/TP**

In [None]:
!pwd

/content


In [None]:
import os
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

drive_dir = "/content/drive/MyDrive/ColabNotebooks/"
os.makedirs(drive_dir, exist_ok=True)
os.listdir(drive_dir)

Mounted at /content/drive


[]

**Ajouter les fichiers films.json, notesAMJ.csv, ratings.csv et movies.csv dans Google Drive (/content/drive/MyDrive/mlia/TP)**

***Installer pyspark et findspark :***

In [None]:
!pip install -q pyspark
!pip install -q findspark

***Démarrer la session spark:***

In [None]:
import os
!find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.11/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

/usr/local/bin/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/tests/snapshots/test_sql/test_group_by_has_index/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/tests/snapshots/test_sql/test_union_aliasing/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/tests/snapshots/test_sql/test_rewrite_context/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/tests/snapshots/test_sql/test_to_sql_default_backend/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/tests/snapshots/test_sql/test_isin_bug/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/tests/snapshots/test_sql/test_mixed_qualified_and_unqualified_predicates/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/tests/snapshots/test_sql/test_cte_refs_in_topo_order/pyspark
/usr/local/lib/python3.11/dist-packages/ibis/backends/tests/snapshots/test_generic/test_many_subqueries/pyspark
/usr/local/

In [None]:
# Principaux import
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkConf

# pour les dataframe et udf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *


# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory")

  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")

  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()

session démarrée, son id est  local-1746087109691


## Lire un fichier et le transformer en DataFrame
  - lire le fichier films.json
  - afficher le schéma
  - afficher les colonnes (attributs)
  - afficher le contenu (3 films)
  - afficher le nombre de films
  - décrire la colonne nF (fonction describe())
  - afficher des statistiques sur la table films (fonction summary)

In [None]:
#Le dossier contenant les fichiers csv importés:
DATASET_DIR="/content/drive/MyDrive/mlia/TP"

## Lire le fichier films.json et créer le DataFrame films

In [None]:
#Lire le fichier films.json et création d'un Dataframe films (fonction spark.read.json)
#==============
# Données
#==============
print("Lecture du fichier: ", DATASET_DIR+"/films.json")
films = spark.read.json(DATASET_DIR+"/films.json")

Lecture du fichier:  /content/drive/MyDrive/mlia/TP/films.json


In [None]:
#Afficher le schéma obtenu
...
#résultat:
#root
# |-- g: array (nullable = true)
# |    |-- element: string (containsNull = true)
# |-- nF: long (nullable = true)
# |-- titre: string (nullable = true)

In [None]:
#Afficher les noms des colonnes
....
#résultat:
#['g', 'nF', 'titre']

In [None]:
#Afficher 3 lignes de la structure films (fonction show)
...

Résultat:

```
+--------------------+------+--------------------+
|                   g|    nF|               titre|
+--------------------+------+--------------------+
|             [Drama]|  8754|Prime of Miss Jea...|
|          [Thriller]|111486|Lesson of the Evi...|
|[Animation, Child...|  1033|Fox and the Hound...|
+--------------------+------+--------------------+
```

In [None]:
#Afficher le nombre de films (fonction count)
...
#résultat: 9125

In [None]:
#Décrire (donner les statistiques) de la colonne nF de films (fonction describe)
...

Résultat:


```
+-------+------------------+
|summary|                nf|
+-------+------------------+
|  count|              9125|
|   mean|31123.291835616437|
| stddev|40782.633603974195|
|    min|                 1|
|    max|            164979|
+-------+------------------+
```



In [None]:
# statistiques sur les attributs
...

Résultat:

```
+-------+------------------+--------------------+
|summary|                nF|               titre|
+-------+------------------+--------------------+
|  count|              9125|                9125|
|   mean|31123.291835616437|                null|
| stddev|40782.633603974195|                null|
|    min|                 1|"""Great Performa...|
|    25%|              2849|                null|
|    50%|              6287|                null|
|    75%|             56251|                null|
|    max|            164979| İtirazım Var (2014)|
+-------+------------------+--------------------+
```

## Requêtes: Interrogation  des films
   - afficher 10 titres de films
   - afficher les titres de films, les numéros de films incrémentés de 1 et les genres
   - afficher les films dont le titre commence par 'Police', les ordonner par nF (fonction startswith)
   - créer une nouvelle DataFrame films2 avec un seul genre par film (pour un film avec n genres, il y a n lignes); fonction explode
   - afficher deux lignes de films2
   - afficher le nombre de genres distinct
   - afficher le nombre de films par genre (groupBy)

In [None]:
# Afficher 10 titres de films
...


```
# Résultat:
+--------------------+
|               titre|
+--------------------+
|Prime of Miss Jea...|
|Lesson of the Evi...|
|Fox and the Hound...|
|Sinbad: Legend of...|
|       Gloria (1980)|
|    Lady Jane (1986)|
|4 Months, 3 Weeks...|
|Ella Enchanted (2...|
|In a World... (2013)|
|The Disappearance...|
+--------------------+
only showing top 10 rows
```



In [None]:
#Afficher 3 titres de films, les numéros de films incrémentés de 1 et les genres
...

Résultat:


```
# +--------------------+--------+--------------------+
|               titre|(nF + 1)|                   g|
+--------------------+--------+--------------------+
|Prime of Miss Jea...|    8755|             [Drama]|
|Lesson of the Evi...|  111487|          [Thriller]|
|Fox and the Hound...|    1034|[Animation, Child...|
+--------------------+--------+--------------------+
```



In [None]:
#Afficher les films dont le titre commence par 'Police', les ordonner par nF (filter avec  startswith, orderBy, show)
...

Résultat:
```
# +---------------+----+--------------------+
|              g|  nF|               titre|
+---------------+----+--------------------+
|[Comedy, Crime]|2378|Police Academy (1...|
|[Comedy, Crime]|2379|Police Academy 2:...|
|[Comedy, Crime]|2380|Police Academy 3:...|
+---------------+----+--------------------+
```



In [None]:
# Pour les films sans genre il y films.g est un array de taille 1 avec '(no genres listed)')
# Créer un Dataframe tmp qui contient que les films sans les films sans genre (fonction array_contains) et afficher le nombre de films dans ce Dataframe
#Résultat: 9107
...

In [None]:
#Créer un nouveau DataFrame films_g à partir de tmp avec un seul genre par film (pour un film avec n genres, il y a n lignes); utiliser la fonction explode
...

Résultat:
```
# +--------------------+---------+------+
|               titre|    genre|    nF|
+--------------------+---------+------+
|Prime of Miss Jea...|    Drama|  8754|
|Lesson of the Evi...| Thriller|111486|
|Fox and the Hound...|Animation|  1033|
+--------------------+---------+------+
```


In [None]:
# Calculer le nombre de genres distinct (distinct et count)
...
#Résultat: 19

In [None]:
# Afficher le nombre de films par genre (groupBy et count)



Résultat:
```
# +-----------+-----+
|      genre|count|
+-----------+-----+
|      Crime| 1100|
|     Horror|  877|
|  Adventure| 1117|
|     Sci-Fi|  792|
|    Musical|  394|
|    Western|  168|
|    Romance| 1545|
|     Comedy| 3315|
|    Mystery|  543|
|        War|  367|
|Documentary|  495|
|   Children|  583|
|    Fantasy|  654|
|      Drama| 4365|
|       IMAX|  153|
|   Thriller| 1729|
|  Animation|  447|
|     Action| 1545|
|  Film-Noir|  133|
+-----------+-----+
```



## Lire le fichier notesAMJ.csv et créer le DataFrame notes

In [None]:
schema = """
          nU INT,
          nF LONG,
          note FLOAT,
          annee INT,
          mois INT,
          jour INT
        """
print("Lecture du fichier: ", DATASET_DIR+"/notesAMJ.csv")
notes = spark.read.csv(DATASET_DIR+"/notesAMJ.csv", header='true', schema=schema)
notes.printSchema()
notes=notes.persist()
notes.count() #résultat: 100004

## Requêtes: Interrogation des notes
  - lire le fichier notesAMJ.csv (le fichier contient pour chaque utilisateur les films qu'il a notés
      (avec la date (année, mois, jour) de la note))
  - afficher le schéma obtenu
  - afficher le contenu (3 lignes)
  - afficher le nombre d'années distinctes
  - afficher le nombre de dates (comprenant année, mois, jour) distinctes
  - afficher la note maximale, moyenne et minimale
  - grouper les notes par numéro de film
  - afficher la note moyenne par film
  - pour chaque utilisateur
     - afficher son nombre total de notes differentes, la note maximum, minimum et moyenne
     - trier le resultat de la requête précédente par le nombre de notes décroissant et le numéro d'utilisateur

In [None]:
# Afficher le schéma de notes ainsi que 3 lignes de son contenu
...

Résultat:
```
# root
 |-- nU: integer (nullable = true)
 |-- nF: long (nullable = true)
 |-- note: float (nullable = true)
 |-- annee: integer (nullable = true)
 |-- mois: integer (nullable = true)
 |-- jour: integer (nullable = true)

+---+-----+----+-----+----+----+
|nU |nF   |note|annee|mois|jour|
+---+-----+----+-----+----+----+
|175|48   |3.0 |2003 |5   |14  |
|461|2001 |3.0 |2004 |8   |8   |
|547|89881|4.0 |2011 |10  |8   |
+---+-----+----+-----+----+----+
```



In [None]:
#Afficher le nombre d'années distinctes (countDistinct)
...

Résultat:
```
# +---------------------+
|count(DISTINCT annee)|
+---------------------+
|                   22|
+---------------------+
```



In [None]:
#Afficher le nombre de dates distinctes (annee, mois, jour)
...

Résultat:
```
# +---------------------------------+
|count(DISTINCT annee, mois, jour)|
+---------------------------------+
|                             3840|
+---------------------------------+
```



In [None]:
#Afficher la note maximale, moyenne et minimale (min, max, avg)
...

Résultat:
```
# +---------+---------+-----------------+
|min(note)|max(note)|        avg(note)|
+---------+---------+-----------------+
|      0.5|      5.0|3.543608255669773|
+---------+---------+-----------------+
```



In [None]:
#Grouper les notes par numéro de film (groupBy) et stocker le résultat dans un Dataframe notes_groupee
...
#pas de résultat à afficher

In [None]:
#Afficher la note moyenne par film en utilisant le Dataframe notes_groupees (avg)
...


Résultat:
```
# +-----+------------------+
|   nF|         avg(note)|
+-----+------------------+
|   48|2.9262295081967213|
|89881|               4.0|
|  208| 2.752212389380531|
+-----+------------------+
```



In [None]:
#Afficher les notes moyennes par film triées par ordre décroissant de la note (orderBy avec desc)

...

Résultat:
```
# +---+---------+
| nF|avg(note)|
+---+---------+
| 53|      5.0|
|183|      5.0|
|301|      5.0|
+---+---------+
```



In [None]:
#Creer un dataframe notes_util qui groupe les notes par utilisateur
#pas de rsultat à afficher
...

In [None]:
# Créer un Dataframe tmp qui contient pour chaque utilisateur le nombre total de notes differentes, la la note maximum, minimum et moyenne
...
# pas de résultat à afficher

In [None]:
# Trier le Dataframe tmp par le nombre de notes décroissant et le numéro d'utilisateur et afficher le résultat
...

Résultat:
```
# +---+-----+---+---+------------------+
| nU|total|max|min|           moyenne|
+---+-----+---+---+------------------+
| 15|   10|5.0|0.5|2.6217647058823528|
| 17|   10|5.0|0.5| 3.743801652892562|
| 20|   10|5.0|0.5|3.2908163265306123|
+---+-----+---+---+------------------+
```



### **Jointures films et notes**
  - créer un DataFrame films_notes qui contient les films avec leur notes (une ligne par note)
  - afficher le nombre de notes du film dont le titre contient la chaîne'Pocahontas'
  - afficher pour chaque film, son titre, le nombre de notes, sa note moyenne, sa note maximale, sa note minimale
  - les titres des films qui ne sont pas notés
  - pour chaque genre les utilisateurs qui n'ont noté aucun film de ce genre
  - exporter le DataFrame films_notes dans un fichier JSON et afficher le contenu du fichier obtenu
  - essayer d'exporter films_notes dans un fichier CSV

In [None]:
# Créer un DataFrame films_notes qui contient les films avec leur notes (une ligne par note) (join)
...

Résultat:
```
# +----+--------------------+--------------------+---+----+-----+----+----+
|  nF|                   g|               titre| nU|note|annee|mois|jour|
+----+--------------------+--------------------+---+----+-----+----+----+
|  48|[Animation, Child...|   Pocahontas (1995)|175| 3.0| 2003|   5|  14|
|2001|[Action, Comedy, ...|Lethal Weapon 2 (...|461| 3.0| 2004|   8|   8|
+----+--------------------+--------------------+---+----+-----+----+----+
```



In [None]:
#Afficher le nombre de notes du film dont le titre contient la chaîne'Pocahontas' (contains)
...
#résultat: 61

In [None]:
# Afficher pour chaque film, son titre, le nombre de notes, sa note moyenne, sa note maximale, sa note minimale (groupBy + agg)
...

Résultat:
```
# +--------------------+-------+---+---+------------------+
|               titre|nbNotes|max|min|           moyenne|
+--------------------+-------+---+---+------------------+
|    Toy Story (1995)|      9|5.0|1.0|3.8724696356275303|
|      Jumanji (1995)|      8|5.0|1.5|3.4018691588785046|
|Grumpier Old Men ...|     10|5.0|0.5|3.1610169491525424|
+--------------------+-------+---+---+------------------+
```



### *Jointures externe et produit Cartesien*

In [None]:
# Créer un Dataframe f_films qui renomme l'attribut nF de films en nF1 (withColumnRenamed)
...
#pas de résultat à afficher

In [None]:
# Joignez la table f_film avec la table notes par une jointure externe gauche (left outer join) qui garde aussi les films sans notes; stocker le résultat dans un Dataframe ff_notes
...
#pas de résultat à afficher

In [None]:
# Afficher les films sans notes (isNull)
...


Résultat:


```
# +--------------------+
|               titre|
+--------------------+
|Wild Child, The (...|
|Iron Ladies, The ...|
|Scarlet Street (1...|
+--------------------+
```



In [None]:
# Pour chaque genre les utilisateurs qui n'ont noté aucun film de ce genre
# Indications:

# - créer un premier Dataframe g_u qui contient des couples (genre, nU) où nU a vu des films du genre "genre"
ff_notes.printSchema()

# - créer un deuxième Dataframe gu-tous qui contient tous les couples (genre, nU) possibles (crossjoin)

# - utiliser les deux Dataframes pour calculer les couples (genre, nU) où nU n'a pas vu de films du genre "genre" (subtract)



Résultat:


```
# +---------+---+
|    genre| nU|
+---------+---+
|Animation| 11|
|Animation| 71|
|Animation|230|
+---------+---+
```



# **Recommander des films aux utilisateurs**

Appliquer le filtrage collaboratif (l'aproche centrée utilisateur) pour recommander à chaque utilisateur des films pas encore visionnés (on suppose qu'un film non noté par un utilisateur n'a pas été visionné par celui-ci). Voir une description de l'approche ici (https://en.wikipedia.org/wiki/Collaborative_filtering, section Memory-based).

Nous allons tout d'abord préparer les données à partir des fichiers ratings.csv et movies.csv contenant les films et les notes des utilisateurs pour ces films et construire les strutures DataFrame correpondantes.

## Préparation des données

In [None]:
# charger le fichier ratings.csv dans un Datframe notes_i
# charger le fichier ratings.csv dans un Datframe notes_i
schema = """
          nU INT,
          nF INT,
          note FLOAT,
          date INT
        """

print("Lecture du fichier: ", DATASET_DIR+"/ratings.csv")
notes_i =
notes_i=notes_i.persist()

# Afficher les données chargées
notes_i.printSchema()
notes_i.show(3)
notes_i.count()

Résultat:
```
# root
 |-- nU: integer (nullable = true)
 |-- nF: integer (nullable = true)
 |-- note: float (nullable = true)
 |-- date: integer (nullable = true)

+---+----+----+----------+
| nU|  nF|note|      date|
+---+----+----+----------+
|  1|  31| 2.5|1260759144|
|  1|1029| 3.0|1260759179|
|  1|1061| 3.0|1260759182|
+---+----+----+----------+
only showing top 3 rows

100004
```



### *Extraction du jour, du mois et de l'année à partir de la date :*
Dans le fichier ratings.csv la date à laquelle un utilisateur a noté un film est au format epoch Unix (timestamp). Nous allons extraire de cette date les informations concernant l'année, le mois et le jour. Cette conversion sera réalisée en deux étapes:
- créer 3 fonctions utilisateur, chacune prenant comme paramètre un entier représentant la date à convertir (annotées @udf('integer')) et renvoyant respectivement le jour, le mois (compris entre 1 et   12) et l'année
- invoquer ces fonctions à l'aide de la méthode withColumn(). Vérifier que les valeurs obtenues correspondent à celles  continues dans le fichier notesAMJ.csv

In [None]:
from datetime import *
from pyspark.sql.functions import udf

In [None]:
#définir la fonction qui extrait le jour (compris entre 1 et 31) de la date
@udf('integer')
def getJour(v):
    return datetime.utcfromtimestamp(v).day

In [None]:
#tester la fonction précédente en l'appliquant à la colonne date de notes_i


Résultat:
```
# +----------+
|      date|
+----------+
|1260759144|
|1260759179|
|1260759182|
+----------+
only showing top 3 rows

+---+----+----+----------+----+
| nU|  nF|note|      date|jour|
+---+----+----+----------+----+
|  1|  31| 2.5|1260759144|  14|
|  1|1029| 3.0|1260759179|  14|
|  1|1061| 3.0|1260759182|  14|
+---+----+----+----------+----+
```



In [None]:
#définir la fonction qui extrait le mois (compris entre 1 et 12) de la date


In [None]:
#définir la fonction qui extrait l'année


In [None]:
# appliquer les 3 fonctions précédentes à la colonne date de notes_i pour construire une nouvelle DataFrame notes
# ayant comme colonnes nU, nF, note, jour, mois, annee
....
notes.persist() #garder les notes en mémoire
notes.count()
notes.show(3)

Résultat:
```
# +---+----+----+----+----+-----+
| nU|  nF|note|jour|mois|annee|
+---+----+----+----+----+-----+
|  1|  31| 2.5|  14|  12| 2009|
|  1|1029| 3.0|  14|  12| 2009|
|  1|1061| 3.0|  14|  12| 2009|
+---+----+----+----+----+-----+
```



In [None]:
# Creer un DataFrame films_i pour stocker les films qui se trouvent dans le fichier movies.csv
#son schéma est le suivant: nF INT, titre STRING, g STRING
print("Lecture du fichier: ", DATASET_DIR+"/movies.csv")
....
films_i=films_i.persist()
films_i.printSchema()
films_i.show(3)

Résultat:


```
# root
 |-- nF: integer (nullable = true)
 |-- titre: string (nullable = true)
 |-- g: string (nullable = true)

+---+--------------------+--------------------+
| nF|               titre|                   g|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Adventure|Animati...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
+---+--------------------+--------------------+
```



#### **Transformation de la colonne des genres**

Les genres de chaque film étant actuellement stockés dans une seule chaîne de caractères, nous allons remplacer
cette chaîne par un tableau de chaînes (par exemple, pour un film avec une colonne g contenant 'Comedy, Romance'
nous allons obtenir une colonne genres ['Comedy', 'Romance']).
- utiliser la fonction split

In [None]:
#Construire le DataFrame films
#résultat: 9125

Le nouveau DataFrame films aura les colonnes nF, titre et genres et sera gardé en mémoire.
- afficher le schéma
- afficher 3 lignes

In [None]:
films.printSchema()
films.show(3)

Résultat:
```
# root
 |-- nF: integer (nullable = true)
 |-- titre: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+--------------------+--------------------+
| nF|               titre|              genres|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|[Adventure, Anima...|
|  2|      Jumanji (1995)|[Adventure, Child...|
|  3|Grumpier Old Men ...|   [Comedy, Romance]|
+---+--------------------+--------------------+
```



## **Calcul de recommandation**

### 1.  Calcul de la similarité entre les utilisateurs (similarité Jaccard)

Nous allons d'abord calculer pour chaque couple d'utilisateurs une valeur de similarité basée sur les films
qu'ils ont notés en commun. Pour un utilisateur u nous avons besoin de connaître l'ensemble v des numéros de
films qu'il a notés. La similarité entre les utilisateurs u1 et u2 sera calculée à partir des ensembles de films v1 et v2 correspondants.

Similarité Jaccard (voir la description ici: https://en.wikipedia.org/wiki/Jaccard_index):
 - la similarité entre u1 et u2 est égale au nombre de films notés en commun par u1 et u2 divisé par le nombre total
   de films notés par u1 ou u2. Par exemple, si u1 a noté les films f1, f3 et f4 (v1=[f1, f3, f4]) et u2 a noté les      films f3, f4, f5 et f6 (v2=[f3, f4, f5, f6]) leur similarité sera 2/5=0,4 ce qui correspond à la cardinalité de      l'intersection entre v1 et v2 divisée par la cardinalité de leur union).
   
Le calcul de similarité sera effectué en plusieurs étapes:

- Étape 1: construire pour chaque utilisateur la liste des films qu'il a notés et les stocker dans le DataFrame liste_films qui aura 2 colonnes: nU et l_films qui contiendra un tableau de numéro de films

In [None]:
liste_films=
liste_films.printSchema()
liste_films.show(2)


Résultat:
```
# root
 |-- nU: integer (nullable = true)
 |-- l_films: array (nullable = false)
 |    |-- element: integer (containsNull = false)

+---+--------------------+
| nU|             l_films|
+---+--------------------+
|  1|[31, 1029, 1061, ...|
|  2|[10, 17, 39, 47, ...|
+---+--------------------+
```



- Étape 2: Construire tous les couples possibles d'utilisateurs avec leur listes de films respectives et les stocker dans le DataFrame couples_u qui aura comme colonnes nU1, nU2, l_films1, l_films2

In [None]:
# Construire un DataFrame intermédiaire t1(nU1, l_films1) à partir de liste films en renommant nU->nU1 et
# l_films -> l_films1
t1=
t1.show(3)

# Construire un DataFrame intermédiaire t2(nU2, l_films2) de manière similaire à t1
t2=
t2.show(3)

#Construire couples_u(nU1, nU2, l_films1, l_films2) à partir de t1 et t2 (éliminer les couples où nU1=nU2)
couples_u=
couples_u.show(3)


Résultat:
```
# +---+--------------------+
|nU1|            l_films1|
+---+--------------------+
|  1|[31, 1029, 1061, ...|
|  2|[10, 17, 39, 47, ...|
|  3|[60, 110, 247, 26...|
+---+--------------------+
only showing top 3 rows

+---+--------------------+
|nU2|            l_films2|
+---+--------------------+
|  1|[31, 1029, 1061, ...|
|  2|[10, 17, 39, 47, ...|
|  3|[60, 110, 247, 26...|
+---+--------------------+
only showing top 3 rows

+---+--------------------+---+--------------------+
|nU1|            l_films1|nU2|            l_films2|
+---+--------------------+---+--------------------+
|  1|[31, 1029, 1061, ...|  2|[10, 17, 39, 47, ...|
|  1|[31, 1029, 1061, ...|  3|[60, 110, 247, 26...|
|  1|[31, 1029, 1061, ...|  4|[10, 34, 112, 141...|
+---+--------------------+---+--------------------+
```



- Étape 3: Définition d'une fonction utilisateur sim_jaccard qui calcule une valeur de similarité Jaccard à partir de deux listes spécifiées comme paramètres

In [None]:
@udf('float')
def sim_jaccard(l1, l2):
    set1=set(l1)
    set2=set(l2)
    l = len(set1.union(set2))
    if (l == 0): return 0
    return float(len(set1.intersection(set2)))/len(set1.union(set2))

- Étape 4: Calcul de la similarité entre chaque couple d'utilisateurs construit à l'étape 2 en appliquant la fonction de similarité définie à l'étape 3 à leur listes de films respectives. La similarité sera stockée dans le DataFrame sim_j(nU1, nU2, sim)

In [None]:
# Construire un DataFrame sim_j(nU1, nU2, sim) en appliquant la méthode withColumn au DataFrame couples_u
# Garder uniquement les entrées où sim != 0
sim_j =
sim_j.persist()
sim_j.count()
sim_j.show(3)

Résultat:
```
# +---+---+------------+
|nU1|nU2|         sim|
+---+---+------------+
| 12| 13|0.0088495575|
| 12| 14|      0.0125|
| 12| 18| 0.018181818|
```



### 2. **Calcul de scores de recommandation pour les films non notés**

- Préparation du calcul: éliminer les infomations concernant la date

In [None]:
u_vu_notes=
u_vu_notes.count()
u_vu_notes.show(3)


Résultat:
```
# +---+----+----+
| nU|  nF|note|
+---+----+----+
|  1|  31| 2.5|
|  1|1029| 3.0|
|  1|1061| 3.0|
+---+----+----+
```



- Étape 1: Construire tous les couples possibles (nU, nF) et enlever les couples qui se trouvent dans u_vu_notes. Stocker le résultat dans le DataFrame uf_pas_vu(nU, nF) qui sera gardé en mémoire

In [None]:
....
u_pas_vu.show(3)

Résultat:


```
# +---+---+
| nU| nF|
+---+---+
|  1|240|
|  1|314|
|  1|420|
+---+---+
```



- Etape 2: calculer un DF u_sim_notes qui contient des quintuples (nU1,nU2,nF,note,sim)

In [None]:
...
u_sim_notes.show(2)
u_sim_notes.count()

Résultat:


```
# +---+---+---+----+------------+
|nU1|nU2| nF|note|         sim|
+---+---+---+----+------------+
| 13| 12|253| 3.0|0.0088495575|
| 14| 12|253| 3.0|      0.0125|
+---+---+---+----+------------+
64389904
```



- Etape 3 : créer un DF u_recom qui étend u_sim_notes avec une colonne recom qui contient le produit sim*note

In [None]:
...
u_recom.show(2)

Résultat:
```
# +---+---+---+----+------------+-----------+
|nU1|nU2| nF|note|         sim|      recom|
+---+---+---+----+------------+-----------+
| 13| 12|253| 3.0|0.0088495575|0.026548672|
| 14| 12|253| 3.0|      0.0125|     0.0375|
+---+---+---+----+------------+-----------+
```



- Etape 4: créer un DF u_recom2 qui contient les colonnes nU1, nF et avg_rec de avg_rec est la moyennes des scores de recommandation pour nU1 et nF. Afficher le résultat.

In [None]:
...
u_recom2.show(2)
u_recom2.count()

Résultat:

```
# +---+---+-----------+
|nU1| nF|      recom|
+---+---+-----------+
| 13|253|0.026548672|
| 14|253|     0.0375|
+---+---+-----------+
6019322
```



- Etape 5: créer un DF u_pas_vu_rec qui contient que les recommandations pour des films pas vus

Résultat:
```
# +---+----+------------------+
| nU|  nF|           avg_rec|
+---+----+------------------+
|514| 966| 1.696969747543335|
|562|5828|1.6906474828720093|
|461|8504|1.6904267072677612|
| 86| 764|1.6736401319503784|
|355|5828|  1.66304349899292|
+---+----+------------------+
only showing top 5 rows
```

