## Running this Notebook using Pyspark in the my Anaconda environment(Not in EMR cluster) 

For proceeding further, you must have installed JDK 8, Apache Spark and PySpark on your local system depending on the
OS and successfully run PySpark in your Jupyter Notebook.

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.types import *

In [4]:
spark = SparkSession \
    .builder \
    .appName("Netflix Movie title") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


In [59]:
file_path = r'C:\Users\user\Desktop\Distributed and Scalable data eng\Netflix\TrainingRatings.txt'

In [9]:
schema1 = StructType([
    StructField("MovieID", IntegerType(), True),
    StructField("UserID", IntegerType(), True),
    StructField("Ratings", DoubleType(), True)])

In [10]:
df_train = spark.read \
 .schema(schema1) \
 .option("inferSchema", "True") \
 .csv(file_path)

In [11]:
df_train.printSchema()

root
 |-- MovieID: integer (nullable = true)
 |-- UserID: integer (nullable = true)
 |-- Ratings: double (nullable = true)



In [12]:
df_train.show(5)

+-------+-------+-------+
|MovieID| UserID|Ratings|
+-------+-------+-------+
|      8|1744889|    1.0|
|      8|1395430|    2.0|
|      8|1205593|    4.0|
|      8|1488844|    4.0|
|      8|1447354|    1.0|
+-------+-------+-------+
only showing top 5 rows



In [13]:
df_train.count()

3255352

In [17]:
df_train.describe().show()

+-------+------------------+-----------------+------------------+
|summary|           MovieID|           UserID|           Ratings|
+-------+------------------+-----------------+------------------+
|  count|           3255352|          3255352|           3255352|
|   mean|  8724.66010434509|1327058.466035931| 3.481187595074204|
| stddev|5107.4015117381605|762688.6901278322|1.0828732789156674|
|    min|                 8|                7|               1.0|
|    max|             17742|          2649285|               5.0|
+-------+------------------+-----------------+------------------+



## Using Model-based approach for Collaborative filtering

### Alternative Least Squares model

In [21]:
from pyspark.ml.recommendation import ALS

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="UserID", itemCol="MovieID", ratingCol="Ratings")
model = als.fit(df_train)


In [22]:
# Getting test data into another dataframe
schema2 = StructType([
    StructField("MovieID", IntegerType(), True),
    StructField("UserID", IntegerType(), True),
    StructField("Ratings", DoubleType(), True)])

In [24]:
file_path_test = r'C:\Users\user\Desktop\Distributed and Scalable data eng\Netflix\TestingRatings.txt'

In [26]:
#Building Testing Dataframe
df_test = spark.read \
 .schema(schema2) \
 .option("inferSchema", "True") \
 .csv(file_path_test)

In [27]:
df_test.show(5)

+-------+-------+-------+
|MovieID| UserID|Ratings|
+-------+-------+-------+
|      8| 573364|    1.0|
|      8|2149668|    3.0|
|      8|1089184|    3.0|
|      8|2465894|    3.0|
|      8| 534508|    1.0|
+-------+-------+-------+
only showing top 5 rows



In [101]:
df_test.count()

100478

### Finding Distict Movies and Users in the testing dataset

In [98]:
# getting the count of unique Users
df_uniqueUsers = df_test.groupBy(['UserID']).count()
df_uniqueUsers.describe().show()
# Unique count = 27555

+-------+------------------+------------------+
|summary|            UserID|             count|
+-------+------------------+------------------+
|  count|             27555|             27555|
|   mean|1325637.6160406459|3.6464525494465616|
| stddev| 762738.5020270856|2.3513183230802865|
|    min|                 7|                 1|
|    max|           2649285|                70|
+-------+------------------+------------------+



In [99]:
# getting the count of unique Movies in testing data
df_uniqueMovies = df_test.groupBy(['MovieID']).count()
df_uniqueMovies.describe().show()
#Unique count = 1701

+-------+-----------------+------------------+
|summary|          MovieID|             count|
+-------+-----------------+------------------+
|  count|             1701|              1701|
|   mean| 8919.66784244562| 59.06995884773662|
| stddev|5168.709499952631|121.58856049833442|
|    min|                8|                 1|
|    max|            17742|               811|
+-------+-----------------+------------------+



### Making predictions from Test data

In [28]:
predictions = model.transform(df_test)

In [70]:
predictions.orderBy('prediction',ascending=False).show()

+-------+-------+-------+----------+
|MovieID| UserID|Ratings|prediction|
+-------+-------+-------+----------+
|    218|2302897|    4.0|  7.535534|
|  15480|1460800|    4.0|  5.914879|
|  12700|2173152|    4.0|  5.899558|
|  14283|1229656|    5.0| 5.8130255|
|  14808| 942442|    5.0|  5.800982|
|  14808|1178384|    3.0| 5.7909117|
|  12293| 304581|    5.0|  5.781457|
|  12184|1909349|    5.0|  5.780202|
|    844|2387369|    5.0| 5.7670455|
|  16902|   5980|    3.0|  5.765463|
|   1848| 761430|    5.0|  5.762156|
|  16707|1823836|    5.0| 5.7180157|
|   8933| 117581|    4.0| 5.7034674|
|   6281|2510301|    5.0| 5.6293592|
|   7400|1952860|    5.0| 5.6181192|
|  12293|1001464|    5.0| 5.6138134|
|   2430|1019603|    5.0| 5.5977087|
|   7505|2429098|    5.0|  5.596356|
|   9689| 735560|    5.0| 5.5950675|
|  16022| 657521|    5.0|   5.59129|
+-------+-------+-------+----------+
only showing top 20 rows



