In [1]:
import findspark
findspark.init('/home/prateek/spark-3.0.0-preview2-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MovieRC').getOrCreate()

In [2]:
from pyspark.sql.functions import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
rating_df = spark.read.csv('ratings.csv', header=True, inferSchema=True)
movie_df = spark.read.csv('movies.csv', header=True, inferSchema=True)
rating_df.printSchema(), movie_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



(None, None)

In [4]:
rating_df.show(5), movie_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



(None, None)

In [5]:
rating_df.describe().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+------------------+--------------------+



In [6]:
movie_df.describe().show()

+-------+------------------+--------------------+------------------+
|summary|           movieId|               title|            genres|
+-------+------------------+--------------------+------------------+
|  count|              9742|                9742|              9742|
|   mean|42200.353623485935|                null|              null|
| stddev| 52160.49485443825|                null|              null|
|    min|                 1|"11'09""01 - Sept...|(no genres listed)|
|    max|            193609|À nous la liberté...|           Western|
+-------+------------------+--------------------+------------------+



In [7]:
cols = rating_df.columns
rating_df.select([count(when(isnan(a) | col(a).isNull(), a)).alias(a) for a in cols]).show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+



In [23]:
rating_df[rating_df['rating']!=0].count(), movie_df[movie_df['movieId']!=0].count()

(100836, 9742)

In [10]:
rating_df = rating_df.drop('timestamp')
rating_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [11]:
train, test = rating_df.randomSplit([0.7,0.3])

In [12]:
alsrec = ALS(maxIter=20, regParam=0.1, userCol='userId', itemCol='movieId', ratingCol='rating')
recmodel = alsrec.fit(train)
recmodel.setColdStartStrategy('drop')
preds = recmodel.transform(test)
preds.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   436|    471|   3.0|  4.135099|
|    91|    471|   1.0| 2.7659729|
|   599|    471|   2.5| 3.0338988|
|   603|    471|   4.0| 3.9998624|
|   387|    471|   3.0|  3.025355|
+------+-------+------+----------+
only showing top 5 rows



In [13]:
model_evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [14]:
rmse = model_evaluator.evaluate(preds)
rmse

0.8971392256494236

In [37]:
user = test.filter(test['userId']==13).select(['userId','movieId'])
user.show(5)

+------+-------+
|userId|movieId|
+------+-------+
|    13|   1173|
|    13|   3717|
|    13|   3753|
|    13|   3793|
|    13|   3863|
+------+-------+
only showing top 5 rows



In [38]:
rec = recmodel.transform(user)
rec.orderBy('prediction', ascending=False).show(10)

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|    13|   3793| 4.0666285|
|    13|   3897| 3.9308279|
|    13|   3753| 3.8411844|
|    13|   3717| 3.6615844|
|    13|   3863| 2.8762736|
|    13|   1173|  2.869637|
+------+-------+----------+



In [39]:
rec.createOrReplaceTempView('Recommendations')
movie_df.createOrReplaceTempView('Movies')

In [40]:
recommendation = spark.sql('select r.userId, m.title from Recommendations r, Movies m where r.movieId=m.movieId')
recommendation.show()

+------+--------------------+
|userId|               title|
+------+--------------------+
|    13| Patriot, The (2000)|
|    13|        X-Men (2000)|
|    13|    Cell, The (2000)|
|    13|Gone in 60 Second...|
|    13|Almost Famous (2000)|
|    13|Cook the Thief Hi...|
+------+--------------------+

