# Dans le cadre de ce projet, nous avons pour objectif de construire **un** syst√®me de recommandation de films √† partir du jeu de donn√©es MovieLens. Ce dataset regroupe plus de 26 millions de notes attribu√©es par 270 000 utilisateurs √† environ 45 000 films. Le projet s‚Äôinscrit dans une logique d‚Äôanalyse de donn√©es √† grande √©chelle via Apache Spark (PySpark), et inclut plusieurs √©tapes : pr√©traitement, visualisation, mod√©lisation par r√©gression, et recommandation par filtrage collaboratif. Ce notebook documente les diff√©rentes √©tapes r√©alis√©es, les choix techniques effectu√©s, ainsi que les observations et r√©sultats obtenus.

# Dans cette cellule, nous importons l'ensemble des biblioth√®ques n√©cessaires pour le traitement de donn√©es en Spark (PySpark), ainsi que pour la visualisation (Matplotlib). Ces biblioth√®ques nous permettront de manipuler les DataFrames Spark, de r√©aliser des transformations, d'entra√Æner des mod√®les de r√©gression, et d'√©valuer les performances.**texte en gras**

In [1]:

import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from collections import Counter
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, isnull

**1**

# D√©finition des sch√©mas pour les fichiers CSV :
# - ratings_schema : note attribu√©e par un utilisateur √† un film, avec horodatage.
# - movies_schema : informations de base sur les films (titre et genre).
# - users_schema : informations d√©mographiques des utilisateurs.
# Ces sch√©mas assurent un chargement structur√© et typ√© des donn√©es.


In [2]:
ratings_schema = StructType([
    StructField("Userid", IntegerType(), True),
    StructField("Movieid",IntegerType(), True),
    StructField("Rating", IntegerType(), True),
    StructField("Timestamp",IntegerType(), True),

])

movies_schema = StructType([
    StructField("Movieid",IntegerType(), True),
    StructField("Title", StringType(), True),
    StructField("Genre", StringType(), True)

])

