# Introduction

Dans ce Notebook, je reprends la méthode de résolution de la question 3 (voir main.ipynb) mais en utilisant `Spark` pour pouvoir gérer les 35Go de données.

Pour vous résumer rapidement, Spark est un Pandas *amélioré* qui ne charge pas l'intégralité des données en mémoire (**= Lazy Evaluation**) et quand il doit effectuer des calculs il les fait en parallèles sur l'ensemble des processeurs de la machine (**= Parallelism**). ChatGPT m'a donné une bonne analogie que vous pouvez voir plus bas.

cfr: 
* https://app.datacamp.com/learn/courses/big-data-fundamentals-with-pyspark (le prof nous donne l'accès dans un mail)
* ChatGPT
* Le cours dans deux semaines
* **documentation**: https://spark.apache.org/docs/latest/api/python/index.html

**Note importante**: Pour pouvoir utiliser `Spark` sur votre machine vous pouvez suivre le tutoriel suivant: 
* https://medium.com/@marcelopedronidasilva/how-to-install-and-run-pyspark-locally-integrated-with-vscode-via-jupyter-notebook-on-windows-ff209ac8621f

---

## Analogie

Imagine que tes données (35 Go) sont un Caddie géant rempli à ras bord de courses. Ta RAM, c'est le tapis roulant de la caisse.

1. Pandas : La Méthode "Tout ou Rien"
Pandas, c'est une caisse unique avec un caissier très strict.
* Son exigence : "Videz l'intégralité de votre caddie sur le tapis avant que je ne scanne le premier article."
* Le problème : Ton caddie est énorme (35 Go). Le tapis est petit (ta RAM de 16 Go).
* Le résultat : Tu essaies d'empiler tes packs d'eau et tes conserves. Le tapis craque, tout tombe par terre. C'est le Memory Error. Le travail s'arrête avant même d'avoir commencé.

2. Spark : La Méthode "Flux Tendu"
Spark approche ce caddie géant différemment, en combinant ses deux super-pouvoirs :

