<a href="https://colab.research.google.com/github/vaidehi31/RecommenderSystem/blob/main/HW4_Recommender_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark Installation

In [None]:
# installation 

from google.colab import drive
drive.mount('/content/drive')
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.mirrors.hoobly.com/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


# Recommeder System for MovieLens dataset

In [None]:
#Import libraries
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

from pyspark.sql import Row

In [None]:
!ls

drive  sample_data  spark-2.4.5-bin-hadoop2.7  spark-2.4.5-bin-hadoop2.7.tgz


# Step 1. Read data - Import the MovieLens Dataset

In [None]:
#load ratings data from the MovieLens dataset, each row consisting of a user, a movie, a rating and a timestamp
lines = spark.read.text("/content/drive/Shared drives/Analytics for Big Data/HW4/ml-100k/u.data").rdd  
parts = lines.map(lambda row: row.value.split("\t"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=float(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
ratings.show()

+-------+------+------------+------+
|movieId|rating|   timestamp|userId|
+-------+------+------------+------+
|    242|   3.0|8.81250949E8|   196|
|    302|   3.0|8.91717742E8|   186|
|    377|   1.0|8.78887116E8|    22|
|     51|   2.0|8.80606923E8|   244|
|    346|   1.0|8.86397596E8|   166|
|    474|   4.0|8.84182806E8|   298|
|    265|   2.0|8.81171488E8|   115|
|    465|   5.0|8.91628467E8|   253|
|    451|   3.0|8.86324817E8|   305|
|     86|   3.0|8.83603013E8|     6|
|    257|   2.0|8.79372434E8|    62|
|   1014|   5.0|8.79781125E8|   286|
|    222|   5.0| 8.7604234E8|   200|
|     40|   3.0|8.91035994E8|   210|
|     29|   3.0|8.88104457E8|   224|
|    785|   3.0|8.79485318E8|   303|
|    387|   5.0|8.79270459E8|   122|
|    274|   2.0|8.79539794E8|   194|
|   1042|   4.0|8.74834944E8|   291|
|   1184|   2.0|8.92079237E8|   234|
+-------+------+------------+------+
only showing top 20 rows



In [None]:
#Dropping Timestamp column
ratings = ratings.drop("timestamp")
ratings.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|    242|   3.0|   196|
|    302|   3.0|   186|
|    377|   1.0|    22|
|     51|   2.0|   244|
|    346|   1.0|   166|
|    474|   4.0|   298|
|    265|   2.0|   115|
|    465|   5.0|   253|
|    451|   3.0|   305|
|     86|   3.0|     6|
|    257|   2.0|    62|
|   1014|   5.0|   286|
|    222|   5.0|   200|
|     40|   3.0|   210|
|     29|   3.0|   224|
|    785|   3.0|   303|
|    387|   5.0|   122|
|    274|   2.0|   194|
|   1042|   4.0|   291|
|   1184|   2.0|   234|
+-------+------+------+
only showing top 20 rows



# Split training and testing

In [None]:
(training, test) = ratings.randomSplit([0.8, 0.2])

# Step 2. Build the recommendation model using ALS

In [None]:
als_original = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True)
model = als_original.fit(training)

# Step 3. Reporting the Original Performance

In [None]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Original Root-mean-square error = " + str(rmse))

Original Root-mean-square error = nan


As we can see rmse is coming out to be nan. This is the cold start problem.

In this, Spark assigns NaN predictions during ALSModel. To solve this problem, spark allows dropping rows in the DataFrame of predictions that contain NaN values. It is done by setting coldStartStrategy parameter to "drop"

# Step 4a. Solving the cold start problem

In [None]:
als_new = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, coldStartStrategy= "drop")
model = als_new.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9165699485658979


The RMSE is coming out to be 0.916, now trying to imrpove the performance using Cross Validation

# Step 4b. Performance improvement using Cross Validation

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

model_new = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, coldStartStrategy="drop")

# Parameters for tuning
paramGrid = ParamGridBuilder() \
    .addGrid(model_new.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(model_new.rank, [5, 10, 15]) \
    .build()

crossvalidation = CrossValidator(estimator = model_new,
                     estimatorParamMaps = paramGrid,
                     evaluator = evaluator,
                     numFolds=10)

#Using the Best Model
model_cv = crossvalidation.fit(training).bestModel

#Evaluate and print the predictions
print("RMSE value after solving cold start problem is: ", evaluator.evaluate(model_cv.transform(test)))

RMSE value after solving cold start problem is:  0.9178132964212179


As we can see, even after CV there isn't much improvement.

# Step 5. Top 10 movies for all the users 

In [None]:
recommendations = model_cv.recommendForAllUsers(10)
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[867, 4.883415],...|
|   463|[[19, 4.4390416],...|
|   833|[[1368, 5.366121]...|
|   496|[[320, 4.9004207]...|
|   148|[[1104, 5.207343]...|
|   540|[[1449, 4.8793826...|
|   392|[[1449, 5.308921]...|
|   243|[[1512, 4.404202]...|
|   623|[[1450, 4.577325]...|
|   737|[[1449, 4.739571]...|
|   897|[[1368, 5.0980883...|
|   858|[[1512, 4.4579964...|
|    31|[[1463, 5.4224477...|
|   516|[[1449, 4.6970286...|
|   580|[[793, 5.293725],...|
|   251|[[1643, 4.909021]...|
|   451|[[1269, 4.6691117...|
|    85|[[1643, 4.5083456...|
|   137|[[1500, 5.837367]...|
|   808|[[1449, 5.558436]...|
+------+--------------------+
only showing top 20 rows



In [None]:
import pandas as pd

recommendations = recommendations.toPandas()

In [None]:
#Initialize lists that will be used for converting to dataframe
list_users = []
list_recs = []

#Iterate over the whole data set
for i in range(len(recommendations)):
  #Add userId to user list
  list_users.append(recommendations.iloc[i,0])
  
  #Initialize a string for storing a given user's recommendations
  user_recs = "" 

  #Iterate over all recommendations and pick the movieIds
  for item in recommendations.iloc[i,1]:
    user_recs = user_recs + ", " + str(item.asDict()["movieId"])

  list_recs.append(user_recs[2:])

recommendations_df = pd.DataFrame(data = zip(list_users, list_recs), columns=["user", "recommendations"])
recommendations_df.head()

Unnamed: 0,user,recommendations
0,471,"867, 1598, 1468, 102, 189, 263, 1154, 916, 477..."
1,463,"19, 947, 958, 1449, 963, 835, 318, 1642, 740, ..."
2,833,"1368, 1463, 320, 1104, 1643, 838, 922, 1597, 8..."
3,496,"320, 1664, 1449, 793, 475, 75, 1467, 1114, 613..."
4,148,"1104, 1449, 1643, 512, 344, 168, 408, 1607, 43..."


In [None]:
#Write to a text file
with open("recommendations.txt", "w") as f:
  f.write("userId\trecommendations\n")
  for i in range(len(recommendations_df)):
    f.write(str(recommendations_df.iloc[i,0]) + "\t" + recommendations_df.iloc[i,1] + "\n")