### Evaluating the predictions made using metrics : Root Mean Squared Error and Absolute Mean Error

In [33]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="Ratings",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root means quare error = " + str(rmse))


Root means quare error = 0.8493166767705578


In [35]:
evaluator_2 = RegressionEvaluator(metricName="mae", labelCol="Ratings",predictionCol="prediction")
mae = evaluator_2.evaluate(predictions)
print("Mean Absolute Error = " + str(mae))


Mean Absolute Error = 0.6668206177535538


In [60]:
"""Adding a new user. Let it be the user '1' and let's say I have watched 10 movies and did rating them.I have manually 
added in Training set and creating a new dataframe. I'll be outputing predictions for the movies I haven't watched. """

file_path_addedUser1 = r'C:\Users\user\Desktop\Distributed and Scalable data eng\Netflix\TrainingRatings_addedUser1.txt'
df_train_addedUser1 = spark.read \
 .schema(schema1) \
 .option("inferSchema", "True") \
 .csv(file_path_addedUser1)


In [61]:
df_train_addedUser1.describe().show()

+-------+-----------------+------------------+------------------+
|summary|          MovieID|            UserID|           Ratings|
+-------+-----------------+------------------+------------------+
|  count|          3255362|           3255362|           3255362|
|   mean| 8724.63380048056|1327054.3895078336|3.4811864241211885|
| stddev|5107.415718447187| 762691.0652031554|1.0828736515089448|
|    min|                8|                 1|               1.0|
|    max|            17742|           2649285|               5.0|
+-------+-----------------+------------------+------------------+



In [62]:
user_1 = df_train_addedUser1.filter(df_train_addedUser1['UserID']==1).select(['UserID','MovieID','Ratings'])
user_1.show()

+------+-------+-------+
|UserID|MovieID|Ratings|
+------+-------+-------+
|     1|     69|    4.0|
|     1|     91|    4.0|
|     1|     76|    3.0|
|     1|    129|    2.0|
|     1|    152|    1.0|
|     1|    183|    3.0|
|     1|    192|    4.0|
|     1|    204|    5.0|
|     1|    233|    3.0|
|     1|    289|    2.0|
+------+-------+-------+



In [71]:
model2 = als.fit(df_train_addedUser1)

In [72]:
recommendations = model2.transform(df_test)

In [73]:
#Obtaining the top 10 recommendations for the UserID = 1 
recommendations.orderBy('prediction',ascending=False).show()

+-------+-------+-------+----------+
|MovieID| UserID|Ratings|prediction|
+-------+-------+-------+----------+
|    218|2302897|    4.0|  6.608642|
|   6991|1390768|    4.0| 6.1331286|
|  16902|   5980|    3.0|  5.923931|
|   8561| 685113|    5.0| 5.8187985|
|  10578|1476006|    5.0|  5.794419|
|   6756|2614232|    5.0| 5.7690296|
|  12184|1909349|    5.0| 5.7673874|
|  12232|2617068|    5.0|  5.711573|
|  14808|1178384|    3.0|  5.701107|
|   3824|1663569|    5.0|  5.676264|
|   8933| 117581|    4.0| 5.6736484|
|  14808| 942442|    5.0| 5.6624107|
|   1848| 761430|    5.0| 5.6581693|
|    359| 501823|    5.0|  5.640541|
|  10947|1783036|    5.0|  5.629932|
|  16707|1823836|    5.0|  5.629205|
|  12293| 304581|    5.0| 5.6265254|
|  15183|1391668|    5.0| 5.6258717|
|   3928| 753532|    5.0| 5.6249366|
|  15183|1476102|    4.0|  5.617443|
+-------+-------+-------+----------+
only showing top 20 rows



In [110]:
# Getting Movie titles into another dataframe
schema3 = StructType([
    StructField("MovieID", IntegerType(), True),
    StructField("YearOfRelease", IntegerType(), True),
    StructField("MovieName", StringType(), True)])

In [114]:
movies_titles_df = spark.read \
 .schema(schema3) \
 .option("inferSchema", "True") \
 .csv(r'C:\Users\user\Desktop\Distributed and Scalable data eng\Netflix\movie_titles.txt')

In [115]:
movies_titles_df.show(5)

+-------+-------------+--------------------+
|MovieID|YearOfRelease|           MovieName|
+-------+-------------+--------------------+
|      1|         2003|     Dinosaur Planet|
|      2|         2004|Isle of Man TT 20...|
|      3|         1997|           Character|
|      4|         1994|Paula Abdul's Get...|
|      5|         2004|The Rise and Fall...|
+-------+-------------+--------------------+
only showing top 5 rows



In [119]:
 movies_titles_df.filter(movies_titles_df['MovieID']==218).select(['YearOfRelease','MovieName']).show()
# Triumph is the first movie recommended to me with high prediction Rating.  

+-------------+--------------------+
|YearOfRelease|           MovieName|
+-------------+--------------------+
|         1983|Triumph: Live at ...|
+-------------+--------------------+

