In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession ,Row
from pyspark.sql.functions import col
from pyspark.sql import SQLContext
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType,StructField,IntegerType
import matplotlib.pyplot as plt
%matplotlib inline
spark = SparkSession.builder.appName("Food Hunters").getOrCreate()

In [2]:
df_business = spark.read.json('./yelp/yelp_academic_dataset_business.json')
df_review = spark.read.json('./yelp/yelp_academic_dataset_review.json')

22/04/02 00:05:58 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

### pre-processing reviews

In [3]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['business_id', 'user_id']]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(df_review).transform(df_review)
reviews = transformed.select(['business_id', 'business_id_index', 'user_id', 'user_id_index', 'stars'])
reviews.show()

22/04/02 00:06:28 WARN DAGScheduler: Broadcasting large task binary with size 97.9 MiB
[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+-----------------+--------------------+-------------+-----+
|         business_id|business_id_index|             user_id|user_id_index|stars|
+--------------------+-----------------+--------------------+-------------+-----+
|XQfwVwDr-v0ZS3_Cb...|           7807.0|mh_-eMZ6K5RLWhZyI...|      27777.0|  3.0|
|7ATYjTIgM3jUlt4UM...|           9481.0|OyoGAe7OKpv6SyGZT...|     144389.0|  5.0|
|YjUWPpI6HXG530lwP...|          31055.0|8g_iMtfSiwikVnbP2...|      12673.0|  3.0|
|kxX2SOes4o-D3ZQBk...|           7022.0|_7bHUi9Uuf5__HHc_...|     262396.0|  5.0|
|e4Vwtrqf-wpJfwesg...|          40824.0|bcjbaE6dDog4jkNY9...|       3102.0|  4.0|
|04UD14gamNjLY0IDY...|           3859.0|eUta8W_HdHMXPzLBB...|    1607324.0|  1.0|
|gmjsEdUsKpj9Xxu6p...|           2529.0|r3zeYsv1XFBRA4dJp...|    1829751.0|  5.0|
|LHSTtnW3YHCeUkRDG...|          13352.0|yfFzsLmaWF2d4Sr0U...|    1964786.0|  5.0|
|B5XSoSG3SfvQGtKEG...|          81995.0|wSTuiTk-sKNdcFypr...|    1925049.0|  3.0|
|gebiRewfieSdtt1

                                                                                

### pre-processing businesses

In [4]:
businesses = df_business.select("business_id","name", "stars", "review_count", "attributes", "categories", "city") \
                        .withColumnRenamed("stars", "stars_restaurant") \
                        .filter((df_business['city'] == 'Levittown') & (df_business.categories.contains('Restaurants'))).drop('city')

### Calculate sparsity

In [46]:
# Count the total number of stars in the dataset
numerator = reviews.select("stars").count()

# Count the number of distinct user_id and distinct business_id
num_users = reviews.select("user_id_index").distinct().count()
num_restaurants = reviews.select("business_id_index").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of num_restaurants
denominator = num_users * num_restaurants

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")



The ratings dataframe is  100.00% empty.


                                                                                

### Build Out An ALS Model

In [8]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [9]:
# Create test and train set
(train, test) = reviews.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="user_id_index", itemCol="business_id_index", ratingCol="stars", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

### Tell Spark how to tune your ALS model

In [10]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="stars", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


### Build your cross validation pipeline

In [11]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

CrossValidator_57f0ebd14245


### Best Model and Best Model Parameters

In [12]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

22/04/02 00:07:38 WARN DAGScheduler: Broadcasting large task binary with size 98.0 MiB
22/04/02 00:07:43 WARN DAGScheduler: Broadcasting large task binary with size 98.0 MiB
[Stage 10:>                                                        (0 + 8) / 40]
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RemoteBlock-temp-file-clean-thread"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "refresh progress"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Spark Context Cleaner"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SparkUI-45"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SparkUI-43"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "driver-heartbeater"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/3.2.1/libexec/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/3.2.1/libexec/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/homebrew/Cellar/apache-spark/3.2.1/libexec/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 503, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/3.2.1/libexec/p

ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

In [None]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

In [None]:
test_predictions.show()

In [None]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show()

In [None]:
nrecommendations.join(businesses, on='business_id').filter('user_id = 100').show()