In [1]:
#Initialize spark session
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession 
spark=pyspark.SparkContext(appName="test")
sc = spark.getOrCreate()
spark = SparkSession(sc)

In [2]:
#import all the necessary libraries from Spark ML Library
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import min, avg
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

Read the Yelp Reviews data in Json format to spark RDD

In [3]:
data_review = spark.read.json("C:\\Users\\tejan\\Downloads\\practicedata\\yelp-dataset\\yelp_academic_dataset_review.json")

In [4]:
#show the top 5 rows in the data
data_review.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|-MhfebM0QIsKt87iD...|   0|2015-04-15 05:21:16|    0|xQY8N_XvtGbearJ5X...|  2.0|As someone who ha...|     5|OwjRMXRC0KyPrIlcj...|
|lbrU8StCq3yDfr-QM...|   0|2013-12-07 03:16:52|    1|UmFMZ8PyXZTY2Qcwz...|  1.0|I am actually hor...|     1|nIJD_7ZXHq-FX8byP...|
|HQl28KMwrEKHqhFrr...|   0|2015-12-05 03:18:11|    0|LG2ZaYiOgpr2DK_90...|  5.0|I love Deagan's. ...|     1|V34qejxNsCbcgD8C0...|
|5JxlZaqCnk1MnbgRi...|   0|2011-05-27 05:30:52|    0|i6g_oA9Yf9Y31qt0w...|  1.0|Dismal, lukewarm,...|     0|ofKDkJKXSKZXu5xJN...|
|IS4cv902ykd8wj1TR...|   0|2017-01-14 21:56:57|    0|6TdNDKywdbjoTkize...|  4.0|Oh happy d

In [5]:
#The names of the columns in the data
data_review.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

In [5]:
#count of the rows in the data
data_review.count()

8021122

In [6]:
#First select only the relevant columns, user id, business id and the rating columns
data_review = data_review.select("user_id", "business_id", "stars")
data_review.columns

['user_id', 'business_id', 'stars']

In [7]:
#Lets see how can we see how many businesses an user has rated. Lets see the top 20 such rows
data_review.groupBy("user_id").count().show()

+--------------------+-----+
|             user_id|count|
+--------------------+-----+
|KWFiZKiZBANVxuhm4...|    7|
|VmYpF5C3GL-7wFnvO...|    5|
|1Dul59QEe-Q-7OQHT...|    8|
|xS6kmkMXp0PRrFwkS...|   19|
|j56G3m8vYtA_2Io6F...|   36|
|ruHz-qN-j21kg0iyI...|    1|
|M6-A6F0B3kM5i94Kr...|   13|
|z2Gi5vo-8j544qN_g...|    4|
|CzkWUMIYDxUSetfCR...|   17|
|Uf_TVv1Z4s024jdI4...|   16|
|zWWcik1fRPZviBCQL...|  411|
|zEpzcYlc1wQ4YJtFF...|    5|
|4ZfcCa4m5RWvO4EFz...|   40|
|QSWr7Oivp0mClj_PR...|   12|
|FuSn5ZxN2NV_PpK03...|   46|
|T_ReHc3TRnOw9h1qf...|    2|
|7o473jeLWW-zgKN-Q...|    9|
|ucpIv1E0x8IhxeLGD...|    9|
|ouvhv-E3S57SQ4ser...|    8|
|kAxgIZA-kSCKDy6EP...|    6|
+--------------------+-----+
only showing top 20 rows



Converting user_id and business_id to numeric variables so that they can be used in ALS algorithm

In [7]:
#extract unique users_ids from the data
users = data_review.select("user_id").distinct().coalesce(1)
#create a unique numerical id corresponding to the user_id
users = users.withColumn("userId", monotonically_increasing_id()).persist()
users.show()

+--------------------+------+
|             user_id|userId|
+--------------------+------+
|KWFiZKiZBANVxuhm4...|     0|
|VmYpF5C3GL-7wFnvO...|     1|
|1Dul59QEe-Q-7OQHT...|     2|
|xS6kmkMXp0PRrFwkS...|     3|
|j56G3m8vYtA_2Io6F...|     4|
|ruHz-qN-j21kg0iyI...|     5|
|M6-A6F0B3kM5i94Kr...|     6|
|z2Gi5vo-8j544qN_g...|     7|
|CzkWUMIYDxUSetfCR...|     8|
|Uf_TVv1Z4s024jdI4...|     9|
|zWWcik1fRPZviBCQL...|    10|
|zEpzcYlc1wQ4YJtFF...|    11|
|4ZfcCa4m5RWvO4EFz...|    12|
|QSWr7Oivp0mClj_PR...|    13|
|FuSn5ZxN2NV_PpK03...|    14|
|T_ReHc3TRnOw9h1qf...|    15|
|7o473jeLWW-zgKN-Q...|    16|
|ucpIv1E0x8IhxeLGD...|    17|
|ouvhv-E3S57SQ4ser...|    18|
|kAxgIZA-kSCKDy6EP...|    19|
+--------------------+------+
only showing top 20 rows



