In [0]:
#Importing important libraries

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("recom").getOrCreate()


In [0]:
#Loading the dataset

In [0]:
movie = spark.read.options(header = "True", inferSchema = "True").csv("/FileStore/tables/movies.csv")
rating = spark.read.options(header = "True", inferSchema = "True").csv("/FileStore/tables/ratings.csv")

In [0]:
display(rating)
display(movie)

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041
1,157,5.0,964984100


movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [0]:
#Joing two dataframes

In [0]:
ratings = rating.join(movie, 'movieId', 'left')

In [0]:
ratings.show()

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

In [0]:
#Train and test data

In [0]:
(train, test) = ratings.randomSplit([0.8, 0.2])

In [0]:
print(test.count())

20248


In [0]:
print(train.count())

80588


In [0]:
#ALS Method

In [0]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")

In [0]:
#Hyperparameter tuning and cross validation

In [0]:
param_grid = ParamGridBuilder() \
             .addGrid(als.rank, [10,50,100,150]) \
             .addGrid(als.regParam, [0.01,0.05,0.1,0.15]) \
             .build()

In [0]:
evaluator = RegressionEvaluator(
            metricName="rmse",
            labelCol="rating",
            predictionCol="prediction")

In [0]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [0]:
#fitting the model

In [0]:
model = cv.fit(train)
best_model = model.bestModel
test_predictions = best_model.transform(test)
Score = evaluator.evaluate(test_predictions)
print(Score)

0.8667964744890653


In [0]:
#Recommendations

In [0]:
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{132333, 5.66636...|
|     2|[{131724, 4.80750...|
|     3|[{5746, 4.842065}...|
|     4|[{3851, 4.8489127...|
|     5|[{132333, 4.51109...|
|     6|[{33649, 4.773507...|
|     7|[{3379, 4.6497636...|
|     8|[{3379, 4.791485}...|
|     9|[{132333, 4.96950...|
|    10|[{71579, 4.409375...|
|    11|[{5867, 4.978396}...|
|    12|[{5867, 5.5843787...|
|    13|[{3379, 5.1917706...|
|    14|[{33649, 4.576783...|
|    15|[{60943, 4.373869...|
|    16|[{3379, 4.519663}...|
|    17|[{3379, 5.1226077...|
|    18|[{3379, 4.888902}...|
|    19|[{3379, 3.9683187...|
|    20|[{5915, 4.981031}...|
+------+--------------------+
only showing top 20 rows



In [0]:
df21 = recommendations

In [0]:
display(df21)

userId,recommendations
1,"List(List(132333, 5.6663694), List(5490, 5.6663694), List(5915, 5.6663694), List(3379, 5.650161), List(33649, 5.6116505))"
2,"List(List(131724, 4.807505), List(179135, 4.5250998), List(72171, 4.5250998), List(117531, 4.5250998), List(184245, 4.5250998))"
3,"List(List(5746, 4.842065), List(5181, 4.7434907), List(5919, 4.7415147), List(7991, 4.60728), List(4518, 4.5177674))"
4,"List(List(3851, 4.8489127), List(1733, 4.731344), List(4765, 4.7114496), List(2204, 4.5613313), List(3365, 4.516425))"
5,"List(List(132333, 4.5110936), List(5490, 4.5110936), List(5915, 4.5110936), List(112804, 4.3531017), List(33649, 4.3524528))"
6,"List(List(33649, 4.7735076), List(5867, 4.7477913), List(42730, 4.7477913), List(3086, 4.7379756), List(67618, 4.6831207))"
7,"List(List(3379, 4.6497636), List(132333, 4.5889935), List(5915, 4.5889935), List(5490, 4.5889935), List(8477, 4.4786797))"
8,"List(List(3379, 4.791485), List(33649, 4.7234383), List(4495, 4.548978), List(2295, 4.548978), List(6201, 4.548978))"
9,"List(List(132333, 4.969508), List(5490, 4.969508), List(5915, 4.969508), List(2295, 4.8539143), List(6201, 4.8539143))"
10,"List(List(71579, 4.409375), List(113275, 4.344688), List(7169, 4.1646156), List(67618, 4.1173), List(3086, 4.0966377))"


In [0]:
from pyspark.sql.functions import col, explode

In [0]:
df22 = df21.withColumn("movie_Id_Rating", explode("recommendations"))

In [0]:
display(df22)

userId,recommendations,movie_Id_Rating
1,"List(List(132333, 5.6663694), List(5490, 5.6663694), List(5915, 5.6663694), List(3379, 5.650161), List(33649, 5.6116505))","List(132333, 5.6663694)"
1,"List(List(132333, 5.6663694), List(5490, 5.6663694), List(5915, 5.6663694), List(3379, 5.650161), List(33649, 5.6116505))","List(5490, 5.6663694)"
1,"List(List(132333, 5.6663694), List(5490, 5.6663694), List(5915, 5.6663694), List(3379, 5.650161), List(33649, 5.6116505))","List(5915, 5.6663694)"
1,"List(List(132333, 5.6663694), List(5490, 5.6663694), List(5915, 5.6663694), List(3379, 5.650161), List(33649, 5.6116505))","List(3379, 5.650161)"
1,"List(List(132333, 5.6663694), List(5490, 5.6663694), List(5915, 5.6663694), List(3379, 5.650161), List(33649, 5.6116505))","List(33649, 5.6116505)"
2,"List(List(131724, 4.807505), List(179135, 4.5250998), List(72171, 4.5250998), List(117531, 4.5250998), List(184245, 4.5250998))","List(131724, 4.807505)"
2,"List(List(131724, 4.807505), List(179135, 4.5250998), List(72171, 4.5250998), List(117531, 4.5250998), List(184245, 4.5250998))","List(179135, 4.5250998)"
2,"List(List(131724, 4.807505), List(179135, 4.5250998), List(72171, 4.5250998), List(117531, 4.5250998), List(184245, 4.5250998))","List(72171, 4.5250998)"
2,"List(List(131724, 4.807505), List(179135, 4.5250998), List(72171, 4.5250998), List(117531, 4.5250998), List(184245, 4.5250998))","List(117531, 4.5250998)"
2,"List(List(131724, 4.807505), List(179135, 4.5250998), List(72171, 4.5250998), List(117531, 4.5250998), List(184245, 4.5250998))","List(184245, 4.5250998)"


In [0]:
df23 = df22.select("userId", col("movie_Id_Rating.movieId"), col("movie_Id_Rating.rating"))

In [0]:
display(df23)

userId,movieId,rating
1,132333,5.6663694
1,5490,5.6663694
1,5915,5.6663694
1,3379,5.650161
1,33649,5.6116505
2,131724,4.807505
2,179135,4.5250998
2,72171,4.5250998
2,117531,4.5250998
2,184245,4.5250998
