## Movie Recommendation Model using Alternating Least Squares (ALS) ML algorithm

#### Requirements to run this notebook:
- Spark environment / Spark DataFrame API
- MovieLens dataset


In [1]:
# check Spark environment

import findspark
findspark.init()
findspark.find()

'C:\\Spark\\spark-3.1.2-bin-hadoop3.2'

In [2]:
# config Spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("movie-recommendation").getOrCreate()
spark

#### Process Overview
- Step 1: Load MovieLens dataset and perform exploratory data analysis
- Step 2: Split movie ratings data into training and testing datasets
- Step 3: Build ALS machine learning model to predict users movie ratings
- Step 4: Predict movie ratings for all users using the ALS model 
- Step 5: Check performance metrics of the model
- Step 6: Generate movie recommendations for existing users
- Step 7: Generate movie recommendations for a new user

### Step 1: Load MovieLens dataset and perform exploratory data analysis

In [3]:
movies = spark.read.csv("data_folder/ml-latest-small_movies.csv", header = True, inferSchema=True)
ratings = spark.read.csv("data_folder/ml-latest-small_ratings.csv", header = True, inferSchema=True)
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [4]:
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [5]:
ratings = ratings.drop("timestamp")
ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
+------+-------+------+
only showing top 5 rows



In [6]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [7]:
ratings.describe().show()

+-------+------------------+------------------+------------------+
|summary|            userId|           movieId|            rating|
+-------+------------------+------------------+------------------+
|  count|            100004|            100004|            100004|
|   mean| 347.0113095476181|12548.664363425463| 3.543608255669773|
| stddev|195.16383797819535|26369.198968815268|1.0580641091070326|
|    min|                 1|                 1|               0.5|
|    max|               671|            163949|               5.0|
+-------+------------------+------------------+------------------+



### Step 2: Split movie ratings data into training and testing datasets

In [8]:
train_df, test_df = ratings.randomSplit([0.8,0.2], seed = 1234)

### Step 3: Build ALS machine learning model to predict users movie ratings

In [9]:
# Load packages

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [10]:
# Create ALS model

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

In [11]:
# Create a pipleline

pipeline = Pipeline(stages=[als])

In [12]:
# Create a param_grid of hyperparameters for tuning the model

param_grid = ParamGridBuilder()\
                    .addGrid(als.rank, [20, 50])\
                    .addGrid(als.maxIter, [20])\
                    .addGrid(als.regParam, [0.1])\
                    .build()

In [13]:
# Set model evaluation metirc (evaluator) as RMSE

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [14]:
# Build the model with cross validation setup

model_cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid,
                          evaluator=evaluator, numFolds=3)

In [15]:
# Fit the model with train_df

model = model_cv.fit(train_df)

In [16]:
# Check best model hyperparameters

bestModel = model.bestModel.stages[0]

bestModel_rank = bestModel.rank
bestModel_MaxIter = bestModel._java_obj.parent().getMaxIter()
bestModel_RegParam = bestModel._java_obj.parent().getRegParam()

print("Best Model Hyperparameters:")
print(f" rank = {bestModel_rank}")
print(f" MaxIter = {bestModel_MaxIter}")
print(f" RegParam = {bestModel_RegParam}")

Best Model Hyperparameters:
 rank = 50
 MaxIter = 20
 RegParam = 0.1


### Step 4: Predict movie ratings for all users using the ALS model

In [17]:
# Generate predictions for all user ratings using the best ALS model

pred_df = bestModel.transform(test_df)

pred_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   232|    463|   4.0| 3.4205956|
|    85|    471|   3.0| 2.8754594|
|   452|    471|   3.0| 3.3117192|
|   309|    471|   4.0|  4.235129|
|   358|    471|   5.0| 3.8504686|
+------+-------+------+----------+
only showing top 5 rows



In [18]:
# Sort pred_df by "userId" and "rating" in descending order

pred_df.sort("userId","rating", ascending=False).limit(5).show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   671|   2804|   5.0|  3.865137|
|   671|   1035|   5.0| 4.0637326|
|   671|   3114|   5.0| 4.1227484|
|   671|   2291|   5.0| 3.7513726|
|   671|   4886|   5.0| 4.0184727|
+------+-------+------+----------+



### Step 5: Check performance metrics of the model

In [19]:
# Check RMSE evaluation metric for the model predictions

rmse = evaluator.evaluate(pred_df)
print(f"RMSE = {rmse:.4f}")

RMSE = 0.9011


### Step 6: Generate movie recommendations for existing users

In [20]:
# Generate top 10 movie recommendations for all users