In [8]:
#extract unique business_ids from the data
business = data_review.select("business_id").distinct().coalesce(1) 
#create a unique numerical id corresponding to the business_id
business = business.withColumn("businessId", monotonically_increasing_id()).persist()

In [9]:
# Join the ratings, users and business dataframes
data_review_final = data_review.join(users, "user_id", "left").join(business, "business_id", "left")
data_review_final.show()

+--------------------+--------------------+-----+------+----------+
|         business_id|             user_id|stars|userId|businessId|
+--------------------+--------------------+-----+------+----------+
|--9e1ONYQuAa-CB_R...|0y8ORuC2X1i1UF6SG...|  5.0|  4812|       596|
|--9e1ONYQuAa-CB_R...|3qz_dfwbFwTQeDRzy...|  5.0|   917|       596|
|--9e1ONYQuAa-CB_R...|9spixZHaqC1JeN1ld...|  2.0|   315|       596|
|--9e1ONYQuAa-CB_R...|A4GnBOU7ZCTcoQK4e...|  5.0|  7867|       596|
|--9e1ONYQuAa-CB_R...|FtUDjNLhVjlIoeFKm...|  4.0|   483|       596|
|--9e1ONYQuAa-CB_R...|H0tfWQsGjEBuhXD4W...|  5.0|   151|       596|
|--9e1ONYQuAa-CB_R...|R0KVWeN9xR-F6j4z5...|  4.0|   155|       596|
|--9e1ONYQuAa-CB_R...|XZaCs-Gs0SXdZgfG3...|  4.0|  3069|       596|
|--9e1ONYQuAa-CB_R...|dSGINC_8KV6fxNjeQ...|  5.0|  7920|       596|
|--9e1ONYQuAa-CB_R...|n9DJHwgYflQ_ms8gB...|  3.0|   796|       596|
|--9e1ONYQuAa-CB_R...|ucXjnxiEKLUOEktHF...|  5.0|   720|       596|
|--9e1ONYQuAa-CB_R...|xOSVPzpIDSd3-2r6k...|  5.0

In [10]:
#Only subset the numeric userId, businessId and the ratings columns
data_review_final = data_review_final.select("userId", "businessId", "stars")

In [12]:
data_review_final.show()

+------+----------+-----+
|userId|businessId|stars|
+------+----------+-----+
|  6940|    139342|  4.0|
|  4005|    200136|  2.0|
|  6146|     83897|  4.0|
|  2444|    125200|  5.0|
|  2444|    166557|  5.0|
|  2444|    193213|  1.0|
|  2444|     43351|  1.0|
|  2444|    116689|  1.0|
|  1803|     83483|  5.0|
|  1803|    175882|  4.0|
|  1803|    180283|  1.0|
|  1803|    121169|  5.0|
|  1803|     78008|  3.0|
|  1803|     22618|  5.0|
|  6431|    121362|  5.0|
|  3439|     40680|  5.0|
|  3439|    113848|  5.0|
|  3439|     97782|  5.0|
|  3439|     63399|  5.0|
|  3439|    192453|  5.0|
+------+----------+-----+
only showing top 20 rows



In [11]:
data_review = data_review_final.limit(500)

Calculate Matrix sparsity

In [13]:
#numerator is the count of total  number of ratings given
numerator = data_review.select("stars").count()

#we need to multiply the count of total unique users and unique businesses
unique_users = data_review.select("userId").distinct().count()
unique_business = data_review.select("businessId").distinct().count()

denominator = unique_users*unique_business

#sparsity calculation
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The yelp_reviews dataframe is ", "%.9f" % sparsity + "% empty.")

The yelp_reviews dataframe is  99.101688825% empty.


In [14]:
# Find the business with fewest ratings. Group by business_id and find the minimum number of ratings
print("Business with the fewest ratings: ")
data_review.groupBy("businessId").count().select(min("count")).show()

# Avg num ratings per business
print("Avg num ratings per Business: ")
data_review.groupBy("businessId").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
data_review.groupBy("userId").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
data_review.groupBy("userId").count().select(avg("count")).show()

Business with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per Business: 
+------------------+
|        avg(count)|
+------------------+
|1.0330578512396693|
+------------------+

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per user: 
+------------------+
|        avg(count)|
+------------------+
|4.3478260869565215|
+------------------+



