# Movie Recommendation System

## Introduction

#### We will create a movie recommendation system based on the MovieLens dataset available [here.](https://grouplens.org/datasets/movielens/) The data consists of movies ratings (on a scale of 1 to 5).

## Outline
  1. Import libraries and load the MovieLens dataset <br>
  2. Handling missing data <br>
  3. Exploring movielens data <br>
  4. Collaborative filtering with <br>
    &emsp;&emsp; I. _ALS_ (before tuning)<br>
    &emsp;&emsp; II. _ALS_ (after tuning)<br>

## Setup


#### Step 1. Create a parallel computing cluster by loading configuration settings from a JSON file.

In [None]:
import ipyparallel as ipp

cluster = ipp.Cluster.from_file("/root/.ipython/profile_default/security/cluster-.json")
rc = cluster.connect_client_sync()
rc

#### Step 2. Set up a SparkSession using PySpark and import set of built-in functions for data manipulation.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
 .appName('MovieRecommendationSystem') \
 .getOrCreate()

# spark.conf.set("spark.sql.repl.eagerEval.enabled",True) # Switch to eager evaluations for debugging

#### Step 3. Import the storage module from the google.cloud package, which provides functionality to interact with Google Cloud Storage.

In [None]:
from google.cloud import storage

gcs_client = storage.Client()
bucket = gcs_client.bucket('bucket-spark-recommender') # Retrieves a reference to a specific GCS bucket

bucket_path = "gs://bucket-spark-recommender"

## I. Define File Paths and Schema, Load DataFrames.

##### Load the MovieLens 1 million dataset

occupation_list = ({'0': 'other', 
                    '1': 'academic/educator', 
                    '2': 'artist', 
                    '3': 'clerical/admin', 
                    '4': 'college/grad student', 
                    '5': 'customer service', 
                    '6': 'doctor/health care', 
                    '7': 'executive/managerial', 
                    '8': 'farmer', 
                    '9': 'homemaker', 
                    '10': 'K-12 student', 
                    '11': 'lawyer', 
                    '12': 'programmer', 
                    '13': 'retired', 
                    '14': 'sales/marketing', 
                    '15': 'scientist', 
                    '16': 'self-employed', 
                    '17': 'technician/engineer', 
                    '18': 'tradesman/craftsman', 
                    '19': 'unemployed', 
                    '20': 'writer' })

genres_list = ["unknown","Action", "Adventure", "Animation", "Children's", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]
age_list = ({'1':  "Under 18", 
            '18':  "18-24", 
            '25':  "25-34", 
            '35':  "35-44", 
            '45':  "45-49", 
            '50':  "50-55", 
            '56':  "56+"})

movies_path_1m = f"{bucket_path}/ml-1m/movies.dat"
movies_schema_1m = "MovieID int, Title string, Genres string"

movie_df_1m = (spark.read.schema(movies_schema_1m)
            .option("sep","::")
            .format("csv")
            .load(movies_path_1m)
)

####################################################################

ratings_path_1m = f"{bucket_path}/ml-1m/ratings.dat"
ratings_schema_1m = "UserID int, MovieID int, Rating int, Timestamp long"

rating_df_1m = (spark.read.schema(ratings_schema_1m)
            .option("sep","::")
            .format("csv")
            .load(ratings_path_1m)
)


####################################################################

users_path_1m = f"{bucket_path}/ml-1m/users.dat"
users_schema_1m = "UserID int, Gender string, Age string, Occupation string, ZipCode string"

user_df_1m = (spark.read.schema(users_schema_1m)
            .option("sep","::")
            .format("csv")
            .load(users_path_1m)
)

##### Load the MovieLens 10 million dataset

movies_path_10m = f"{bucket_path}/ml-10M100K/movies.dat"
movies_schema_10m = "MovieID int, Title string, Genres string"

movie_df_10m = (spark.read.schema(movies_schema_10m)
            .option("sep","::")
            .format("csv")
            .load(movies_path_10m)
)

####################################################################

ratings_path_10m = f"{bucket_path}/ml-10M100K/ratings.dat"
ratings_schema_10m = "UserID int, MovieID int, Rating int, Timestamp long"

rating_df_10m = (spark.read.schema(ratings_schema_10m)
            .option("sep","::")
            .format("csv")
            .load(ratings_path_10m)
)

##### Load the MovieLens 20 million dataset

movies_path_20m = f"{bucket_path}/ml-20m/movies.csv"
movies_schema_20m = "MovieID int, Title string, Genres string"

movie_df_20m = (spark.read.option("header",True).schema(movies_schema_20m)
            .format("csv")
            .load(movies_path_20m)
)

