In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=728bcb8a2c688146ce22eadd3fe2c72db2dc5d2b01bc5d419edd6819f8b46be7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import array_contains, col, lit, when, expr, size, array, explode, sum as spark_sum, udf
from pyspark.sql.types import ArrayType, StringType, IntegerType, FloatType
import numpy as np
import logging
import re

# REMA Project - Léo BILLANT - SCIA 2025

The code and most of its explanation/choices is in this notebook. The rest is in the README file. Thank you for reading !

# DATA LOADING

In [3]:
# Load MovieLens data
movies = pd.read_csv('/kaggle/input/movielens-ml1m/ml-1m/ml-1m/movies.dat', sep='::', engine='python', names=['movieId', 'title', 'genres'], encoding='ISO-8859-1')
ratings = pd.read_csv('/kaggle/input/movielens-ml1m/ml-1m/ml-1m/ratings.dat', sep='::', engine='python', names=['userId', 'movieId', 'rating', 'timestamp'], encoding='ISO-8859-1')

# Load IMDb data
imdb_titles = pd.read_csv('/kaggle/input/movies/title.basics.tsv/title.basics.tsv', sep='\t', low_memory=False)
imdb_ratings = pd.read_csv('/kaggle/input/movies/title.ratings.tsv/title.ratings.tsv', sep='\t', low_memory=False)

# DATA PREPROCESSING

In [4]:
# We only want movies for a movie night, so we only keep movies

imdb_titles = imdb_titles[imdb_titles['titleType'] == 'movie']

# We don't want movies that are not at least a little popular on imdb

imdb_ratings = imdb_ratings[imdb_ratings['numVotes'] > 500]

In [5]:
# Get the year of the movie from the title

def extract_year(title):
    match = re.search(r'\(\d{4}\)', title)
    year = match.group(0)[1:-1] if match else ''
    return year

movies['year'] = movies['title'].apply(extract_year)

In [6]:
# To make sure that movies and imdb_titles merge well, we put the titles in the same form
# For example, "Women, The (1939)" -> "women the" and "The Women" -> "women the"

def standardize_title(title):
    if pd.isna(title):
        return ''
    title = re.sub(r'\(\d{4}\)', '', title).strip().lower()
    title = re.sub(r'[^a-z0-9\s]', '', title)
    if title.startswith('the '):
        title = title[4:] + ' the'
    return title

movies['title_small'] = movies['title'].apply(standardize_title)
imdb_titles['primaryTitle'] = imdb_titles['primaryTitle'].apply(standardize_title)

In [7]:
# Merging the data
merged_data = pd.merge(movies, ratings, on='movieId')
merged_data = pd.merge(merged_data, imdb_titles, left_on=['title_small', 'year'], right_on=['primaryTitle', 'startYear'], how='left')
merged_data = pd.merge(merged_data, imdb_ratings, on='tconst', how='left')

# If tconst was empty or removed by the previous filter (if titleType != movie), drop row
merged_data = merged_data.dropna(subset=['tconst'])

# If numvotes was empty or removed by the previous filter (if numvotes < 500), drop row
merged_data = merged_data.dropna(subset=['numVotes'])

# Imdb ratings are on a 0-10 scale. Let's put them on a 0-5 scale
merged_data['averageRating'] = merged_data['averageRating']/2


In [8]:
# Extract genres from both datasets for each movie and combine them
def combine_genres(row):
    genres_x = set(row['genres_x'].split('|'))
    genres_y = set(row['genres_y'].split(','))
    combined_genres = genres_x.union(genres_y)
    combined_genres.discard('(no genres listed)')
    return list(combined_genres)

merged_data['genres_combined'] = merged_data.apply(combine_genres, axis=1)

In [9]:
# Drop irrelevant columns
merged_data = merged_data.drop(columns=['genres_x', 'genres_y', 'titleType', 'originalTitle', 'isAdult', 'startYear', 'endYear', 'runtimeMinutes', 'timestamp','title_small', 'primaryTitle'])

In [10]:
# Aggregate duplicate ratings by taking the mean rating
merged_data = merged_data.groupby(['userId', 'movieId']).agg({
    'rating': 'mean',
    'title': 'first',
    'genres_combined': 'first',
    'averageRating': 'first',
    'numVotes': 'first',
    'year': 'first'
    
}).reset_index()

In [11]:
# Be sure to not have any lacking data
merged_data =  merged_data.dropna(subset=['title'])

In [12]:
# We do not want biased movies in our model, so we take only movies with at least 50 reviews in movielens

# Count occurrences of each movieId
movie_counts = merged_data['movieId'].value_counts()

