In [None]:
import findspark
findspark.init('/home/toyesh30/spark-3.1.2-bin-hadoop3.2')

In [None]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
import warnings
warnings.filterwarnings('ignore')

# Spark session

In [None]:
from pyspark.sql import SparkSession
sc = SparkContext
spark = SparkSession.builder.appName('MovieRecommendation').getOrCreate()

#Loading data

In [None]:
movies = spark.read.csv("movies.csv",inferSchema = True, header=True)
ratings = spark.read.csv("ratings.csv",inferSchema = True, header=True)
tags = spark.read.csv("tags.csv",inferSchema = True, header = True)

                                                                                

In [None]:
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [None]:
ratings = ratings.\
    withColumn('rating', col('rating').cast('float')).\
    drop('timestamp')
ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [None]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [None]:
merge1 = ratings.join(movies, ['movieId'], 'left')
merge1.show(5)

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|      1|     1|   4.0|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|Usual Suspects, T...|Crime|Mystery|Thr...|
+-------+------+------+--------------------+--------------------+
only showing top 5 rows



In [None]:
tags.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [None]:
merge2 = tags.join(ratings, ['movieID'], 'left')
merge2.show(5)

[Stage 12:>                                                         (0 + 1) / 1]                                                                                

+-------+------+-----+----------+------+------+
|movieId|userId|  tag| timestamp|userId|rating|
+-------+------+-----+----------+------+------+
|  60756|     2|funny|1445714994|   599|   2.5|
|  60756|     2|funny|1445714994|   583|   3.0|
|  60756|     2|funny|1445714994|   564|   5.0|
|  60756|     2|funny|1445714994|   560|   3.5|
|  60756|     2|funny|1445714994|   484|   4.5|
+-------+------+-----+----------+------+------+
only showing top 5 rows



## EDA

In [None]:
total_ratings = ratings.select("rating").count()
total_ratings

100836

In [None]:
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

sparsity = (1.0 - (total_ratings *1.0)/(num_users*num_movies))*100
print(f"The Sparsity of data is {sparsity:.2f}%")



The Sparsity of data is 98.30%


                                                                                

## Top 10 active users

In [None]:
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.orderBy('count', ascending = False).show(10)



+------+-----+
|userId|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
+------+-----+
only showing top 10 rows



                                                                                

# Top 10 movies with most ratings

In [None]:
movieId_ratings = ratings.groupBy("movieId").count().orderBy('count', ascending=False)
movieId_ratings.join(movies, ['movieID'], 'left').orderBy('count', ascending = False).select('title', 'count').show(10)



+--------------------+-----+
|               title|count|
+--------------------+-----+
| Forrest Gump (1994)|  329|
|Shawshank Redempt...|  317|
| Pulp Fiction (1994)|  307|
|Silence of the La...|  279|
|  Matrix, The (1999)|  278|
|Star Wars: Episod...|  251|
|Jurassic Park (1993)|  238|
|   Braveheart (1995)|  237|
|Terminator 2: Jud...|  224|
|Schindler's List ...|  220|
+--------------------+-----+
only showing top 10 rows



                                                                                

## ALS Pipeline

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

## Tuning

In [None]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 30, 50, 70 ]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .addGrid(als.maxIter, [5, 10, 15, 20]) \
            .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print(f"Num models to be tested:{len(param_grid)} ")

Num models to be tested:64 


## Cross validation 

In [None]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
model = cv.fit(train)

21/08/10 17:07:40 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/08/10 17:07:40 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [None]:
best_model = model.bestModel

In [None]:
print("Best Model Attributes:\n")

print("  Rank:", best_model._java_obj.parent().getRank())

print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

print("  RegParam:", best_model._java_obj.parent().getRegParam())

Best Model Attributes:

  Rank: 70
  MaxIter: 5
  RegParam: 0.15


In [None]:
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)



0.8690282244549132




In [None]:
test_predictions.show()



