Setup for getting data from Kaggle via API


In [1]:
!pip install kaggle

from google.colab import drive
drive.mount('/content/drive')

! mkdir ~/.kaggle/

! cp /content/drive/MyDrive/kaggle/kaggle.json ~/.kaggle/

! chmod 600 ~/.kaggle/kaggle.json

! kaggle datasets download yelp-dataset/yelp-dataset -f yelp_academic_dataset_review.json

!unzip /content/yelp_academic_dataset_review.json.zip

Mounted at /content/drive
Dataset URL: https://www.kaggle.com/datasets/yelp-dataset/yelp-dataset
License(s): other
Downloading yelp_academic_dataset_review.json.zip to /content
 99% 2.05G/2.07G [00:20<00:00, 137MB/s]
100% 2.07G/2.07G [00:20<00:00, 107MB/s]
Archive:  /content/yelp_academic_dataset_review.json.zip
  inflating: yelp_academic_dataset_review.json  


Import necessary libraries for Spark session, DataFrame operations, ALS model, evaluation, and hyperparameter tuning.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, rank, countDistinct, count
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql.functions import col, collect_list

In [3]:
spark = SparkSession.builder.appName('TasteMatch').config("spark.driver.memory", "20g").getOrCreate()
df = spark.read.json("/content/yelp_academic_dataset_review.json")

# Select columns of interest
df = df.select("business_id", "user_id", "stars")

In [4]:
# Select only the top 80 restaurants
popularity_restaurants = df.groupBy('business_id') \
                 .agg(count('*').alias('popularity')) \
                 .orderBy(col('popularity').desc())
top_restaurants = popularity_restaurants.limit(80)
df = df.join(top_restaurants, on='business_id', how='inner')

In [5]:
# Select users who have rated atleast 5 restaurants - Avoid cold start problem
df = df.withColumn("num_ratings", expr("count(*) over (partition by user_id)"))
df = df.filter(col("num_ratings")>=5)

Prepare train and test data.

1. For each user, mask 30% of their ratings to be evaluated as test set.
2. Encode the user_id and business_id columns with StringIndexer into numerical indices.

In [6]:
mask_percentage = 0.3

# For each user, select 30% of ratings for test set and the remaining for train set
user_window = Window.partitionBy("user_id").orderBy(col("business_id").desc())
df = df.withColumn("num_items_to_mask", (col("num_ratings") * mask_percentage).cast("int"))
df = df.withColumn("item_rank", rank().over(user_window))

# Create indexing models for user_id and business_id, and apply transformation
indexer_user = StringIndexer(inputCol='user_id', outputCol='userIndex').setHandleInvalid("keep")
indexer_biz = StringIndexer(inputCol='business_id', outputCol='bizIndex').setHandleInvalid("keep")
df = indexer_user.fit(df).transform(df)
df = indexer_biz.fit(df).transform(df)

df = df.withColumn('userIndex', df['userIndex'].cast('integer')).withColumn('bizIndex', df['bizIndex'].cast('integer'))

train_df = df.filter(col("item_rank") > col("num_items_to_mask"))
test_df = df.filter(col("item_rank") <= col("num_items_to_mask"))

In [7]:
# Optimal parameters calculated based on parameter search
best_rank = 1
best_maxIter = 20
best_regParam = 0.05

als = ALS(userCol='userIndex', itemCol='bizIndex', ratingCol='stars',
          coldStartStrategy='drop', nonnegative=True,
          rank=best_rank, maxIter=best_maxIter, regParam=best_regParam)

# Fit the model to the training data
model = als.fit(train_df)

In [8]:
# Generate predictions on the test data
predictions = model.transform(test_df)
predictions = predictions.withColumn("prediction", expr("CASE WHEN prediction < 1 THEN 1 WHEN prediction > 5 THEN 5 ELSE prediction END"))

evaluator = RegressionEvaluator(metricName='rmse', labelCol='stars', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print(f'Root Mean Squared Error (RMSE): {rmse}')

Root Mean Squared Error (RMSE): 1.0005418885851705


In [9]:
# Generate restaurant recommendations
n_recommend = 5
userRecs = model.recommendForAllUsers(n_recommend)

# Evaluate quality of recommendations by side-by-side comparison
ground_truth = test_df.groupby('userIndex').agg(collect_list('bizIndex').alias('ground_truth_items'))
model_pred = train_df.groupby('userIndex').agg(collect_list('bizIndex').alias('model_pred'))
user_eval = userRecs.join(ground_truth, on='userIndex').join(model_pred, on='userIndex') \
          .select('userIndex', 'recommendations.bizIndex', 'ground_truth_items', 'model_pred', 'recommendations.rating')
user_eval = user_eval.toPandas()
user_eval['Recommendations'] = user_eval.apply(lambda x:[b for (b,z) in zip(x.bizIndex, x.rating) if b not in x.model_pred], axis=1)
user_eval['Predicted Ratings'] = user_eval.apply(lambda x:[z for (b,z) in zip(x.bizIndex, x.rating) if b not in x.model_pred], axis=1)

In [10]:
user_eval.head()

Unnamed: 0,userIndex,bizIndex,ground_truth_items,model_pred,rating,Recommendations,Predicted Ratings
0,0,"[69, 9, 43, 79, 47]","[38, 41, 23, 23, 52, 31, 49, 6, 6, 4]","[13, 0, 2, 32, 39, 15, 11, 20, 57, 29, 29, 29,...","[3.514249324798584, 3.4588253498077393, 3.3888...","[69, 9, 79]","[3.514249324798584, 3.4588253498077393, 3.3838..."
1,1,"[69, 9, 43, 79, 47]","[38, 41, 23, 49, 6, 48, 4, 50, 13]","[0, 2, 2, 2, 33, 32, 39, 39, 39, 11, 20, 29, 6...","[3.78212571144104, 3.7224769592285156, 3.64714...","[69, 9, 79]","[3.78212571144104, 3.7224769592285156, 3.64179..."
2,2,"[69, 9, 43, 79, 47]","[1, 41, 56, 52, 19, 31, 48, 4, 53]","[9, 2, 33, 39, 11, 29, 21, 60, 58, 54, 22, 24,...","[4.493710994720459, 4.422839641571045, 4.33334...","[69, 43, 79]","[4.493710994720459, 4.333340167999268, 4.32698..."
3,3,"[69, 9, 43, 79, 47]","[41, 31, 67, 48, 48, 4, 4, 2, 32, 32, 32]","[11, 29, 29, 29, 29, 29, 29, 45, 10, 54, 3, 3,...","[3.7492284774780273, 3.690098762512207, 3.6154...","[69, 9, 43, 79, 47]","[3.7492284774780273, 3.690098762512207, 3.6154..."
4,4,"[69, 9, 43, 79, 47]","[38, 41, 31, 31, 49, 49, 48, 4]","[2, 33, 32, 29, 29, 29, 29, 45, 3, 3, 3, 37, 3...","[4.140809535980225, 4.075503826141357, 3.99303...","[69, 9, 79]","[4.140809535980225, 4.075503826141357, 3.98717..."
