# Movie Recommendation with Spark

In this Notebook, I will using Alternating Least Squares (ALS) algorithm to predict the ratings for movies in data set [MoviesLensSmall](https://files.grouplens.org/datasets/movielens/ml-latest-small.zip)

## Analyze

#### Prepare asset

In [2]:

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"
os.chdir('D:\College\WRK\Machine_learning')
os.getcwd()

'D:\\College\\WRK\\Machine_learning'

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("movies recommendation") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#### Reading data

In [7]:
movies = spark.read.load("data/ml-latest-small/movies.csv", format='csv', header = True)
ratings = spark.read.load("data/ml-latest-small/ratings.csv", format='csv', header = True)
links = spark.read.load("data/ml-latest-small/links.csv", format='csv', header = True)
tags = spark.read.load("data/ml-latest-small/tags.csv", format='csv', header = True)

In [8]:
type(movies)

pyspark.sql.dataframe.DataFrame

In [9]:
movies.count()

9742

In [10]:
movies.createOrReplaceTempView("movies")

limit5 = spark.sql("SELECT * FROM movies limit 5")
limit5.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



In [11]:
ratings.createOrReplaceTempView("ratings")

limit5 = spark.sql("SELECT * FROM ratings limit 5")
limit5.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+



In [12]:
tags.createOrReplaceTempView("tags")

limit5 = spark.sql("SELECT * FROM tags limit 5")
limit5.show()

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+



In [13]:
links.createOrReplaceTempView("links")

limit5 = spark.sql("SELECT * FROM links limit 5")
limit5.show()

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+



#### Analyze data

In [14]:
count_min_rating_per_user = ratings.groupBy("userID").count().toPandas()['count'].min()
count_min_rating_per_movie = ratings.groupBy("movieId").count().toPandas()['count'].min()

print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(count_min_rating_per_user))
print('Minimum number of ratings per movie is {}'.format(count_min_rating_per_movie))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [15]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()

print('{} out of {} movies are rated by only one user equal {:.2f} %'.format(tmp1, tmp2,tmp1 * 100 /tmp2*1.0))

3446 out of 9724 movies are rated by only one user equal 35.44 %


Number of users

In [16]:
num_users = spark.sql("SELECT count (distinct userID) as num_users FROM ratings")
num_users.show()

+---------+
|num_users|
+---------+
|      610|
+---------+



In [17]:
ratings.select("userId").distinct().count()

610

In [18]:
type(num_users)

pyspark.sql.dataframe.DataFrame

number of Movies

In [19]:
# Sql
num_movies = spark.sql("SELECT count (distinct movieID) as num_movies FROM movies")
num_movies.show()

+----------+
|num_movies|
+----------+
|      9742|
+----------+



In [20]:
movies.select('movieID').distinct().count()

9742

How many Movies was rated by Users

In [21]:
rated_by_users = ratings.select('movieID').distinct().count()
print('How many movies are rated by users? \n', rated_by_users)

How many movies are rated by users? 
 9724


How many Movies genres

In [22]:
mv_genres = spark.sql("SELECT DISTINCT(genres) as movies_genres FROM movies ")
mv_genres.show()

+--------------------+
|       movies_genres|
+--------------------+
|Comedy|Horror|Thr...|
|Adventure|Sci-Fi|...|
|Action|Adventure|...|
| Action|Drama|Horror|
|Action|Animation|...|
|Animation|Childre...|
|Action|Adventure|...|
|    Adventure|Sci-Fi|
|Documentary|Music...|
|Adventure|Childre...|
| Adventure|Animation|
| Musical|Romance|War|
|Action|Adventure|...|
|Adventure|Childre...|
|Comedy|Crime|Horr...|
|Crime|Drama|Fanta...|
|Comedy|Mystery|Th...|
|   Adventure|Fantasy|
|Horror|Romance|Sc...|
|Drama|Film-Noir|R...|
+--------------------+
only showing top 20 rows



In [23]:
# Specific Genres

genres = spark.sql("""
    SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 1), '|', -1) as genre FROM movies
    UNION
    SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 2), '|', -1) as genre FROM movies
    UNION
    SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 3), '|', -1) as genre FROM movies
    UNION
    SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 4), '|', -1) as genre FROM movies
    UNION
    SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 5), '|', -1) as genre FROM movies
    UNION
    SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 6), '|', -1) as genre FROM movies
    ORDER BY genre
""")

genres.show()

+------------------+
|             genre|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+



In [34]:
movie_pdf = movies.toPandas()
movie_pdf['genres'].str.get_dummies(sep='|').head()

Unnamed: 0,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film-Noir,Horror,IMAX,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,0,0,1,1,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0
1,0,0,1,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
2,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0
3,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,1,0,0,0,0
4,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [35]:
list_of_movie = list(movie_pdf['title'])

### Spark ALS based approach for training model

We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using sc.textFile and then convert it to the form of (user, item, rating) tuples.

In [37]:
limit5 = spark.sql("SELECT * FROM ratings limit 5")
limit5.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+



