# Movie Recommendation based on Alternating Least Squares (ALS) with Apache Spark

In [1]:
#Mounting the drive
import os 
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 65 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=49be15a9db37596b6d6c7ccef39e47e02c570edd4f351d3a2258435c6a213348
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [19]:
#Import the libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max, explode
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [4]:
#creating pyspark session
spark = SparkSession.builder.appName("lastfm").getOrCreate()

##Loading the dataset

In [5]:
#ratings file

file_path = '/content/drive/MyDrive/Rec_Project/user_ratedmovies-timestamps.dat'
ratings_df = spark.read.csv(file_path, sep='\t',header=True,inferSchema = True)
ratings_df.show()

+------+-------+------+-------------+
|userID|movieID|rating|    timestamp|
+------+-------+------+-------------+
|    75|      3|   1.0|1162160236000|
|    75|     32|   4.5|1162160624000|
|    75|    110|   4.0|1162161008000|
|    75|    160|   2.0|1162160212000|
|    75|    163|   4.0|1162160970000|
|    75|    165|   4.5|1162160715000|
|    75|    173|   3.5|1162160257000|
|    75|    296|   5.0|1162160689000|
|    75|    353|   3.5|1162160220000|
|    75|    420|   2.0|1162160202000|
|    75|    589|   4.0|1162160901000|
|    75|    653|   3.0|1162160225000|
|    75|    832|   4.5|1162160269000|
|    75|    920|   0.5|1162160228000|
|    75|    996|   4.5|1162160777000|
|    75|   1036|   4.0|1162160685000|
|    75|   1127|   3.5|1162160932000|
|    75|   1215|   4.5|1162160936000|
|    75|   1233|   4.0|1162161005000|
|    75|   1304|   2.5|1162160216000|
+------+-------+------+-------------+
only showing top 20 rows



In [6]:
#movies file

file_path = '/content/drive/MyDrive/Rec_Project/movies.dat'
movies_df = spark.read.csv(file_path, sep='\t',header=True,inferSchema = True)
movies_df.show()

+---+--------------------+------+--------------------+--------------------+----+--------------------+------------------+----------------------+--------------------+---------------------+-----------------+------------------+----------------------+--------------------+---------------------+-----------------+----------------+--------------------+---------------+--------------------+
| id|               title|imdbID|        spanishTitle|      imdbPictureURL|year|                rtID|rtAllCriticsRating|rtAllCriticsNumReviews|rtAllCriticsNumFresh|rtAllCriticsNumRotten|rtAllCriticsScore|rtTopCriticsRating|rtTopCriticsNumReviews|rtTopCriticsNumFresh|rtTopCriticsNumRotten|rtTopCriticsScore|rtAudienceRating|rtAudienceNumRatings|rtAudienceScore|        rtPictureURL|
+---+--------------------+------+--------------------+--------------------+----+--------------------+------------------+----------------------+--------------------+---------------------+-----------------+------------------+-----------

## Basic Recommendations

One basic to recommend movies is to always recommend the movies with the highest average rating. We will filter our movies with high ratings but greater than or equal to 500 reviews because movies with few reviews may not have broad appeal to everyone.

In [7]:
from pyspark.sql import functions as F

# From ratings_df, create a movie_ids_with_avg_ratings_df that combines the two DataFrames
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieID').agg(F.count(ratings_df.rating).alias("count"), F.avg(ratings_df.rating).alias("average"))
print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(3, truncate=False)


#joining with movie df and gettting the relevant columns
movie_names_df = movie_ids_with_avg_ratings_df.join(movies_df, movie_ids_with_avg_ratings_df.movieID == movies_df.id)
movie_names_with_avg_ratings_df = (movie_names_df.select(movie_names_df["average"], movie_names_df["title"], movie_names_df["count"], movie_names_df["movieId"])).orderBy(["average"], ascending = 0)
print('movie_names_with_avg_ratings_df:')
movie_names_with_avg_ratings_df.show(3, truncate=False)

movie_ids_with_avg_ratings_df:
+-------+-----+------------------+
|movieID|count|average           |
+-------+-----+------------------+
|1238   |73   |3.8972602739726026|
|3175   |566  |3.57773851590106  |
|6620   |304  |3.794407894736842 |
+-------+-----+------------------+
only showing top 3 rows

movie_names_with_avg_ratings_df:
+-------+------------------------------------------------+-----+-------+
|average|title                                           |count|movieId|
+-------+------------------------------------------------+-----+-------+
|5.0    |Saikaku ichidai onna                            |1    |25975  |
|5.0    |Brother Minister: The Assassination of Malcolm X|1    |404    |
|5.0    |Gabbeh                                          |1    |1575   |
+-------+------------------------------------------------+-----+-------+
only showing top 3 rows



In [8]:
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter("count >= 500")

print('Movies with highest ratings:')
movies_with_500_ratings_or_more.show(20, truncate=False)

Movies with highest ratings:
+------------------+--------------------------------------------------------------------+-----+-------+
|average           |title                                                               |count|movieId|
+------------------+--------------------------------------------------------------------+-----+-------+
|4.365371269951423 |The Shawshank Redemption                                            |1441 |318    |
|4.3350970017636685|The Godfather                                                       |1134 |858    |
|4.29064039408867  |The Usual Suspects                                                  |1218 |50     |
|4.25278940027894  |Fight Club                                                          |1434 |2959   |
|4.238451528952505 |Pulp Fiction                                                        |1537 |296    |
|4.228211009174312 |The Godfather: Part II                                              |872  |1221   |
|4.228174603174603 |Casablanca     

##Latent Factor based model using ALS

In [9]:
# 60% for training, 20% for validation, and 20% for testing
seed = 1
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([6.0, 2.0, 2.0], seed)

