In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf, col, when
import numpy as np


In [3]:
directory = 'movei-lens-small-latest-dataset'

In [31]:
# import os
# os.listdir()

In [5]:
spark = SparkSession.builder.appName('Movie_Recommendation').getOrCreate()

In [6]:
sc = spark.sparkContext
sqlContext=SQLContext(sc)

There are 3 csv files:

ratings.csv : rating that a user gave to a movie 

movies.csv: genre and title of each movie

links.csv: 

imdbId is an identifier for movies used by http://www.imdb.com. E.g., the movie Toy Story has the link http://www.imdb.com/title/tt0114709/.

tmdbId is an identifier for movies used by https://www.themoviedb.org. E.g., the movie Toy Story has the link https://www.themoviedb.org/movie/862.


In [7]:
ratings_df = spark.read.csv('ratings.csv',inferSchema=True,header=True)
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [8]:
ratings_df.show(4)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
+------+-------+------+---------+
only showing top 4 rows



In [11]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [12]:
movies_df = spark.read.csv('movies.csv',inferSchema=True,header=True)
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [13]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [14]:
links_df = spark.read.csv('links.csv',inferSchema=True,header=True)
links_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [15]:
links_df.show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



ALS requires all input data to be numberic - no need to change since our data is numeric already

In this step we split rating dataframe to training and validation datasets

In [17]:

training_df, validation_df = ratings_df.randomSplit([0.8,0.2])

ALS requires 

In [18]:
#These data are required for ALS
iterations = 10
regularization_parameter = 0.1
rank = 4 #this is a concept in matric factorization this is from cross validation 
#In mathematics, given an m × n matrix A of rank r, a rank decomposition or rank factorization of A is a factorization of A of the form A = CF, 
#where C is an m × r matrix and F is an r × n matrix.
#use rank 4,5,6 and other values to pick the best one 

#gradiaent descent is used to minimize errors

errors =[]
err = 0

In [19]:
als = ALS(maxIter = iterations, regParam = regularization_parameter, rank=4, userCol ="userId", itemCol='movieId',ratingCol='rating')
model = als.fit(training_df)
predictions = model.transform(validation_df)
new_predictions = predictions.filter(col('prediction') != np.nan)
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
rmse = evaluator.evaluate(new_predictions)
print("Root-mean-square error ="+str(rmse))

Root-mean-square error =0.8774178786683966