In [12]:
#print the schema of the RDD
data_review.printSchema()

root
 |-- userId: long (nullable = true)
 |-- businessId: long (nullable = true)
 |-- stars: double (nullable = true)



In [13]:
#convert the userid and businessid to integer format and ratings to double format
data_review = data_review.select(data_review.userId.cast("integer"), data_review.businessId.cast("integer"), data_review.stars.cast("double"))

# Call .printSchema() again to confirm the columns are now in the correct format
data_review.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- businessId: integer (nullable = true)
 |-- stars: double (nullable = true)



# Modeling - ALS

Model 1: Latent vectors = 15, reg parameter = .01, maximum iterations = 20

In [16]:
#Split the data to training and validation at 80:20 proportion
(training_data, test_data) = data_review.randomSplit([0.8, 0.2], seed=123)

# Set the ALS hyperparameters. Select 15 latent vectors and select 20 iterations. 
from pyspark.ml.recommendation import ALS
als = ALS(userCol = "userId", itemCol = "businessId", ratingCol = "stars", rank = 15, maxIter = 20, regParam = 0.01,
          coldStartStrategy = "drop", nonnegative = True, implicitPrefs = False)

# Fit the model to the training data
model = als.fit(training_data)

# Generate predictions on the test data
test_predictions = model.transform(test_data)
test_predictions.show()

+------+----------+-----+----------+
|userId|businessId|stars|prediction|
+------+----------+-----+----------+
|   123|    173512|  5.0|  4.997417|
|   123|    173512|  5.0|  4.997417|
|  1929|     46356|  5.0|  4.997871|
|   123|    185233|  5.0|  4.997417|
|  3477|     11742|  5.0| 4.9966435|
|   548|     22143|  5.0| 2.2418418|
+------+----------+-----+----------+



In [17]:
# Import RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

# Complete the evaluator code. Use RMSE as the evaluation metric
evaluator = RegressionEvaluator(metricName="rmse", labelCol="stars", predictionCol="prediction")

# Extract the 3 parameters
print(evaluator.getMetricName())
print(evaluator.getLabelCol())
print(evaluator.getPredictionCol())

rmse
stars
prediction


In [18]:
#evaluate RMSE
RMSE = evaluator.evaluate(test_predictions)
# Print the RMSE
print(RMSE)

1.1260160233737941


Model 2: Latent Vectors = 15, Maximum iterations = 20, reg parameter = 0.05

In [19]:
# Set the ALS hyperparameters. Select 15 latent vectors and select 20 iterations. 
als2 = ALS(userCol = "userId", itemCol = "businessId", ratingCol = "stars", rank = 15, maxIter = 20, regParam = 0.05,
          coldStartStrategy = "drop", nonnegative = True, implicitPrefs = False)

# Fit the model to the training data
model2 = als2.fit(training_data)

# Generate predictions on the test data
test_predictions2 = model2.transform(test_data)


In [18]:
test_predictions2.show()
# Complete the evaluator code. Use RMSE as the evaluation metric
evaluator2 = RegressionEvaluator(metricName="rmse", labelCol="stars", predictionCol="prediction")

# Extract the 3 parameters
print("The three parameters are : ")
print(" ")
print(evaluator2.getMetricName())
print(evaluator2.getLabelCol())
print(evaluator2.getPredictionCol())
print(" ")

#evaluate RMSE
RMSE2 = evaluator2.evaluate(test_predictions)
# Print the RMSE
print("The RMSE for this model is :", RMSE2)

+------+----------+-----+----------+
|userId|businessId|stars|prediction|
+------+----------+-----+----------+
|   123|    173512|  5.0| 4.9717736|
|   123|    173512|  5.0| 4.9717736|
|  1929|     46356|  5.0| 4.9639187|
|   123|    185233|  5.0| 4.9717736|
|  3477|     11742|  5.0|  4.967866|
|   548|     22143|  5.0|  2.015633|
+------+----------+-----+----------+

rmse
stars
prediction
1.218685794906584


Model 3: Latent Vectors = 15, Max iterations = 20, reg parameter = 0.001

In [20]:
als3 = ALS(userCol = "userId", itemCol = "businessId", ratingCol = "stars", rank = 15, maxIter = 20, regParam = 0.001,
          coldStartStrategy = "drop", nonnegative = True, implicitPrefs = False)

# Fit the model to the training data
model3 = als3.fit(training_data)

# Generate predictions on the test data
test_predictions3 = model3.transform(test_data)

test_predictions3.show()

# Complete the evaluator code. Use RMSE as the evaluation metric
evaluator3 = RegressionEvaluator(metricName="rmse", labelCol="stars", predictionCol="prediction")

