# Recommender System

This code focuses on building an RS from scratch using
the Alternating Least squares (ALS) method in PySpark and Jupyter Notebook.

The ALS is used in the Latent Factor Based CF. This have the follow limitations:
    
    1- Cold Start Problem
    2- Missing values
    3- Cannot recommend new or unrated items
    4- Poor Accuracy

The dataset that we are going to use for this chapter is a subset from
a famous open sourced movie lens dataset and contains a total of 0.1
million records with three columns (User_Id,title,rating). We will train our
recommender model using 75% of the data and test it on the rest of the
25% user ratings.

In [30]:
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

import findspark
findspark.init()

import pyspark
#import SparkSession
from pyspark.sql import SparkSession

In [31]:
#import and create sparksession object
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rc').getOrCreate()

In [32]:
#import the required functions and libraries
from pyspark.sql.functions import *

In [33]:
#load the dataset and create sprk dataframe
df=spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)

In [34]:
#validate the shape of the data 
print((df.count(),len(df.columns)))

(100000, 3)


In [35]:
#check columns in dataframe
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [36]:
#validate few rows of dataframe in random order
df.orderBy(rand()).show(10,False)

+------+-----------------------------------------+------+
|userId|title                                    |rating|
+------+-----------------------------------------+------+
|775   |Desperate Measures (1998)                |3     |
|319   |Air Bud (1997)                           |3     |
|125   |Naked Gun 33 1/3: The Final Insult (1994)|3     |
|412   |Highlander (1986)                        |4     |
|5     |Forrest Gump (1994)                      |1     |
|303   |Billy Madison (1995)                     |5     |
|495   |Demolition Man (1993)                    |3     |
|101   |Rumble in the Bronx (1995)               |4     |
|452   |Killing Fields, The (1984)               |4     |
|627   |How to Make an American Quilt (1995)     |2     |
+------+-----------------------------------------+------+
only showing top 10 rows



The user with the highest number of records has rated 737 movies, and
each user has rated at least 20 movies.

In [37]:
#check number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [38]:
#check number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|636   |20   |
|572   |20   |
|685   |20   |
|631   |20   |
|34    |20   |
|926   |20   |
|596   |20   |
|300   |20   |
|93    |20   |
+------+-----+
only showing top 10 rows



In [39]:
#number of times movie been rated 
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



In [40]:
df.groupBy('title').count().orderBy('count',ascending=True).show(10,False)

+-----------------------------------------+-----+
|title                                    |count|
+-----------------------------------------+-----+
|Leopard Son, The (1996)                  |1    |
|Lashou shentan (1992)                    |1    |
|Mad Dog Time (1996)                      |1    |
|Fear, The (1995)                         |1    |
|Aiqing wansui (1994)                     |1    |
|Vie est belle, La (Life is Rosey) (1987) |1    |
|Next Step, The (1995)                    |1    |
|JLG/JLG - autoportrait de d�cembre (1994)|1    |
|Modern Affair, A (1995)                  |1    |
|Target (1995)                            |1    |
+-----------------------------------------+-----+
only showing top 10 rows



In [41]:
#import String indexer to convert string values to numeric values
from pyspark.ml.feature import StringIndexer,IndexToString

In [42]:
#creating string indexer to convert the movie title column values into numerical values
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

In [43]:
#applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)

In [44]:
#creating new dataframe with transformed values
indexed = model.transform(df)

In [45]:
#validate the numerical title values
indexed.show(10)

+------+------------+------+---------+
|userId|       title|rating|title_new|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
|   296|Kolya (1996)|     4|    287.0|
|    34|Kolya (1996)|     5|    287.0|
|   271|Kolya (1996)|     4|    287.0|
|   201|Kolya (1996)|     4|    287.0|
|   209|Kolya (1996)|     4|    287.0|
+------+------------+------+---------+
only showing top 10 rows



In [59]:
#number of times each numerical movie title has been rated 
indexed.groupBy('title_new','title').count().orderBy('count',ascending=False).show(10,False)

+---------+-----------------------------+-----+
|title_new|title                        |count|
+---------+-----------------------------+-----+
|0.0      |Star Wars (1977)             |583  |
|1.0      |Contact (1997)               |509  |
|2.0      |Fargo (1996)                 |508  |
|3.0      |Return of the Jedi (1983)    |507  |
|4.0      |Liar Liar (1997)             |485  |
|5.0      |English Patient, The (1996)  |481  |
|6.0      |Scream (1996)                |478  |
|7.0      |Toy Story (1995)             |452  |
|8.0      |Air Force One (1997)         |431  |
|9.0      |Independence Day (ID4) (1996)|429  |
+---------+-----------------------------+-----+
only showing top 10 rows



In [47]:
#split the data into training and test datatset
train,test=indexed.randomSplit([0.75,0.25])

In [48]:
#count number of records in train set
train.count()

75185

In [49]:
#count number of records in test set
test.count()

24815

In [50]:
#import ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS

There are multiple hyperparameters
that can be tuned to improve the performance of the model. Two of the
important ones are nonnegative =‘True’ doesn’t create negative ratings in
recommendations and coldStartStrategy=‘drop’ to prevent any NaN ratings
predictions.

In [51]:
#Training the recommender model using train datatset
rec = ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

### Train the model

In [52]:
#fit the model on train set
rec_model = rec.fit(train)

### Making Prediction

In [53]:
#making predictions on test set 
predicted_ratings = rec_model.transform(test)