# Cache datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print(f'Training: {training_df.count()}, validation: {validation_df.count()}, test: {test_df.count()}\n')
training_df.show(3)
validation_df.show(3)
test_df.show(3)

Training: 513410, validation: 170855, test: 171333

+------+-------+------+-------------+
|userID|movieID|rating|    timestamp|
+------+-------+------+-------------+
|    75|     32|   4.5|1162160624000|
|    75|    110|   4.0|1162161008000|
|    75|    160|   2.0|1162160212000|
+------+-------+------+-------------+
only showing top 3 rows

+------+-------+------+-------------+
|userID|movieID|rating|    timestamp|
+------+-------+------+-------------+
|    75|      3|   1.0|1162160236000|
|    75|    165|   4.5|1162160715000|
|    75|    420|   2.0|1162160202000|
+------+-------+------+-------------+
only showing top 3 rows

+------+-------+------+-------------+
|userID|movieID|rating|    timestamp|
+------+-------+------+-------------+
|    75|    163|   4.0|1162160970000|
|    75|    920|   0.5|1162160228000|
|    75|    996|   4.5|1162160777000|
+------+-------+------+-------------+
only showing top 3 rows



In [10]:
# This step is broken in ML Pipelines: https://issues.apache.org/jira/browse/SPARK-14489
from pyspark.ml.recommendation import ALS

# Initialize learner
als = ALS()

# Now we set the parameters for the method
(als.setMaxIter(5)
   .setSeed(seed)
   .setRegParam(0.1)
   .setUserCol("userID")
   .setItemCol("movieID")
   .setRatingCol("rating"))

#compute an evaluation metric for test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12,16,20]
errors = [0, 0, 0, 0, 0]
models = [0, 0, 0, 0, 0]
i = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
  als.setRank(rank)

  # Create the model and predict using the validation data
  model = als.fit(training_df)
  predict_df = model.transform(validation_df)

  # Remove NaN values from prediction (due to SPARK-14489)
  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

  # Run the previously created RMSE evaluator on the predicted_ratings_df
  error = reg_eval.evaluate(predicted_ratings_df)
  errors[i] = error
  models[i] = model
  print(f'For rank {rank} the RMSE is {error}')
  if error < min_error:
    min_error = error
    best_rank = i
  i += 1

als.setRank(ranks[best_rank])
print(f'The best model was trained with rank {ranks[best_rank]}')
my_model = models[best_rank]

For rank 4 the RMSE is 0.7908398514339215
For rank 8 the RMSE is 0.7898455173670315
For rank 12 the RMSE is 0.7874978049069674
For rank 16 the RMSE is 0.7848743388005446
For rank 20 the RMSE is 0.7862274970395939
The best model was trained with rank 16


In [11]:
predict_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator on the predicted_test_df
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model has a RMSE on the test set of {0}'.format(test_RMSE))

The model has a RMSE on the test set of 0.7859832908292662


##Recommended movies for a given user

In [42]:
recs = model.recommendForAllUsers(10)

In [43]:
#displays the userid and the recommedations
recs.show()

+------+--------------------+
|userID|     recommendations|
+------+--------------------+
|    78|[{61742, 5.935991...|
|   127|[{3881, 6.1496406...|
|   175|[{61742, 5.970932...|
|   190|[{27783, 4.367045...|
|   325|[{60983, 5.158107...|
|   383|[{25898, 5.251923...|
|   476|[{61742, 5.529724...|
|   493|[{61742, 4.887939...|
|   498|[{61742, 5.268247...|
|   545|[{61742, 5.443331...|
|   548|[{25975, 5.842819...|
|   580|[{3881, 5.074946}...|
|   636|[{60983, 4.498276...|
|   732|[{61742, 4.151705...|
|   937|[{60983, 5.107560...|
|  1017|[{27783, 4.585057...|
|  1035|[{3881, 5.0476766...|
|  1047|[{61742, 5.410261...|
|  1118|[{60983, 5.428158...|
|  1122|[{60983, 5.743911...|
+------+--------------------+
only showing top 20 rows



In [44]:
#getting recommendations in better format
recs = recs.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))
recs.show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|    78|  61742| 5.935991|
|    78|  60983| 5.801132|
|    78|  42783|  5.57746|
|    78|  25975| 5.504363|
|    78|  32657|5.4191976|
|    78|   1164| 5.406566|
|    78|   4777|5.1945515|
|    78|   3456|5.1890903|
|    78|   8264| 5.132485|
|    78|   7699|5.1300178|
|   127|   3881|6.1496406|
|   127|   2573| 5.734455|
|   127|   6511| 5.704766|
|   127|   5472|5.6772757|
|   127|   8731|5.6756654|
|   127|   3849|  5.61863|
|   127|  43744| 5.601955|
|   127|   4433|5.4815707|
|   127|  40969|5.3809466|
|   127|   3830| 5.342137|
+------+-------+---------+
only showing top 20 rows



In [45]:
#top 10 recommendations for a user
top_10_recs = recs.join(movies_df, recs.movieId == movies_df.id).select('userId','title','rating')
top_10_recs.filter('userId == 78').select('userId','title').show(truncate = False)

+------+--------------------------------+
|userId|title                           |
+------+--------------------------------+
|78    |Maradona by Kusturica           |
|78    |Eve and the Fire Horse          |
|78    |Tini zabutykh predkiv           |
|78    |Saikaku ichidai onna            |
|78    |L'homme qui plantait des arbres |
|78    |2 ou 3 choses que je sais d'elle|
|78    |The American Astronaut          |
|78    |Rang-e khoda                    |
|78    |Grey Gardens                    |
|78    |�Qui�n diablos es Juliette?     |
+------+--------------------------------+