# Filter movieIds that appear more than 10 times
popular_movieIds = movie_counts[movie_counts > 50].index

# Filter ratings DataFrame based on popular movieIds
filtered_data = merged_data[merged_data['movieId'].isin(popular_movieIds)]

In [13]:
# We do not want empty any movieId for the Als Algorithm, otherwise it will generate recommendations for movieId indices that are not present in the training data

# Get unique ids
unique_movieIds = sorted(filtered_data['movieId'].unique())

# Replace movieId with a new one
movieId_mapping = {old_id: new_id for new_id, old_id in enumerate(unique_movieIds)}
filtered_data.loc[:, 'movieId'] = filtered_data['movieId'].map(movieId_mapping)

In [14]:
filtered_data

Unnamed: 0,userId,movieId,rating,title,genres_combined,averageRating,numVotes,year
0,1,0,5.0,Toy Story (1995),"[Comedy, Animation, Children's, Adventure]",4.15,1076316.0,1995
1,1,39,5.0,Pocahontas (1995),"[Drama, Musical, Animation, Children's, Romanc...",3.35,204325.0,1995
2,1,88,5.0,Apollo 13 (1995),"[History, Drama, Adventure]",3.85,317673.0,1995
3,1,154,4.0,Star Wars: Episode IV - A New Hope (1977),"[Action, Fantasy, Adventure, Sci-Fi]",4.30,1459641.0,1977
4,1,319,5.0,Schindler's List (1993),"[Drama, War, History, Biography]",4.50,1461889.0,1993
...,...,...,...,...,...,...,...,...
899276,6040,1968,2.0,Caddyshack (1980),"[Sport, Comedy]",3.60,128004.0,1980
899277,6040,2017,4.0,Blazing Saddles (1974),"[Western, Comedy]",3.85,153768.0,1974
899278,6040,2025,4.0,Blood Simple (1984),"[Drama, Crime, Film-Noir, Thriller]",3.75,106010.0,1984
899279,6040,2067,4.0,Serpico (1973),"[Drama, Crime, Biography]",3.85,136020.0,1973


# Model Setup

In [15]:
# Initialize Spark session
spark = SparkSession.builder.appName("CoupleRecommandations").getOrCreate()

# Load the filtered_data into a Spark DataFrame
spark_df = spark.createDataFrame(filtered_data)

# Extract unique genres
unique_genres = set()
for genres in filtered_data['genres_combined']:
    unique_genres.update(genres)

# Create a one-hot encoding for genres
for genre in unique_genres:
    spark_df = spark_df.withColumn(genre, when(array_contains(col('genres_combined'), genre), 1).otherwise(0))

spark_df = spark_df.repartition(200)
spark_df.cache()

# Creating a User-Item Interaction Matrix for ALS
indexer = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in ["userId", "movieId"]]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(spark_df).transform(spark_df)

