In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Java JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Downloading Spark
!wget -q http://apache.mirrors.pair.com/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz 
#Unzipping the hadoop file
!tar -xvf spark-3.0.1-bin-hadoop3.2.tgz

In [None]:
#Fetching the MovieLens dataset
!wget http://files.grouplens.org/datasets/movielens/ml-latest.zip  

In [None]:
#Unzip the file
!unzip ml-latest.zip  

In [None]:
###################### SPARK SETUP ################################
#Install findspark
!pip install -q findspark

In [None]:
#Setting up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [1]:
#Initialize Spark session using findspark lib
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

23/04/05 18:13:09 WARN Utils: Your hostname, Adityanos-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.100.166 instead (on interface en0)
23/04/05 18:13:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/05 18:13:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#setting the path of the files
reviews_file ='data/hotels/details/reviews_clean.csv'
hotels_file = 'data/hotels/details/hotel_info_clean.csv'

In [3]:

def readFiles(filename):
    data = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load(filename,header=True)
    return data

In [4]:

#Read the data files
reviews = readFiles(reviews_file)
hotels = readFiles(hotels_file)

In [5]:
reviews.show(5)

+--------+------+----------+-------+
|hotel_id|rating| user_name|user_id|
+--------+------+----------+-------+
|    9003|   5.0|  kpatters|      0|
|    9003|   5.0|  Connie B|      1|
|    9003|   5.0|      JoJo|      2|
|    9003|   5.0|Stephani S|      3|
|    9003|   4.0|   BimmerM|      4|
+--------+------+----------+-------+
only showing top 5 rows



In [6]:
hotels.show(5)