+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   597|    471|   2.0|  4.008968|
|   436|    471|   3.0| 3.6839767|
|   218|    471|   4.0| 2.9261594|
|   387|    471|   3.0|  3.043863|
|   217|    471|   2.0|  2.749609|
|   287|    471|   4.5| 2.8179712|
|    32|    471|   3.0|  3.689608|
|   260|    471|   4.5| 3.4837396|
|   104|    471|   4.5| 3.5412128|
|   111|   1088|   3.0|  3.217746|
|   177|   1088|   3.5| 3.5217268|
|    41|   1088|   1.5| 2.7708287|
|   387|   1088|   1.5| 2.6513364|
|   594|   1088|   4.5|  4.230787|
|   307|   1088|   3.0|   2.57052|
|   509|   1088|   3.0| 3.0907078|
|   104|   1088|   3.0| 3.6971767|
|   268|   1238|   5.0| 3.8791125|
|   462|   1238|   3.5| 3.4524965|
|   307|   1342|   2.0| 2.0326793|
+------+-------+------+----------+
only showing top 20 rows





## Recommendations

In [None]:
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show(10)



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{3379, 4.9617186...|
|   463|[{3379, 5.0728035...|
|   496|[{3379, 4.5845876...|
|   148|[{33649, 4.50006}...|
|   540|[{3379, 5.5229683...|
|   392|[{3379, 4.975219}...|
|   243|[{3379, 5.824365}...|
|    31|[{3379, 5.1254587...|
|   516|[{3379, 4.926261}...|
|   580|[{3379, 4.811471}...|
+------+--------------------+





In [None]:
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrecommendations.limit(10).show()



+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|   471|   3379|4.9617186|
|   471|   8477| 4.803492|
|   471|  33649| 4.609002|
|   471|   6201| 4.581375|
|   471| 171495| 4.576506|
|   471|  33779|4.5385003|
|   471| 102217|4.5385003|
|   471|  92494|4.5385003|
|   471|   7096|4.4979353|
|   471|  78836| 4.487687|
+------+-------+---------+





## For a particular user, let's say user 471

In [None]:
nrecommendations.join(movies, on='movieId').filter('userId = 471').show()

                                                                                

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|   3379|   471|4.9617186| On the Beach (1959)|               Drama|
|   8477|   471| 4.803492|    Jetée, La (1962)|      Romance|Sci-Fi|
|  33649|   471| 4.609002|  Saving Face (2004)|Comedy|Drama|Romance|
|   6201|   471| 4.581375|    Lady Jane (1986)|       Drama|Romance|
| 171495|   471| 4.576506|              Cosmos|  (no genres listed)|
|  33779|   471|4.5385003|Eddie Izzard: Dre...|              Comedy|
| 102217|   471|4.5385003|Bill Hicks: Revel...|              Comedy|
|  92494|   471|4.5385003|Dylan Moran: Mons...|  Comedy|Documentary|
|   7096|   471|4.4979353|Rivers and Tides ...|         Documentary|
|  78836|   471| 4.487687|Enter the Void (2...|               Drama|
+-------+------+---------+--------------------+--------------------+



In [None]:
ratings.join(movies, on='movieId').filter('userId = 471').sort('rating', ascending=False).limit(10).show()

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|      1|   471|   5.0|    Toy Story (1995)|Adventure|Animati...|
|   2324|   471|   5.0|Life Is Beautiful...|Comedy|Drama|Roma...|
|  79702|   471|   5.0|Scott Pilgrim vs....|Action|Comedy|Fan...|
|    527|   471|   4.5|Schindler's List ...|           Drama|War|
|  60069|   471|   4.5|       WALL·E (2008)|Adventure|Animati...|
|  78499|   471|   4.5|  Toy Story 3 (2010)|Adventure|Animati...|
|  92259|   471|   4.5| Intouchables (2011)|        Comedy|Drama|
| 158966|   471|   4.5|Captain Fantastic...|               Drama|
| 168252|   471|   4.5|        Logan (2017)|       Action|Sci-Fi|
|   7147|   471|   4.0|     Big Fish (2003)|Drama|Fantasy|Rom...|
+-------+------+------+--------------------+--------------------+

