<a href="https://colab.research.google.com/github/krishangi-deka/Movie-Prediction-Engine/blob/main/MovieRecSystem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Movie Recommender System - Using Spark**

Installing Spark and JDK and Setting environment variables.

In [None]:
%%sh
apt-get install openjdk-8-jdk-headless -qq > /dev/null
wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
tar -xvf spark-3.0.0-bin-hadoop3.2.tgz
pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

Creating a local Spark Session.

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql import functions as F
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

Mounting on Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/Shared drives/IDS561/HW5
#!ls

/content/drive/Shared drives/IDS561/HW5


## **STEP 1**
Importing file and extracting columns to be used for Assignment

In [None]:
data = spark.read.option("delimiter", "\t").csv("/content/drive/Shared drives/IDS561/HW5/u.data").toDF("user_id","item_id", "rating", "timestamp")
#data.show()

In [None]:
df = data.drop('timestamp')  #keeping only the relevant columns for data processing.
#df.show()

In [None]:
#Converting data type from string to integer
from pyspark.sql.types import IntegerType
df = df.withColumn("user_id", df["user_id"].cast(IntegerType()))   
df = df.withColumn("item_id", df["item_id"].cast(IntegerType()))    
df = df.withColumn("rating", df["rating"].cast(IntegerType()))

In [None]:
df = df.sort(df.user_id.asc())  #data sorted by user_id
#df.show()

## **STEP 2**
Building a recommendation model using Alternating Least Squares

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
#Create test and train set
(train, test) = df. randomSplit([0.8, 0.2])

In [None]:
#Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [None]:
#Create ALS model
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", nonnegative = True)
model = als.fit(train)

In [None]:
#Generate predictions
pred = model.transform(test)

## **STEP 3**
Original RMSE Calculation

In [None]:
#Evaluate using RMSE and print evaluation metrics 
rmse = evaluator.evaluate(pred)
print("Root mean square error(RMSE) = " + str(rmse))

Root mean square error(RMSE) = nan


## **STEP 4**
Part 1: Solving the cold-start problem

In [None]:
#We got mean square error as nan because we did not solve the cold start problem.
#What is a cold start problem? Cold-start problem refers to when the system cannot draw any inferences for users or items about which it has not 
#yet gathered sufficient information. 

#Retraining the ALS model
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy = "drop", nonnegative = True)
model = als.fit(train)

In [None]:
#Print the new RMSE
print("New RMSE = ", evaluator.evaluate(model.transform(test)))

New RMSE =  0.92067680707247


In [None]:
#Print other model parameters
print("Rank: ", model.rank)
print("Max Iter: ", model._java_obj.parent().getMaxIter())
print("Reg Param: ", model._java_obj.parent().getRegParam())

Rank:  10
Max Iter:  10
Reg Param:  0.1


Part 2: Perform 10-fold cross validation

In [None]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

model = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy = "drop", nonnegative = True)

#Tuning model with ParamGridBuilder
param_grid = ParamGridBuilder() \
    .addGrid(model.regParam, [0.05,0.1, 0.01, 0.001]) \
    .addGrid(model.rank, [5, 10, 15,20]) \
    .build()
#print(param_grid)

In [None]:
#Build a 10 fold cross validation
crossvalidation = CrossValidator(estimator = model, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds=10)

#Fit ALS model to training data
best_model = crossvalidation.fit(train).bestModel

#Extract best model from the tuning exercise using ParamGridBuilder
#best_model = model.bestModel

## **STEP 5**
RMSE after model optimization

In [None]:
#Calculate the RMSE on test data using the best set of parameters obtained after cross validation
print("RMSE value after solving cold start problem is: ", evaluator.evaluate(best_model.transform(test)))

RMSE value after solving cold start problem is:  0.9174356367648316


**The model performance has been improved by 0.32% after performing cross validation**

## **STEP 6**
Output top 10 movies for all the users

In [None]:
movie_rec = best_model.recommendForAllUsers(10)   #top 10 movie recommendations for all users
#movie_rec.show()

In [None]:
import pandas as pd
movie_rec = movie_rec.toPandas()
#movie_rec.head()

In [None]:
users = []
recommendations = []
#For all data iterations
for i in range(len(movie_rec)):

  users.append(movie_rec.iloc[i,0])         #Add user_id to list
  user_recs = "" 

  for item in movie_rec.iloc[i,1]:          #Fetching only the item ID's from the recommendations
    user_recs = user_recs + ", " + str(item.asDict()["item_id"])
  
  recommendations.append(user_recs[2:])     #Append the itemID's to a list

#Create a dataframe with the appended data
recommendations_df = pd.DataFrame(data = zip(users, recommendations), columns=["UserID", "ItemID"])

In [None]:
#Displaying users and movie recommendations(top 10 movies) for the first 10 users
recommendations_df.head(10)

Unnamed: 0,UserID,ItemID
0,471,"1233, 936, 1643, 1159, 1394, 909, 459, 1005, 9..."
1,463,"1449, 6, 611, 408, 100, 958, 169, 114, 1512, 119"
2,833,"179, 1463, 646, 32, 1367, 853, 320, 1558, 1187..."
3,496,"320, 1467, 114, 42, 899, 1463, 190, 10, 61, 56"
4,148,"169, 408, 921, 1167, 1449, 1019, 513, 745, 114..."
5,540,"1398, 1449, 169, 1193, 316, 515, 408, 1122, 64..."
6,392,"1463, 483, 318, 1643, 1449, 178, 963, 1142, 48..."
7,243,"1449, 1193, 408, 1463, 1398, 483, 1643, 134, 2..."
8,623,"1463, 50, 483, 174, 1169, 478, 1449, 694, 659,..."
9,737,"1643, 127, 56, 1449, 60, 1142, 1558, 119, 156,..."


In [None]:
#write to a text file
recommendations_df.to_csv('/content/drive/Shared drives/IDS561/HW5/recommendation_output.txt', sep='\t', index=False)