+----+--------------------+------------+----------------+--------------------+--------------------+-------+--------------------+--------------------+-----+
|  id|          hotel_name|hotel_rating|hotel_experience|           amenities|             address|country|            locality|            location|price|
+----+--------------------+------------+----------------+--------------------+--------------------+-------+--------------------+--------------------+-----+
|9001|Peterborough Hous...|        null|            null|                  []|     633 Gilmour St | Canada|Kimberley British...|(44.3014954, -78....| null|
|9002|          Ramada Inn|        null|            null|['Bar/Lounge', 'N...|100 Charlotte St ...| Canada|Kimberley British...|(44.3035966, -78....| null|
|9003|     Hotel Le Floral|         4.5|       Excellent|['Free High Speed...|192 Sherbrooke Qu...| Canada|Kimberley British...|(45.403271, -71.8...|  115|
|9004|OTL Gouverneur Sh...|         5.0|       Excellent|['Pool'

In [7]:
data = reviews
#print the schema now and check that timestamp column is dropped
data.printSchema()

root
 |-- hotel_id: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_id: integer (nullable = true)



In [22]:
# Randomly split the data into train and test where 80% data is in train and remaining is test
train, test = data.randomSplit([0.8, 0.2])

ConnectionRefusedError: [Errno 61] Connection refused

In [21]:
test.printSchema()

ConnectionRefusedError: [Errno 61] Connection refused

In [9]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [10]:
# Build a recommendation model using Alternating Least Squares method
# Evaluate the model by computing the RMSE on the test data
model = ALS(userCol="user_id", itemCol="hotel_id", ratingCol="rating", nonnegative=True, coldStartStrategy="drop", maxIter=10).fit(train)

from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

23/04/05 18:13:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/04/05 18:13:19 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


In [11]:
# Make predictions and print the RMSE of the ALS model
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("New RMSE: ", evaluator.evaluate(model.transform(test)))

New RMSE:  1.6395374331855026


In [12]:
#Implementing ALS with Cross Validation
    
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

In [13]:
# Now we try to improve the performance of the original model using cross validation and solve the cold-start problem.
# we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model = ALS(userCol="user_id", itemCol="hotel_id", ratingCol="rating", nonnegative=True, coldStartStrategy="drop", maxIter=10)

#For Parameter tuning of the ALS model we use ParamGridBuilder function
#We tune two parameters 
#1. The Regularization parameter ranging from 0.1, 0.01, 0.001, 0.0001
#2. The rank for matrix factorization
paramGrid = ParamGridBuilder() \
    .addGrid(model.regParam, [0.1, 0.05, 0.01, 0.001]) \
    .addGrid(model.rank, [5, 10, 20, 30]) \
    .build()

#Defining a cross-validator object
#Setting up CV and adding parameters. We will be performing a 5 fold CV
crossvalidation = CrossValidator(estimator = model,
                     estimatorParamMaps = paramGrid,
                     evaluator = evaluator,
                     numFolds=5)

In [14]:
# Run cross-validation, and choose the best set of parameters.
Best_model = crossvalidation.fit(train).bestModel

                                                                                

In [15]:
#The Best_model
print(type(Best_model))
#Complete the code below to extract the ALS model parameters
print("**Best Model**")
#Rank
print("Rank: ", Best_model._java_obj.parent().getRank())
#MaxIter
print("MaxIter: ", Best_model._java_obj.parent().getMaxIter())
#RegParam
print("RegParam: ", Best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
Rank:  30
MaxIter:  10
RegParam:  0.05


In [16]:

# Calculate the RMSE on test data using the best set of parameters obtained after cross validation
print("Best RMSE value is: ", evaluator.evaluate(Best_model.transform(test)))

Best RMSE value is:  1.4933872763180922


In [17]:

# pred = Best_model.transform(test)
# pred.show(10)
from pyspark.sql.functions import explode_outer

preds = Best_model.recommendForAllUsers(5)
preds.show(10)




+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{9283, 5.021367}...|
|     12|[{9283, 4.042997}...|
|     22|[{9004, 5.006264}...|
|     26|[{9004, 4.4838276...|
|     27|[{9006, 4.9881215...|
|     31|[{9587, 5.003485}...|
|     34|[{9016, 4.987218}...|
|     44|[{9004, 5.150865}...|
|     52|[{9587, 5.8435354...|
|     53|[{9121, 3.9814317...|
+-------+--------------------+
only showing top 10 rows



                                                                                

In [18]:
final_preds = preds.select(preds.user_id, explode_outer(preds.recommendations).alias("recommendation"))
final_preds.show(10)
hotel_recommendation = final_preds.toPandas()
hotel_recommendation

                                                                                

+-------+-----------------+
|user_id|   recommendation|
+-------+-----------------+
|      1| {9283, 5.021367}|
|      1|  {9093, 4.99111}|
|      1|{9003, 4.9471617}|
|      1|{9587, 4.8832192}|
|      1|{9346, 4.7123375}|
|     12| {9283, 4.042997}|
|     12|{9093, 4.0269694}|
|     12|{9003, 3.9995146}|
|     12|{9203, 3.9611397}|
|     12| {9587, 3.765884}|
+-------+-----------------+
only showing top 10 rows



                                                                                

Unnamed: 0,user_id,recommendation
0,1,"(9283, 5.021367073059082)"
1,1,"(9093, 4.991109848022461)"
2,1,"(9003, 4.947161674499512)"
3,1,"(9587, 4.883219242095947)"
4,1,"(9346, 4.712337493896484)"
...,...,...
8205,1702,"(9587, 5.37185001373291)"
8206,1702,"(9172, 5.341799736022949)"
8207,1702,"(9146, 5.3003058433532715)"
8208,1702,"(9104, 5.262861251831055)"


In [19]:
import pandas as pd
hotel_recommendation[['hotel_id', 'rating']] = hotel_recommendation['recommendation'].apply(lambda x: pd.Series([int(x[0]), x[1]]))
hotel_recommendation = hotel_recommendation.drop("recommendation", axis=1)
hotel_recommendation['hotel_id'] = hotel_recommendation['hotel_id'].astype(int)
hotel_recommendation

Unnamed: 0,user_id,hotel_id,rating
0,1,9283,5.021367
1,1,9093,4.991110
2,1,9003,4.947162
3,1,9587,4.883219
4,1,9346,4.712337
...,...,...,...
8205,1702,9587,5.371850
8206,1702,9172,5.341800
8207,1702,9146,5.300306
8208,1702,9104,5.262861


In [20]:
preds_items = Best_model.recommendForAllItems(5)
preds_items.show()

Best_model.save("weight/als_model_weight")

                                                                                

+--------+--------------------+
|hotel_id|     recommendations|
+--------+--------------------+
|    9003|[{1475, 5.6752896...|
|    9004|[{46, 6.438581}, ...|
|    9005|[{25, 2.979922}, ...|
|    9006|[{1670, 5.0218496...|
|    9007|[{572, 5.453142},...|
|    9008|[{46, 4.978293}, ...|
|    9009|[{48, 3.9474812},...|
|    9011|[{730, 5.079746},...|
|    9013|[{536, 4.522635},...|
|    9014|[{52, 4.023527}, ...|
|    9016|[{671, 5.5963926}...|
|    9017|[{536, 5.269701},...|
|    9024|[{536, 3.704628},...|
|    9033|[{948, 5.5573797}...|
|    9034|[{536, 5.6158924}...|
|    9035|[{536, 5.6172686}...|
|    9036|[{76, 4.9396563},...|
|    9037|[{637, 5.2106323}...|
|    9038|[{637, 5.9491262}...|
|    9039|[{1035, 5.19686},...|
+--------+--------------------+
only showing top 20 rows



[Stage 9156:>                                                      (0 + 8) / 10]

23/04/05 18:17:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

23/04/05 20:57:05 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 4256212 ms exceeds timeout 120000 ms
23/04/05 20:57:06 WARN SparkContext: Killing executors is not supported by current scheduler.
23/04/05 20:57:08 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.

In [19]:
hotels = readFiles(hotels_file)

In [26]:
pred.join(hotels, pred["hotel_id"] ==  hotels["id"]).select("user_id","hotel_name","hotel_rating","prediction").show(5)

+-------+--------------------+------------+----------+
|user_id|          hotel_name|hotel_rating|prediction|
+-------+--------------------+------------+----------+
|    463|Quality Inn & Sui...|         3.5|  1.638894|
|   1088|         Alpen Motel|         4.5| 2.8580868|
|   1238|Premier Inn And S...|         4.0| 3.1555507|
|   1591|Days Inn by Wyndh...|         4.5| 2.1650615|
|    623|  Colonial 900 Motel|         4.5|  2.668856|
+-------+--------------------+------------+----------+
only showing top 5 rows



In [34]:
for_an_user = pred.where(pred.user_id==1088).join(hotels, pred["hotel_id"] ==  hotels["id"]).select("user_id","hotel_name","hotel_rating","prediction")
for_an_user.show(5)

+-------+-----------+------------+----------+
|user_id| hotel_name|hotel_rating|prediction|
+-------+-----------+------------+----------+
|   1088|Alpen Motel|         4.5| 2.8580868|
+-------+-----------+------------+----------+