####################################################################

ratings_path_20m = f"{bucket_path}/ml-20m/ratings.csv"
ratings_schema_20m = "UserID int, MovieID int, Rating int, Timestamp long"

rating_df_20m = (spark.read.option("header",True).schema(ratings_schema_20m)
            .format("csv")
            .load(ratings_path_20m)
)

##### Load the MovieLens 25 million dataset

In [None]:
movies_path_25m = f"{bucket_path}/ml-25m/movies.csv"
movies_schema_25m = "MovieID int, Title string, Genres string"

movie_df_25m = (spark.read.option("header",True).schema(movies_schema_25m)
            .format("csv")
            .load(movies_path_25m)
)

####################################################################

ratings_path_25m = f"{bucket_path}/ml-25m/ratings.csv"
ratings_schema_25m = "UserID int, MovieID int, Rating float, Timestamp long"

rating_df_25m = (spark.read.option("header",True).schema(ratings_schema_25m)
            .format("csv")
            .load(ratings_path_25m)
)

In [None]:
rating_df_25m.show(10)

In [None]:
movie_df_25m.show(10)

In [None]:
# Merge user, movie, and rating DataFrames on common columns (userID and movieID) using inner joins.

df_25m = (rating_df_25m.join(movie_df_25m, on="movieID", how="inner"))
df_25m.show(10)

In [None]:
df_25m.describe(["Rating"]).show()

## II. Handling missing value

In [None]:
# Count the number of missing values in each column of the DataFrame.
# Drop rows with null values in any column if found during analysis using na.drop()
df_25m.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_25m.columns]).show()

## III. Exploring movielens data

In [None]:
# Split the values in the 'Genres' column and map the corresponding values for occupation and age.
df2_1m = (df_25m
         .withColumn("Genres", split(col("Genres"), "[|]", -1))
         )
genres_list = ["unknown","Action", "Adventure", "Animation", "Children's", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]
# Create feature columns for each movie, incorporating information about their genres.
for genre in genres_list:
    df2_1m = df2_1m.withColumn(genre, when(array_contains(df2_1m.Genres , genre), 1).otherwise(0))

In [None]:
import matplotlib.pyplot as plt

# Grouping the DataFrame by "Rating" and counting occurrences
user_rating_record = df2_1m.groupBy("Rating").count().sort("Rating").collect()

# Extracting ratings and their corresponding counts for plotting
ratings_label = [row["Rating"] for row in user_rating_record]
ratings_counts = [row["count"] for row in user_rating_record]

# Plotting a bar chart for the rating
plt.bar(ratings_label, ratings_counts, align='center')

# Adding labels and title
plt.xlabel("Rating")
plt.ylabel("Count")
plt.title("Rating Histogram")

# Display the histogram
plt.show()

In [None]:
# Explore genre-wise average ratings and counts
(df2_1m.withColumn("Genres", explode(col("Genres")))
    .groupBy("Genres")
    .agg(avg("Rating").alias("avg_ratings"), count("Rating"))
    .withColumn("avg_ratings", round("avg_ratings", 2))
    .sort(col("avg_ratings").desc())
    .show()
)

## Recommendation system filtering

#### Similarity Measures

1. Cosine Similarity:

- Measures the cosine of the angle between two vectors.
- High cosine similarity indicates similarity; 0 means orthogonal, 1 means identical.
- Often used when direction matters more than magnitude.

2. Dot Product:

- Multiplies corresponding components of two vectors and sums the results.
- Normalized dot product is the same as cosine similarity if vectors are normalized.
- Higher dot product implies higher similarity.

3. Euclidean Distance:

- Measures the straight-line distance between two vectors in Euclidean space.
- Smaller distance indicates higher similarity.
- Squared Euclidean distance (when vectors are normalized) is similar to dot product and cosine up to a constant.

4. Pearson Coefficient

- The Pearson correlation coefficient is a measure of linear correlation between two variables. In the context of recommendation systems, it is used to quantify the similarity between users based on their item ratings.
- The coefficient ranges from -1 to 1. A value of 1 indicates a perfect positive correlation, -1 indicates a perfect negative correlation, and 0 indicates no correlation.

### IV. Collaborative filtering 

#### a. ALS _(Alternating Least Squares (ALS) matrix factorization before tuning)_

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row
import time

ratings_df = df_25m.select("UserID", "MovieID", "Rating")

(train_data, test_data) = ratings_df.randomSplit([0.8, 0.2], seed=42)