# Extract the 3 parameters
print("The three parameters are : ")
print(" ")
print(evaluator3.getMetricName())
print(evaluator3.getLabelCol())
print(evaluator3.getPredictionCol())
print(" ")

#evaluate RMSE
RMSE3 = evaluator3.evaluate(test_predictions)
# Print the RMSE
print("The RMSE for this model is :", RMSE3)

+------+----------+-----+----------+
|userId|businessId|stars|prediction|
+------+----------+-----+----------+
|   123|    173512|  5.0| 4.9997854|
|   123|    173512|  5.0| 4.9997854|
|  1929|     46356|  5.0|  4.999812|
|   123|    185233|  5.0| 4.9997854|
|  3477|     11742|  5.0|  4.999905|
|   548|     22143|  5.0| 1.5839012|
+------+----------+-----+----------+

The three parameters are : 
 
rmse
stars
prediction
 
The RMSE for this model is : 1.3946165195630487


Model 4: Latent Vectors = 10, Max iterations = 10, reg parameter = 0.01

In [14]:

# Set the ALS hyperparameters. Select 10 latent vectors and select 10 iterations. 
from pyspark.ml.recommendation import ALS
als4 = ALS(userCol = "userId", itemCol = "businessId", ratingCol = "stars", rank = 10, maxIter = 10, regParam = 0.01,
          coldStartStrategy = "drop", nonnegative = True, implicitPrefs = False)

# Fit the model to the training data
model4 = als4.fit(training_data)

# Generate predictions on the test data
test_predictions4 = model4.transform(test_data)

print("The glimpse test predictions are shown in the beslow table")
print(" ")
test_predictions4.show()


evaluator4 = RegressionEvaluator(metricName="rmse", labelCol="stars", predictionCol="prediction")

# Extract the 3 parameters
print("The three parameters are : ")
print(" ")
print(evaluator4.getMetricName())
print(evaluator4.getLabelCol())
print(evaluator4.getPredictionCol())
print(" ")

#evaluate RMSE
RMSE4 = evaluator4.evaluate(test_predictions)
# Print the RMSE
print("The RMSE for this model is :", RMSE4)

The glimpse test predictions are shown in the beslow table
 
+------+----------+-----+----------+
|userId|businessId|stars|prediction|
+------+----------+-----+----------+
|   123|    173512|  5.0| 4.9973364|
|   123|    173512|  5.0| 4.9973364|
|  1929|     46356|  5.0| 4.9971924|
|   123|    185233|  5.0| 4.9973364|
|  3477|     11742|  5.0|  4.998615|
|   548|     22143|  5.0|  2.845271|
+------+----------+-----+----------+

The three parameters are : 
 
rmse
stars
prediction
 
The RMSE for this model is : 0.8796673304072976


Model 5: Latent Vectors = 15, Max iterations = 10, reg parameter = 0.01

In [16]:
# Set the ALS hyperparameters. Select 10 latent vectors and select 15 iterations. 

als5 = ALS(userCol = "userId", itemCol = "businessId", ratingCol = "stars", rank = 15, maxIter = 10, regParam = 0.01,
          coldStartStrategy = "drop", nonnegative = True, implicitPrefs = False)

# Fit the model to the training data
model5 = als5.fit(training_data)

# Generate predictions on the test data
test_predictions5 = model5.transform(test_data)

print("The glimpse test predictions are shown in the below table")
print(" ")
test_predictions5.show()

# Complete the evaluator code. Use RMSE as the evaluation metric
evaluator5 = RegressionEvaluator(metricName="rmse", labelCol="stars", predictionCol="prediction")

# Extract the 3 parameters
print("The three parameters are : ")
print(" ")
print(evaluator5.getMetricName())
print(evaluator5.getLabelCol())
print(evaluator5.getPredictionCol())
print(" ")

#evaluate RMSE
RMSE5 = evaluator5.evaluate(test_predictions)
# Print the RMSE
print("The RMSE for this model is :", RMSE5)

The glimpse test predictions are shown in the beslow table
 
+------+----------+-----+----------+
|userId|businessId|stars|prediction|
+------+----------+-----+----------+
|   123|    173512|  5.0|  4.997821|
|   123|    173512|  5.0|  4.997821|
|  1929|     46356|  5.0| 4.9968524|
|   123|    185233|  5.0|  4.997821|
|  3477|     11742|  5.0| 4.9973073|
|   548|     22143|  5.0| 1.3190264|
+------+----------+-----+----------+

The three parameters are : 
 
rmse
stars
prediction
 
The RMSE for this model is : 1.5027529409317795
