In [1]:
import sqlite3
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
from os import path
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import seaborn as sns
import numpy as np
from PIL import Image
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
import findspark
findspark.init()

In [2]:
spark = SparkSession.builder.enableHiveSupport().appName('YelpRecommenderAnalysis').getOrCreate()
sc = spark.sparkContext
sc.defaultParallelism

24

In [3]:
df_business = spark.read.json("yelp_academic_dataset_business.json")
df_checkin = spark.read.json("yelp_academic_dataset_checkin.json")
df_review = spark.read.json("yelp_academic_dataset_review.json")
df_tip = spark.read.json("yelp_academic_dataset_tip.json")
df_user = spark.read.json("yelp_academic_dataset_user.json")

In [4]:
df_business = df_business.withColumnRenamed('stars', 'business_stars')
df_review = df_review.withColumnRenamed('stars', 'review_stars')
df_review = df_review.withColumnRenamed('business_id', 'review_business_id')
columns_to_drop = ['address','postal_code','latitude','longitude','review_count']
df_business = df_business.drop(*columns_to_drop)
df_business = df_business.filter(df_business.categories.contains('Restaurants'))
df_business = df_business.withColumnRenamed('name', 'business_name')

In [5]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,StringIndexer,CountVectorizer
import pyspark.mllib.classification
from pyspark.sql.functions import udf
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml import Pipeline
import string
import re
import nltk

In [6]:
business_restaurant = df_business.select('business_id', 'categories', 'state', 'business_name')


# Get all useful reviews, i.e. a review with at least one useful vote and 
# here we are excluding all neutral reviews as well
review_useful = df_review.select('review_business_id', 'review_id', 'review_stars', 
                              'text', 'useful', 'user_id') \
                      .where("useful >= 1 and stars != 3")


# Join the two dataframes above to get all useful reviews for restaurant businesses
restaurant_useful_review = business_restaurant.join(review_useful, 
                                                    [business_restaurant.business_id == review_useful.review_business_id], 
                                                    how = 'inner')

## 1. Associate Mining

In [7]:
from pyspark.sql.functions import collect_set, col, count

In [8]:
user_categories = restaurant_useful_review.select('user_id', 'categories')
user_cat = user_categories.groupby('user_id').agg(collect_set('categories').alias('categories'))
user_cat.createOrReplaceTempView('user_cat')
user_cat.show(truncate=100)

