In [1]:
#import and create sparksession object
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rc').getOrCreate()

In [2]:
#import the required functions and libraries
from pyspark.sql.functions import *
from pyspark.ml.feature import IndexToString

In [3]:
#load the dataset and create sprk dataframe
df=spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)

In [4]:
#validate the shape of the data 
print((df.count(),len(df.columns)))

(100000, 3)


In [5]:
#check columns in dataframe
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [6]:
#shuffle the records in the dataframe
df=df.orderBy(rand())

In [7]:
#validate few rows of dataframe
df.show(10,False)

+------+---------------------------------+------+
|userId|title                            |rating|
+------+---------------------------------+------+
|732   |Smile Like Yours, A (1997)       |1     |
|805   |Rock, The (1996)                 |3     |
|621   |Cable Guy, The (1996)            |2     |
|16    |Crimson Tide (1995)              |5     |
|303   |Godfather, The (1972)            |5     |
|435   |Lion King, The (1994)            |3     |
|422   |Interview with the Vampire (1994)|3     |
|474   |White Squall (1996)              |4     |
|796   |Crimson Tide (1995)              |4     |
|806   |Raising Arizona (1987)           |4     |
+------+---------------------------------+------+
only showing top 10 rows



In [8]:
#check number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [9]:
#number of times movie been rated 
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



In [10]:
#import String indexer to convert string values to numeric values and vice versa
from pyspark.ml.feature import StringIndexer,IndexToString

In [11]:
#creating string indexer to convert the movie title column values into numerical values
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

In [12]:
#applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)

In [13]:
#creating new dataframe with transformed values
indexed = model.transform(df)

In [14]:
#validate the numerical title values
indexed.show(10)

+------+--------------------+------+---------+
|userId|               title|rating|title_new|
+------+--------------------+------+---------+
|   732|Smile Like Yours,...|     1|    861.0|
|   805|    Rock, The (1996)|     3|     17.0|
|   621|Cable Guy, The (1...|     2|    315.0|
|    16| Crimson Tide (1995)|     5|    196.0|
|   303|Godfather, The (1...|     5|     11.0|
|   435|Lion King, The (1...|     3|     94.0|
|   422|Interview with th...|     3|    223.0|
|   474| White Squall (1996)|     4|    394.0|
|   796| Crimson Tide (1995)|     4|    196.0|
|   806|Raising Arizona (...|     4|     62.0|
+------+--------------------+------+---------+
only showing top 10 rows



In [15]:
#number of times each numerical movie title has been rated 
indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



In [16]:
#split the data into training and test datatset
train,test=indexed.randomSplit([0.75,0.25])

In [17]:
#count number of records in train set
train.count()

75157

In [18]:
#count number of records in test set
test.count()

24841

In [19]:
#import ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS

In [20]:
#Training the recommender model using train datatset
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [21]:
#fit the model on train set
rec_model=rec.fit(train)

In [22]:
#making predictions on test set 
predicted_ratings=rec_model.transform(test)

In [23]:
#predicted vs actual ratings for test set 
predicted_ratings.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|   201|Bullets Over Broa...|     4|    392.0| 3.7373939|
|   442|      Alien 3 (1992)|     2|    337.0| 2.1287026|
|   886|  French Kiss (1995)|     4|    399.0|  3.365878|
|   645|Butch Cassidy and...|     4|    101.0|  4.225646|
|   320|William Shakespea...|     4|    313.0| 4.2848535|
|    49|      Jeffrey (1995)|     4|    753.0|  3.065721|
|   409|One Flew Over the...|     5|     57.0| 3.6882381|
|   207|        Bogus (1996)|     3|    894.0|  2.734596|
|     1|Long Kiss Goodnig...|     3|    132.0|  3.344652|
|   535|Nina Takes a Love...|     4|   1282.0| 2.5115514|
+------+--------------------+------+---------+----------+
only showing top 10 rows



In [24]:
#importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator

In [25]:
#create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [27]:
#apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_ratings)

In [28]:
#print RMSE error
print(rmse)

0.8093828311744932


In [29]:
#Recommend top movies  which user might like 

In [30]:
#create dataset of all distinct movies 
unique_movies=indexed.select('title_new').distinct()

In [31]:
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [34]:
top_movies(67,10)

+---------+------+----------+------------------------------+
|title_new|userId|prediction|title                         |
+---------+------+----------+------------------------------+
|1372.0   |67    |8.241052  |Schizopolis (1996)            |
|1268.0   |67    |8.129553  |City of Industry (1997)       |
|1286.0   |67    |8.090075  |Mina Tannenbaum (1994)        |
|778.0    |67    |7.9398994 |Great Race, The (1965)        |
|1233.0   |67    |7.902425  |Man of No Importance, A (1994)|
|1149.0   |67    |7.7158966 |Chairman of the Board (1998)  |
|469.0    |67    |7.473818  |Dumb & Dumber (1994)          |
|1258.0   |67    |7.3118477 |Secret Agent, The (1996)      |
|1130.0   |67    |7.264929  |Endless Summer 2, The (1994)  |
|995.0    |67    |7.0268645 |Stuart Saves His Family (1995)|
+---------+------+----------+------------------------------+

