#### Project -5 Recommender System in Spark
#### Author :Santosh Manjrekar

In this assignment I will use spark framework for movie recommendations.

Install Spark framework on your machine and see if it is running properly.I used following tutorial to do that.

https://bigdata-madesimple.com/guide-to-install-spark-and-use-pyspark-from-jupyter-in-windows/

Let's check if insttalation looks good.


In [19]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello ")

df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



Let's start building movie recommender system using spark framework. We will use movielens 100k data.
First we will run the code locally and later deploy it to databricks in the cloud.

In [21]:
# read ratings CSV
ratings_df = spark.read.csv('ratings.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [22]:
ratings = ratings_df.rdd

print(ratings)

numRatings = ratings.count()
numUsers = ratings.map(lambda r: r[0]).distinct().count()
numMovies = ratings.map(lambda r: r[1]).distinct().count()

print("Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies))

MapPartitionsRDD[3395] at javaToPython at <unknown>:0
Got 100836 ratings from 610 users on 9724 movies.


In [24]:
# read movies CSV
movies_df = spark.read.csv('movies.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [25]:
movies_counts = ratings_df.groupBy("movieId").count()
movies_counts.show()

+-------+-----+
|movieId|count|
+-------+-----+
|   1580|  165|
|   2366|   25|
|   3175|   75|
|   1088|   42|
|  32460|    4|
|  44022|   23|
|  96488|    4|
|   1238|    9|
|   1342|   11|
|   1591|   26|
|   1645|   51|
|   4519|    9|
|   2142|   10|
|    471|   40|
|   3997|   12|
|    833|    6|
|   3918|    9|
|   7982|    4|
|   1959|   15|
|  68135|   10|
+-------+-----+
only showing top 20 rows



In [26]:
training_df, validation_df, test_df = ratings_df.randomSplit([.6, .2, .2])
# training_RDD = training_RDD.rdd.cache()
# validation_for_predict_RDD = validation_RDD.rdd.map(lambda x: (x[0], x[1])).cache()
# test_for_predict_RDD = test_RDD.rdd.map(lambda x: (x[0], x[1])).cache()
training_df

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [8]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import Row
import numpy as np
import math

In [27]:
#These are the diffrenet parameters which we can tune for the ALS model.
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = range(4, 12)
errors = []
err = 0
tolerance = 0.02

Let's see which rank value will be the most optimized one .

In [15]:
min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    als = ALS(maxIter=iterations, regParam=regularization_parameter, rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating")
    model = als.fit(training_df)
    predictions = model.transform(validation_df)
    print(type(predictions))
    predictions.describe()
    print(predictions.head(5))
    new_predictions = predictions[predictions['prediction'] != np.NaN]
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(new_predictions)
    errors.append(rmse)

    print('For rank %s the RMSE is %s' % (rank, rmse))
    if rmse < min_error:
        min_error = rmse
        best_rank = rank
print('The best model was trained with rank %s' % best_rank)

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(userId=133, movieId=471, rating=4.0, timestamp=843491793, prediction=3.143522024154663), Row(userId=597, movieId=471, rating=2.0, timestamp=941558175, prediction=3.702383518218994), Row(userId=500, movieId=471, rating=1.0, timestamp=1005528017, prediction=3.3468072414398193), Row(userId=57, movieId=471, rating=3.0, timestamp=969753604, prediction=3.749197244644165), Row(userId=610, movieId=471, rating=4.0, timestamp=1479544381, prediction=3.7941508293151855)]
For rank 4 the RMSE is 0.8993863379097063
<class 'pyspark.sql.dataframe.DataFrame'>
[Row(userId=133, movieId=471, rating=4.0, timestamp=843491793, prediction=2.8729283809661865), Row(userId=597, movieId=471, rating=2.0, timestamp=941558175, prediction=4.210461139678955), Row(userId=500, movieId=471, rating=1.0, timestamp=1005528017, prediction=2.8438453674316406), Row(userId=57, movieId=471, rating=3.0, timestamp=969753604, prediction=3.4386770725250244), Row(userId=610, movieId=471, 

Looks like rank=4 will be the best value.

I deployed above code to databricks.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6740759789383834/2719118021108212/8218724042301946/latest.html
    
I can immidiately see the performance improvement while running the code. The code runs much faster in the
databricks cluster environment. 



#### Conclusion:

1. We have built a recommender system using spark framework that uses movielens data to predict movie ratings for users, achieving an RMSE of 0.899.
2. The approach used is highly scalable, and can be used with computational clusters using HDFS for much larger data files.
3. By using a parallel architecture we can make better use of hardware instead of using a pythonic serial calculation approach. This reduces runtimes for larger calculations.