In [20]:
predictions.show(n=10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   182|    471|   4.5|1054779644| 3.3760562|
|    57|    471|   3.0| 969753604| 3.3690586|
|   462|    471|   2.5|1123890831| 2.5891209|
|   610|    471|   4.0|1479544381| 3.8334165|
|   136|    471|   4.0| 832450058| 3.4742217|
|   411|    471|   4.0| 835532928| 3.3836277|
|   414|    471|   5.0| 961514069|  3.787899|
|   541|    471|   3.0| 835643551| 2.6706078|
|   373|    471|   5.0| 846830388| 3.3302827|
|   357|    471|   3.5|1348627082| 3.6292505|
+------+-------+------+----------+----------+
only showing top 10 rows



In [21]:
predictions.join(movies_df, 'movieId').select("userId","title","genres","prediction").show(5)

+------+--------------------+------+----------+
|userId|               title|genres|prediction|
+------+--------------------+------+----------+
|   182|Hudsucker Proxy, ...|Comedy| 3.3760562|
|    57|Hudsucker Proxy, ...|Comedy| 3.3690586|
|   462|Hudsucker Proxy, ...|Comedy| 2.5891209|
|   610|Hudsucker Proxy, ...|Comedy| 3.8334165|
|   136|Hudsucker Proxy, ...|Comedy| 3.4742217|
+------+--------------------+------+----------+
only showing top 5 rows



In [25]:
#Let's predict for one user
for_one_user = predictions.filter(col("userId")==357).join(movies_df,"movieId").join(links_df,"movieId").select("userId","title","genres","prediction")
for_one_user.show()

+------+--------------------+--------------------+----------+
|userId|               title|              genres|prediction|
+------+--------------------+--------------------+----------+
|   357|Hudsucker Proxy, ...|              Comedy| 3.6292505|
|   357| Pulp Fiction (1994)|Comedy|Crime|Dram...|  4.311175|
|   357|The Machinist (2004)|Drama|Mystery|Thr...|  4.131466|
|   357|Silence of the La...|Crime|Horror|Thri...| 4.3103695|
|   357|Super Size Me (2004)|Comedy|Documentar...| 3.8793893|
|   357|   Young Guns (1988)|Action|Comedy|Wes...| 3.3264928|
|   357|Grosse Pointe Bla...|Comedy|Crime|Romance| 3.8550434|
|   357|Austin Powers: Th...|Action|Adventure|...| 3.3309813|
|   357|    Quiz Show (1994)|               Drama| 3.7013764|
|   357|        Ghost (1990)|Comedy|Drama|Fant...| 3.6219265|
|   357|When Harry Met Sa...|      Comedy|Romance| 4.0273376|
|   357|Charlie and the C...|Adventure|Childre...| 3.4246156|
|   357|Weird Science (1985)|Comedy|Fantasy|Sc...| 3.1713977|
|   357|

In [26]:
# Generate top 5 movie recommendation for each user
userRecomments = model.recommendForAllUsers(5)

In [27]:
# Generate top 5 movie recommendation for each user
movieRecomments = model.recommendForAllItems(5)

In [28]:
userRecomments.select("userId","recommendations.movieId").show(10,False)

+------+-------------------------------------+
|userId|movieId                              |
+------+-------------------------------------+
|471   |[141718, 40491, 104875, 6818, 89904] |
|463   |[102217, 92494, 141718, 59018, 33649]|
|496   |[40491, 6818, 96004, 102217, 92494]  |
|148   |[3925, 51931, 6732, 89904, 2327]     |
|540   |[141718, 33649, 102217, 92494, 59018]|
|392   |[104875, 141718, 33649, 6201, 4495]  |
|243   |[5666, 87234, 51931, 67618, 62293]   |
|31    |[87234, 7842, 5666, 67618, 84847]    |
|516   |[4495, 6201, 33649, 141718, 8235]    |
|580   |[51931, 5666, 3925, 86290, 87234]    |
+------+-------------------------------------+
only showing top 10 rows



In [29]:
movieRecomments.select("movieId","recommendations.userId").show(10,False)

+-------+-------------------------+
|movieId|userId                   |
+-------+-------------------------+
|1580   |[53, 543, 43, 276, 452]  |
|4900   |[53, 243, 12, 548, 224]  |
|5300   |[236, 138, 154, 258, 295]|
|6620   |[295, 55, 502, 266, 535] |
|7340   |[543, 43, 327, 584, 544] |
|32460  |[43, 53, 452, 337, 429]  |
|54190  |[53, 452, 337, 578, 554] |
|471    |[154, 53, 598, 51, 441]  |
|1591   |[147, 138, 595, 44, 96]  |
|140541 |[360, 337, 492, 456, 175]|
+-------+-------------------------+
only showing top 10 rows



In [30]:
#take 3 random users 

users=ratings_df.select("userId").distinct().limit(3)
users.show()

+------+
|userId|
+------+
|   148|
|   463|
|   471|
+------+



In [46]:
useSubsetRec = model.recommendForUserSubset(users,10)
useSubsetRec.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{3379, 5.1067796...|
|   463|[{4789, 5.4247403...|
|   148|[{2068, 4.9471707...|
+------+--------------------+



In [48]:
useSubsetRec.select("userId","recommendations.movieId").show(10,False)

+------+---------------------------------------------------------------+
|userId|movieId                                                        |
+------+---------------------------------------------------------------+
|471   |[3379, 96004, 6818, 3266, 2295, 3473, 89759, 7121, 58301, 7096]|
|463   |[4789, 3379, 3200, 5075, 446, 1237, 84847, 3347, 184245, 7071] |
|148   |[2068, 5607, 27611, 8235, 3223, 3341, 3531, 2972, 2512, 1310]  |
+------+---------------------------------------------------------------+