# Create ALS model
als = ALS(
    userCol="UserID",
    itemCol="MovieID",
    ratingCol="Rating",
    coldStartStrategy="drop",  # Drop any rows with NaN predictions
    nonnegative=True,  # Ensures that predictions are non-negative
    implicitPrefs=False,  # Treat ratings as explicit feedback
    maxIter=5,
    regParam=0.01
)

# record start time
start = time.time()

model = als.fit(train_data)

# record end time
end = time.time()

#Generating Predictions
predictions = model.transform(test_data)

print("The time of execution taken by als model for traning is :",
      (end-start) * 10**3, "ms")

In [None]:
# View the predictions
predictions.show(10)

In [None]:
# Calculate rmse(Root-mean-square error) and mae (mean absolute error)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse) + "\n")

evaluator2 = RegressionEvaluator(metricName="mae", labelCol="Rating",predictionCol="prediction")
mae = evaluator2.evaluate(predictions)
print("Mean Absolute Error = " + str(mae) + "\n")

In [None]:
# Generate top 5 movie recommendations for a specific user (e.g., UserID 1)
userID = 1
userSpecificRecs = model.recommendForUserSubset(spark.createDataFrame([Row(UserID=userID)]), 5)
print("Top 5 user recommendations for a user 1")
# Show the recommendations for the specific user
userSpecificRecs_movies = [ row[0][0] for row in userSpecificRecs.select(explode("recommendations")).collect()]
userSpecificRecs_movies_df = spark.createDataFrame([Row(MovieID=m,Order=i ) for i, m in enumerate(userSpecificRecs_movies)])
userSpecificRecs_movies_df.join(df2_1m.select("MovieID", "Title", "Genres"), "MovieID", "left").distinct().sort("Order").show()

# Generate top 5 user recommendations for a specific movie (e.g., MovieID 101)
movieID = 101
movieSpecificRecs = model.recommendForItemSubset(spark.createDataFrame([Row(MovieID=movieID)]), 5)
print("Top 5 user recommendations for a specific movie 101")
# Show the recommendations for the specific movie
movieSpecificRecs_movies = [ row[0][0] for row in movieSpecificRecs.select(explode("recommendations")).collect()]
movieSpecificRecs_movies_df = spark.createDataFrame([Row(MovieID=m,Order=i ) for i, m in enumerate(movieSpecificRecs_movies)])
movieSpecificRecs_movies_df.join(df2_1m.select("MovieID", "Title", "Genres"), "MovieID", "left").distinct().sort("Order").show()

#### b. ALS _(after tuning)_

In [None]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = (ParamGridBuilder()
            .addGrid(als.rank, [10, 50, 100, 150])
            .addGrid(als.regParam, [.01, .05, .1, 0.15])
            .build())
            # .addGrid(als.maxIter, [5, 50, 100, 200]) # number of iteration to perform on model training
      
    
print("Num models to be tested: ", len(param_grid))
evaluator3 = RegressionEvaluator(metricName="rmse", labelCol="Rating",predictionCol="prediction")
evaluator4 = RegressionEvaluator(metricName="mae", labelCol="Rating",predictionCol="prediction")

#### Build Cross Validation Pipeline

In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator3, numFolds=5)

# Confirm cv was built
print(cv)

In [None]:
#Fit cross validator to the 'train' dataset
start1 = time.time()

model = cv.fit(train_data)

end1 = time.time()

#Extract best model from the cv model above
best_model = model.bestModel
# print("The time of execution taken by als model for traning is :", (end1-start1) * 10**3, "ms")

In [None]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("*************Best Model Parameters*************")

