Using Spark DataFrame and SparkSQL to inspect MovieLense dataset and build movie reccomendation system

In [1]:
# Apply CSS styling to display tables with a horizontal scrollbar
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# import os

from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

## Table of contents

##### 1. Data exploratoin
##### 2. Data preprocessing
##### 3. Content-based recommendation system
##### 4. Collaborative filtering recommendation system
##### 5. Hybrid (final) recommendation system

## 1. Data exploration

In this notebook we will work with three datasets:
1. u.item file that contains info about movies
2. u.data that contains ratings
3. u.user thath holds user information

**Loading files into PySpark DataFrames**

In [4]:
# u.item file contains information about movies
movies_path = r"..\MovieLense\ml-100k\u.item"
movies_df = spark.read \
                 .option("delimiter", "|") \
                 .csv(movies_path, inferSchema=True, header=False) \
                 .toDF("movie_id", "title", "release_date", "video_release_date", "IMDb_URL", "unknown",
                       "Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary",
                       "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery",
                       "Romance", "Sci-Fi", "Thriller", "War", "Western")
print("Number of rows: ", movies_df.count())
movies_df.show(5)
print("Movies can be in several genres at once.")

Number of rows:  1682
+--------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movie_id|            title|release_date|video_release_date|            IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+--------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|       1| Toy Story (1995)| 01-Jan-1995|              null|http://us.imdb.co...|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|       2| GoldenEye (1995)| 01-Jan-1995| 

In [5]:
# Find duplicates by grouping by all columns and counting
duplicates = movies_df.groupBy(movies_df.columns).count().filter("count > 1")

duplicates.show()

+--------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----+
|movie_id|title|release_date|video_release_date|IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|count|
+--------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----+
+--------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----+



In [6]:
# u.data contains 100000 ratings by 943 users on 1682 items (movies)
ratings_path =  r"..\MovieLense\ml-100k\u.data"
ratings_df = spark.read \
                  .option("delimiter", "\t") \
                  .csv(ratings_path, inferSchema=True, header=False) \
                  .toDF("user_id", "movie_id", "rating", "timestamp")

print("Number of rows: ", ratings_df.count())
ratings_df.show(5)
print("Each user has rated at least 20 movies.")

Number of rows:  100000
+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|    196|     242|     3|881250949|
|    186|     302|     3|891717742|
|     22|     377|     1|878887116|
|    244|      51|     2|880606923|
|    166|     346|     1|886397596|
+-------+--------+------+---------+
only showing top 5 rows

Each user has rated at least 20 movies.


In [7]:
# u.user file contains demographic information about the users
users_path =  r"..\MovieLense\ml-100k\u.user"

users_df = spark.read \
                .option("delimiter", "|") \
                .csv(users_path, inferSchema=True) \
                .toDF("user_id", "age", "gender", "occupation", "zip_code")

print("Number of rows: ", users_df.count())
users_df.show(5)

Number of rows:  943
+-------+---+------+----------+--------+
|user_id|age|gender|occupation|zip_code|
+-------+---+------+----------+--------+
|      1| 24|     M|technician|   85711|
|      2| 53|     F|     other|   94043|
|      3| 23|     M|    writer|   32067|
|      4| 24|     M|technician|   43537|
|      5| 33|     F|     other|   15213|
+-------+---+------+----------+--------+
only showing top 5 rows



**1.1. Average rating score across all movies**

In [8]:
# Calculate the average rating across all movies
average_rating = ratings_df.agg(f.round(f.avg("rating"), 2).alias("average rating"))

# Show the average rating for all movies
average_rating.show()

+--------------+
|average rating|
+--------------+
|          3.53|
+--------------+



**1.2. Top 10 movies with the most and least number of views and their average ratings**

In [9]:
# Calculate number of views and average rating for each movie
movie_stats = ratings_df.groupBy("movie_id") \
                        .agg(f.count("user_id").alias("num_views"), f.avg("rating").alias("avg_rating"))

# Join with movies_df to get movie titles
movie_stats = movie_stats.join(movies_df, on=["movie_id"]) \
                        .select(movies_df["title"], "num_views", "avg_rating")

# Show 10 movies with the most views
print("Top 10 movies with the most views:")
most_views = movie_stats.orderBy(f.col("num_views").desc()).limit(10)
most_views.show(truncate=False)

# Show 10 movies with the least views
print("\nTop 10 movies with the least views:")
least_views = movie_stats.orderBy(f.col("num_views")).limit(10)
least_views.show(truncate=False)

Top 10 movies with the most views:
+-----------------------------+---------+------------------+
|title                        |num_views|avg_rating        |
+-----------------------------+---------+------------------+
|Star Wars (1977)             |583      |4.3584905660377355|
|Contact (1997)               |509      |3.8035363457760316|
|Fargo (1996)                 |508      |4.155511811023622 |
|Return of the Jedi (1983)    |507      |4.007889546351085 |
|Liar Liar (1997)             |485      |3.156701030927835 |
|English Patient, The (1996)  |481      |3.656964656964657 |
|Scream (1996)                |478      |3.4414225941422596|
|Toy Story (1995)             |452      |3.8783185840707963|
|Air Force One (1997)         |431      |3.6310904872389793|
|Independence Day (ID4) (1996)|429      |3.438228438228438 |
+-----------------------------+---------+------------------+


Top 10 movies with the least views:
+---------------------------------------------------------+---------+----

**1.3. Top 3 movies for each category**

In [10]:
# Calculate the number of views for each movie
movie_views = ratings_df.groupBy("movie_id").agg(f.count("user_id").alias("num_views"))

# Initialize an empty DataFrame to store the results
top_movies_per_genre = spark.createDataFrame([], schema="genre STRING, rank INT, movie_id INT, title STRING, total_views INT")