**A. Le Parallélisme** (L'Armée de caisse)

Au lieu d'aller à une seule caisse, Spark crie "OUVERTURE DES CAISSES !". Il divise ton caddie géant en 10 petits paniers et les envoie vers 10 caisses différentes en même temps.
* Résultat : Le travail avance 10 fois plus vite.

**B. Le Lazy Evaluation** (Le Flux)

C'est ici que Spark gère les 35 Go sans faire craquer les tapis. Les caissiers Spark disent : "Gardez vos articles dans le panier. Ne posez sur le tapis que ce que je peux scanner maintenant."
* Tu poses une brique de lait sur le tapis.
* Le caissier la scanne (le processeur calcule).
* La brique est immédiatement mise en sac et évacuée (sauvegardée ou envoyée à l'étape suivante).
* La place est libre pour poser le paquet de pâtes suivant.

La magie du mélange : Grâce au Lazy Evaluation, le tapis roulant (la RAM) n'est jamais plein, même si tu traites des tonnes de courses. Grâce au Parallélisme, 10 tapis roulent en même temps pour finir le travail en un temps record.*

--- 
# **Note importante**

C'est un début de solution mais ce n'est pas fini, car j'ai retesté avec une autre musique plus populaire *35kahykNu00FPysz3C2euR* et chez moi il crash. Il faut changer la configuration de la Session Spark et alors ça fonctionne mais le code prend 20 minutes pour run (pas idéal s'il faut une démo live (?)).

Je pense qu'on devrait **changer la méthode** à la question et passer au algorithme de Collaborative filtering directement proposés par Spark. Voir doc:
* https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

# Code

In [1]:
import os
import zipfile
import glob
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, explode, collect_set, size, regexp_replace, count, lit, sqrt, desc, broadcast, countDistinct

# Extraction des données

Spark ne sait pas lire efficacement des fichiers zip, on va alors dézipper l'entériété du fichier et le mettre dans un fichier non zippé.
* L'étape ici n'est pas top, on peut imaginer de passer à Parquet ou une autre solution qui nous permet de stocker plsu efficacement les 35 Go ?

In [2]:
ZIP_FILE_PATH = "spotify_million_playlist_dataset.zip" 
EXTRACT_PATH = "data_extracted/"                       

def unzip_data(zip_path, extract_to):
    """Extrait le dataset si ce n'est pas déjà fait."""
    if not os.path.exists(extract_to):
        print(f"Extraction de {zip_path}...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        print("Extraction terminée.")
    else:
        print(f"Le dossier {extract_to} existe déjà. On passe l'extraction.")

In [62]:
#Si déjà dézipper, ne pas exécuter
#unzip_data(ZIP_FILE_PATH, EXTRACT_PATH)

In [3]:
#Chemin pour l'ensemble des fichiers JSON (si on a dézippé avec la fonction au-dessus)
JSON_DATA_PATH = os.path.join(EXTRACT_PATH, "data/*.json")

# Initialisation de Spark en mode local

On démarre une "*Spark Session*" afin de pouvoir utiliser Spark sur notre machine:
* `master("local[*]")` : L'étoile `*` dit à Spark : "Utilise tous les cœurs de processeur disponibles sur mon ordinateur portable". S'il y a *x* cœurs, Spark lancera $x$ tâches en parallèle.

In [4]:
spark = SparkSession.builder \
    .appName("SpotifyPlaylistRecommenderLSH") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

Pour voir la Session Spark que l'on a créée **localement** : http://localhost:4040/jobs/

Le reste du code, je reprend ce qui a été fait à la question 3 mais adapté à Spark, reste à voir ce qu'on peut faire d'autre pour optimiser notre gestion des 35Go (surtout si on doit faire une démo live (voir dernière cellule)).
* Il y a la piste de `parquet` mais on compresse juste nos données, pas sûr que ce soit pertinent mais ça reste pratique
* On peut aussi seulement garder les playlist avec plus de $x$ musiques

# Preprocessing

Schema is the structure of data in DataFrame and helps Spark to optimize queries on the data more efficiently. A schema provides informational detail such as the column name, the type of data in that column, and whether null or empty values are allowed in the column (cfr DataCamp).
* Un fichier JSON (`dataset_schema`) contient une liste appelée *playlists*.
* Chaque élément de cette liste (`playlist_schema`) contient un entier *pid* et une liste *tracks*.
* Chaque élément de la liste tracks (`track_schema`) contient trois strings: *track_uri*, *artist_name* et *track_name*.

Tout ce qui n'est pas décrit ici (par exemple num_followers ou description de la playlist) sera ignoré par Spark lors de la lecture. C'est ce qui rend le chargement beaucoup plus rapide (on passe de 1m30 (sans schema) à 5 secondes (avec)).

In [5]:
#Schema des JSON 
track_schema = StructType([
    StructField("track_uri", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("track_name", StringType(), True)
])
playlist_schema = StructType([
    StructField("pid", IntegerType(), True),
    StructField("tracks", ArrayType(track_schema), True)
])
dataset_schema = StructType([
    StructField("playlists", ArrayType(playlist_schema), True)
])

`data` est une Data Frame, au sens Spark du terme (assez similaire à Pandas) qui contient:
* **lignes**: le nombre de fichiers JSON chargés (dataset complet=$1.000$)
* **colonne**: unique contenant une liste de $1000$ playlists par fichiers JSON

---

**Note**: normalement `Spark` doit pouvoir trouver les fichiers lui-même mais il semblerait que l'on doive télécharger un autre fichier pour ça et honnêtement ça me cassait les couilles. C'est pourquoi j'utilise `glob` avec qui Python va scanner le dossier `data` et transformer cette chaîne de caractères en une liste Python contenant les chemins complets.

* Résultat : `['data/mpd.slice.0-999.json', 'data/mpd.slice.1000-1999.json', ...]`

cfr: https://docs.python.org/3/library/glob.html

---

Si vous voulez voir le résultat du code à chaque étape vous pouvez utiliser la méthode `.show()` mais je vous déconseille de le faire sur l'ensemble des $1.000.000$ de playlists car ça rajoute un temps de calcule considérable.

In [6]:
# On utilise glob pour trouver les fichiers à la place de Spark
JSON_DATA_PATH = r"data_extracted\data\mpd.slice.*.json"
json_files = glob.glob(JSON_DATA_PATH)

print(f"Fichiers trouvés par Python : {len(json_files)}")

if not json_files:
    raise Exception(f"Aucun fichier trouvé dans : {JSON_DATA_PATH}")

#On passe la LISTE des fichiers directement à Spark
#Spark va lire la liste comme si vous aviez tapé les noms un par un
data = spark.read \
    .option("multiLine", True) \
    .schema(dataset_schema) \
    .json(json_files)

Fichiers trouvés par Python : 1000


# 3. Recommendation

Et tant donné que `data` contient un ligne par JSON et qu'on veut une ligne par playlist on utilise:
* `.explode(col(playlists))` prend la colonne `playlist` (avec `col`) de `data` et crée une nouvelle ligne pour chaque playlist ($=x$ $fichiers$ $de$ $1000$ $playlists$). Si un fichier contient $1000$ playlists, on passe de $1$ ligne à $1000$ lignes par fichiers.
* `.select()` on ne garde que la colonne sélectionnée, toute les autres son supprimées
* `.alias()` on renomme la colonne au singulier pour que ce soit plus logique

In [7]:
playlists = data.select(explode(col("playlists")).alias("playlist"))

Ensuite, comme discuté dans `main.ipynb`, on ne garde que deux colonnes. `Spark` va dans le JSON et ne garde que:
* `pid`: les playlists uniques
* `tracks`: la **liste** des musiques par playlist


In [8]:
df_track = playlists.select(
    col("playlist.pid").alias("pid"),
    col("playlist.tracks.track_uri").alias("track_uris")
)

On crée une Data Frame temporaire contenant une ligne par musique au lieu d'une ligne par playlist.
* `pid`: qui peut avoir des doublons
* `raw_track_uri`: qui peut avoir des doublons

Puis, on construit une df des tracks cleaned dans le sens ou les id sont débarrasé de *spotify:name:*

In [9]:
exploded_temp_df = df_track.select(
    col("pid"),
    explode(col("track_uris")).alias("raw_track_uri")
)

#Cleaner les track_id
cleaned_df_track = exploded_temp_df.select(
    col("pid"),
    regexp_replace(col("raw_track_uri"), "spotify:track:", "").alias("track_id")
)

#cleaned_df_track.show(5)

A priori, si $\text{track\_id}_i = \text{track\_id}_j$ :  

$$
\text{artist\_id}_i = \text{artist\_id}_j, \quad
\text{album\_id}_i = \text{album\_id}_j
$$

On peut donc chercher par `track_id` unique et créer une liste pour le `pid`. Ainsi, *list_pid* sera un vecteur de dimension $( \text{ndim} = 10 000$. Seules les composantes non nulles sont conservées et valent 0 ou 1 si elles sont présentes dans la playlist.  
Plutôt que de conserver les 0, on conserve uniquement les indices des playlists dans lesquelles la musique apparaît.  
(On pourrait éventuellement retirer les playlists avec zéro track.)

La similarité entre deux musiques peut alors se calculer de la façon suivante :  

$$
\langle a, b \rangle = \frac{|a \cap b|}{\sqrt{\text{len}(a) \cdot \text{len}(b)}} 
$$


In [10]:
#'collect_set' au lieu de 'collect_list' pour éviter les doublons 
inverted_index_df = cleaned_df_track.groupBy("track_id").agg(
    collect_set("pid").alias("playlists_list")
)

#ajouts de la troisième colonne avec la taille de la liste de playlist
df_tracks_vectors = inverted_index_df.withColumn(
    "pid_count", 
    size(col("playlists_list"))
)

#afficher les résultats (seulement sur un petit jeu de données sinon augmente considérablement le temps)
#df_tracks_vectors.printSchema()
#df_tracks_vectors.count()
#df_tracks_vectors.show(5)

In [11]:
def similar_tracks(target_track_id, cleaned_df_track, df_tracks_vectors, limit=10):

    #On va récupérer la valeur de la longueur du vecteur dans df_tracks_vectors
    target_count_row = df_tracks_vectors.filter(col("track_id") == target_track_id).select("pid_count").first()
    if not target_count_row:
        print(f"Track {target_track_id} introuvable")
        return None

    target_global_count = target_count_row["pid_count"]

    #On s'assure qu'on ne liste pas deux fois la même playlist pour la même chanson
    target_pids_df = cleaned_df_track.filter(col("track_id") == target_track_id) \
                                     .select("pid") \
                                     .distinct()


    #On cherche l'intersection entre A et B
    candidates_df = cleaned_df_track.join(broadcast(target_pids_df), on="pid", how="inner")
    intersection_df = candidates_df.groupBy("track_id").agg(
        countDistinct("pid").alias("intersection")
    )

    #On crée un dernier dataset en joignant avec df_tracks_vectors pour récupérer la taille du vecteur B
    recommendations_df = intersection_df.join(df_tracks_vectors, on="track_id", how="inner")

    #Similarity score
    result_df = recommendations_df.withColumn(
        "similarity_score",
        col("intersection") / sqrt(lit(target_global_count) * col("pid_count"))
    )
   
    return result_df.select("track_id", "intersection", "pid_count", "similarity_score") \
                    .orderBy(desc("similarity_score")) \
                    .limit(limit)

In [None]:
from pyspark.sql.functions import col, lit, size, array_intersect, arrays_overlap, sqrt, desc

def similar_tracks(target_track_id, df_tracks_vectors, limit=10):
    
    # 1. Récupérer les infos de la cible (La liste des playlists et la taille)
    # On récupère ça en Python localement (c'est très léger)
    target_row = df_tracks_vectors.filter(col("track_id") == target_track_id).first()
    
    if not target_row:
        print(f"Track {target_track_id} introuvable")
        return None
        
    target_pids_list = target_row['playlists_list'] # C'est une liste Python [1, 5, 8...]
    target_global_count = target_row['pid_count']

    # 2. Filtrer d'abord : On ne garde que les musiques qui ont au moins 
    # UNE playlist en commun avec la cible.
    # arrays_overlap est très efficace pour ça.
    candidates = df_tracks_vectors.filter(
        (col("track_id") != target_track_id) & # On s'exclut soi-même
        arrays_overlap(col("playlists_list"), lit(target_pids_list))
    )

    # 3. Calculer l'intersection directement dans la ligne
    # On utilise array_intersect pour trouver les éléments communs, puis size() pour compter
    result_df = candidates.withColumn(
        "intersection_count", 
        size(array_intersect(col("playlists_list"), lit(target_pids_list)))
    )

    # 4. Calculer le score (Même formule qu'avant)
    final_df = result_df.withColumn(
        "similarity_score",
        col("intersection_count") / sqrt(lit(target_global_count) * col("pid_count"))
    )

    return final_df.select("track_id", "intersection_count", "pid_count", "similarity_score") \
                   .orderBy(desc("similarity_score")) \
                   .limit(limit)

# Exemple d'utilisation de `similar_tracks`

On a bien la track qui est similaire avec elle-même.

**Notes importantes**: 
* ça m'a quand même pris 8 minutes pour run sur le dataset complet, ça reste long si on doit faire une démo live. Mais je calcule l'entérieté des similarité entre toutes les musiques.
* J'ai essayé de comparer seulement une paire mais au final avoir le résultat prend plus de temps que de ressortir l'ensemble des comparaisons

In [None]:
sample_track_id = "35kahykNu00FPysz3C2euR" 

recs = similar_tracks(sample_track_id, cleaned_df_track, df_tracks_vectors)

if recs:
    recs.show(truncate=False)

In [None]:
spark.stop()

# **TO DO** Product recommendation : Collaborative filtering with Spark

https://github.com/databricks/spark-training/blob/master/website/movie-recommendation-with-mllib.md

https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
* solution 1 : ALS
* solution 2: Word2Vec
* Solution 3: sparse vecteur de Spark (?)

Pouvoir expliquer pourquoi on a utilisé l'un et pas l'autre ???