+----------------------+----------------------------------------------------------------------------------------------------+
|               user_id|                                                                                          categories|
+----------------------+----------------------------------------------------------------------------------------------------+
|--8r3pNaZiG1fN8LCHuL_g|[Pubs, Nightlife, Bars, Sandwiches, American (New), Restaurants, American (Traditional), Sports B...|
|--Kwhcbkh7jxkhVVQZo2uQ|[Bars, Nightlife, American (Traditional), Barbeque, Seafood, Beer Bar, Sports Bars, Restaurants, ...|
|--Vu3Gux9nPnLcG9yO_HxA|[Bakeries, Restaurants, Coffee & Tea, Bagels, Food, Breakfast & Brunch, Sandwiches, American (Tra...|
|--agAy0vRYwG6WqbInorfg|                                              [Delis, Food, Restaurants, Sandwiches, Desserts, Soup]|
|--bAnPT8W3L01Rg17js-Zw|                                                                             [Japanese, Restau

Frequent Itemsets

In [9]:
from pyspark.ml.fpm import FPGrowth

#set the minimum thresholds for support and confidence
fpGrowth = FPGrowth(itemsCol="categories", minSupport=0.001, minConfidence=0)

model = fpGrowth.fit(user_cat)

#Calculate frequent itemsets
mostPopularCategory = model.freqItemsets
mostPopularCategory.createOrReplaceTempView("mostPopularCategory")

# Display frequent itemsets.
model.freqItemsets.show(truncate =False)

+--------------------------------------------------------------------------------------------+-----+
|items                                                                                       |freq |
+--------------------------------------------------------------------------------------------+-----+
|[Restaurants, Japanese]                                                                     |1525 |
|[Live/Raw Food, Restaurants, Seafood, Beer Bar, Beer, Wine & Spirits, Bars, Food, Nightlife]|719  |
|[Chinese, Restaurants, Dim Sum, Asian Fusion, Shanghainese]                                 |827  |
|[Steakhouses, Restaurants]                                                                  |2655 |
|[Restaurants, American (Traditional), Burgers]                                              |648  |
|[Diners, Salad, American (Traditional), Breakfast & Brunch, Restaurants]                    |998  |
|[Restaurants, Mexican]                                                                    

In [10]:
#only show item groups larger than 2
spark.sql('''select items, freq from mostPopularCategory 
          where size(items) > 2 order by freq desc limit 20''').show(truncate =False)

+----------------------------------------------------------------+----+
|items                                                           |freq|
+----------------------------------------------------------------+----+
|[Restaurants, Pizza, Mexican, Restaurants, Restaurants, Mexican]|639 |
+----------------------------------------------------------------+----+



Association Rules

In [11]:
model.associationRules.show(25, truncate=False)

+--------------------------------------------+-----------------------------+-------------------+------------------+---------------------+
|antecedent                                  |consequent                   |confidence         |lift              |support              |
+--------------------------------------------+-----------------------------+-------------------+------------------+---------------------+
|[Restaurants, Thai]                         |[Mexican, Restaurants]       |0.15731989077924804|5.788453310438805 |0.0012356738546920136|
|[Restaurants, Thai]                         |[Restaurants, Mexican]       |0.16845200588111742|5.977792752697247 |0.0013231113904712884|
|[Thai, Restaurants]                         |[Restaurants, Pizza]         |0.10226139008979049|5.5063902299685745|0.001014605368004791 |
|[Thai, Restaurants]                         |[Mexican, Restaurants]       |0.161456601263718  |5.94066009992709  |0.0016019216460693528|
|[Thai, Restaurants]              

In [12]:
#sort by confidence
model.associationRules.orderBy("confidence", ascending=False).show(25, truncate=False)

+--------------------------------------------+----------------------+-------------------+------------------+---------------------+
|antecedent                                  |consequent            |confidence         |lift              |support              |
+--------------------------------------------+----------------------+-------------------+------------------+---------------------+
|[Restaurants, Pizza, Mexican, Restaurants]  |[Restaurants, Mexican]|0.4654042243262928 |16.515624047931   |0.0010541997238293682|
|[Restaurants, Pizza, Restaurants, Mexican]  |[Mexican, Restaurants]|0.4487359550561798 |16.51086275036046 |0.0010541997238293682|
|[Mexican, Restaurants, Restaurants, Mexican]|[Restaurants, Pizza]  |0.24614791987673343|13.254137264770574|0.0010541997238293682|
|[Vietnamese, Restaurants]                   |[Restaurants, Mexican]|0.1737152034261242 |6.164565857451842 |0.0010706973720896087|
|[Restaurants, Thai]                         |[Restaurants, Mexican]|0.168452005881

In [13]:
model.associationRules.orderBy("confidence", ascending=False).toPandas().head()

Unnamed: 0,antecedent,consequent,confidence,lift,support
0,"[Restaurants, Pizza, Mexican, Restaurants]","[Restaurants, Mexican]",0.465404,16.515624,0.001054
1,"[Restaurants, Pizza, Restaurants, Mexican]","[Mexican, Restaurants]",0.448736,16.510863,0.001054
2,"[Mexican, Restaurants, Restaurants, Mexican]","[Restaurants, Pizza]",0.246148,13.254137,0.001054
3,"[Vietnamese, Restaurants]","[Restaurants, Mexican]",0.173715,6.164566,0.001071
4,"[Restaurants, Thai]","[Restaurants, Mexican]",0.168452,5.977793,0.001323


Predictions

In [14]:
model.transform(user_cat).show(truncate=70)

+----------------------+----------------------------------------------------------------------+----------------------------------------------------------------------+
|               user_id|                                                            categories|                                                            prediction|
+----------------------+----------------------------------------------------------------------+----------------------------------------------------------------------+
|--8r3pNaZiG1fN8LCHuL_g|[Pubs, Nightlife, Bars, Sandwiches, American (New), Restaurants, Am...|[Restaurants, Pizza, Italian, Restaurants, Restaurants, Italian, Me...|
|--Kwhcbkh7jxkhVVQZo2uQ|[Bars, Nightlife, American (Traditional), Barbeque, Seafood, Beer B...|                                                                    []|
|--Vu3Gux9nPnLcG9yO_HxA|[Bakeries, Restaurants, Coffee & Tea, Bagels, Food, Breakfast & Bru...|[Restaurants, Mexican, Restaurants, Pizza, Pizza, Restaurants, Rest...

In [15]:
user_cat_pred = model.transform(user_cat).toPandas()

In [16]:
user_cat_pred = user_cat_pred[user_cat_pred["prediction"].str.len() != 0]

In [17]:
user_cat_pred

Unnamed: 0,user_id,categories,prediction
0,--8r3pNaZiG1fN8LCHuL_g,"[Pubs, Nightlife, Bars, Sandwiches, American (...","[Restaurants, Pizza, Italian, Restaurants, Res..."
2,--Vu3Gux9nPnLcG9yO_HxA,"[Bakeries, Restaurants, Coffee & Tea, Bagels, ...","[Restaurants, Mexican, Restaurants, Pizza, Piz..."
5,--gafFJlfUIwzifH583ruw,"[Restaurants, Mexican]","[Mexican, Restaurants, Restaurants, Pizza, Piz..."
11,-05T0q5BxB9g0RCKiGYoyQ,"[Specialty Food, Japanese, Ethnic Food, Sushi ...","[Restaurants, Pizza, Mexican, Restaurants, Piz..."
29,-3s52C4zL_DHRK0ULG6qtg,"[Bars, Restaurants, Burgers, Greek, American (...","[Restaurants, Mexican, Restaurants, Italian, R..."
...,...,...,...
606118,zv7w7qx-8umbe_RrwGo9MQ,"[American (Traditional), Coffee & Tea, Restaur...","[Restaurants, Mexican, Restaurants, Pizza, Piz..."
606127,zwcrm4WMUq0Duz6P5rr0sw,"[Chinese, Restaurants]","[Restaurants, Pizza, Mexican, Restaurants, Piz..."
606130,zxIYRuegLaSHBhn4JErnFw,"[Restaurants, Pizza]","[Mexican, Restaurants, Restaurants, Mexican, P..."
606139,zykLkhcejsC-4y2GaXBcbQ,"[Pizza, Restaurants]","[Restaurants, Pizza, Mexican, Restaurants, Res..."


## 2. ALS

In [18]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [19]:
restaurant_useful_review.show(5)

+--------------------+--------------------+-----+---------------+--------------------+--------------------+------------+--------------------+------+--------------------+
|         business_id|          categories|state|  business_name|  review_business_id|           review_id|review_stars|                text|useful|             user_id|
+--------------------+--------------------+-----+---------------+--------------------+--------------------+------------+--------------------+------+--------------------+
|kxX2SOes4o-D3ZQBk...|Halal, Pakistani,...|   PA|          Zaika|kxX2SOes4o-D3ZQBk...|AqPFMleE6RsU23_au...|         5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|Sandwiches, Beer,...|   LA|           Melt|e4Vwtrqf-wpJfwesg...|Sx8TMOWLNuJBWer-0...|         4.0|Cute interior and...|     1|bcjbaE6dDog4jkNY9...|
|04UD14gamNjLY0IDY...|Mediterranean, Re...|   PA|       Dmitri's|04UD14gamNjLY0IDY...|JrIxlS1TzJ-iCu79u...|         1.0|I am a long term ...|     1|eU

In [20]:
df_als = restaurant_useful_review.select('user_id','business_id','business_name','review_stars')

In [21]:
df_als.show(5)

+--------------------+--------------------+---------------+------------+
|             user_id|         business_id|  business_name|review_stars|
+--------------------+--------------------+---------------+------------+
|_7bHUi9Uuf5__HHc_...|kxX2SOes4o-D3ZQBk...|          Zaika|         5.0|
|bcjbaE6dDog4jkNY9...|e4Vwtrqf-wpJfwesg...|           Melt|         4.0|
|eUta8W_HdHMXPzLBB...|04UD14gamNjLY0IDY...|       Dmitri's|         1.0|
|yfFzsLmaWF2d4Sr0U...|LHSTtnW3YHCeUkRDG...|Fries Rebellion|         5.0|
|j2wlzrntrbKwyOcOi...|rBdG_23USc7DletfZ...|           Olio|         4.0|
+--------------------+--------------------+---------------+------------+
only showing top 5 rows



In [22]:
from pyspark.ml.feature import StringIndexer

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_int") 
user_id_indexed = user_id_indexer.fit(df_als).transform(df_als) 
user_id_indexed.show()

+--------------------+--------------------+--------------------+------------+-----------+
|             user_id|         business_id|       business_name|review_stars|user_id_int|
+--------------------+--------------------+--------------------+------------+-----------+
|_7bHUi9Uuf5__HHc_...|kxX2SOes4o-D3ZQBk...|               Zaika|         5.0|   440047.0|
|bcjbaE6dDog4jkNY9...|e4Vwtrqf-wpJfwesg...|                Melt|         4.0|     8524.0|
|eUta8W_HdHMXPzLBB...|04UD14gamNjLY0IDY...|            Dmitri's|         1.0|   473296.0|
|yfFzsLmaWF2d4Sr0U...|LHSTtnW3YHCeUkRDG...|     Fries Rebellion|         5.0|   598073.0|
|j2wlzrntrbKwyOcOi...|rBdG_23USc7DletfZ...|                Olio|         4.0|     4689.0|
|RreNy--tOmXMl1en0...|cPepkJeRMtHapc_b2...|Naked Tchopstix E...|         4.0|     3421.0|
|mNsVyC9tQVYtzLOCb...|MWmXGQ98KbRo3vsS5...|Anthony's at Paxo...|         5.0|    59244.0|
|QzCEzH3R7Z6erOGLr...|0pMj5xUAecW9o1P35...|                Wawa|         5.0|   160132.0|
|EZjT2qJN0

In [23]:
business_id_indexer = StringIndexer(inputCol="business_id", outputCol="business_id_int") 
df_als = business_id_indexer.fit(user_id_indexed).transform(user_id_indexed) 
df_als.show()

+--------------------+--------------------+--------------------+------------+-----------+---------------+
|             user_id|         business_id|       business_name|review_stars|user_id_int|business_id_int|
+--------------------+--------------------+--------------------+------------+-----------+---------------+
|_7bHUi9Uuf5__HHc_...|kxX2SOes4o-D3ZQBk...|               Zaika|         5.0|   440047.0|         3634.0|
|bcjbaE6dDog4jkNY9...|e4Vwtrqf-wpJfwesg...|                Melt|         4.0|     8524.0|        22766.0|
|eUta8W_HdHMXPzLBB...|04UD14gamNjLY0IDY...|            Dmitri's|         1.0|   473296.0|         2474.0|
|yfFzsLmaWF2d4Sr0U...|LHSTtnW3YHCeUkRDG...|     Fries Rebellion|         5.0|   598073.0|         9010.0|
|j2wlzrntrbKwyOcOi...|rBdG_23USc7DletfZ...|                Olio|         4.0|     4689.0|         2144.0|
|RreNy--tOmXMl1en0...|cPepkJeRMtHapc_b2...|Naked Tchopstix E...|         4.0|     3421.0|        21160.0|
|mNsVyC9tQVYtzLOCb...|MWmXGQ98KbRo3vsS5...|Ant

In [24]:
from pyspark.sql.types import IntegerType
df_als = df_als.withColumn("user_id_int", df_als["user_id_int"].cast(IntegerType()))
df_als = df_als.withColumn("business_id_int", df_als["business_id_int"].cast(IntegerType()))

In [25]:
df_als.show(5)

+--------------------+--------------------+---------------+------------+-----------+---------------+
|             user_id|         business_id|  business_name|review_stars|user_id_int|business_id_int|
+--------------------+--------------------+---------------+------------+-----------+---------------+
|_7bHUi9Uuf5__HHc_...|kxX2SOes4o-D3ZQBk...|          Zaika|         5.0|     440047|           3634|
|bcjbaE6dDog4jkNY9...|e4Vwtrqf-wpJfwesg...|           Melt|         4.0|       8524|          22766|
|eUta8W_HdHMXPzLBB...|04UD14gamNjLY0IDY...|       Dmitri's|         1.0|     473296|           2474|
|yfFzsLmaWF2d4Sr0U...|LHSTtnW3YHCeUkRDG...|Fries Rebellion|         5.0|     598073|           9010|
|j2wlzrntrbKwyOcOi...|rBdG_23USc7DletfZ...|           Olio|         4.0|       4689|           2144|
+--------------------+--------------------+---------------+------------+-----------+---------------+
only showing top 5 rows



In [27]:
df_als = df_als.drop('business_id','user_id','business_name')

In [30]:
df_als = df_als.sample(fraction=0.1)

In [31]:
print((df_als.count(), len(df_als.columns)))

(52044, 3)


In [32]:
train_df, test_df = df_als.randomSplit([0.7,0.3],seed=1)
als = ALS(maxIter=10, regParam=0.01, userCol="user_id_int", itemCol="business_id_int", ratingCol="review_stars",
          coldStartStrategy="drop", rank=10, nonnegative = True)
#fit and predict
model = als.fit(train_df)
predictions = model.transform(test_df)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3460, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_747/3596533824.py", line 5, in <module>
    model = als.fit(train_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 205, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 383, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 380, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get

ConnectionRefusedError: [Errno 111] Connection refused

## Can't run on the local, so I moved this part to GCP