In [113]:
#----------------initialisation de la session ----------------

from typing import Optional, List
import pathlib, os, logging
import findspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer,VectorAssembler 


class Session:
    
    def __init__(self):
        logging.info("Initialisation de le Session !")
    
    def createSession(self,master:Optional['str']="local[*]", app_name:Optional['str']="FiltrageCollaborative(MovieLens") -> SparkSession:
        spark = SparkSession.builder.master(master).appName(app_name).getOrCreate()
        print("Session Démarré")
        return spark
    

In [115]:
a = Session()
session = a.createSession()

Session Démarré


In [116]:
#----------------Préparation des données----------------


class PreparationDonnees:
    
    def __init__(self):
        logging.info("Préparation des données !")
    
    def dataFrame(self,spark:SparkSession,filePath:str) -> DataFrame:
        if os.path.exists(filePath) and os.path.isfile(filePath) :
            extension = pathlib.Path(filePath).suffix
        else: raise Exception('Erreur, essayer encore')
        
        def dataCSV(filePath:str) -> DataFrame:
            df = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(filePath)
            return df   
        def dataJSON(filePath:str) -> DataFrame:
            df = spark.read.json(filePath)
            return df
        return dataCSV(filePath) if extension==".csv" else dataJSON(filePath)
    
    def transformation(self,df:DataFrame, cols:List['str'], output:str) -> DataFrame:
        assemble=VectorAssembler(inputCols=cols,outputCol =output)
        assembled_data=assemble.transform(df)
        return assembled_data
    


In [123]:
#----------------Préparation des données----------------

#---Chargement des données
movies = "MovieLens/movies.csv"
ratings = "MovieLens/ratings.csv"

data = PreparationDonnees()
movies=data.dataFrame(session, movies)
ratings=data.dataFrame(session, ratings)

#---Affichage des données
movies.show(10)
ratings.show(10)

#----------------Joint du bloc de données de movie dans les ratings  ----------------

movie_ratings = ratings.join(movies, ['movieId'], 'left') 
movie_ratings.show(10)

#--------------------------------Dans cette études on s'interesse plus sur les notes(ratings)-------------------------

#---Récupération des noms des colonnes des notes (ratings)
cols=ratings.columns

#---Récupération des données ratings transformé
ratings_trans=data.transformation(ratings, cols,'features')

#----Affichage des données ratings transformé
ratings_trans.show(10,False)


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|

In [128]:
#----------------Construction et test du modèle ----------------

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

class Recommandation:
    def __init__(self):
        logging.info("filtering Collaboration en utilisant le modele ALS !")
        
          
    def modele(self,data_trans:DataFrame):# -> DataFrame:
        
        #----------------division des données (Test et Entraînement) ----------------
        (train, test) = ratings_trans.randomSplit([0.8, 0.2])
        
        #----------------Création du model ALS --------------------------
        als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", 
          nonnegative = True, coldStartStrategy="drop")
        
        # Vérification de la création du model
        type(als)
        
        #----------------Ajout des hyperparamètres et leurs valeurs à param_grid ----------------
        #----------------cela prendra beaucoup de temps dans la période de cv, 
        #-----------------donc il suffit d'utiliser quelques paramètres pour essayer--------------        
        paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.01])
             .addGrid(als.rank, [10])
             .addGrid(als.maxIter, [15])
             .build())
        
        # paramGrid = (ParamGridBuilder()
        # .addGrid(als.regParam, [0.01, 0.5, 1, 1.5])
        # .addGrid(als.rank, [10, 15, 20, 25])
        # .addGrid(als.maxIter, [1, 5, 10, 15])
        # .build())
        
        #----------------Définition l'évaluateur comme RMSE et imprimer la longueur de l'évaluateur evaluator-----------
        evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
        
        #----------------Construire la validation croisée en utilisant CrossValidator ----------------
        cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)
        
        #----------------Ajustement du validateur croisé à l'ensemble de données 'train' ---------------
        model = cv.fit(train)
        
        #----------------Extraire le meilleur modèle du modèle cv ci-dessus --------------- 
        best_model = model.bestModel

        # Print best_model
        print(type(best_model))

        # extraction des paramètres du modèle ALS
        print("----------Best Model---------")
        print("  Rank:", best_model._java_obj.parent().getRank())
        print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
        print("  RegParam:", best_model._java_obj.parent().getRegParam())
        
        print("----------------------------------------------------------")
     
        #----------------Afficher les prédictions  ----------------

        test_predictions = best_model.transform(test)

        RMSE = evaluator.evaluate(test_predictions) 
        
        print("RMSE : ", RMSE)
        
        test_predictions.show()
        
        print("----------------------------------------------------------")

        #----------------Générer n recommandations pour tous les utilisateurs ----------------
        nRecommendations = best_model.recommendForAllUsers(5) 
        
        print("----------------------------------------------------------")

        #----------------Conversion des recommandations en format interprétable ----------------
        nRecommendations = nRecommendations\
            .withColumn("rec_exp", explode("recommendations"))\
            .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))
        
        
        #----------------Recommandations ALS du 100e utilisateur ----------------
        print("Films recommandé au 100e utilisateur")
        nRecommendations.join(movies, on='movieId').filter('userId = 100').show()
        
        print("----------------------------------------------------------")

        
        #---------------- Préférence réelle du 100e utilisateur ----------------
        print("Préférence réelle du 100e utilisateur")
        ratings.join(movies, on='movieId').filter('userId = 100').sort('rating', ascending=False).limit(10).show()

In [129]:
f = Recommandation()
rec = f.modele(data_trans)

<class 'pyspark.ml.recommendation.ALSModel'>
----------Best Model---------
  Rank: 10
  MaxIter: 15
  RegParam: 0.01
----------------------------------------------------------
RMSE :  1.0404759650986148
+------+-------+------+----------+--------------------+----------+
|userId|movieId|rating| timestamp|            features|prediction|
+------+-------+------+----------+--------------------+----------+
|   133|    471|   4.0| 843491793|[133.0,471.0,4.0,...| 2.2583044|
|   597|    471|   2.0| 941558175|[597.0,471.0,2.0,...| 3.1255198|
|   603|    471|   4.0| 954482443|[603.0,471.0,4.0,...| 3.6480277|
|   474|    471|   3.0| 974668858|[474.0,471.0,3.0,...| 3.4387953|
|   387|    471|   3.0|1139047519|[387.0,471.0,3.0,...| 3.4384892|
|   136|    471|   4.0| 832450058|[136.0,471.0,4.0,...| 1.8491004|
|   411|    471|   4.0| 835532928|[411.0,471.0,4.0,...| 4.0569997|
|   414|    471|   5.0| 961514069|[414.0,471.0,5.0,...| 2.5205312|
|   426|    471|   5.0|1451081135|[426.0,471.0,5.0,...| 3.23