recommendations_all_10 = bestModel.recommendForAllUsers(10)
recommendations_all_10.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{54328, 4.633193...|
|   463|[{83411, 4.550748...|
|   496|[{108583, 5.09886...|
|   148|[{83411, 5.209703...|
|   540|[{3462, 5.0655766...|
+------+--------------------+
only showing top 5 rows



In [21]:
# Check top 10 movie recommendations for userID = 1

recommendations_all_10.filter(recommendations_all_10.userId == 1).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{2563, 3.6745405...|
+------+--------------------+



In [22]:
# Unpack the list of top 10 movieIds from recommendations column using <explode> function

recommendations_user1 = recommendations_all_10.filter(recommendations_all_10.userId == 1)

recommendations_user1.registerTempTable("ALS_recommendations_user1") 
 
recommendations_user1_exploded = spark.sql("SELECT userId, recommendation.movieId AS movieId,\
                               recommendation.rating AS pred_rating \
                               FROM ALS_recommendations_user1 \
                               LATERAL VIEW explode(recommendations) exploded_table \
                               AS recommendation")

recommendations_user1_exploded.show() 

+------+-------+-----------+
|userId|movieId|pred_rating|
+------+-------+-----------+
|     1|   2563|  3.6745405|
|     1|   5114|  3.4208624|
|     1|   2267|  3.4150085|
|     1|    290|  3.4125144|
|     1|    390|  3.3266888|
|     1|   4226|  3.2958186|
|     1| 116797|  3.2381463|
|     1|   2304|  3.2291195|
|     1|    735|  3.2255266|
|     1|   3019|  3.2117074|
+------+-------+-----------+



In [23]:
# Add movie info (movie title and genres) to top 10 movie recommendations data (with movieIds and ratings)

recommendations_user1_exploded.join(movies, ["movieId"], "left").show()

+-------+------+-----------+--------------------+------------------+
|movieId|userId|pred_rating|               title|            genres|
+-------+------+-----------+--------------------+------------------+
|   2563|     1|  3.6745405|Dangerous Beauty ...|             Drama|
|   5114|     1|  3.4208624|Bad and the Beaut...|             Drama|
|   2267|     1|  3.4150085|Mortal Thoughts (...|  Mystery|Thriller|
|    290|     1|  3.4125144|Once Were Warrior...|       Crime|Drama|
|    390|     1|  3.3266888|Faster Pussycat! ...|Action|Crime|Drama|
|   4226|     1|  3.2958186|      Memento (2000)|  Mystery|Thriller|
| 116797|     1|  3.2381463|The Imitation Gam...|Drama|Thriller|War|
|   2304|     1|  3.2291195|Love Is the Devil...|             Drama|
|    735|     1|  3.2255266|Cemetery Man (Del...|            Horror|
|   3019|     1|  3.2117074|Drugstore Cowboy ...|       Crime|Drama|
+-------+------+-----------+--------------------+------------------+



### Step 7: Generate movie recommendations for a new user

#### Step 7.1: Add new user data to the existing ratings data

In [24]:
# Let's assume a new user's favorite movies are the following:
# Toy Story (1995), Lion King, The (1994), Shrek (2001), Finding Nemo (2003) 
# which can be converted as a list of movieIds as below:
new_user_movieIds = [1, 364, 4306, 6377]

# set new userId as the highest userId value among the existing users + 1
new_userId = ratings.agg({"userId": "max"}).first()[0] + 1
rating_max = 5.0

# Create a list of new user's movie rating data
new_user_ratings = [(new_userId, movieId, rating_max) for movieId in new_user_movieIds]

# ratings.columns = ['userId', 'movieId', 'rating']

# Create a new dataframe that contains new user's ratings
new_user_df = spark.createDataFrame(new_user_ratings, ratings.columns)
new_user_df.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   672|      1|   5.0|
|   672|    364|   5.0|
|   672|   4306|   5.0|
|   672|   6377|   5.0|
+------+-------+------+



In [25]:
# Add new_user_df to the existing ratings data

ratings_all_new = ratings.union(new_user_df)

ratings_all_new.filter(ratings_all_new.userId == new_userId).show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   672|      1|   5.0|
|   672|    364|   5.0|
|   672|   4306|   5.0|
|   672|   6377|   5.0|
+------+-------+------+



#### Step 7.2: Split movie ratings data into training and testing datasets

In [26]:
train_df, test_df = ratings_all_new.randomSplit([0.8,0.2], seed = 1234)

#### Step 7.3: Build ALS machine learning model to predict users movie ratings

In [27]:
## Rebuild ALS model with updated ratings datasets that include new user's ratings

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

pipeline = Pipeline(stages=[als])

# param_grid contains only the bestModel hyperparameters that were obtained above
param_grid = ParamGridBuilder()\
                    .addGrid(als.rank, [bestModel_rank])\
                    .addGrid(als.maxIter, [bestModel_MaxIter])\
                    .addGrid(als.regParam, [bestModel_RegParam])\
                    .build()

# Set model evaluation metirc (evaluator) as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build the model with cross validation setup
model_cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid,
                          evaluator=evaluator, numFolds=3)

# Fit the model with train_df
model = model_cv.fit(train_df)

#### Step 7.4: Predict movie ratings for all users using the ALS model

In [28]:
# Generate predictions for all user ratings using the new ALS model

pred_df = model.transform(test_df)

pred_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   232|    463|   4.0| 3.4215581|
|    85|    471|   3.0| 2.8765335|
|   452|    471|   3.0| 3.3119018|
|   309|    471|   4.0| 4.2360477|
|   358|    471|   5.0| 3.8490489|
+------+-------+------+----------+
only showing top 5 rows



In [29]:
# Sort pred_df by "userId" and "rating" in descending order

pred_df.sort("userId","rating", ascending=False).limit(5).show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   672|    364|   5.0| 4.3420334|
|   671|   4886|   5.0|  4.016987|
|   671|   2804|   5.0| 3.8661559|
|   671|   3114|   5.0| 4.1227794|
|   671|   2291|   5.0| 3.7502224|
+------+-------+------+----------+



In [30]:
# Check best model hyperparameters, which should remain the same as given above

bestModel = model.bestModel.stages[0]

bestModel_rank = bestModel.rank
bestModel_MaxIter = bestModel._java_obj.parent().getMaxIter()
bestModel_RegParam = bestModel._java_obj.parent().getRegParam()

print("Best Model Hyperparameters:")
print(f" rank = {bestModel_rank}")
print(f" MaxIter = {bestModel_MaxIter}")
print(f" RegParam = {bestModel_RegParam}")

Best Model Hyperparameters:
 rank = 50
 MaxIter = 20
 RegParam = 0.1


#### Step 7.5: Check performance metrics of the model

In [31]:
# Check RMSE evaluation metric for the model predictions

rmse = evaluator.evaluate(pred_df)
print(f"RMSE = {rmse:.4f}")

RMSE = 0.9011


#### Step 7.6: Generate movie recommendations for existing users

In [32]:
# Generate top 10 movie recommendations for all users

recommendations_all_10 = bestModel.recommendForAllUsers(10)
recommendations_all_10.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{54328, 4.632806...|
|   463|[{83411, 4.550862...|
|   496|[{108583, 5.09745...|
|   148|[{83411, 5.210514...|
|   540|[{3462, 5.074485}...|
+------+--------------------+
only showing top 5 rows



#### Step 7.7: Retrieve movie recommendations for the new user

In [33]:
# Unpack the list of top 10 movieIds from recommendations column using <explode> function

recommendations_new_user = recommendations_all_10.filter(recommendations_all_10.userId == new_userId)

recommendations_new_user.registerTempTable("ALS_recommendations_new_user") 
 
recommendations_new_user_exploded = spark.sql("SELECT userId, recommendation.movieId AS movieId,\
                               recommendation.rating AS pred_rating \
                               FROM ALS_recommendations_new_user \
                               LATERAL VIEW explode(recommendations) exploded_table \
                               AS recommendation")

recommendations_new_user_exploded.show() 

+------+-------+-----------+
|userId|movieId|pred_rating|
+------+-------+-----------+
|   672|  83411|  5.3297186|
|   672|  67504|  5.3297186|
|   672|  83318|  5.3297186|
|   672|   4306|  4.9407563|
|   672|   3462|  4.9163284|
|   672|      1|  4.8858476|
|   672|   6377|  4.8782406|
|   672|  54328|   4.877879|
|   672|  59684|   4.865235|
|   672|    527|  4.8487167|
+------+-------+-----------+



In [34]:
# Add movie info (movie title and genres) to top 10 movie recommendations data (with movieIds and ratings)

recommendations_new_user_exploded.join(movies, ["movieId"], "left").show()

+-------+------+-----------+--------------------+--------------------+
|movieId|userId|pred_rating|               title|              genres|
+-------+------+-----------+--------------------+--------------------+
|  83411|   672|  5.3297186|         Cops (1922)|              Comedy|
|  67504|   672|  5.3297186|Land of Silence a...|         Documentary|
|  83318|   672|  5.3297186|    Goat, The (1921)|              Comedy|
|   4306|   672|  4.9407563|        Shrek (2001)|Adventure|Animati...|
|   3462|   672|  4.9163284| Modern Times (1936)|Comedy|Drama|Romance|
|      1|   672|  4.8858476|    Toy Story (1995)|Adventure|Animati...|
|   6377|   672|  4.8782406| Finding Nemo (2003)|Adventure|Animati...|
|  54328|   672|   4.877879|My Best Friend (M...|              Comedy|
|  59684|   672|   4.865235| Lake of Fire (2006)|         Documentary|
|    527|   672|  4.8487167|Schindler's List ...|           Drama|War|
+-------+------+-----------+--------------------+--------------------+