# Ensure transformed DataFrame retains necessary columns
spark_df = transformed.select("userId", "movieId", "rating", "title", "genres_combined", "averageRating", "numVotes", "movieId_index", "year")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/07 19:26:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/07 19:28:25 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/07/07 19:28:26 WARN TaskSetManager: Stage 0 contains a task of very large size (20248 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [16]:
# Split data into training and test sets
(training, test) = transformed.randomSplit([0.8, 0.2], seed=42)

# ALS model
als = ALS(
    maxIter=10,
    regParam=0.1,
    rank=20,
    userCol="userId_index",
    itemCol="movieId_index",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True
)

# Training
model = als.fit(training)

24/07/07 19:29:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

# Rating for Single Users

In [17]:
max_rating = 5
min_rating = 0

# Force ALS Score to be between 0 and 5
def clip_score(score):
    return min(max(score, min_rating), max_rating)

clip_udf = udf(clip_score, FloatType())

In [18]:
# Get the individual score for ALS
def get_top_n_recommendations(user_id, n, model):
    user_recs = model.recommendForUserSubset(spark.createDataFrame([(user_id,)], ["userId_index"]), n)
    user_recs = user_recs.withColumn("rec", explode("recommendations"))
    user_recs = user_recs.select("userId_index", col("rec.movieId_index").alias("movieId_index"), col("rec.rating").alias("score"))
    user_recs = user_recs.withColumn("score", clip_udf(col("score")))
    return user_recs

In [19]:
# Get the individual recommendation for one user
def recommend_for_single_user(user_id, n, model, spark_df, filtered_data):
    # Get top N recommendations for the user
    user_recs = get_top_n_recommendations(user_id, n, model)
    
    # Convert Spark DataFrame to Pandas DataFrame
    user_recs_pd = user_recs.toPandas()
    
    # Sort and display the top 10 recommendations
    user_recs_pd = user_recs_pd.sort_values(by='score', ascending=False).head(10)
    
    # Join with filtered_data to get the titles and other details
    user_recs_pd = user_recs_pd.merge(filtered_data, how='left', left_on='movieId_index', right_on='movieId').drop_duplicates(subset=["movieId_index", "score"])
    
    # Display recommendations
    print(f"Recommendations for user {user_id}:")
    print(user_recs_pd[['title', 'score']])

# Example usage for single user
user1_id = 1
recommend_for_single_user(user1_id, 10, model, spark_df, filtered_data)


[Stage 173:>                                                        (0 + 1) / 1]

Recommendations for user 1:
                             title     score
0                     Shaft (1971)  4.541117
222            Superman III (1983)  4.467391
733      Addams Family, The (1991)  4.454474
1447      Leaving Las Vegas (1995)  4.429168
2427  Devil in a Blue Dress (1995)  4.423054
2806  Gentleman's Agreement (1947)  4.394519
2897     Body Snatcher, The (1945)  4.391973
2961  Home for the Holidays (1995)  4.390656
3060                  Balto (1995)  4.381446
3159   Natural Born Killers (1994)  4.375618


                                                                                

# Rating for Couples

In [20]:
# Get favorite movie genres for one user
def get_favorite_genres(user_id):
    user_genres = merged_data[merged_data['userId'] == user_id]['genres_combined'].explode().value_counts()
    return list(user_genres.nlargest(5).index)

# Get favorite movie year/era for one user
def get_favorite_year(user_id):
    # Filter the data for the given user to get the years of the movies they watched
    user_data = merged_data[merged_data['userId'] == user_id]
    watched_years = user_data['year']
    
    # Here I chose to do median instead of mean to not have really old classics and brand new blockbusters that everyone has seen bias the result
    return int(np.median(watched_years))
    
    
# Get movie recommendations for a date night for 2 users
def recommend_for_couple(user1_id, user2_id, model, spark_df, merged_data, filtered_data, n=1000):
    # Get ALS recommendations for each user
    user1_recs = get_top_n_recommendations(user1_id, n, model).withColumnRenamed("score", "score_user1")
    user2_recs = get_top_n_recommendations(user2_id, n, model).withColumnRenamed("score", "score_user2")
    
    # Combine the ALS recommendations
    combined_recs = user1_recs.join(user2_recs, "movieId_index", "outer")
    combined_recs = combined_recs.withColumn("als_score", 
                                             (when(col("score_user1").isNull(), 0).otherwise(col("score_user1")) + 
                                              when(col("score_user2").isNull(), 0).otherwise(col("score_user2"))) / 2)
    
    # Get favorite genres for each user
    user1_genres = get_favorite_genres(user1_id)
    user2_genres = get_favorite_genres(user2_id)
    
    # Keep the favorite genres that the 2 users have in common
    favorite_genres = list(set(user1_genres).intersection(set(user2_genres)))
    
    # Put year in the good format    
    merged_data['year'] = merged_data['title'].str.extract(r'\((\d{4})\)').astype(float)
    spark_df = spark_df.withColumn("year", col("year").cast("float"))
    
    # Get favorite movie year for each user
    user1_year = get_favorite_year(user1_id)
    user2_year = get_favorite_year(user2_id)
    
    # Take the mean of both users favorite year
    # We don't use the median for every movie because we want both of the users to be satisfied
    favorite_year = (user1_year + user2_year) / 2
    
    # Join with movie details
    combined_recs = combined_recs.join(spark_df.select("movieId_index", "genres_combined", "averageRating", "title", "numVotes", "year"), "movieId_index", "left").dropDuplicates(["movieId_index", "als_score"])
    
    # Filter out movies that either user has already watched
    watched_movies_user1 = filtered_data[filtered_data['userId'] == user1_id]['movieId'].unique()
    watched_movies_user2 = filtered_data[filtered_data['userId'] == user2_id]['movieId'].unique()
    watched_movies = set(watched_movies_user1).union(set(watched_movies_user2))
    combined_recs = combined_recs.filter(~col("movieId_index").isin(watched_movies))
    
    # Create a UDF to calculate matching genres
    @udf(returnType=FloatType())
    def calculate_genre_score(genres, favorite_genres):
        if not genres or not favorite_genres:
            return 0.0

        genre_set = set(genres)
        favorite_set = set(favorite_genres)

        matching_genres = len(genre_set.intersection(favorite_set))
        total_favorites = len(favorite_set)
        
        # We check how many favorite genres are present
        return matching_genres / total_favorites
    
    # Create a UDF to calculate matching year
    @udf(returnType=FloatType())
    def calculate_year_score(movie_year, favorite_year):
        if not movie_year or not favorite_year:
            return 0.0

        year_difference = abs(movie_year - favorite_year)
        
        # Score calculation : 30 years maximum otherwise it's too far from the liked era. Each year difference represents 3.33% of the score.
        score = max(0.0, (30 - year_difference) / 30)
        return score
    
    
    # Calculate genre match score
    combined_recs = combined_recs.withColumn("genre_score", 
                                             calculate_genre_score(col("genres_combined"), array(*[lit(g) for g in favorite_genres])))
    
    # Calculate year match score
    combined_recs = combined_recs.withColumn("year_score", calculate_year_score(col("year"), lit(favorite_year)))
    
    
    # Calculate final score :
    # als_score is the most relevent as we trained the model. It is also the center of this exercise.
    # averageRating of imDb is also quite relevant as imdb is a worlwide reference in movie ranking
    # genre_score is only at 0.1 because it creates a very strong bias. Putting more would be pretty much the same as filtering the genres to only get all of the favorites genres
    # year_score is only at 0.1, because it is not as relevant as the rest
    combined_recs = combined_recs.withColumn("final_score", 
                                             0.6 * col("als_score") + 
                                             0.1 * col("genre_score") * 5 + 
                                             0.2 * col("averageRating") +
                                             0.1 * col("year_score") * 5
                                            )
    
    # Sort by final score and get the top 5 movies
    top_movies = combined_recs.orderBy(col("final_score").desc()).limit(5)

    # Collect the top 5 movies
    top_movies_list = top_movies.collect()

    print(f"Favorites genres of the couple {favorite_genres}:")
    print(f"Favorites year of the couple {favorite_year}:")

    # Extract and print details of the top 5 movies
    for idx, movie in enumerate(top_movies_list):
        print(f"\nTop {idx + 1} movie recommendation for users {user1_id} and {user2_id}:")
        print(f"Title: {movie['title']}")
        print(f"Genres: {movie['genres_combined']}")
        print(f"IMDb Rating: {movie['averageRating']} / 5")
        print(f"ALS Score: {movie['als_score']} / 5")
        print(f"Genre Score: {movie['genre_score']*5} / 5")
        print(f"Year Score: {movie['year_score']*5} / 5")
        print(f"Predicted rating for the couple: {movie['final_score']:.2f} / 5")
        
        

# Example usage for a date night for user 1 and 2
user1_id = 1
user2_id = 2
recommend_for_couple(user1_id, user2_id, model, spark_df, merged_data, filtered_data)




Favorites genres of the couple ['Adventure', 'Drama']:
Favorites year of the couple 1991.0:

Top 1 movie recommendation for users 1 and 2:
Title: Schindler's List (1993)
Genres: ['Drama', 'War', 'History', 'Biography']
IMDb Rating: 4.5 / 5
ALS Score: 4.200810432434082 / 5
Genre Score: 2.5 / 5
Year Score: 4.666666686534882 / 5
Predicted rating for the couple: 4.14 / 5

Top 2 movie recommendation for users 1 and 2:
Title: Shawshank Redemption, The (1994)
Genres: ['Drama']
IMDb Rating: 4.65 / 5
ALS Score: 4.112240791320801 / 5
Genre Score: 2.5 / 5
Year Score: 4.4999998807907104 / 5
Predicted rating for the couple: 4.10 / 5

Top 3 movie recommendation for users 1 and 2:
Title: Lion King, The (1994)
Genres: ['Drama', 'Musical', 'Animation', "Children's", 'Adventure']
IMDb Rating: 4.25 / 5
ALS Score: 3.750603675842285 / 5
Genre Score: 5.0 / 5
Year Score: 4.4999998807907104 / 5
Predicted rating for the couple: 4.05 / 5

Top 4 movie recommendation for users 1 and 2:
Title: Silence of the Lambs

                                                                                

# Model Evaluation

In [21]:
# Evaluate the model for rmse
evaluator_rmse = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction"
)

# Evaluate the model for mae
evaluator_mae = RegressionEvaluator(
    metricName="mae", labelCol="rating", predictionCol="prediction"
)
predictions = model.transform(test)
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)

print("RMSE=" + str(rmse))
print(f"MAE = {mae}")

                                                                                

RMSE=0.8669688747568868
MAE = 0.6957531818563544
