In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Java JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Downloading Spark
!wget -q http://apache.mirrors.pair.com/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz 
#Unzipping the hadoop file
!tar -xvf spark-3.0.1-bin-hadoop3.2.tgz

In [None]:
###################### SPARK SETUP ################################
#Install findspark
!pip install -q findspark

In [None]:
#Setting up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [None]:
#Initialize Spark session using findspark lib
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#setting the path of the files
reviews_file ='data/hotels/details/reviews_clean.csv'
hotels_file = 'data/hotels/details/hotel_info_clean.csv'

In [None]:

def readFiles(filename):
    data = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load(filename,header=True)
    return data

In [None]:

#Read the data files
reviews = readFiles(reviews_file)
hotels = readFiles(hotels_file)

In [None]:
reviews.show(5)

In [None]:
hotels.show(5)

In [None]:
data = reviews
#print the schema now and check that timestamp column is dropped
data.printSchema()

In [None]:
# Randomly split the data into train and test where 80% data is in train and remaining is test
train, test = data.randomSplit([0.8, 0.2])

In [None]:
test.printSchema()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
# Build a recommendation model using Alternating Least Squares method
# Evaluate the model by computing the RMSE on the test data
model = ALS(userCol="user_id", itemCol="hotel_id", ratingCol="rating", nonnegative=True, coldStartStrategy="drop", maxIter=10).fit(train)

from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

In [None]:
# Make predictions and print the RMSE of the ALS model
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("New RMSE: ", evaluator.evaluate(model.transform(test)))

In [None]:
#Implementing ALS with Cross Validation
    
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

In [None]:
# Now we try to improve the performance of the original model using cross validation and solve the cold-start problem.
# we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model = ALS(userCol="user_id", itemCol="hotel_id", ratingCol="rating", nonnegative=True, coldStartStrategy="drop", maxIter=10)

#For Parameter tuning of the ALS model we use ParamGridBuilder function
#We tune two parameters 
#1. The Regularization parameter ranging from 0.1, 0.01, 0.001, 0.0001
#2. The rank for matrix factorization
paramGrid = ParamGridBuilder() \
    .addGrid(model.regParam, [0.1, 0.05, 0.01, 0.001]) \
    .addGrid(model.rank, [5, 10, 20, 30]) \
    .build()

#Defining a cross-validator object
#Setting up CV and adding parameters. We will be performing a 5 fold CV
crossvalidation = CrossValidator(estimator = model,
                     estimatorParamMaps = paramGrid,
                     evaluator = evaluator,
                     numFolds=5)

In [None]:
# Run cross-validation, and choose the best set of parameters.
Best_model = crossvalidation.fit(train).bestModel

In [None]:
#The Best_model
print(type(Best_model))
#Complete the code below to extract the ALS model parameters
print("**Best Model**")
#Rank
print("Rank: ", Best_model._java_obj.parent().getRank())
#MaxIter
print("MaxIter: ", Best_model._java_obj.parent().getMaxIter())
#RegParam
print("RegParam: ", Best_model._java_obj.parent().getRegParam())

In [None]:

# Calculate the RMSE on test data using the best set of parameters obtained after cross validation
print("Best RMSE value is: ", evaluator.evaluate(Best_model.transform(test)))

In [None]:

# pred = Best_model.transform(test)
# pred.show(10)
from pyspark.sql.functions import explode_outer

preds = Best_model.recommendForAllUsers(5)
preds.show(10)


In [None]:
final_preds = preds.select(preds.user_id, explode_outer(preds.recommendations).alias("recommendation"))
final_preds.show(10)
hotel_recommendation = final_preds.toPandas()
hotel_recommendation

In [None]:
import pandas as pd
hotel_recommendation[['hotel_id', 'rating']] = hotel_recommendation['recommendation'].apply(lambda x: pd.Series([int(x[0]), x[1]]))
hotel_recommendation = hotel_recommendation.drop("recommendation", axis=1)
hotel_recommendation['hotel_id'] = hotel_recommendation['hotel_id'].astype(int)
hotel_recommendation

In [None]:
preds_items = Best_model.recommendForAllItems(5)
preds_items.show()

Best_model.save("weight/als_model_weight")

In [None]:
hotels = readFiles(hotels_file)

In [None]:
pred.join(hotels, pred["hotel_id"] ==  hotels["id"]).select("user_id","hotel_name","hotel_rating","prediction").show(5)

In [None]:
for_an_user = pred.where(pred.user_id==1088).join(hotels, pred["hotel_id"] ==  hotels["id"]).select("user_id","hotel_name","hotel_rating","prediction")
for_an_user.show(5)