In [38]:
movie_ratings=ratings.drop('timestamp')

In [39]:
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [40]:
movie_ratings.createOrReplaceTempView("movie_ratings")

limit10 =  (spark.sql("SELECT * FROM movie_ratings limit 10"))
limit10.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
+------+-------+------+



#### ALS Model Selection and Evaluation


With the ALS model, we can use a grid search to find the optimal hyperparameters.



In [41]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [42]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [43]:
# Create ALS model
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [44]:
# 1st print a list of parameters
print(als.explainParams())

alpha: alpha for implicit preference (default: 1.0)
blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 4096)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False)
intermediateStorageLevel: StorageLe

In [46]:
#Tune model using ParamGridBuilder
# it will take long time in the cv period, so just use few parameter to try 

paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.01])
             .addGrid(als.rank, [10])
             .addGrid(als.maxIter, [15])
             .build())

In [49]:
# Define evaluator as RMSE

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [50]:
from pyspark.ml.tuning import CrossValidator
# Build Cross validation 
# Create 5-fold CrossValidator
# it takes too long that I only use 2-fold
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)

# Run cross validations
cvModel = cv.fit(training)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [51]:
# Extract the best model selected by CV
best_model = cvModel.bestModel

In [58]:
#Fit ALS model to training data

# specify parameter settings by the best model obtained via CV
print ("Best Model")
print ("Rank: ", best_model)
print ("MaxIter: ", str(best_model._java_obj.parent().getMaxIter()))
print ("RegParam:",  best_model._java_obj.parent().regParam())

Best Model
Rank:  ALSModel: uid=ALS_9c3d3a70c09f, rank=10
MaxIter:  15
RegParam: ALS_9c3d3a70c09f__regParam


#### Model testing


In [54]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [61]:
#Print RMSE 
print ("RMSE = {}".format(rmse))

RMSE = 1.1456151134572492