In [54]:
#columns in predicted ratings dataframe
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [55]:
#predicted vs actual ratings for test set 
predicted_ratings.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|   489|       Mother (1996)|     3|    169.0|  4.785524|
|   896|Air Up There, The...|     2|    983.0| 2.0463958|
|   500| Pulp Fiction (1994)|     5|     12.0|  4.574325|
|   267|        Alien (1979)|     4|     44.0|  4.698067|
|   896|    GoldenEye (1995)|     3|    238.0| 3.2311964|
|   188|        Speed (1994)|     4|     84.0| 4.4615808|
|   437|Mrs. Doubtfire (1...|     3|    126.0| 3.5331366|
|   833|Die Hard: With a ...|     2|    202.0| 2.2058969|
|    21|Hunchback of Notr...|     3|    253.0|  3.456728|
|   386|    Boot, Das (1981)|     5|    115.0| 4.6373873|
+------+--------------------+------+---------+----------+
only showing top 10 rows



In [56]:
#importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator

In [57]:
#create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [60]:
#apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_ratings)

The RMSE is not very high; we are making an error of one point in the
actual rating and predicted rating. This can be improved further by tuning
the model parameters and using the hybrid approach.

In [61]:
#print RMSE error
print(rmse)

1.0168526253947543


In [None]:
#Recommend top movies  which user might like 

In [63]:
#create dataset of all distinct movies 
unique_movies=indexed.select('title_new').distinct()

In [64]:
#number of unique movies
unique_movies.count()

1664

In [66]:
#assigning alias name 'a' to unique movies df
a = unique_movies.alias('a')

We can select any user within the dataset for which we need to
recommend other movies. In our case, we go ahead with userId = 85. 

In [67]:
user_id=85

We will filter the movies that this active user has already rated or seen.

In [68]:
#creating another dataframe which contains already watched movie by active user 
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()

In [69]:
#number of movies already rated 
watched_movies.count()

287

In [70]:
#assigning alias name 'b' to watched movies df
b=watched_movies.alias('b')

So, there are total of 287 unique movies out of 1664 movies that this
active user has already rated. So, we would want to recommend movies
from the remaining 1377 items. We now combine both the tables (**LEFT JOIN**) to find
the movies that we can recommend by filtering null values from the joined
table.

In [71]:
#joining both tables on left join 
total_movies = a.join(b, a.title_new == b.title_new,how='left')


In [72]:
total_movies.show(10,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|558.0    |null     |
|305.0    |305.0    |
|299.0    |null     |
|596.0    |null     |
|769.0    |null     |
|934.0    |null     |
|496.0    |496.0    |
|1051.0   |null     |
|692.0    |null     |
|810.0    |null     |
+---------+---------+
only showing top 10 rows



In [73]:
#selecting movies which active user is yet to rate or watch
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [74]:
#number of movies user is yet to rate 
remaining_movies.count()

1377

In [75]:
#adding new column of user_Id of active useer to remaining movies df 
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))

In [76]:
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|558.0    |85    |
|299.0    |85    |
|596.0    |85    |
|769.0    |85    |
|934.0    |85    |
|1051.0   |85    |
|692.0    |85    |
|810.0    |85    |
|720.0    |85    |
|782.0    |85    |
+---------+------+
only showing top 10 rows



In [77]:
#making recommendations using ALS recommender model and selecting only top 'n' movies
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [78]:
recommendations.show(5,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1358.0   |85    |5.6179676 |
|303.0    |85    |4.7003927 |
|1030.0   |85    |4.640958  |
|1470.0   |85    |4.6015687 |
|1433.0   |85    |4.580033  |
+---------+------+----------+
only showing top 5 rows



In [79]:
#converting title_new values back to movie titles
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

final_recommendations=movie_title.transform(recommendations)


In [80]:
final_recommendations.show(10,False)

+---------+------+----------+-----------------------------------------------+
|title_new|userId|prediction|title                                          |
+---------+------+----------+-----------------------------------------------+
|1358.0   |85    |5.6179676 |Angel Baby (1995)                              |
|303.0    |85    |4.7003927 |Close Shave, A (1995)                          |
|1030.0   |85    |4.640958  |Braindead (1992)                               |
|1470.0   |85    |4.6015687 |Some Mother's Son (1996)                       |
|1433.0   |85    |4.580033  |Boys, Les (1997)                               |
|853.0    |85    |4.5688143 |Naked (1993)                                   |
|996.0    |85    |4.505178  |In the Bleak Midwinter (1995)                  |
|540.0    |85    |4.49625   |39 Steps, The (1935)                           |
|1005.0   |85    |4.4796925 |Haunted World of Edward D. Wood Jr., The (1995)|
|1195.0   |85    |4.471673  |Pather Panchali (1955)             

This can be nicely wrapped in a single function that
executes the above steps in sequence and generates recommendations for
active users

In [81]:
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [82]:
top_movies(85,10)

+---------+------+----------+-----------------------------------------------+
|title_new|userId|prediction|title                                          |
+---------+------+----------+-----------------------------------------------+
|1358.0   |85    |5.6179676 |Angel Baby (1995)                              |
|303.0    |85    |4.7003927 |Close Shave, A (1995)                          |
|1030.0   |85    |4.640958  |Braindead (1992)                               |
|1470.0   |85    |4.6015687 |Some Mother's Son (1996)                       |
|1433.0   |85    |4.580033  |Boys, Les (1997)                               |
|853.0    |85    |4.5688143 |Naked (1993)                                   |
|996.0    |85    |4.505178  |In the Bleak Midwinter (1995)                  |
|540.0    |85    |4.49625   |39 Steps, The (1935)                           |
|1005.0   |85    |4.4796925 |Haunted World of Edward D. Wood Jr., The (1995)|
|1195.0   |85    |4.471673  |Pather Panchali (1955)             