# # Print "Rank"
print("Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("RegParam:", best_model._java_obj.parent().getRegParam())

In [None]:
# View the predictions
test_predictions = best_model.transform(test_data)

In [None]:
rmse = evaluator3.evaluate(test_predictions)
print("Root-mean-square error = " + str(rmse) + "\n")


mae = evaluator4.evaluate(test_predictions)
print("Mean Absolute Error = " + str(mae) + "\n")

#### Make Recommendations

In [None]:
# Generate top 5 movie recommendations for a specific user (e.g., UserID 1)
userID = 1
userSpecificRecs = best_model.recommendForUserSubset(spark.createDataFrame([Row(UserID=userID)]), 5)
print("Top 5 user recommendations for a user 1")
# Show the recommendations for the specific user
userSpecificRecs_movies = [ row[0][0] for row in userSpecificRecs.select(explode("recommendations")).collect()]
userSpecificRecs_movies_df = spark.createDataFrame([Row(MovieID=m,Order=i ) for i, m in enumerate(userSpecificRecs_movies)])
userSpecificRecs_movies_df.join(df2_1m.select("MovieID", "Title", "Genres"), "MovieID", "left").distinct().sort("Order").show()

# Generate top 5 user recommendations for a specific movie (e.g., MovieID 101)
movieID = 101
movieSpecificRecs = best_model.recommendForItemSubset(spark.createDataFrame([Row(MovieID=movieID)]), 5)
print("Top 5 user recommendations for a specific movie 101")
# Show the recommendations for the specific movie
movieSpecificRecs_movies = [ row[0][0] for row in movieSpecificRecs.select(explode("recommendations")).collect()]
movieSpecificRecs_movies_df = spark.createDataFrame([Row(MovieID=m,Order=i ) for i, m in enumerate(movieSpecificRecs_movies)])
movieSpecificRecs_movies_df.join(df2_1m.select("MovieID", "Title", "Genres"), "MovieID", "left").distinct().sort("Order").show()

In [None]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show()

### User-Based Filtering with KNN and Cosine Similarity

In [None]:
!pip install scikit-surprise
!pip install tabulate
!pip install numpy
!pip install scikit-learn

In [None]:
from collections import defaultdict
from surprise import SVD
from surprise import accuracy
from surprise import KNNBasic
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from tabulate import tabulate
from surprise import Dataset, SVD
from surprise.model_selection import GridSearchCV
from surprise import AlgoBase, KNNBasic
from surprise.prediction_algorithms.knns import SymmetricAlgo
from surprise import Reader

class KNNBasicPre(KNNBasic):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def test(self, testset, verbose=False):
        predictions = [self.predict(uid,
                                    iid,
                                    r_ui_trans,
                                    verbose=verbose)
                        for (uid, iid, r_ui_trans) in testset ]
        return predictions
    

algo_args=[]
algo_kwargs = dict(k=50, sim_options={'name': 'pearson', 'user_based': True, 'verbose' : True})
knn_algo = KNNBasicPre(*algo_args, **algo_kwargs)

knn_algo.fit(trainset)
predictions = knn_algo.test(testset)
accuracy.rmse(predictions)

knn_algo_for_hitrate = algo_class(*algo_args, **algo_kwargs)

# Get hitrate results
knn_algo_for_hitrate.fit(train_loocv)
left_out_predictions = knn_algo_for_hitrate.test(test_loocv)
loocv_anti_testset = train_loocv.build_anti_testset()
all_predictions = knn_algo_for_hitrate.test(loocv_anti_testset)

# Get top N
topN = defaultdict(list)
minimumRating = 4.0
n = 10
for userID, movieID, actualRating, estimatedRating, _ in all_predictions:
    if (estimatedRating >= minimumRating):
        topN[userID].append((movieID, estimatedRating))

for userID, ratings in topN.items():
    ratings.sort(key=lambda x: x[1], reverse=True)
    topN[userID] = ratings[:n]

top_n_predicted = topN

# Calculate hitrate
hits = 0
total = 0

# For each left-out rating
for leftOut in left_out_predictions:
    userID = leftOut[0]
    leftOutMovieID = leftOut[1]
    # Is it in the predicted top 10 for this user?
    hit = False
    for movieID, predictedRating in top_n_predicted[userID]:
        if leftOutMovieID == movieID:
            hit = True
            break
    if (hit) :
        hits += 1

    total += 1
hitrate = hits/total

# Compute overall precision
print(f'HitRate: {hitrate}')
# Return all_predictions
if calc_most_similar:
    if hasattr(knn_algo_for_hitrate, 'qi'):
        sims = knn_algo_for_hitrate.qi
    else:
        sims = knn_algo_for_hitrate.sim

    # get_most_similar_movies
    inner_movie_id = train_loocv.to_inner_iid(target_movie_id)
    sims = cosine_similarity(sims, sims)
    target_movie_sims_sorted = [train_loocv.to_raw_iid(x) for x in np.argsort(sims[inner_movie_id])[::-1]]
    most_similar_movies = movies_df.loc[target_movie_sims_sorted].iloc[:top_k]
    print(f'Most similar movies to {movies_df.loc[target_movie_id].movie_name}:')
    print(tabulate(most_similar_movies.head(top_k)[['movie_name', 'genre']], headers='keys'))

# filter prediction for user
top_k=10
top_preds = sorted([pred for pred in all_predictions if pred.uid == target_user_id], key=lambda pred: pred.est, reverse=True)[:top_k]
movie_ids = [pred.iid for pred in top_preds]
relevant_movies = movies_df.loc[movie_ids]
relevant_movies['rating'] = [pred.est for pred in top_preds]

print(f'Top predictions for user {target_user_id}:')
print(tabulate(relevant_movies.head(top_k)[['movie_name', 'genre']], headers='keys'))