In [1]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import monotonically_increasing_id,min,avg
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

### Loading the Dataset

In [2]:
df=spark.read.csv("/FileStore/tables/merged.csv", header="true", inferSchema="true")

In [3]:
df.show()

In [4]:
users=df.select("user_id").distinct().coalesce(1)

## Creating Numeric Id for users

In [5]:

users=users.withColumn("UserId",monotonically_increasing_id()).persist()
users.show()

In [6]:
restaurants=df.select("business_id").distinct().coalesce(1)

## Creating Numeric Id for restaurants

In [7]:

restaurants=restaurants.withColumn("restaurantId",monotonically_increasing_id()).persist()
restaurants.show()

In [8]:
restaurant_rating=df.join(users,"user_id","left").join(restaurants,"business_id","left")

In [9]:
restaurant_rating.show()

In [10]:
restaurant_rating=restaurant_rating.select("UserId","restaurantId","rating")

## Checking the Sparsity of the DataFrame

In [11]:

# Count the total number of ratings in the dataset
numerator = restaurant_rating.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = restaurant_rating.select("userId").distinct().count()
num_restaurants = restaurant_rating.select("restaurantId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_restaurants

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

### Descriptive Analysis

In [12]:
# Min num ratings for restaurants
print("Restaurants with the fewest ratings: ")
restaurant_rating.groupBy("restaurantId").count().select(min("count")).show()

#Avg num ratings per restaurants
print("Avg num ratings per restaurants: ")
restaurant_rating.groupBy("restaurantId").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
restaurant_rating.groupBy("userId").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
restaurant_rating.groupBy("userId").count().select(avg("count")).show()

### Fitting the ALS Model

In [13]:
(training_data,test_data)=restaurant_rating.randomSplit([0.8,0.2],seed=101)

In [14]:
als=ALS(userCol="UserId",itemCol="restaurantId",ratingCol="rating",rank=25,maxIter=20,regParam=.1,coldStartStrategy="drop",nonnegative=True,implicitPrefs=False)

In [15]:
model=als.fit(training_data)

In [16]:
test_predictions = model.transform(test_data)
test_predictions.show()

### Defining a evaluator to check model performance

In [17]:

evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

In [18]:
RMSE=evaluator.evaluate(test_predictions)

In [19]:
print(RMSE)

### Building the best model using Cross Validation and ParamGridBuilder
### Creating a new training and test data for validation 

In [21]:
(train,test)=restaurant_rating.randomSplit([0.8,0.2],seed=55)

In [22]:
## Creating a base ALS Model
als=ALS(userCol="UserId",itemCol="restaurantId",ratingCol="rating",coldStartStrategy="drop",nonnegative=True,implicitPrefs=False)

###  Adding hyperparameters and their respective values to param_grid

In [23]:

param_grid= ParamGridBuilder() \
            .addGrid(als.rank,[10,50,100]) \
            .addGrid(als.maxIter,[5,15,25]) \
            .addGrid(als.regParam,[.01,.05]) \
            .build()
print ("Num models to be tested: ", len(param_grid))

In [24]:
evaluator_cv=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

### Building cross validation using CrossValidator

In [25]:

cv=CrossValidator(estimator=als,estimatorParamMaps=param_grid,evaluator=evaluator_cv,numFolds=5)

In [26]:
#Fitting cross validator to the 'train' dataset
model = cv.fit(train)

In [27]:
best_model = model.bestModel

In [28]:
best_model.transform(test)

In [29]:
test_predictions = best_model.transform(test)

In [30]:
test_predictions.show()

### Calculate the RMSE of test_predictions

In [31]:

RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

### Reformatting the ALS recommmendation output and generating recommendation for 20 users

In [32]:
ALS_recommendations=model.recommendForAllUsers(20)

In [33]:
ALS_recommendations.registerTempTable("ALS_rcs_temp")

In [34]:
ALS_recommendations.show()

In [35]:
clean_rcs=spark.sql("SELECT userId,restaurantIds_and_ratings.restaurantId as restaurantId,restaurantIds_and_ratings.rating as prediction from ALS_rcs_temp LATERAL VIEW explode(recommendations) exploded_table as restaurantIds_and_ratings")

In [36]:
clean_rcs.show()

In [37]:
clean_rcs.join(restaurant_rating,['restaurantId','userId'],"left").show()