# Iterate over each genre column and calculate the top 3 movies for each genre
for genre in ["Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary", "Drama", "Fantasy",
               "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]:

    genre_views = movies_df.filter(f.col(genre) == 1).join(movie_views, on=["movie_id"]) \
                            .groupBy("movie_id", "title").agg(f.sum(f.col("num_views")).alias("total_views"))

    window = Window.partitionBy().orderBy(f.desc("total_views"))
    ranked_movies = genre_views.withColumn("rank", f.row_number().over(window))

    # Select the top 3 movies for each genre
    top_genre_movies = ranked_movies.filter(f.col("rank") <= 3) \
                                    .select(f.lit(genre).alias("genre"), f.col("rank"), f.col("movie_id"), f.col("title"), f.col("total_views"))

    top_movies_per_genre = top_movies_per_genre.union(top_genre_movies)

top_movies_per_genre.orderBy("genre", "rank").show(truncate=False)

+-----------+----+--------+--------------------------------------------+-----------+
|genre      |rank|movie_id|title                                       |total_views|
+-----------+----+--------+--------------------------------------------+-----------+
|Action     |1   |50      |Star Wars (1977)                            |583        |
|Action     |2   |181     |Return of the Jedi (1983)                   |507        |
|Action     |3   |300     |Air Force One (1997)                        |431        |
|Adventure  |1   |50      |Star Wars (1977)                            |583        |
|Adventure  |2   |181     |Return of the Jedi (1983)                   |507        |
|Adventure  |3   |174     |Raiders of the Lost Ark (1981)              |420        |
|Animation  |1   |1       |Toy Story (1995)                            |452        |
|Animation  |2   |71      |Lion King, The (1994)                       |220        |
|Animation  |3   |95      |Aladdin (1992)                        

**1.4. Add categorical attribute 'age_category' and show top 5 most popular movies for each age group**

In [11]:
# Define age ranges and categorize users
users_df = users_df.withColumn("age_category",
                               f.when(f.col("age").between(0, 25), "Young") \
                               .when(f.col("age").between(26, 50), "Middle Aged") \
                               .otherwise("Senior"))

movie_views = ratings_df.groupBy("movie_id").agg(f.count("user_id").alias("num_views"))

movie_views_with_titles = movie_views.join(movies_df, on=["movie_id"]) \
                                        .select(movie_views["movie_id"], movies_df["title"], "num_views")

# Join with users_df to get the age category of users
top_movies_per_age_category = ratings_df.join(users_df, on=["user_id"]) \
                                        .join(movie_views_with_titles, on=["movie_id"]) \
                                        .groupBy("age_category", "movie_id", "title") \
                                        .agg(f.count("user_id").alias("total_views")) \
                                        .orderBy("age_category", f.desc("total_views")) \
                                        .withColumn("age_category", f.col("age_category").cast(StringType())) \

# Use window function to rank movies within each age category
window_spec = Window.partitionBy("age_category").orderBy(f.desc("total_views"))
top_movies_per_age_category = top_movies_per_age_category.withColumn("rank", f.row_number().over(window_spec))

# Filter only the top 5 movies for each age category
top_movies_per_age_category = top_movies_per_age_category.filter(f.col("rank") <= 5).drop("rank")

# Show the top 5 most popular movies for every age category
top_movies_per_age_category.show(truncate=False)

+------------+--------+---------------------------+-----------+
|age_category|movie_id|title                      |total_views|
+------------+--------+---------------------------+-----------+
|Middle Aged |50      |Star Wars (1977)           |362        |
|Middle Aged |100     |Fargo (1996)               |314        |
|Middle Aged |181     |Return of the Jedi (1983)  |307        |
|Middle Aged |258     |Contact (1997)             |303        |
|Middle Aged |286     |English Patient, The (1996)|300        |
|Senior      |286     |English Patient, The (1996)|80         |
|Senior      |100     |Fargo (1996)               |60         |
|Senior      |300     |Air Force One (1997)       |54         |
|Senior      |269     |Full Monty, The (1997)     |50         |
|Senior      |127     |Godfather, The (1972)      |48         |
|Young       |288     |Scream (1996)              |195        |
|Young       |50      |Star Wars (1977)           |173        |
|Young       |181     |Return of the Jed

**1.5. Top 5 movies by gender**

In [12]:
# Join ratings_df with users_df to include gender information
joined_df = ratings_df.join(users_df, on=["user_id"])

# Calculate the number of views for each movie and gender
movie_gender_views = joined_df.groupBy("movie_id", "gender") \
                              .agg(f.count("user_id").alias("num_views"))

# Convert gender to numeric values (0 for female, 1 for male)
movie_gender_views = movie_gender_views.withColumn("gender_numeric", f.when(movie_gender_views["gender"] == "F", 0).otherwise(1))

# Pivot the dataframe to have separate columns for male and female views
movie_gender_views_pivot = movie_gender_views.groupBy("movie_id") \
                                             .pivot("gender_numeric") \
                                             .agg(f.first("num_views"))

movie_gender_views_pivot = movie_gender_views_pivot.withColumnRenamed("0", "female_views").withColumnRenamed("1", "male_views")

# Calculate the overall number of views for each movie
movie_gender_views_pivot = movie_gender_views_pivot.withColumn("total_views", f.col("female_views") + f.col("male_views"))

# Join with movies_df to get movie titles
movie_gender_views_pivot = movie_gender_views_pivot.join(movies_df, on="movie_id") \
                                                    .select(movies_df["title"], "total_views", "female_views", "male_views")

# Show top 5 most popular movies based on the number of views, segmented by gender
top_movies_views_female = movie_gender_views_pivot.orderBy(f.col("female_views").desc()).limit(5)
print("Top 5 most popular movies based on the number of views for women:")
top_movies_views_female.show(truncate=False)

top_movies_views_male = movie_gender_views_pivot.orderBy(f.col("male_views").desc()).limit(5)
print("\nTop 5 most popular movies based on the number of views for men:")
top_movies_views_male.show(truncate=False)

Top 5 most popular movies based on the number of views for women:
+---------------------------+-----------+------------+----------+
|title                      |total_views|female_views|male_views|
+---------------------------+-----------+------------+----------+
|English Patient, The (1996)|481        |152         |329       |
|Star Wars (1977)           |583        |151         |432       |
|Scream (1996)              |478        |143         |335       |
|Liar Liar (1997)           |485        |141         |344       |
|Contact (1997)             |509        |137         |372       |
+---------------------------+-----------+------------+----------+


Top 5 most popular movies based on the number of views for men:
+-------------------------+-----------+------------+----------+
|title                    |total_views|female_views|male_views|
+-------------------------+-----------+------------+----------+
|Star Wars (1977)         |583        |151         |432       |
|Fargo (1996)     

**1.6. Top 3 movies by gender and occupation**

In [13]:
# Join ratings_df with users_df to include gender and occupation information
joined_df = ratings_df.join(users_df, on="user_id")

# Calculate the number of views for each movie, gender and occupation
movie_gender_occupation_views = joined_df.groupBy("movie_id", "gender", "occupation") \
                                          .agg(f.count("user_id").alias("num_views"))

# Convert gender to numeric values (0 for female, 1 for male)
movie_gender_occupation_views = movie_gender_occupation_views.withColumn("gender_numeric", f.when(movie_gender_occupation_views["gender"] == "F", 0).otherwise(1))

# Pivot the dataframe to have separate columns for male and female views
movie_gender_occupation_views_pivot = movie_gender_occupation_views.groupBy("occupation", "movie_id") \
                                                                     .pivot("gender_numeric") \
                                                                     .agg(f.first("num_views"))

movie_gender_occupation_views_pivot = movie_gender_occupation_views_pivot.withColumnRenamed("0", "female_views").withColumnRenamed("1", "male_views")

# Calculate the overall number of views for each movie and occupation
movie_gender_occupation_views_pivot = movie_gender_occupation_views_pivot.withColumn("total_views", f.col("female_views") + f.col("male_views"))

# Join with movies_df to get movie titles
movie_gender_occupation_views_pivot = movie_gender_occupation_views_pivot.join(movies_df, on="movie_id") \
                                                                         .select("occupation", movies_df["title"], "total_views", "female_views", "male_views")

# Show top 5 most popular movies based on the number of views, segmented by gender and occupation

window_female = Window.partitionBy("occupation").orderBy(f.col("female_views").desc())
window_male = Window.partitionBy("occupation").orderBy(f.col("male_views").desc())

top_movies_views_female = movie_gender_occupation_views_pivot.withColumn("rank_female", f.rank().over(window_female)).filter(f.col("rank_female") <= 5)
print("Top 5 most popular movies based on the number of views for females, for each occupation:")
top_movies_views_female.show(truncate=False)

top_movies_views_male = movie_gender_occupation_views_pivot.withColumn("rank_male", f.rank().over(window_male)).filter(f.col("rank_male") <= 5)
print("\nTop 5 most popular movies based on the number of views for males, for each occupation:")
top_movies_views_male.show(truncate=False)

Top 5 most popular movies based on the number of views for females, for each occupation:
+-------------+----------------------------------------+-----------+------------+----------+-----------+
|occupation   |title                                   |total_views|female_views|male_views|rank_female|
+-------------+----------------------------------------+-----------+------------+----------+-----------+
|administrator|English Patient, The (1996)             |47         |21          |26        |1          |
|administrator|Star Wars (1977)                        |44         |21          |23        |1          |
|administrator|Jerry Maguire (1996)                    |38         |19          |19        |3          |
|administrator|Return of the Jedi (1983)               |40         |18          |22        |4          |
|administrator|Toy Story (1995)                        |31         |18          |13        |4          |
|artist       |Contact (1997)                          |15         |10 

## 2. Data preprocessing

To avoid any potential data leakage we will start off by spliting ratings data into train and test set

In [14]:
# Spliting ratings dataset
train_df, test_df = ratings_df.randomSplit([0.8, 0.2], seed=22)

Helper functions to avoid code duplication

In [15]:
def extract_year(df):
    '''
    Creates a new column 'release_year' from existing 'release_date'
    and fills potential missing data with median value
    '''
    # Extract the year
    df = df.withColumn("release_year", f.expr("substring(release_date, -4, 4)").cast("int"))
    
    # Filling missing release_years with median value
    median_release_year = df.approxQuantile("release_year", [0.5], 0.25)[0]
    filled = df.withColumn("release_year",
                            f.when(f.col("release_year").isNull(), f.lit(median_release_year).cast("int"))
                                   .otherwise(f.col("release_year")))

    return filled

In [16]:
def null_counts(df):
    ''' Shows number of missing values for each column '''
    count = df.select([f.sum(f.col(column).isNull().cast("int")).alias(column)
                                   for column in df.columns])

    count.show()

**Training dataset**

In [17]:
print("Number of rows: ", train_df.count())
train_df.show(5)

Number of rows:  79912
+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       1|     5|874965758|
|      1|       2|     3|876893171|
|      1|       3|     4|878542960|
|      1|       7|     4|875071561|
|      1|       8|     1|875072484|
+-------+--------+------+---------+
only showing top 5 rows



In [18]:
# Left join movies_df and users_df to ratings training set
training = train_df.join(movies_df, on='movie_id', how='left').join(users_df, on='user_id', how='left')

training.show(5)

+-------+--------+------+---------+--------------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|user_id|movie_id|rating|timestamp|               title|release_date|video_release_date|            IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|age_category|
+-------+--------+------+---------+--------------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|      1|       1|     5|874965758|    Toy Story (1995)| 01-Jan-1995|             

In [19]:
# Extract the year from the release_date column
training = extract_year(training)

training.select("title", "release_year").show(n=5, truncate=False)

+---------------------+------------+
|title                |release_year|
+---------------------+------------+
|Toy Story (1995)     |1995        |
|GoldenEye (1995)     |1995        |
|Four Rooms (1995)    |1995        |
|Twelve Monkeys (1995)|1995        |
|Babe (1995)          |1995        |
+---------------------+------------+
only showing top 5 rows



In [20]:
null_counts(training)
# Here we are only interested in release_year column

+-------+--------+------+---------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+------------+
|user_id|movie_id|rating|timestamp|title|release_date|video_release_date|IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|age_category|release_year|
+-------+--------+------+---------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+------------+
|      0|       0|     0|        0|    0|           9|             79912|      11|      0|     0|        0|        0|       

In [21]:
# Select relevant columns
train_data = training.select("age", "gender", "occupation", "release_year",
                             "unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime",
                             "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery",
                             "Romance", "Sci-Fi", "Thriller", "War", "Western", "rating")

train_data.show(5)     

+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|age|gender|occupation|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
| 24|     M|technician|        1995|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     5|
| 24|     M|technician|        1995|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       1|  0|      0|     3|
| 24|     M|technici

In [22]:
null_counts(train_data)

+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|age|gender|occupation|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|  0|     0|         0|           0|      0|     0|        0|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     0|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+



**Testing dataset**

In [23]:
print("Number of rows: ", test_df.count())
test_df.show(5)

Number of rows:  20088
+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       4|     3|876893119|
|      1|       5|     3|889751712|
|      1|       6|     5|887431973|
|      1|      12|     5|878542960|
|      1|      13|     5|875071805|
+-------+--------+------+---------+
only showing top 5 rows



In [24]:
# Left join movies_df and users_df on ratings testing set
testing = test_df.join(movies_df, on='movie_id', how='left').join(users_df, on='user_id', how='left')

testing.show(5)

+-------+--------+------+---------+--------------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|user_id|movie_id|rating|timestamp|               title|release_date|video_release_date|            IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|age_category|
+-------+--------+------+---------+--------------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+
|      1|       4|     3|876893119|   Get Shorty (1995)| 01-Jan-1995|             

In [25]:
# Extract the year from the release_date column
testing = extract_year(testing)

testing.select("title", "release_year").show(n=5, truncate=False)

+----------------------------------------------------+------------+
|title                                               |release_year|
+----------------------------------------------------+------------+
|Get Shorty (1995)                                   |1995        |
|Copycat (1995)                                      |1995        |
|Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|1995        |
|Usual Suspects, The (1995)                          |1995        |
|Mighty Aphrodite (1995)                             |1995        |
+----------------------------------------------------+------------+
only showing top 5 rows



In [26]:
null_counts(testing)

+-------+--------+------+---------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+------------+
|user_id|movie_id|rating|timestamp|title|release_date|video_release_date|IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|age|gender|occupation|zip_code|age_category|release_year|
+-------+--------+------+---------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+---+------+----------+--------+------------+------------+
|      0|       0|     0|        0|    0|           0|             20088|       2|      0|     0|        0|        0|       

In [27]:
# Select relevant columns
test_data = testing.select("age", "gender", "occupation", "release_year",
                            "unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime",
                            "Documentary","Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery",
                            "Romance", "Sci-Fi", "Thriller", "War", "Western", "rating")
test_data.show(5)

+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|age|gender|occupation|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
| 24|     M|technician|        1995|      0|     1|        0|        0|       0|     1|    0|          0|    1|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     3|
| 24|     M|technician|        1995|      0|     0|        0|        0|       0|     0|    1|          0|    1|      0|        0|     0|      0|      0|      0|     0|       1|  0|      0|     3|
| 24|     M|technici

In [28]:
null_counts(test_data)

+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|age|gender|occupation|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+
|  0|     0|         0|           0|      0|     0|        0|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     0|
+---+------+----------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+



In [29]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

**Trening dataset - OneHotEncoder, asembler**

In [30]:
def encode_gender(dataframe):
    # StringIndexer to convert string labels into indices
    gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")

    # OneHotEncoder to convert indices into one-hot encoded vectors
    gender_encoder = OneHotEncoder(inputCol="gender_index", outputCol="gender_encoded")

    pipeline = Pipeline(stages=[gender_indexer, gender_encoder])
    encoded_df = pipeline.fit(dataframe).transform(dataframe)

    #Drop categorical columns
    encoded_df = encoded_df.drop("gender", "gender_index")
    
    return encoded_df

In [31]:
def encode_occupation(dataframe):
    # StringIndexer to convert string labels into indices
    occupation_indexer = StringIndexer(inputCol="occupation", outputCol="occupation_index")

    # OneHotEncoder to convert indices into one-hot encoded vectors
    occupation_encoder = OneHotEncoder(inputCol="occupation_index", outputCol="occupation_encoded")

    pipeline = Pipeline(stages=[occupation_indexer, occupation_encoder])
    encoded_df = pipeline.fit(dataframe).transform(dataframe)
    
    #Drop categorical columns
    encoded_df = encoded_df.drop("occupation", "occupation_index")

    return encoded_df

In [32]:
training_data = encode_gender(train_data)

In [33]:
training_data = encode_occupation(training_data)

In [34]:
training_data.show()

+---+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+--------------+------------------+
|age|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|gender_encoded|occupation_encoded|
+---+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+--------------+------------------+
| 24|        1995|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     5| (1,[0],[1.0])|    (20,[8],[1.0])|
| 24|        1995|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|   

In [35]:
# VectorAssembler is used to combine list of columns into a single 'vector column'  
# The resulting 'features' column will be used as input for our models
assembler = VectorAssembler(
    inputCols=["age", "gender_encoded", "occupation_encoded", "release_year", "unknown", "Action", "Adventure",
               "Animation", "Children", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir",
               "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"],
    outputCol="features")

In [36]:
assembled_training = assembler.transform(training_data)
assembled_training.select("features").show(n=10, truncate=False)

+-------------------------------------------------------------------------+
|features                                                                 |
+-------------------------------------------------------------------------+
|(42,[0,1,10,22,26,27,28],[24.0,1.0,1.0,1995.0,1.0,1.0,1.0])              |
|(42,[0,1,10,22,24,25,39],[24.0,1.0,1.0,1995.0,1.0,1.0,1.0])              |
|(42,[0,1,10,22,39],[24.0,1.0,1.0,1995.0,1.0])                            |
|(42,[0,1,10,22,31,38],[24.0,1.0,1.0,1995.0,1.0,1.0])                     |
|(42,[0,1,10,22,27,28,31],[24.0,1.0,1.0,1995.0,1.0,1.0,1.0])              |
|(42,[0,1,10,22,31],[24.0,1.0,1.0,1995.0,1.0])                            |
|(42,[0,1,10,22,31,40],[24.0,1.0,1.0,1996.0,1.0,1.0])                     |
|(42,[0,1,10,22,29,39],[24.0,1.0,1.0,1995.0,1.0,1.0])                     |
|(42,[0,1,10,22,31,37],[24.0,1.0,1.0,1994.0,1.0,1.0])                     |
|(42,[0,1,10,22,24,28,29,34,39],[24.0,1.0,1.0,1996.0,1.0,1.0,1.0,1.0,1.0])|
+-----------

In [37]:
assembled_training.show()

+---+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+--------------+------------------+--------------------+
|age|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|rating|gender_encoded|occupation_encoded|            features|
+---+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+--------------+------------------+--------------------+
| 24|        1995|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|     5| (1,[0],[1.0])|    (20,[8],[1.0])|(42,[0,1,10,22,26...|
| 24|        1995|      0|     1|        1|        0|       0|     0

In [38]:
# Now that 'features' holds all the information from our dataset it's all we need as independent variable
final_train = assembled_training.select("features", 'rating')

**Testing dataset - OneHotEncoder, asembler**

In [39]:
testing_data = encode_gender(test_data)
testing_data = encode_occupation(testing_data)

In [40]:
testing_data.columns

['age',
 'release_year',
 'unknown',
 'Action',
 'Adventure',
 'Animation',
 'Children',
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western',
 'rating',
 'gender_encoded',
 'occupation_encoded']

In [41]:
asembled_testing = assembler.transform(testing_data)
asembled_testing.select("features").show(truncate=False)

+-------------------------------------------------------------------------+
|features                                                                 |
+-------------------------------------------------------------------------+
|(42,[0,1,11,22,24,28,31],[24.0,1.0,1.0,1995.0,1.0,1.0,1.0])              |
|(42,[0,1,11,22,29,31,39],[24.0,1.0,1.0,1995.0,1.0,1.0,1.0])              |
|(42,[0,1,11,22,31],[24.0,1.0,1.0,1995.0,1.0])                            |
|(42,[0,1,11,22,29,39],[24.0,1.0,1.0,1995.0,1.0,1.0])                     |
|(42,[0,1,11,22,28],[24.0,1.0,1.0,1995.0,1.0])                            |
|(42,[0,1,11,22,31],[24.0,1.0,1.0,1996.0,1.0])                            |
|(42,[0,1,11,22,28,37],[24.0,1.0,1.0,1995.0,1.0,1.0])                     |
|(42,[0,1,11,22,31],[24.0,1.0,1.0,1995.0,1.0])                            |
|(42,[0,1,11,22,24,29,38],[24.0,1.0,1.0,1995.0,1.0,1.0,1.0])              |
|(42,[0,1,11,22,28],[24.0,1.0,1.0,1994.0,1.0])                            |
|(42,[0,1,11

In [42]:
final_test = asembled_testing.select("features", 'rating')


____________
In this notebook we apply __*hybrid recommendation system*__ that combines:
1. content based filttering for cold-start problems (for users with little to no interaction history) and
2. collaborative filltering for users with known prefereneces
____________
*final_train* and *final_test* contain preprocessed data ready for the content-based recommendation model, while <br> *train_df* and *test_df* holds unprocessed data used for collaborative filttering  

## 3. Content-based recommendation system

**Linear Regression**

In [43]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [44]:
# Initialize the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="rating")

# Define a grid of hyperparameters to search over
paramGrid = ParamGridBuilder() \
            .addGrid(lr.maxIter, [5, 10, 50]) \
            .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
            .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
            .build()

# Define an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol=lr.getLabelCol(), predictionCol=lr.getPredictionCol())

# Initialize CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossval.fit(final_train)
predictions = cvModel.transform(final_test)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Best model's hyperparameters
best_maxIter = cvModel.bestModel._java_obj.getMaxIter()
best_regParam = cvModel.bestModel._java_obj.getRegParam()
best_elasticNetParam = cvModel.bestModel._java_obj.getElasticNetParam()

print("Best Linear Model Hyperparameters:")
print("maxIter: ", best_maxIter)
print("regParam: ", best_regParam)
print("elasticNetParam: ", best_elasticNetParam)

Root Mean Squared Error (RMSE) on test data = 1.08049
Best Linear Model Hyperparameters:
maxIter:  5
regParam:  0.001
elasticNetParam:  0.0


**Random Forest**

In [45]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [46]:
# Initialize the Random Forest model
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

# Define a grid of hyperparameters to search over
paramGrid = ParamGridBuilder() \
            .addGrid(rf.numTrees, [20, 30, 50]) \
            .addGrid(rf.maxDepth, [5, 10, 15]) \
            .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
            .build()

# Define an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol=rf.getLabelCol(), predictionCol=rf.getPredictionCol())

# Initialize CrossValidator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossval.fit(final_train)
predictions = cvModel.transform(final_test)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Best model's hyperparameters
best_numTrees = cvModel.bestModel.getNumTrees
best_maxDepth = cvModel.bestModel.getOrDefault("maxDepth")
best_minInstancesPerNode = cvModel.bestModel.getOrDefault("minInstancesPerNode")

print("Best Random Forest Model Hyperparameters:")
print("numTrees:", best_numTrees)
print("maxDepth:", best_maxDepth)
print("minInstancesPerNode:", best_minInstancesPerNode)

Root Mean Squared Error (RMSE) on test data = 1.03129
Best Random Forest Model Hyperparameters:
numTrees: 50
maxDepth: 15
minInstancesPerNode: 5


---
The best Random Forest model had a slightly better RMSE than the best Linear Regressor. <br>
However, since the improvement was minimal while it took much more time to train and make predictions, I chose Linear Regression as the final model for content-based recommender.

In [47]:
lr = LinearRegression(maxIter=best_maxIter,
                     regParam=best_regParam,
                     elasticNetParam=best_elasticNetParam,
                     featuresCol="features",
                     labelCol="rating")

lr_model = lr.fit(final_train)

**Implementing the model**

Creating dataset top 50 movies. <br>
This serves to limit the recommendation space to the 50 most watched movies, because those are interesting for evaluating preferences.

In [48]:
# Count the number of ratings for each movie
ratings_count_df = ratings_df.groupBy("movie_id").agg(f.count("rating").alias("num_ratings"))

# Join the ratings count with the movies DataFrame
popular_movies_df = movies_df.join(ratings_count_df, on="movie_id")

# Create a new DataFrame with the top 50 most popular movies based on the number of ratings
popular_movies_df = popular_movies_df.orderBy(f.desc("num_ratings")).limit(50)

popular_movies_df.show(truncate=False)

+--------+--------------------------------+------------+------------------+---------------------------------------------------------------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----------+
|movie_id|title                           |release_date|video_release_date|IMDb_URL                                                                   |unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|num_ratings|
+--------+--------------------------------+------------+------------------+---------------------------------------------------------------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----------+
|50      |Star Wars (1977)                |01-Jan-19

In [49]:
popular_movies_df.columns

['movie_id',
 'title',
 'release_date',
 'video_release_date',
 'IMDb_URL',
 'unknown',
 'Action',
 'Adventure',
 'Animation',
 'Children',
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western',
 'num_ratings']

In [50]:
# Extract the year from the title column
popular_movies_df = extract_year(popular_movies_df)

popular_movies_df.select("title", "release_year").show(n=5, truncate=False)

+-------------------------+------------+
|title                    |release_year|
+-------------------------+------------+
|Star Wars (1977)         |1977        |
|Contact (1997)           |1997        |
|Fargo (1996)             |1997        |
|Return of the Jedi (1983)|1997        |
|Liar Liar (1997)         |1997        |
+-------------------------+------------+
only showing top 5 rows



In [51]:
null_counts(popular_movies_df)

+--------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----------+------------+
|movie_id|title|release_date|video_release_date|IMDb_URL|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|num_ratings|release_year|
+--------+-----+------------+------------------+--------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+-----------+------------+
|       0|    0|           0|                50|       0|      0|     0|        0|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|          0|           0|
+--------+-----+------------+------------------+--------+---

In [52]:
top50_df = popular_movies_df.select("movie_id", "title", "release_year",
                                    "unknown", "Action", "Adventure", "Animation", "Children",
                                    "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror",
                                    "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western")

top50_df.show(5)

+--------+--------------------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movie_id|               title|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+--------+--------------------+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|      50|    Star Wars (1977)|        1977|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      1|     1|       0|  1|      0|
|     258|      Contact (1997)|        1997|      0|     0|        0|        0|       0|     0|    0|          0|    1|      0|        0|     0|      0|      0|      0|     1|       0|  0|      0|
|     100|     

---
Columns *movie_id* and *title* will not go into the model, but we are keeping them here so we can retrieve and display the movie titles when generating recommendations

In [53]:
# Encoding user gender and occupation
encoded_users = encode_gender(users_df)
encoded_users = encode_occupation(encoded_users)
encoded_users.columns

['user_id',
 'age',
 'zip_code',
 'age_category',
 'gender_encoded',
 'occupation_encoded']

**Creating predictions**

In [54]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT

In [55]:
def get_user_info(user_id):
    '''
    Retrieves the user information for a given user ID.

    Returns:
    Row: The first matching row from the DataFrame (contains 'gender_encoded', 'occupation_encoded', 'age').
    '''
    user_info = encoded_users.filter(encoded_users.user_id == user_id) \
                                    .select('gender_encoded', 'occupation_encoded', 'age') \
                                    .collect()
    return user_info[0]

In [56]:
def create_df_for_model(user_id):
    '''
    Creates a DataFrame with features for model training based on user information.

    Parameters:
    user_id (int): The unique identifier of the user for whom the recommendation model is being created.

    Returns:
    DataFrame: A new DataFrame for recommendation modeling that includes columns for gender, occupation, and age for the user.
    '''
    # Retrieve user information (gender, occupation, age) based on user_id
    gender_encoded, occupation_encoded, age = get_user_info(user_id)
    
    # Define User Defined Functions (UDFs) to convert the encoded gender and occupation to a Vector
    gender_encoded_udf = udf(lambda: gender_encoded, VectorUDT())
    occupation_encoded_udf = udf(lambda: occupation_encoded, VectorUDT())

    # Select columns from top50_df excluding 'title' and 'movie_id' for the recommendation model
    selected_columns = [col for col in top50_df.columns if col not in ["title", "movie_id"]]
    top50_movies_df = top50_df.select(selected_columns)

    # Add the user-specific information to the DataFrame: gender, occupation, and age
    recomendation_df = top50_movies_df.withColumn("gender_encoded", gender_encoded_udf())
    recomendation_df = recomendation_df.withColumn("occupation_encoded", occupation_encoded_udf())
    recomendation_df = recomendation_df.withColumn("age", f.lit(age))
    
    # Return the modified DataFrame for the recommendation model    
    return recomendation_df

In [57]:
# Function simulation
create_df_for_model(1).show()

+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+--------------+------------------+---+
|release_year|unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|gender_encoded|occupation_encoded|age|
+------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+--------------+------------------+---+
|        1977|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      1|     1|       0|  1|      0| (1,[0],[1.0])|   (20,[11],[1.0])| 24|
|        1997|      0|     0|        0|        0|       0|     0|    0|          0|    1|      0|        0|     0|      0|      0|      0|     1|       0|  0|      0| (1,[0],[1.0])

In [58]:
def get_user_ratings(user_id):
    '''
    Generates movie ratings predictions for a given user based on a recommendation model.

    Parameters:
    user_id (int): The unique identifier of the user for whom the ratings are being predicted.

    Returns:
    DataFrame: A DataFrame containing the predicted ratings for the user.
    '''
    # Create the DataFrame for the user containing movie features
    dataframe = create_df_for_model(user_id)

    # Transform the DataFrame to assemble the features into a single vector
    output = assembler.transform(dataframe)

    # Select the 'features' column from the transformed DataFrame
    unlabeled_data = output.select("features")
    
    # Use the trained recommendation model to predict ratings for the user
    predictions_for_user = lr_model.transform(unlabeled_data)

    # Return the DataFrame containing the predicted ratings
    return predictions_for_user

In [59]:
#Function simulation
get_user_ratings(1).show(truncate=False)

+--------------------------------------------------------------------------------+------------------+
|features                                                                        |prediction        |
+--------------------------------------------------------------------------------+------------------+
|(42,[0,1,13,22,24,25,37,38,40],[24.0,1.0,1.0,1977.0,1.0,1.0,1.0,1.0,1.0])       |3.916140356976399 |
|(42,[0,1,13,22,31,38],[24.0,1.0,1.0,1997.0,1.0,1.0])                            |3.6056849540271294|
|(42,[0,1,13,22,29,31,39],[24.0,1.0,1.0,1997.0,1.0,1.0,1.0])                     |3.7033074843830107|
|(42,[0,1,13,22,24,25,37,38,40],[24.0,1.0,1.0,1997.0,1.0,1.0,1.0,1.0,1.0])       |3.63433229994099  |
|(42,[0,1,13,22,28],[24.0,1.0,1.0,1997.0,1.0])                                   |3.2060523970543464|
|(42,[0,1,13,22,31,37,40],[24.0,1.0,1.0,1996.0,1.0,1.0,1.0])                     |3.7967793102602485|
|(42,[0,1,13,22,34,39],[24.0,1.0,1.0,1996.0,1.0,1.0])                            |

In [60]:
def get_user_recommendation(user_id):
    '''
    Generates movie recommendations for a given user based on predicted ratings.

    Parameters:
    user_id (int): The unique identifier of the user for whom the movie recommendations are being generated.

    Returns:
    DataFrame: A DataFrame containing the top movie recommendations for the user, sorted by predicted ratings.
    
    '''
    
    # Retrieve predicted ratings for the user
    predictions_for_user = get_user_ratings(user_id)

    # Add an index column to both predictions and movie data to facilitate joining
    df1_with_index = predictions_for_user.withColumn("row_index", f.monotonically_increasing_id())
    df2_with_index = top50_df.withColumn("row_index", f.monotonically_increasing_id())

    # Join the predictions with the top50_df to get info about movies ids and titles
    merged_df = df1_with_index.join(df2_with_index, on="row_index").drop("row_index")

    # Order the merged DataFrame by predicted ratings in descending order and select relevant columns
    merged_df = merged_df.orderBy(f.desc("prediction")) \
                         .select(f.lit(user_id).alias("user_id"),"movie_id","prediction")
    
    # Return the DataFrame containing the recommended movies with predicted ratings
    return merged_df

In [61]:
# Function simulation
get_user_recommendation(1).show()

+-------+--------+------------------+
|user_id|movie_id|        prediction|
+-------+--------+------------------+
|      1|     172| 4.165857360733671|
|      1|     127|3.9790089157570705|
|      1|      50| 3.916140356976399|
|      1|     286|3.7967793102602485|
|      1|     191| 3.743808356302406|
|      1|     318|3.7291040770617307|
|      1|     176| 3.709028598569958|
|      1|     100|3.7033074843830107|
|      1|      22| 3.683114260695721|
|      1|      56|  3.67273866082882|
|      1|     302|3.6630322904608086|
|      1|      98|3.6589089136876467|
|      1|     276|3.6396537119248933|
|      1|     181|  3.63433229994099|
|      1|       7|3.6338657597306714|
|      1|     237|3.6255633090731223|
|      1|     313|3.6077542984106543|
|      1|     258|3.6056849540271294|
|      1|     195|3.5659934030863703|
|      1|      64|3.5437976730228335|
+-------+--------+------------------+
only showing top 20 rows



## 4. Collaborative filtering recommendation system

**Alternating Least Squares**

In [62]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initialize the ALS model
als = ALS(userCol="user_id", itemCol="movie_id", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

# Define a grid of hyperparameters
paramGrid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 20, 30]) \
            .addGrid(als.regParam, [0.1, 0.01, 0.001]) \
            .addGrid(als.maxIter, [5, 10, 15]) \
            .build()

# Define an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Initialize CrossValidator
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5,
                          parallelism=2)

# Fit CrossValidator on the training data
cvModel = crossval.fit(train_df)

In [63]:
# Make predictions on test data
predictions = cvModel.transform(test_df)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Best model's hyperparameters
best_rank = cvModel.bestModel.rank
best_regParam = cvModel.bestModel._java_obj.parent().getRegParam()
best_maxIter = cvModel.bestModel._java_obj.parent().getMaxIter()

print("Best Model Hyperparameters:")
print("rank: ", best_rank)
print("regParam: ", best_regParam)
print("maxIter: ", best_maxIter)

Root Mean Squared Error (RMSE) on test data = 0.91613
Best Model Hyperparameters:
rank:  30
regParam:  0.1
maxIter:  15


In [64]:
# Build the recommendation model using ALS on the training data using best model's hyperparameters
als = ALS(rank=best_rank, regParam=best_regParam, maxIter=best_maxIter,
          userCol="user_id", itemCol="movie_id", ratingCol="rating", coldStartStrategy="drop" )

als_model = als.fit(train_df)

**Implementing the model**

This new split will give us simulation of new data (users that are new to the system)

In [65]:
# Spliting ratings dataset
train_df2, test_df2 = ratings_df.randomSplit([0.8, 0.2], seed=4)

Using train_df2, we identify users who have given fewer than 5 ratings or do not appear in train_df and create two variables: <br> *users_with_grades* (users from traind_df2 who appear in train_df and have at least 5 ratings) and <br> *users_without_grades* (users from train_df2 who either do not appear in train_df or have provided fewer than 5 ratings) <br>
That way we simulate 'new' users who are not yet known to the trained model.

In [66]:
train_df.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       1|     5|874965758|
|      1|       2|     3|876893171|
|      1|       3|     4|878542960|
|      1|       7|     4|875071561|
|      1|       8|     1|875072484|
|      1|       9|     5|878543541|
|      1|      10|     3|875693118|
|      1|      11|     2|875072262|
|      1|      14|     5|874965706|
|      1|      17|     3|875073198|
|      1|      18|     4|887432020|
|      1|      20|     4|887431883|
|      1|      21|     1|878542772|
|      1|      22|     4|875072404|
|      1|      23|     4|875072895|
|      1|      24|     3|875071713|
|      1|      25|     4|875071805|
|      1|      26|     3|875072442|
|      1|      27|     2|876892946|
|      1|      28|     4|875072173|
+-------+--------+------+---------+
only showing top 20 rows



In [67]:
train_df2.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       2|     3|876893171|
|      1|       3|     4|878542960|
|      1|       4|     3|876893119|
|      1|       5|     3|889751712|
|      1|       6|     5|887431973|
|      1|       7|     4|875071561|
|      1|       9|     5|878543541|
|      1|      10|     3|875693118|
|      1|      11|     2|875072262|
|      1|      12|     5|878542960|
|      1|      13|     5|875071805|
|      1|      14|     5|874965706|
|      1|      16|     5|878543541|
|      1|      17|     3|875073198|
|      1|      18|     4|887432020|
|      1|      19|     5|875071515|
|      1|      20|     4|887431883|
|      1|      22|     4|875072404|
|      1|      23|     4|875072895|
|      1|      24|     3|875071713|
+-------+--------+------+---------+
only showing top 20 rows



**Segmenting users in users_with_grades and users_without_grades categories**

In [68]:
# Perform a left anti join to find user IDs only in train_df2
users_only_in_df2 = train_df2.join(train_df, on=['user_id'], how='left_anti')

users_only_in_df2.show()     

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
+-------+--------+------+---------+



In [69]:
user_rating_counts = train_df2.groupBy('user_id').count()

# Filter users who have rated less than 5 movies
users_less_than_5_movies = user_rating_counts.filter(f.col('count') < 5)

users_less_than_5_movies.show()

+-------+-----+
|user_id|count|
+-------+-----+
+-------+-----+



In [70]:
user_rating_counts = train_df2.groupBy('user_id').count()

# Filter users who have rated less than 5 movies
users_less_than_5_movies = user_rating_counts.filter(f.col('count') < 5)

# Join the original DataFrame with the filtered users to get their ratings
filtered_dataframe = train_df2.join(users_less_than_5_movies, on=['user_id'], how='inner')

# Show the resulting DataFrame containing users who have rated less than 5 movies
filtered_dataframe.show()

+-------+--------+------+---------+-----+
|user_id|movie_id|rating|timestamp|count|
+-------+--------+------+---------+-----+
+-------+--------+------+---------+-----+



We see that the condition of five ratings is not enough for us to simulate a cold start, as it results in an empty set.
That is why we first need to explore which rating threshold would be more appropriate for our scenario.

In [71]:
user_ratings_count = train_df2.groupBy("user_id").agg(f.count("rating").alias("num_ratings"))

sorted_users = user_ratings_count.orderBy(f.asc("num_ratings"))

# Percentiles of interest
percentiles = [0.1, 0.20, 0.25, 0.50, 0.75, 0.90]

ratings_percentiles = sorted_users.approxQuantile("num_ratings", percentiles, relativeError=0.01)

for percentile, value in zip(percentiles, ratings_percentiles):
    print(f"Percentile {percentile * 100}%: {value}")

Percentile 10.0%: 19.0
Percentile 20.0%: 23.0
Percentile 25.0%: 26.0
Percentile 50.0%: 52.0
Percentile 75.0%: 118.0
Percentile 90.0%: 189.0


To extract around 25% of users for the model, we can filter those who have rated no more than 25 movies.

In [72]:
user_rating_counts = train_df2.groupBy('user_id').count()

# Filter users who have rated less than 25 movies
users_less_than_25_movies = user_rating_counts.filter(f.col('count') < 25)

# Join the original DataFrame with the filtered users to get their ratings
filtered_dataframe = train_df2.join(users_less_than_25_movies, on=['user_id'], how='inner')

# Show the resulting DataFrame containing users who have rated less than 25 movies
filtered_dataframe.show()

+-------+--------+------+---------+-----+
|user_id|movie_id|rating|timestamp|count|
+-------+--------+------+---------+-----+
|      4|      50|     5|892003526|   20|
|      4|     210|     3|892003374|   20|
|      4|     258|     5|892001374|   20|
|      4|     260|     4|892004275|   20|
|      4|     264|     3|892004275|   20|
|      4|     271|     4|892001690|   20|
|      4|     294|     5|892004409|   20|
|      4|     300|     5|892001445|   20|
|      4|     301|     5|892002353|   20|
|      4|     303|     5|892002352|   20|
|      4|     324|     5|892002353|   20|
|      4|     327|     5|892002352|   20|
|      4|     328|     3|892001537|   20|
|      4|     329|     5|892002352|   20|
|      4|     356|     3|892003459|   20|
|      4|     357|     4|892003525|   20|
|      4|     358|     2|892004275|   20|
|      4|     359|     5|892002352|   20|
|      4|     361|     5|892002353|   20|
|      4|     362|     5|892002352|   20|
+-------+--------+------+---------

In [73]:
# Create users_without_grades
users_without_grades = filtered_dataframe.select("user_id").distinct()

row_count = users_without_grades.count()
print("Number of rows in the DataFrame:", row_count)

Number of rows in the DataFrame: 215


For users_without_grades we will use content-based recommendation system (LR).

In [74]:
# Create users_with_grades 

all_users_df2 = train_df2.select('user_id').distinct()
with_grades_df2 = all_users_df2.join(users_without_grades, on=['user_id'], how='left_anti')

users_with_grades = with_grades_df2.select('user_id').distinct()

user_id_count = users_with_grades.count()

print("Number of unique user IDs in the DataFrame:", user_id_count)

Number of unique user IDs in the DataFrame: 728


For users_with_grades we will use collaborative recommendation system (ALS).

In [75]:
# Count the number of unique user IDs train_df2 to double-check
user_id_count = train_df2.select('user_id').distinct().count()

print("Number of unique user IDs in the DataFrame:", user_id_count)

Number of unique user IDs in the DataFrame: 943


In [76]:
# Count distinct movies that appear only in train_df2

movies_only_in_df2 = train_df2.join(train_df, on=['movie_id'], how='left_anti')

movies_only_in_df2 = movies_only_in_df2.select('movie_id').distinct()

movies_only_in_df2_count = movies_only_in_df2.count()

print("Number of unique movie IDs in the DataFrame:", movies_only_in_df2_count)

Number of unique movie IDs in the DataFrame: 18


In [77]:
# Count distinct movies that appear in both train datasets

movies_in_both_df_df2 = train_df2.join(train_df, on=['movie_id'], how='inner')

movies_in_both_df_df2 = movies_in_both_df_df2.select('movie_id').distinct()

movies_in_df_df2_count = movies_in_both_df_df2.count()

print("Number of unique movie IDs in both train DataFrames:", movies_in_df_df2_count)

Number of unique movie IDs in both train DataFrames: 1634


**Creating predictions for known users**

In [78]:
movies_only_in_df2.show()

+--------+
|movie_id|
+--------+
|    1507|
|    1533|
|    1525|
|    1675|
|    1156|
|    1505|
|    1586|
|    1663|
|    1603|
|    1649|
|    1624|
|    1657|
|    1494|
|    1358|
|    1320|
|    1548|
|    1604|
|    1080|
+--------+



In [79]:
users_with_grades.show()


+-------+
|user_id|
+-------+
|    148|
|    463|
|    496|
|    833|
|    243|
|    392|
|    540|
|    623|
|    737|
|    897|
|     31|
|     85|
|    137|
|    251|
|    451|
|    580|
|     65|
|    458|
|    883|
|     53|
+-------+
only showing top 20 rows



In [80]:
# Perform a cross join to get all combinations of user_id and movie_id
combinations_df = users_with_grades.crossJoin(movies_only_in_df2)

# Show the resulting DataFrame
combinations_df.show()

+-------+--------+
|user_id|movie_id|
+-------+--------+
|    148|    1507|
|    148|    1533|
|    148|    1525|
|    148|    1675|
|    148|    1156|
|    148|    1505|
|    148|    1586|
|    148|    1663|
|    148|    1603|
|    148|    1649|
|    148|    1624|
|    148|    1657|
|    148|    1494|
|    148|    1358|
|    148|    1320|
|    148|    1548|
|    148|    1604|
|    148|    1080|
|    463|    1507|
|    463|    1533|
+-------+--------+
only showing top 20 rows



In [81]:
combinations_filtered_df = combinations_df.join(train_df, on=['user_id','movie_id'], how='left_anti')

In [82]:
predictions = als_model.transform(combinations_filtered_df)

In [83]:
predictions.show()

+-------+--------+----------+
|user_id|movie_id|prediction|
+-------+--------+----------+
+-------+--------+----------+



In [84]:
# Sorting each user predictions in descending order
window_spec = Window.partitionBy("user_id").orderBy(f.desc("prediction"))

sorted_predictions = predictions.withColumn("rank", f.row_number().over(window_spec))

sorted_predictions.show()

+-------+--------+----------+----+
|user_id|movie_id|prediction|rank|
+-------+--------+----------+----+
+-------+--------+----------+----+



In [85]:
# Filter out only the top 3 prediction for each user
top_predictions = sorted_predictions.filter(f.col("rank") < 4)

top_predictions = top_predictions.drop("rank")

top_predictions.show()

+-------+--------+----------+
|user_id|movie_id|prediction|
+-------+--------+----------+
+-------+--------+----------+



## 5. Hybrid recommendation system

In [86]:
train_df2.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|       2|     3|876893171|
|      1|       3|     4|878542960|
|      1|       4|     3|876893119|
|      1|       5|     3|889751712|
|      1|       6|     5|887431973|
|      1|       7|     4|875071561|
|      1|       9|     5|878543541|
|      1|      10|     3|875693118|
|      1|      11|     2|875072262|
|      1|      12|     5|878542960|
|      1|      13|     5|875071805|
|      1|      14|     5|874965706|
|      1|      16|     5|878543541|
|      1|      17|     3|875073198|
|      1|      18|     4|887432020|
|      1|      19|     5|875071515|
|      1|      20|     4|887431883|
|      1|      22|     4|875072404|
|      1|      23|     4|875072895|
|      1|      24|     3|875071713|
+-------+--------+------+---------+
only showing top 20 rows



In [87]:
users_without_grades.show()

+-------+
|user_id|
+-------+
|    471|
|    858|
|    516|
|    808|
|    879|
|    799|
|    133|
|     78|
|    513|
|    362|
|    613|
|    857|
|    375|
|    876|
|    108|
|    155|
|    744|
|     34|
|    596|
|    762|
+-------+
only showing top 20 rows



In [88]:
from functools import reduce

def get_users_content_based_recommendation(dataframe):
    # Collect unique user_ids from your DataFrame
    user_ids = dataframe.select("user_id").distinct().rdd.map(lambda x: x[0]).collect()

    # Apply the function to each user_id and collect the resulting DataFrames
    # get_user_recommendation returns content based predictions
    user_dataframes = [get_user_recommendation(user_id) for user_id in user_ids]

    # Combine the DataFrames into a single DataFrame
    combined_dataframe = reduce(lambda df1, df2: df1.union(df2), user_dataframes)

    
    return combined_dataframe

In [89]:
def recommendation_systems(df2, df1, movies_with_grades):

    # Filter users who have rated less than 25 movies
    user_rating_counts = df2.groupBy('user_id').count()
    users_without_grades = user_rating_counts.filter(f.col('count') < 25).select('user_id')

    all_users = df2.select('user_id').distinct()

    # Filter users who have rated more than 25 movies
    users_with_grades = all_users.join(users_without_grades, on=['user_id'], how='left_anti')
    users_with_grades = users_with_grades.select('user_id').distinct()
    print("User segmentation based on grades done.")

    # ALS model for users with grades
    combinations_df = users_with_grades.crossJoin(movies_with_grades)
    
    combinations_filtered_df = combinations_df.join(df1, on=['user_id','movie_id'], how='left_anti')
    als_predictions = als_model.transform(combinations_filtered_df)
    window_spec = Window.partitionBy("user_id").orderBy(f.desc("prediction"))
    als_sorted_predictions = als_predictions.withColumn("rank", f.row_number().over(window_spec))
    als_sorted_predictions= als_sorted_predictions.drop("rank")
    print("Colaborative filtering done.")

    # Prediction model for users without grades
    lr_predictions = get_users_content_based_recommendation(users_without_grades)
    print("Content based done.")

    final_df = als_sorted_predictions.union(lr_predictions)

    sorted_final = final_df.orderBy(final_df.user_id.asc(), final_df.prediction.desc())

    return sorted_final

In [90]:
final = recommendation_systems(train_df2, train_df, movies_in_both_df_df2)

User segmentation based on grades done.
Colaborative filtering done.
Content based done.


In [91]:
# Grouping by user_id and counting the number of ratings for each user
ratings_count = train_df2.groupBy('user_id').agg(f.count('rating').alias('ratings_count'))

# Sorting the users based on the number of ratings they've given
ratings_count_sorted = ratings_count.orderBy('ratings_count', ascending=False)

# Displaying the result
ratings_count_sorted.show()

+-------+-------------+
|user_id|ratings_count|
+-------+-------------+
|    405|          601|
|    655|          548|
|     13|          503|
|    450|          430|
|    276|          410|
|    303|          399|
|    416|          396|
|    234|          385|
|    537|          375|
|    181|          353|
|    393|          348|
|    279|          346|
|    429|          340|
|    846|          339|
|     92|          327|
|     94|          326|
|    293|          325|
|      7|          321|
|    682|          320|
|    308|          316|
+-------+-------------+
only showing top 20 rows



**How the user will rate top 50 movies**

In [92]:
user_450_data = final.filter(final.user_id == 450)

In [93]:
user_450_data.show()

+-------+--------+------------------+
|user_id|movie_id|        prediction|
+-------+--------+------------------+
|    450|    1463| 5.038302421569824|
|    450|    1449| 4.831721305847168|
|    450|     515| 4.748378276824951|
|    450|      57|4.7307257652282715|
|    450|     357| 4.723079681396484|
|    450|     863| 4.678903102874756|
|    450|     963|4.6661601066589355|
|    450|      98| 4.660663604736328|
|    450|    1122| 4.627396106719971|
|    450|    1398|4.6055378913879395|
|    450|     408|  4.59337043762207|
|    450|    1064| 4.587924003601074|
|    450|    1524| 4.581523895263672|
|    450|    1467| 4.576347827911377|
|    450|    1643|4.5728373527526855|
|    450|     480| 4.571084022521973|
|    450|    1368| 4.562071323394775|
|    450|     659| 4.551117420196533|
|    450|     174| 4.547391891479492|
|    450|     958| 4.499762535095215|
+-------+--------+------------------+
only showing top 20 rows