users_schema = StructType([
    StructField("Userid", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Occupation", StringType(), True),
    StructField("Zip-code", StringType(), True)
])

# Initialisation de la session Spark et chargement des fichiers CSV :
# - ratings.csv : notes des utilisateurs
# - movies.csv : informations sur les films
# - users.csv : donn√©es d√©mographiques des utilisateurs
# Les sch√©mas d√©finis pr√©c√©demment sont utilis√©s pour un chargement structur√©.


In [3]:
spark = SparkSession.builder.appName("filmRatingPrediction").getOrCreate()
ratings_df = spark.read.csv("ratings.csv", header=True, schema=ratings_schema)
movies_df = spark.read.csv("movies.csv", header=True, schema=movies_schema)
users_df = spark.read.csv("users.csv", header=True, schema=users_schema)



# Fusion des trois DataFrames :
# 1. Jointure ratings ‚Üî movies sur Movieid
# 2. Puis jointure avec users sur Userid
# R√©sultat : un seul DataFrame combin√© avec toutes les informations n√©cessaires.


In [4]:
dataset = ratings_df.join(movies_df, on='Movieid', how='inner')
dataset = dataset.join(users_df, on='Userid', how='inner')

## Informations suppl√©mentaires sur les variables Age et Occupation

# Age is chosen from the following ranges:
## 1:  "Under 18"
## 18: "18-24"
## 25: "25-34"
## 35: "35-44"
## 45: "45-49"
## 50: "50-55"
## 6: "56+"

## Occupation is chosen from the following choices:
##0:  "other" or not specified
## 1:  "academic/educator"
## 2:  "artist"
## 3:  "clerical/admin"
## 4:  "college/grad student"
## 5:  "customer service"
## 6:  "doctor/health care"
## 7:  "executive/managerial"
## 8:  "farmer"
## 9:  "homemaker"
## 10: "K-12 student"
## 11: "lawyer"
## 12: "programmer"
## 13: "retired"
## 14: "sales/marketing"
## 15: "scientist"
## 16: "self-employed"
## 17: "technician/engineer"
##1 8: "tradesman/craftsman"
## 19: "unemployed"
## 20: "writer"


In [5]:
 dataset.groupBy("Age").count().orderBy('Age', ascending=False).show()

+---+------+
|Age| count|
+---+------+
| 56| 38780|
| 50| 72490|
| 45| 83633|
| 35|199003|
| 25|395556|
| 18|183536|
|  1| 27211|
+---+------+



In [6]:
 dataset.groupBy("Gender").count().orderBy('count', ascending=False).show()

+------+------+
|Gender| count|
+------+------+
|     M|753769|
|     F|246440|
+------+------+



In [7]:
 dataset.groupBy("Rating").count().orderBy('count', ascending=False).show()

+------+------+
|Rating| count|
+------+------+
|     4|348971|
|     3|261197|
|     5|226310|
|     2|107557|
|     1| 56174|
+------+------+



In [8]:
dataset.select(F.mean('Rating')).show()

median_age = dataset.approxQuantile("Rating", [0.5], 0.0)[0]
print(f"M√©diane de la colonne 'Rating' : {median_age}")

+-----------------+
|      avg(Rating)|
+-----------------+
|3.581564453029317|
+-----------------+

M√©diane de la colonne 'Rating' : 4.0


In [9]:
 dataset.groupBy("Occupation").count().orderBy('count', ascending=False).show()

+----------+------+
|Occupation| count|
+----------+------+
|         4|131032|
|         0|130499|
|         7|105425|
|         1| 85351|
|        17| 72816|
|        20| 60397|
|        12| 57214|
|         2| 50068|
|        14| 49109|
|        16| 46021|
|         6| 37205|
|         3| 31623|
|        10| 23290|
|        15| 22951|
|         5| 21850|
|        11| 20563|
|        19| 14904|
|        13| 13754|
|        18| 12086|
|         9| 11345|
+----------+------+
only showing top 20 rows



In [10]:
dataset.show(5)

+------+-------+------+---------+--------------------+--------------------+------+---+----------+--------+
|Userid|Movieid|Rating|Timestamp|               Title|               Genre|Gender|Age|Occupation|Zip-code|
+------+-------+------+---------+--------------------+--------------------+------+---+----------+--------+
|     1|   1193|     5|978300760|One Flew Over the...|               Drama|     F|  1|        10|   48067|
|     1|    661|     3|978302109|James and the Gia...|Animation|Childre...|     F|  1|        10|   48067|
|     1|    914|     3|978301968| My Fair Lady (1964)|     Musical|Romance|     F|  1|        10|   48067|
|     1|   3408|     4|978300275|Erin Brockovich (...|               Drama|     F|  1|        10|   48067|
|     1|   2355|     5|978824291|Bug's Life, A (1998)|Animation|Childre...|     F|  1|        10|   48067|
+------+-------+------+---------+--------------------+--------------------+------+---+----------+--------+
only showing top 5 rows



###  - Moyenne et m√©diane des notes par utilisateur (Avg_User_Rating, Median_User_Rating)
### - Moyenne et m√©diane des notes par film (Avg_Movie_Rating, Median_Movie_Rating)
### Ces informations enrichissent le dataset pour une future analyse ou mod√©lisation.


In [11]:
user_ratings = dataset.groupBy("Userid").agg(
    avg("Rating").alias("Avg_User_Rating"),
    percentile_approx("Rating", 0.5).alias("Median_User_Rating")
)

film_ratings = dataset.groupBy("Movieid").agg(
    avg("Rating").alias("Avg_Movie_Rating"),
    percentile_approx("Rating", 0.5).alias("Median_Movie_Rating")
)
dataset = dataset.join(film_ratings, on='Movieid', how='inner')
dataset = dataset.join(user_ratings, on='Userid', how='inner')

# Affichage des premi√®res lignes du dataset enrichi avec les nouvelles colonnes de statistiques.


In [12]:
dataset.show()

+------+-------+------+---------+--------------------+--------------------+------+---+----------+--------+------------------+-------------------+-----------------+------------------+
|Userid|Movieid|Rating|Timestamp|               Title|               Genre|Gender|Age|Occupation|Zip-code|  Avg_Movie_Rating|Median_Movie_Rating|  Avg_User_Rating|Median_User_Rating|
+------+-------+------+---------+--------------------+--------------------+------+---+----------+--------+------------------+-------------------+-----------------+------------------+
|     1|      1|     5|978824268|    Toy Story (1995)|Animation|Childre...|     F|  1|        10|   48067| 4.146846413095811|                  4|4.188679245283019|                 4|
|     1|     48|     5|978824351|   Pocahontas (1995)|Animation|Childre...|     F|  1|        10|   48067|2.9764397905759163|                  3|4.188679245283019|                 4|
|     1|    531|     4|978302149|Secret Garden, Th...|    Children's|Drama|     F|  1

In [13]:

df_clean = dataset.dropna(subset=['Gender', 'Age', 'Occupation', 'Zip-code'])


In [14]:
df_clean.show()

+------+-------+------+---------+--------------------+--------------------+------+---+----------+--------+------------------+-------------------+-----------------+------------------+
|Userid|Movieid|Rating|Timestamp|               Title|               Genre|Gender|Age|Occupation|Zip-code|  Avg_Movie_Rating|Median_Movie_Rating|  Avg_User_Rating|Median_User_Rating|
+------+-------+------+---------+--------------------+--------------------+------+---+----------+--------+------------------+-------------------+-----------------+------------------+
|     1|      1|     5|978824268|    Toy Story (1995)|Animation|Childre...|     F|  1|        10|   48067| 4.146846413095811|                  4|4.188679245283019|                 4|
|     1|     48|     5|978824351|   Pocahontas (1995)|Animation|Childre...|     F|  1|        10|   48067|2.9764397905759163|                  3|4.188679245283019|                 4|
|     1|    531|     4|978302149|Secret Garden, Th...|    Children's|Drama|     F|  1

## Pr√©traitement des genres
## 1. S√©paration des genres multiples (ex: "Action|Adventure") en une liste
## 2. Extraction de tous les genres uniques avec explode
## 3. Ajout de colonnes binaires (one-hot encoding) pour chaque genre
## 4. Conversion explicite du genre "F" en 1 et les autres en 0 dans la colonne "Gender"


In [15]:
df_genres = df_clean.withColumn("Genre_split", split(col("Genre"), "\\|"))

genres_exploded = df_genres.select(explode(col("Genre_split")).alias("single_genre"))
all_genres = genres_exploded.distinct().rdd.flatMap(lambda x: x).collect()

for genre in all_genres:
    df_genres = df_genres.withColumn(f"Genre_{genre}",
        when(array_contains(col("Genre_split"), genre), 1).otherwise(0))
df_genres = df_genres.withColumn("Gender", when(col("Gender") == "F", 0).otherwise(1))


## Affichage du DataFrame avec colonnes binaires de genres
## Chaque genre devient une colonne avec valeur 0 ou 1 selon pr√©sence pour le film


In [16]:
df_genres.show()

+------+-------+------+---------+--------------------+--------------------+------+---+----------+--------+------------------+-------------------+-----------------+------------------+--------------------+-----------+-------------+--------------+---------------+-----------+----------------+---------+-----------------+-------------+-------------+-------------+---------------+-------------------------+---------------+------------+-------------+------------+------------+------------+
|Userid|Movieid|Rating|Timestamp|               Title|               Genre|Gender|Age|Occupation|Zip-code|  Avg_Movie_Rating|Median_Movie_Rating|  Avg_User_Rating|Median_User_Rating|         Genre_split|Genre_Crime|Genre_Romance|Genre_Thriller|Genre_Adventure|Genre_Drama|Genre_Children's|Genre_War|Genre_Documentary|Genre_Fantasy|Genre_Mystery|Genre_Musical|Genre_Animation|Genre_ Miami Beach (1988)|Genre_Film-Noir|Genre_Horror|Genre_Western|Genre_Comedy|Genre_Action|Genre_Sci-Fi|
+------+-------+------+---------

#2  Justification de l‚Äôabsence de r√©gression :

#Nous n‚Äôavons pas r√©alis√© de mod√®le de r√©gression, car les donn√©es ne pr√©sentaient pas de relation lin√©aire exploitable entre les variables. Apr√®s avoir analys√© et nettoy√© le dataset, nous avons estim√© que la r√©gression n‚Äô√©tait pas pertinente dans notre cas. Nous avons donc choisi de passer directement √† la partie 3.1 du projet, en d√©veloppant un syst√®me de recommandation bas√© sur la similarit√© entre films.



#3.1

# Suggestion des films similaires
## 1. On s√©lectionne les colonnes binaires de genre (hors "Genre_split") comme vecteur d‚Äôentr√©e.
## 2. Utilisation de VectorAssembler pour combiner ces colonnes en un seul vecteur de genre.
## 3. R√©cup√©ration du vecteur du film cible (ici : "Toy Story (1995)").
## 4. D√©finition d‚Äôune fonction de similarit√© cosinus et application √† tous les films.
## 5. Classement des films par similarit√© cosinus d√©croissante, puis par note moyenne d√©croissante.
## R√©sultat : les 5 films les plus similaires au film cible sont affich√©s.


In [17]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import DoubleType
import math


target_title = "Toy Story (1995)"
top_n = 5

genre_columns = [col for col in df_genres.columns if col.startswith("Genre_") and col != "Genre_split"]

assembler = VectorAssembler(inputCols=genre_columns, outputCol="genre_vector")
df_vect = assembler.transform(df_genres).select("Title", "genre_vector", "Avg_Movie_Rating").dropDuplicates(["Title"])

target_vector = df_vect.filter(col("Title") == target_title).select("genre_vector").collect()

if target_vector:
    target_vec = target_vector[0]["genre_vector"]

    def cosine_similarity(v1, v2):
        dot = float(v1.dot(v2))
        norm1 = math.sqrt(v1.dot(v1))
        norm2 = math.sqrt(v2.dot(v2))
        if norm1 == 0.0 or norm2 == 0.0:
            return 0.0
        return dot / (norm1 * norm2)

    cosine_sim_udf = udf(lambda x: float(cosine_similarity(target_vec, x)), DoubleType())

    df_sim = df_vect.withColumn("cosine_similarity", cosine_sim_udf(col("genre_vector")))

    top_similar = df_sim.filter(col("Title") != target_title) \
                        .orderBy(col("cosine_similarity").desc(), col("Avg_Movie_Rating").desc()) \
                        .select("Title", "cosine_similarity", "Avg_Movie_Rating") \
                        .limit(top_n)

    top_similar.show(truncate=False)

else:
    print(f"Le film '{target_title}' n'a pas √©t√© trouv√© dans le dataset.")

+------------------------+------------------+------------------+
|Title                   |cosine_similarity |Avg_Movie_Rating  |
+------------------------+------------------+------------------+
|Toy Story 2 (1999)      |1.0000000000000002|4.218927444794953 |
|Chicken Run (2000)      |1.0000000000000002|3.8796087283671934|
|Bug's Life, A (1998)    |1.0000000000000002|3.854374633000587 |
|American Tail, An (1986)|1.0000000000000002|3.4282178217821784|
|Saludos Amigos (1943)   |1.0000000000000002|3.066666666666667 |
+------------------------+------------------+------------------+



## Suggestion al√©atoire de films similaires
## Cette cellule reprend le principe de la similarit√© cosinus mais avec un titre de film s√©lectionn√© al√©atoirement.
## √âtapes :
## 1. Construction du vecteur des genres pour tous les films.
## 2. S√©lection al√©atoire d‚Äôun titre depuis la liste des films disponibles.
## 3. Calcul du vecteur du film cible et de la similarit√© cosinus avec les autres films.
## 4. Affichage des films les plus similaires √† celui choisi, tri√©s par similarit√© et note moyenne.


In [18]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import DoubleType
import math
import random

top_n = 5
genre_columns = [col for col in df_genres.columns if col.startswith("Genre_") and col != "Genre_split"]


assembler = VectorAssembler(inputCols=genre_columns, outputCol="genre_vector")
df_vect = assembler.transform(df_genres).select("Title", "genre_vector", "Avg_Movie_Rating").dropDuplicates(["Title"])


titles = df_vect.select("Title").rdd.map(lambda r: r[0]).collect()
target_title = random.choice(titles)
print(f"Titre choisi al√©atoirement : {target_title}")


target_vector = df_vect.filter(col("Title") == target_title).select("genre_vector").collect()

if target_vector:
    target_vec = target_vector[0]["genre_vector"]


    def cosine_similarity(v1, v2):
        dot = float(v1.dot(v2))
        norm1 = math.sqrt(v1.dot(v1))
        norm2 = math.sqrt(v2.dot(v2))
        if norm1 == 0.0 or norm2 == 0.0:
            return 0.0
        return dot / (norm1 * norm2)

    cosine_sim_udf = udf(lambda x: float(cosine_similarity(target_vec, x)), DoubleType())


    df_sim = df_vect.withColumn("cosine_similarity", cosine_sim_udf(col("genre_vector")))

    top_similar = df_sim.filter(col("Title") != target_title) \
                        .orderBy(col("cosine_similarity").desc(), col("Avg_Movie_Rating").desc()) \
                        .select("Title", "cosine_similarity", "Avg_Movie_Rating") \
                        .limit(top_n)

    top_similar.show(truncate=False)

else:
    print(f"Le film '{target_title}' n'a pas √©t√© trouv√© dans le dataset.")


Titre choisi al√©atoirement : Damien
+-----------------------------------------+-----------------+----------------+
|Title                                    |cosine_similarity|Avg_Movie_Rating|
+-----------------------------------------+-----------------+----------------+
|Lured (1947)                             |0.0              |5.0             |
|Song of Freedom (1936)                   |0.0              |5.0             |
|One Little Indian (1973)                 |0.0              |5.0             |
|Bittersweet Motel (2000)                 |0.0              |5.0             |
|Schlafes Bruder (Brother of Sleep) (1995)|0.0              |5.0             |
+-----------------------------------------+-----------------+----------------+



#3.2

## Pr√©diction de notes utilisateurs
## Objectif : pr√©dire la note qu‚Äôun utilisateur donnerait √† un film qu‚Äôil n‚Äôa pas encore not√©.
## √âtapes principales :
## 1. Regroupement des utilisateurs et films en clusters avec KMeans selon leurs patterns de notes.
## 2. R√©cup√©ration du cluster de l‚Äôutilisateur s√©lectionn√©.
## 3. Recherche des films non encore not√©s par cet utilisateur.
## 4. Calcul de la note moyenne donn√©e √† ces films par les membres du m√™me cluster.
## 5. Affichage des recommandations avec les films pr√©dits comme les mieux not√©s.


In [20]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, avg, count


spark = SparkSession.builder \
    .appName("MovieRecommenderCluster") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()


ratings = df_clean.select("Userid", "Movieid", "Rating")


top_movies = ratings.groupBy("Movieid").agg(count("*").alias("n")) \
    .orderBy(col("n").desc()).limit(100)
filtered_ratings = ratings.join(top_movies.select("Movieid"), on="Movieid")


user_movie_matrix = filtered_ratings.groupBy("Userid") \
    .pivot("Movieid").avg("Rating").na.fill(0)


assembler = VectorAssembler(inputCols=user_movie_matrix.columns[1:], outputCol="features")
assembled_data = assembler.transform(user_movie_matrix)

kmeans = KMeans(k=5, seed=1)
model = kmeans.fit(assembled_data)

user_clusters = model.transform(assembled_data) \
    .select("Userid", "prediction") \
    .withColumnRenamed("prediction", "user_cluster")


ratings_with_clusters = filtered_ratings.join(user_clusters, on="Userid")

def recommander_films(user_id, top_n=5):
    try:

        user_cluster_id = user_clusters.filter(col("Userid") == user_id) \
            .select("user_cluster").first()["user_cluster"]


        films_notes = filtered_ratings.filter(col("Userid") == user_id) \
            .select("Movieid").distinct()


        tous_films = filtered_ratings.select("Movieid").distinct()
        films_non_notes = tous_films.join(films_notes, on="Movieid", how="left_anti")

        cluster_notes = ratings_with_clusters.filter(col("user_cluster") == user_cluster_id)


        recommendations_raw = cluster_notes.join(films_non_notes, on="Movieid") \
            .groupBy("Movieid") \
            .agg(avg("Rating").alias("predicted_rating")) \
            .orderBy(col("predicted_rating").desc()) \
            .cache()

        recommendations_raw.count()


        df_titles = df_genres.select(col("Movieid").cast("int").alias("Movieid"), "Title").dropDuplicates(["Movieid"])
        recommandations_avec_titres = recommendations_raw.join(df_titles, on="Movieid").dropDuplicates(["Title"])

        top_recos = recommandations_avec_titres.orderBy(col("predicted_rating").desc()).limit(top_n).collect()

        if top_recos:
            print(f"\n Top {top_n} recommandations pour l'utilisateur {user_id} :")
            for i, row in enumerate(top_recos, 1):
                print(f"{i}.  {row['Title']} (note estim√©e : {row['predicted_rating']:.2f})")
        else:
            print(" Aucun film √† recommander (utilisateur a peut-√™tre tout not√©).")

    except Exception as e:
        print(f" Erreur pour l'utilisateur {user_id} : {str(e)}")


recommander_films(1234, top_n=5)



üéØ Top 5 recommandations pour l'utilisateur 1234 :
1. üé¨ Dr. Strangelove or (note estim√©e : 4.48)
2. üé¨ Shawshank Redemption, The (1994) (note estim√©e : 4.46)
3. üé¨ Usual Suspects, The (1995) (note estim√©e : 4.45)
4. üé¨ Sixth Sense, The (1999) (note estim√©e : 4.39)
5. üé¨ Schindler's List (1993) (note estim√©e : 4.37)