In [62]:
#Extract best model from the tuning exercise using ParamGridBuilder
als_best = ALS(maxIter=15, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als_best.fit(training)

In [64]:
#predictions.show(10)

predictions.createOrReplaceTempView("predictions")

limit10 =  (spark.sql("SELECT * FROM predictions limit 10"))
limit10.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   463|   1088|   3.5| 1.9607128|
|   133|    471|   4.0|  3.749315|
|   597|   1959|   4.0| 4.6991343|
|   108|   1959|   5.0| 4.5030694|
|   368|   1645|   3.0| 2.8509874|
|   101|   3175|   4.0| 3.1413093|
|   115|   1645|   4.0|   3.00947|
|   385|    471|   4.0| 3.1198914|
|   436|    471|   3.0| 4.8210683|
|   587|   3175|   5.0| 3.8690786|
+------+-------+------+----------+



#### Model apply and see the performance

In [65]:
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

RMSE = 0.6506614529461175


In [66]:
alldata.registerTempTable("alldata")



In [67]:
limit10 = spark.sql('SELECT * FROM alldata LIMIT 10')
limit10.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   463|   1088|   3.5| 1.9607128|
|   137|   1580|   3.5|  3.208893|
|   580|   1580|   4.0| 3.3586917|
|   580|   3175|   2.5| 3.4234302|
|   580|  44022|   3.5| 3.9047294|
|   133|    471|   4.0|  3.749315|
|   322|   1580|   3.5| 3.1298318|
|   362|   1591|   4.0| 3.2896817|
|   362|   1645|   5.0| 3.8134174|
|   593|   1580|   1.5| 2.4678888|
+------+-------+------+----------+



In [68]:
limit10 = spark.sql('SELECT * FROM movies JOIN alldata ON movies.movieId=alldata.movieId LIMIT 10')
limit10.show()

+-------+--------------------+--------------------+------+-------+------+----------+
|movieId|               title|              genres|userId|movieId|rating|prediction|
+-------+--------------------+--------------------+------+-------+------+----------+
|   1088|Dirty Dancing (1987)|Drama|Musical|Rom...|   463|   1088|   3.5| 1.9607128|
|   1580|Men in Black (a.k...|Action|Comedy|Sci-Fi|   137|   1580|   3.5|  3.208893|
|   1580|Men in Black (a.k...|Action|Comedy|Sci-Fi|   580|   1580|   4.0| 3.3586917|
|   3175| Galaxy Quest (1999)|Adventure|Comedy|...|   580|   3175|   2.5| 3.4234302|
|  44022|Ice Age 2: The Me...|Adventure|Animati...|   580|  44022|   3.5| 3.9047294|
|    471|Hudsucker Proxy, ...|              Comedy|   133|    471|   4.0|  3.749315|
|   1580|Men in Black (a.k...|Action|Comedy|Sci-Fi|   322|   1580|   3.5| 3.1298318|
|   1591|        Spawn (1997)|Action|Adventure|...|   362|   1591|   4.0| 3.2896817|
|   1645|The Devil's Advoc...|Drama|Mystery|Thr...|   362|   1645

#### Recommend 10 movies for each user.

In [69]:
#recommend 10 movies for each users
user_recs = best_model.recommendForAllUsers(10)
#user_recs.show(10)
user_recs.createOrReplaceTempView("user_recs")

limit10 = (spark.sql("SELECT * FROM user_recs limit 10"))
limit10.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{94677, 7.363879...|
|     2|[{1963, 6.7120776...|
|     3|[{1211, 6.4078684...|
|     4|[{1866, 9.520394}...|
|     5|[{1866, 8.929421}...|
|     6|[{674, 7.8965893}...|
|     7|[{89753, 8.85826}...|
|     8|[{3521, 7.6060257...|
|     9|[{158872, 6.96302...|
|    10|[{87485, 8.45906}...|
+------+--------------------+



user 1 recommendation

In [70]:
user_recs.first()

Row(userId=1, recommendations=[Row(movieId=94677, rating=7.363879680633545), Row(movieId=3022, rating=6.843271255493164), Row(movieId=4799, rating=6.754696369171143), Row(movieId=26133, rating=6.685563564300537), Row(movieId=97304, rating=6.603682041168213), Row(movieId=91094, rating=6.540321350097656), Row(movieId=1194, rating=6.505104064941406), Row(movieId=4039, rating=6.501293182373047), Row(movieId=116897, rating=6.468926906585693), Row(movieId=79428, rating=6.466922283172607)])

In [71]:
user_recs.registerTempTable("als_recs_temp")



In [73]:

# seperate the value of 'recommendations' in user_recs

explode_rec = spark.sql('SELECT userId,\
                                explode(recommendations) AS MovieRec\
                                FROM als_recs_temp')
#explode_rec.show(10)


explode_rec.createOrReplaceTempView("explode_rec")

limit10 = (spark.sql("SELECT * FROM explode_rec limit 10"))
limit10.show()

+------+------------------+
|userId|          MovieRec|
+------+------------------+
|     1|{94677, 7.3638797}|
|     1| {3022, 6.8432713}|
|     1| {4799, 6.7546964}|
|     1|{26133, 6.6855636}|
|     1| {97304, 6.603682}|
|     1|{91094, 6.5403214}|
|     1|  {1194, 6.505104}|
|     1|  {4039, 6.501293}|
|     1|{116897, 6.468927}|
|     1|{79428, 6.4669223}|
+------+------------------+



In [75]:
final_recs = spark.sql("SELECT userId,\
                               movieIds_and_ratings.movieId AS movieId,\
                               movieIds_and_ratings.rating AS prediction\
                               FROM als_recs_temp\
                               LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings")


In [76]:
final_recs.createOrReplaceTempView("fianl_recs")

limit10= (spark.sql("SELECT * FROM fianl_recs limit 10"))
limit10.show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|     1|  94677| 7.3638797|
|     1|   3022| 6.8432713|
|     1|   4799| 6.7546964|
|     1|  26133| 6.6855636|
|     1|  97304|  6.603682|
|     1|  91094| 6.5403214|
|     1|   1194|  6.505104|
|     1|   4039|  6.501293|
|     1| 116897|  6.468927|
|     1|  79428| 6.4669223|
+------+-------+----------+



In [77]:
#Before we recommend the films, we need to filter out those users have not seen yet. Therefore, we need to choose rating = 'null' by join the movie ratings

final_rec = final_recs.join(movie_ratings,['userId','movieId'],'left').filter(movie_ratings.rating.isNull())

final_rec.createOrReplaceTempView("final_rec")

limit5 = (spark.sql("SELECT * FROM final_rec LIMIT 5"))
limit5.show()

+------+-------+----------+------+
|userId|movieId|prediction|rating|
+------+-------+----------+------+
|     1|  94677| 7.3638797|  NULL|
|     1|   3022| 6.8432713|  NULL|
|     1|   4799| 6.7546964|  NULL|
|     1|  26133| 6.6855636|  NULL|
|     1|  97304|  6.603682|  NULL|
+------+-------+----------+------+



In [78]:
final_rec.registerTempTable("final_rec")
movies.registerTempTable("movies_df")




Find recommend films for userid = 003

In [81]:
recs_for_003 = spark.sql("""SELECT userId,
                                    title
                            FROM final_rec t1
                            LEFT JOIN movies_df t2
                                ON t1.movieId = t2.movieId
                            WHERE t1.userId=003
                            LIMIT 10
                         """)

In [82]:
recs_for_003.show()

+------+--------------------+
|userId|               title|
+------+--------------------+
|     3|Wings of Desire (...|
|     3|Hard-Boiled (Lat ...|
|     3|     Scrooged (1988)|
|     3|Cold Comfort Farm...|
|     3|      Lockout (2012)|
|     3|Killer, The (Die ...|
|     3|Ninja Scroll (Jûb...|
|     3|    Barb Wire (1996)|
|     3|         Dune (2000)|
|     3|How the Grinch St...|
+------+--------------------+

