In [1]:
#importing the required pyspark library 
from pyspark.sql import SparkSession 
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import avg, min, max
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 
  
#Setup Spark Session 
spark = SparkSession.builder.appName('Recommender').getOrCreate() 
spark

In [2]:
ratings = spark.read.csv('ratings.csv', 
                      inferSchema=True,header=True) 
  
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [3]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  98.30% empty.


In [4]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings.groupBy("movieId").count().select(min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings.groupBy("movieId").count().select(avg("count")).show()

Movie with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per movie: 
+------------------+
|        avg(count)|
+------------------+
|10.369806663924312|
+------------------+



In [5]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [6]:
(training_data, test_data) = ratings.randomSplit([0.8, 0.2], seed=42)

In [7]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank =10, maxIter =15, regParam =.1,
          coldStartStrategy="drop", nonnegative =True, implicitPrefs = False)

In [8]:
model = als.fit(training_data)

In [9]:
test_predictions = model.transform(test_data)

In [10]:
test_predictions.show(10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   148|   4896|   4.0|1482548717| 3.5416205|
|   148|   5618|   3.0|1482548682| 3.3992512|
|   148|   7153|   3.0|1482548769| 3.4210632|
|   148|  40629|   5.0|1482548578| 2.8493803|
|   148|  40815|   4.0|1482548512| 3.5793066|
|   148|  60069|   4.5|1482548484| 3.6827738|
|   148|  68954|   4.0|1482548482| 3.9088442|
|   148|  69844|   4.0|1482548500| 3.5541387|
|   148|  79132|   1.5|1482548463| 3.5483084|
|   148|  79702|   4.0|1482548751| 3.2561567|
+------+-------+------+----------+----------+
only showing top 10 rows



In [11]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

print(evaluator.getMetricName())
print(evaluator.getLabelCol())
print(evaluator.getPredictionCol())

rmse
rating
prediction


In [12]:
RMSE = evaluator.evaluate(test_predictions)
print (RMSE)

0.8751958467840584


In [13]:
n = 5
ALS_recommendations = model.recommendForAllUsers(n)

In [14]:
ALS_recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{3925, 5.796346}...|
|     2|[{158783, 4.97820...|
|     3|[{158783, 5.70639...|
|     4|[{25771, 5.126978...|
|     5|[{25771, 5.216497...|
|     6|[{82, 5.2655487},...|
|     7|[{5075, 5.0765643...|
|     8|[{2843, 5.1593957...|
|     9|[{5666, 5.202669}...|
|    10|[{112804, 5.44498...|
|    11|[{26133, 5.148702...|
|    12|[{112804, 5.93972...|
|    13|[{96004, 5.586616...|
|    14|[{106100, 4.93915...|
|    15|[{27611, 5.348887...|
|    16|[{177593, 4.49237...|
|    17|[{96004, 5.131351...|
|    18|[{96004, 4.835064...|
|    19|[{96004, 4.116426...|
|    20|[{25771, 5.351633...|
+------+--------------------+
only showing top 20 rows



In [15]:
ALS_recommendations.createOrReplaceTempView("ALS_recs_temp")

clean_recs = spark.sql ("SELECT userId, movieIds_and_ratings.movieId AS movieId, movieIds_and_ratings.rating AS prediction FROM ALS_recs_temp LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings")

exploded_recs = spark.sql ("SELECT userId, explode(recommendations) AS MovieRec FROM ALS_recs_temp")

exploded_recs.show()


+------+-------------------+
|userId|           MovieRec|
+------+-------------------+
|     1|   {3925, 5.796346}|
|     1|  {177593, 5.64738}|
|     1|  {171495, 5.52247}|
|     1|   {3494, 5.496704}|
|     1|  {96004, 5.475513}|
|     2| {158783, 4.978201}|
|     2| {49347, 4.9653354}|
|     2| {131724, 4.902289}|
|     2| {31878, 4.7977524}|
|     2|{171495, 4.7108917}|
|     3|{158783, 5.7063923}|
|     3|  {6464, 5.1846757}|
|     3|  {6835, 4.9092855}|
|     3|  {5746, 4.9092855}|
|     3|   {5181, 4.849921}|
|     4| {25771, 5.1269784}|
|     4|   {720, 5.1052933}|
|     4|  {1283, 5.0271764}|
|     4|   {1250, 4.946855}|
|     4|   {3851, 4.916095}|
+------+-------------------+
only showing top 20 rows



In [16]:
clean_recs.show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|     1|   3925|  5.796346|
|     1| 177593|   5.64738|
|     1| 171495|   5.52247|
|     1|   3494|  5.496704|
|     1|  96004|  5.475513|
|     2| 158783|  4.978201|
|     2|  49347| 4.9653354|
|     2| 131724|  4.902289|
|     2|  31878| 4.7977524|
|     2| 171495| 4.7108917|
|     3| 158783| 5.7063923|
|     3|   6464| 5.1846757|
|     3|   6835| 4.9092855|
|     3|   5746| 4.9092855|
|     3|   5181|  4.849921|
|     4|  25771| 5.1269784|
|     4|    720| 5.1052933|
|     4|   1283| 5.0271764|
|     4|   1250|  4.946855|
|     4|   3851|  4.916095|
+------+-------+----------+
only showing top 20 rows



In [17]:
movie_info = spark.read.csv('movies.csv', 
                      inferSchema=True,header=True) 
  
movie_info.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [18]:
clean_recs = clean_recs.join(movie_info, ["movieId"], "left")
clean_recs.show()

+-------+------+----------+--------------------+--------------------+
|movieId|userId|prediction|               title|              genres|
+-------+------+----------+--------------------+--------------------+
|   3925|     1|  5.796346|Stranger Than Par...|        Comedy|Drama|
| 177593|     1|   5.64738|Three Billboards ...|         Crime|Drama|
| 171495|     1|   5.52247|              Cosmos|  (no genres listed)|
|   3494|     1|  5.496704|    True Grit (1969)|Adventure|Drama|W...|
|  96004|     1|  5.475513|Dragon Ball Z: Th...|Action|Adventure|...|
| 158783|     2|  4.978201|The Handmaiden (2...|Drama|Romance|Thr...|
|  49347|     2| 4.9653354|Fast Food Nation ...|               Drama|
| 131724|     2|  4.902289|The Jinx: The Lif...|         Documentary|
|  31878|     2| 4.7977524|Kung Fu Hustle (G...|       Action|Comedy|
| 171495|     2| 4.7108917|              Cosmos|  (no genres listed)|
| 158783|     3| 5.7063923|The Handmaiden (2...|Drama|Romance|Thr...|
|   6464|     3| 5.1

In [19]:
clean_recs.join(ratings, ["userId", "movieId"], "left").filter(ratings['rating'].isNull()).show()

+------+-------+----------+--------------------+--------------------+------+---------+
|userId|movieId|prediction|               title|              genres|rating|timestamp|
+------+-------+----------+--------------------+--------------------+------+---------+
|     1|   3925|  5.796346|Stranger Than Par...|        Comedy|Drama|  NULL|     NULL|
|     1| 177593|   5.64738|Three Billboards ...|         Crime|Drama|  NULL|     NULL|
|     1| 171495|   5.52247|              Cosmos|  (no genres listed)|  NULL|     NULL|
|     1|   3494|  5.496704|    True Grit (1969)|Adventure|Drama|W...|  NULL|     NULL|
|     1|  96004|  5.475513|Dragon Ball Z: Th...|Action|Adventure|...|  NULL|     NULL|
|     2| 158783|  4.978201|The Handmaiden (2...|Drama|Romance|Thr...|  NULL|     NULL|
|     2|  49347| 4.9653354|Fast Food Nation ...|               Drama|  NULL|     NULL|
|     2|  31878| 4.7977524|Kung Fu Hustle (G...|       Action|Comedy|  NULL|     NULL|
|     2| 171495| 4.7108917|              Co

In [20]:
#param_grid = ParamGridBuilder().addGrid(als.rank, [5, 40]).addGrid(als.maxIter, [5, 100]).addGrid(als.regParam, [.05, .1]).build()
 

In [21]:
#cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

In [22]:
#model = cv.fit(training_data)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "c:\Users\Rik\Desktop\Recommender_systems_with_Pyspark\rec_pyspark\lib\site-packages\IPython\core\interactiveshell.py", line 3577, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\Rik\AppData\Local\Temp\ipykernel_19504\3593649120.py", line 1, in <module>
    model = cv.fit(training_data)
  File "c:\Users\Rik\Desktop\Recommender_systems_with_Pyspark\rec_pyspark\lib\site-packages\pyspark\ml\base.py", line 205, in fit
    return self._fit(dataset)
  File "c:\Users\Rik\Desktop\Recommender_systems_with_Pyspark\rec_pyspark\lib\site-packages\pyspark\ml\tuning.py", line 847, in _fit
    for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks):
  File "C:\Users\Rik\.pyenv\pyenv-win\versions\3.10.11\lib\multiprocessing\pool.py", line 873, in next
    raise value
  File "C:\Users\Rik\.pyenv\pyenv-win\versions\3.10.11\lib\multiprocessing\pool.py", line 125, in worker


ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it